From d12e907209786a0170d5026f04de01ae949f92f7 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 8 Apr 2024 22:44:34 +0200 Subject: [PATCH] fix(dsrepl): correctly handle ra membership change command results Before this change, results similar to `{error, {no_more_servers_to_try, [{error, nodedown}, {error, not_member}]}}` were considered retryable failures, which is incorrect. --- .../src/emqx_ds_replication_layer_shard.erl | 28 +++++++++++++++---- 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl index 2d19ec7ef..f4c0d3b01 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl @@ -173,13 +173,14 @@ add_local_server(DB, Shard) -> membership => voter } end, - case ra:add_member(ShardServers, ServerRecord, ?MEMBERSHIP_CHANGE_TIMEOUT) of + Timeout = ?MEMBERSHIP_CHANGE_TIMEOUT, + case ra_try_servers(ShardServers, fun ra:add_member/3, [ServerRecord, Timeout]) of {ok, _, _Leader} -> ok; {error, already_member} -> ok; - {error, Reason} -> - {error, recoverable, Reason} + Error -> + {error, recoverable, Error} end. %% @doc Remove a local server from the shard cluster and clean up on-disk data. @@ -219,13 +220,14 @@ drop_local_server(DB, Shard) -> ok | emqx_ds:error(_Reason). remove_server(DB, Shard, Server) -> ShardServers = shard_servers(DB, Shard), - case ra:remove_member(ShardServers, Server, ?MEMBERSHIP_CHANGE_TIMEOUT) of + Timeout = ?MEMBERSHIP_CHANGE_TIMEOUT, + case ra_try_servers(ShardServers, fun ra:remove_member/3, [Server, Timeout]) of {ok, _, _Leader} -> ok; {error, not_member} -> ok; - {error, Reason} -> - {error, recoverable, Reason} + Error -> + {error, recoverable, Error} end. -spec server_info @@ -272,6 +274,20 @@ member_readiness(#{status := Status, voter_status := #{membership := Membership} member_readiness(#{}) -> unknown. +%% + +ra_try_servers([Server | Rest], Fun, Args) -> + case erlang:apply(Fun, [Server | Args]) of + {ok, R, Leader} -> + {ok, R, Leader}; + {error, Reason} when Reason == noproc; Reason == nodedown -> + ra_try_servers(Rest, Fun, Args); + ErrorOrTimeout -> + ErrorOrTimeout + end; +ra_try_servers([], _Fun, _Args) -> + {error, servers_unreachable}. + ra_overview(Server) -> case ra:member_overview(Server) of {ok, Overview, _Leader} ->