diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl index 8f87b69b4..2d19ec7ef 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl @@ -114,6 +114,13 @@ get_server_local_preferred(DB, Shard) -> pick_random(get_shard_servers(DB, Shard)) end. +lookup_leader(DB, Shard) -> + %% NOTE + %% Does not block, but the result may be outdated or even unknown when there's + %% no servers on the local node. + ClusterName = get_cluster_name(DB, Shard), + ra_leaderboard:lookup_leader(ClusterName). + pick_local(Servers) -> case lists:keyfind(node(), 2, Servers) of Local when is_tuple(Local) -> @@ -181,7 +188,22 @@ add_local_server(DB, Shard) -> -spec drop_local_server(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> ok | emqx_ds:error(_Reason). drop_local_server(DB, Shard) -> + ShardServers = shard_servers(DB, Shard), LocalServer = local_server(DB, Shard), + case lookup_leader(DB, Shard) of + LocalServer -> + %% NOTE + %% Trigger leadership transfer *and* force to wait until the new leader + %% is elected and updated in the leaderboard. This should help to avoid + %% edge cases where entries appended right before removal are duplicated + %% due to client retries. + %% Timeouts are ignored, it's a best effort attempt. + [Candidate | _] = lists:delete(LocalServer, ShardServers), + _ = ra:transfer_leadership(LocalServer, Candidate), + _ = wait_until(fun() -> lookup_leader(DB, Shard) == Candidate end); + _Another -> + ok + end, case remove_server(DB, Shard, LocalServer) of ok -> ra:force_delete_server(DB, LocalServer); @@ -351,3 +373,24 @@ memoize(Fun, Args) -> Result -> Result end. + +wait_until(Fun) -> + wait_until(Fun, 5_000, 250). + +wait_until(Fun, Timeout, Sleep) -> + Deadline = erlang:monotonic_time(millisecond) + Timeout, + loop_until(Fun, Deadline, Sleep). + +loop_until(Fun, Deadline, Sleep) -> + case Fun() of + true -> + ok; + false -> + case erlang:monotonic_time(millisecond) of + Now when Now < Deadline -> + timer:sleep(Sleep), + loop_until(Fun, Deadline, Sleep); + _ -> + timeout + end + end.