diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 896999af7..836e9df07 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -609,13 +609,7 @@ ra_add_generation(DB, Shard) -> ?tag => add_generation, ?since => emqx_ds:timestamp_us() }, - Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred), - case ra:process_command(Servers, Command, ?RA_TIMEOUT) of - {ok, Result, _Leader} -> - Result; - Error -> - error(Error, [DB, Shard]) - end. + ra_command(DB, Shard, Command, 10). ra_update_config(DB, Shard, Opts) -> Command = #{ @@ -623,20 +617,20 @@ ra_update_config(DB, Shard, Opts) -> ?config => Opts, ?since => emqx_ds:timestamp_us() }, - Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred), - case ra:process_command(Servers, Command, ?RA_TIMEOUT) of - {ok, Result, _Leader} -> - Result; - Error -> - error(Error, [DB, Shard]) - end. + ra_command(DB, Shard, Command, 10). ra_drop_generation(DB, Shard, GenId) -> Command = #{?tag => drop_generation, ?generation => GenId}, + ra_command(DB, Shard, Command, 10). + +ra_command(DB, Shard, Command, Retries) -> Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred), case ra:process_command(Servers, Command, ?RA_TIMEOUT) of {ok, Result, _Leader} -> Result; + _Error when Retries > 0 -> + timer:sleep(?RA_TIMEOUT), + ra_command(DB, Shard, Command, Retries - 1); Error -> error(Error, [DB, Shard]) end.