From c4d1360b96b18925151c45b61691ab8c5608b340 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 15 Apr 2024 16:42:29 +0200 Subject: [PATCH 1/3] fix(dsrepl): trigger election for new ra servers unconditionallly Otherwise we might end up in a situation when there's no member online yet at the time of the election trigger, and the election will never happen. --- .../src/emqx_ds_replication_layer_shard.erl | 38 ++++++++----------- .../test/emqx_ds_replication_SUITE.erl | 4 +- 2 files changed, 17 insertions(+), 25 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl index f4c0d3b01..20d9ef481 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl @@ -341,29 +341,21 @@ start_shard(DB, Shard, #{replication_options := ReplicationOpts}) -> log_init_args => LogOpts }) end, - case Servers of - [LocalServer | _] -> - %% TODO - %% Not super robust, but we probably don't expect nodes to be down - %% when we bring up a fresh consensus group. Triggering election - %% is not really required otherwise. - %% TODO - %% Ensure that doing that on node restart does not disrupt consensus. - %% Edit: looks like it doesn't, this could actually be quite useful - %% to "steal" leadership from nodes that have too much leader load. - %% TODO - %% It doesn't really work that way. There's `ra:transfer_leadership/2` - %% for that. - try - ra:trigger_election(LocalServer, _Timeout = 1_000) - catch - %% TODO - %% Tolerating exceptions because server might be occupied with log - %% replay for a while. - exit:{timeout, _} when not Bootstrap -> - ok - end; - _ -> + %% NOTE + %% Triggering election is necessary when a new consensus group is being brought up. + %% TODO + %% It's probably a good idea to rebalance leaders across the cluster from time to + %% time. There's `ra:transfer_leadership/2` for that. + try Bootstrap andalso ra:trigger_election(LocalServer, _Timeout = 1_000) of + false -> + ok; + ok -> + ok + catch + %% TODO + %% Tolerating exceptions because server might be occupied with log replay for + %% a while. + exit:{timeout, _} when not Bootstrap -> ok end. diff --git a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl index 9fc55d170..3b0e37c7f 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl @@ -435,8 +435,8 @@ t_rebalance_offline_restarts(Config) -> erpc:multicall(Nodes, emqx_ds, open_db, [?DB, Opts]) ), ?retry( - 500, - 10, + 1000, + 5, ?assertEqual([8 || _ <- Nodes], [n_shards_online(N, ?DB) || N <- Nodes]) ), From 89f42f117198645b8f08d59b1abad83bd21e7668 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 15 Apr 2024 16:43:52 +0200 Subject: [PATCH 2/3] fix(dsrepl): make placeholder shard process permanent under supervisor --- apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl index 195db7c34..2dd9ae332 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl @@ -232,7 +232,7 @@ shard_replication_spec(DB, Shard, Opts) -> #{ id => {Shard, replication}, start => {emqx_ds_replication_layer_shard, start_link, [DB, Shard, Opts]}, - restart => transient, + restart => permanent, type => worker }. From 5d7b2e2ce69ab9b4e01f2b2ac7882d1c277f8f60 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 15 Apr 2024 16:58:46 +0200 Subject: [PATCH 3/3] fix(dsrepl): attempt leadership transfer on terminate In addition to on removal. The reasoning is basically the same: try to avoid situations when log entries are replicated (or will be considered replicated when the new leader is elected) but the leader terminates before replying to the client. To be clear: this is a stupid solution. Something much more robust is needed. --- .../src/emqx_ds_builtin_db_sup.erl | 1 + .../src/emqx_ds_replication_layer_shard.erl | 58 ++++++++++++------- 2 files changed, 37 insertions(+), 22 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl index 2dd9ae332..ef1600500 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl @@ -232,6 +232,7 @@ shard_replication_spec(DB, Shard, Opts) -> #{ id => {Shard, replication}, start => {emqx_ds_replication_layer_shard, start_link, [DB, Shard, Opts]}, + shutdown => 10_000, restart => permanent, type => worker }. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl index 20d9ef481..e0e70596a 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl @@ -189,22 +189,9 @@ add_local_server(DB, Shard) -> -spec drop_local_server(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> ok | emqx_ds:error(_Reason). drop_local_server(DB, Shard) -> - ShardServers = shard_servers(DB, Shard), + %% NOTE: Timeouts are ignored, it's a best effort attempt. + _ = prep_stop_server(DB, Shard), LocalServer = local_server(DB, Shard), - case lookup_leader(DB, Shard) of - LocalServer -> - %% NOTE - %% Trigger leadership transfer *and* force to wait until the new leader - %% is elected and updated in the leaderboard. This should help to avoid - %% edge cases where entries appended right before removal are duplicated - %% due to client retries. - %% Timeouts are ignored, it's a best effort attempt. - [Candidate | _] = lists:delete(LocalServer, ShardServers), - _ = ra:transfer_leadership(LocalServer, Candidate), - _ = wait_until(fun() -> lookup_leader(DB, Shard) == Candidate end); - _Another -> - ok - end, case remove_server(DB, Shard, LocalServer) of ok -> ra:force_delete_server(DB, LocalServer); @@ -300,7 +287,7 @@ ra_overview(Server) -> init({DB, Shard, Opts}) -> _ = process_flag(trap_exit, true), - ok = start_shard(DB, Shard, Opts), + ok = start_server(DB, Shard, Opts), {ok, {DB, Shard}}. handle_call(_Call, _From, State) -> @@ -310,18 +297,18 @@ handle_cast(_Msg, State) -> {noreply, State}. terminate(_Reason, {DB, Shard}) -> + %% NOTE: Timeouts are ignored, it's a best effort attempt. + catch prep_stop_server(DB, Shard), LocalServer = get_local_server(DB, Shard), ok = ra:stop_server(DB, LocalServer). %% -start_shard(DB, Shard, #{replication_options := ReplicationOpts}) -> +start_server(DB, Shard, #{replication_options := ReplicationOpts}) -> ClusterName = cluster_name(DB, Shard), LocalServer = local_server(DB, Shard), Servers = shard_servers(DB, Shard), case ra:restart_server(DB, LocalServer) of - ok -> - Bootstrap = false; {error, name_not_registered} -> Bootstrap = true, Machine = {module, emqx_ds_replication_layer, #{db => DB, shard => Shard}}, @@ -339,7 +326,11 @@ start_shard(DB, Shard, #{replication_options := ReplicationOpts}) -> initial_members => Servers, machine => Machine, log_init_args => LogOpts - }) + }); + ok -> + Bootstrap = false; + {error, {already_started, _}} -> + Bootstrap = false end, %% NOTE %% Triggering election is necessary when a new consensus group is being brought up. @@ -371,6 +362,29 @@ server_uid(_DB, Shard) -> %% +prep_stop_server(DB, Shard) -> + prep_stop_server(DB, Shard, 5_000). + +prep_stop_server(DB, Shard, Timeout) -> + LocalServer = get_local_server(DB, Shard), + Candidates = lists:delete(LocalServer, shard_servers(DB, Shard)), + case lookup_leader(DB, Shard) of + LocalServer when Candidates =/= [] -> + %% NOTE + %% Trigger leadership transfer *and* force to wait until the new leader + %% is elected and updated in the leaderboard. This should help to avoid + %% edge cases where entries appended right before removal are duplicated + %% due to client retries. + %% TODO: Candidate may be offline. + [Candidate | _] = Candidates, + _ = ra:transfer_leadership(LocalServer, Candidate), + wait_until(fun() -> lookup_leader(DB, Shard) == Candidate end, Timeout); + _Another -> + ok + end. + +%% + memoize(Fun, Args) -> %% NOTE: Assuming that the function is pure and never returns `undefined`. case persistent_term:get([Fun | Args], undefined) of @@ -382,8 +396,8 @@ memoize(Fun, Args) -> Result end. -wait_until(Fun) -> - wait_until(Fun, 5_000, 250). +wait_until(Fun, Timeout) -> + wait_until(Fun, Timeout, 100). wait_until(Fun, Timeout, Sleep) -> Deadline = erlang:monotonic_time(millisecond) + Timeout,