From 3223797ae5930587523a15be1ce00e62012dbdab Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 8 Apr 2024 21:28:20 +0200 Subject: [PATCH] fix(dsrepl): attempt leadership transfer before server removal This should make it much less likely to hit weird edge cases that lead to duplicate Raft log entries because of client retries upon receiving `shutdown` from the leader being removed. --- .../src/emqx_ds_replication_layer_shard.erl | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl index 8f87b69b4..2d19ec7ef 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl @@ -114,6 +114,13 @@ get_server_local_preferred(DB, Shard) -> pick_random(get_shard_servers(DB, Shard)) end. +lookup_leader(DB, Shard) -> + %% NOTE + %% Does not block, but the result may be outdated or even unknown when there's + %% no servers on the local node. + ClusterName = get_cluster_name(DB, Shard), + ra_leaderboard:lookup_leader(ClusterName). + pick_local(Servers) -> case lists:keyfind(node(), 2, Servers) of Local when is_tuple(Local) -> @@ -181,7 +188,22 @@ add_local_server(DB, Shard) -> -spec drop_local_server(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> ok | emqx_ds:error(_Reason). drop_local_server(DB, Shard) -> + ShardServers = shard_servers(DB, Shard), LocalServer = local_server(DB, Shard), + case lookup_leader(DB, Shard) of + LocalServer -> + %% NOTE + %% Trigger leadership transfer *and* force to wait until the new leader + %% is elected and updated in the leaderboard. This should help to avoid + %% edge cases where entries appended right before removal are duplicated + %% due to client retries. + %% Timeouts are ignored, it's a best effort attempt. + [Candidate | _] = lists:delete(LocalServer, ShardServers), + _ = ra:transfer_leadership(LocalServer, Candidate), + _ = wait_until(fun() -> lookup_leader(DB, Shard) == Candidate end); + _Another -> + ok + end, case remove_server(DB, Shard, LocalServer) of ok -> ra:force_delete_server(DB, LocalServer); @@ -351,3 +373,24 @@ memoize(Fun, Args) -> Result -> Result end. + +wait_until(Fun) -> + wait_until(Fun, 5_000, 250). + +wait_until(Fun, Timeout, Sleep) -> + Deadline = erlang:monotonic_time(millisecond) + Timeout, + loop_until(Fun, Deadline, Sleep). + +loop_until(Fun, Deadline, Sleep) -> + case Fun() of + true -> + ok; + false -> + case erlang:monotonic_time(millisecond) of + Now when Now < Deadline -> + timer:sleep(Sleep), + loop_until(Fun, Deadline, Sleep); + _ -> + timeout + end + end.