diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl index 2dd9ae332..ef1600500 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl @@ -232,6 +232,7 @@ shard_replication_spec(DB, Shard, Opts) -> #{ id => {Shard, replication}, start => {emqx_ds_replication_layer_shard, start_link, [DB, Shard, Opts]}, + shutdown => 10_000, restart => permanent, type => worker }. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl index 20d9ef481..e0e70596a 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl @@ -189,22 +189,9 @@ add_local_server(DB, Shard) -> -spec drop_local_server(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> ok | emqx_ds:error(_Reason). drop_local_server(DB, Shard) -> - ShardServers = shard_servers(DB, Shard), + %% NOTE: Timeouts are ignored, it's a best effort attempt. + _ = prep_stop_server(DB, Shard), LocalServer = local_server(DB, Shard), - case lookup_leader(DB, Shard) of - LocalServer -> - %% NOTE - %% Trigger leadership transfer *and* force to wait until the new leader - %% is elected and updated in the leaderboard. This should help to avoid - %% edge cases where entries appended right before removal are duplicated - %% due to client retries. - %% Timeouts are ignored, it's a best effort attempt. - [Candidate | _] = lists:delete(LocalServer, ShardServers), - _ = ra:transfer_leadership(LocalServer, Candidate), - _ = wait_until(fun() -> lookup_leader(DB, Shard) == Candidate end); - _Another -> - ok - end, case remove_server(DB, Shard, LocalServer) of ok -> ra:force_delete_server(DB, LocalServer); @@ -300,7 +287,7 @@ ra_overview(Server) -> init({DB, Shard, Opts}) -> _ = process_flag(trap_exit, true), - ok = start_shard(DB, Shard, Opts), + ok = start_server(DB, Shard, Opts), {ok, {DB, Shard}}. handle_call(_Call, _From, State) -> @@ -310,18 +297,18 @@ handle_cast(_Msg, State) -> {noreply, State}. terminate(_Reason, {DB, Shard}) -> + %% NOTE: Timeouts are ignored, it's a best effort attempt. + catch prep_stop_server(DB, Shard), LocalServer = get_local_server(DB, Shard), ok = ra:stop_server(DB, LocalServer). %% -start_shard(DB, Shard, #{replication_options := ReplicationOpts}) -> +start_server(DB, Shard, #{replication_options := ReplicationOpts}) -> ClusterName = cluster_name(DB, Shard), LocalServer = local_server(DB, Shard), Servers = shard_servers(DB, Shard), case ra:restart_server(DB, LocalServer) of - ok -> - Bootstrap = false; {error, name_not_registered} -> Bootstrap = true, Machine = {module, emqx_ds_replication_layer, #{db => DB, shard => Shard}}, @@ -339,7 +326,11 @@ start_shard(DB, Shard, #{replication_options := ReplicationOpts}) -> initial_members => Servers, machine => Machine, log_init_args => LogOpts - }) + }); + ok -> + Bootstrap = false; + {error, {already_started, _}} -> + Bootstrap = false end, %% NOTE %% Triggering election is necessary when a new consensus group is being brought up. @@ -371,6 +362,29 @@ server_uid(_DB, Shard) -> %% +prep_stop_server(DB, Shard) -> + prep_stop_server(DB, Shard, 5_000). + +prep_stop_server(DB, Shard, Timeout) -> + LocalServer = get_local_server(DB, Shard), + Candidates = lists:delete(LocalServer, shard_servers(DB, Shard)), + case lookup_leader(DB, Shard) of + LocalServer when Candidates =/= [] -> + %% NOTE + %% Trigger leadership transfer *and* force to wait until the new leader + %% is elected and updated in the leaderboard. This should help to avoid + %% edge cases where entries appended right before removal are duplicated + %% due to client retries. + %% TODO: Candidate may be offline. + [Candidate | _] = Candidates, + _ = ra:transfer_leadership(LocalServer, Candidate), + wait_until(fun() -> lookup_leader(DB, Shard) == Candidate end, Timeout); + _Another -> + ok + end. + +%% + memoize(Fun, Args) -> %% NOTE: Assuming that the function is pure and never returns `undefined`. case persistent_term:get([Fun | Args], undefined) of @@ -382,8 +396,8 @@ memoize(Fun, Args) -> Result end. -wait_until(Fun) -> - wait_until(Fun, 5_000, 250). +wait_until(Fun, Timeout) -> + wait_until(Fun, Timeout, 100). wait_until(Fun, Timeout, Sleep) -> Deadline = erlang:monotonic_time(millisecond) + Timeout,