fix(dsrepl): Retry sending ra commands to the leader

This commit is contained in:
ieQu1 2024-05-24 19:07:12 +02:00
parent 1ffc7d5d9e
commit 830b62d899
No known key found for this signature in database
GPG Key ID: 488654DF3FED6FDE
1 changed files with 8 additions and 14 deletions

View File

@ -609,13 +609,7 @@ ra_add_generation(DB, Shard) ->
?tag => add_generation, ?tag => add_generation,
?since => emqx_ds:timestamp_us() ?since => emqx_ds:timestamp_us()
}, },
Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred), ra_command(DB, Shard, Command, 10).
case ra:process_command(Servers, Command, ?RA_TIMEOUT) of
{ok, Result, _Leader} ->
Result;
Error ->
error(Error, [DB, Shard])
end.
ra_update_config(DB, Shard, Opts) -> ra_update_config(DB, Shard, Opts) ->
Command = #{ Command = #{
@ -623,20 +617,20 @@ ra_update_config(DB, Shard, Opts) ->
?config => Opts, ?config => Opts,
?since => emqx_ds:timestamp_us() ?since => emqx_ds:timestamp_us()
}, },
Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred), ra_command(DB, Shard, Command, 10).
case ra:process_command(Servers, Command, ?RA_TIMEOUT) of
{ok, Result, _Leader} ->
Result;
Error ->
error(Error, [DB, Shard])
end.
ra_drop_generation(DB, Shard, GenId) -> ra_drop_generation(DB, Shard, GenId) ->
Command = #{?tag => drop_generation, ?generation => GenId}, Command = #{?tag => drop_generation, ?generation => GenId},
ra_command(DB, Shard, Command, 10).
ra_command(DB, Shard, Command, Retries) ->
Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred), Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred),
case ra:process_command(Servers, Command, ?RA_TIMEOUT) of case ra:process_command(Servers, Command, ?RA_TIMEOUT) of
{ok, Result, _Leader} -> {ok, Result, _Leader} ->
Result; Result;
_Error when Retries > 0 ->
timer:sleep(?RA_TIMEOUT),
ra_command(DB, Shard, Command, Retries - 1);
Error -> Error ->
error(Error, [DB, Shard]) error(Error, [DB, Shard])
end. end.