diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl index 12d621102..36dc654b4 100644 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl +++ b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl @@ -18,7 +18,8 @@ %% Dynamic server location API -export([ - servers/3 + servers/3, + shard_info/3 ]). %% Safe Process Command API @@ -38,6 +39,7 @@ -behaviour(gen_server). -export([ init/1, + handle_continue/2, handle_call/3, handle_cast/2, terminate/2 @@ -160,6 +162,12 @@ local_site() -> %% +-spec shard_info(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), _Info) -> _Value. +shard_info(DB, Shard, ready) -> + persistent_term:get({?MODULE, DB, Shard}, false). + +%% + -spec process_command([server()], _Command, timeout()) -> {ok, _Result, _Leader :: server()} | server_error(). process_command(Servers, Command, Timeout) -> @@ -324,10 +332,38 @@ ra_overview(Server) -> %% +-record(st, { + db :: emqx_ds:db(), + shard :: emqx_ds_replication_layer:shard_id(), + server :: server(), + bootstrapped :: boolean(), + stage :: term() +}). + init({DB, Shard, Opts}) -> _ = process_flag(trap_exit, true), - ok = start_server(DB, Shard, Opts), - {ok, {DB, Shard}}. + case start_server(DB, Shard, Opts) of + {_New = true, Server} -> + NextStage = trigger_election; + {_New = false, Server} -> + NextStage = wait_leader + end, + St = #st{ + db = DB, + shard = Shard, + server = Server, + bootstrapped = false, + stage = NextStage + }, + {ok, St, {continue, bootstrap}}. + +handle_continue(bootstrap, St0) -> + case bootstrap(St0) of + St = #st{bootstrapped = true} -> + {noreply, St}; + St = #st{bootstrapped = false} -> + {noreply, St, {continue, bootstrap}} + end. handle_call(_Call, _From, State) -> {reply, ignored, State}. @@ -336,6 +372,8 @@ handle_cast(_Msg, State) -> {noreply, State}. terminate(_Reason, {DB, Shard}) -> + %% FIXME + persistent_term:erase({?MODULE, DB, Shard}), %% NOTE: Timeouts are ignored, it's a best effort attempt. catch prep_stop_server(DB, Shard), LocalServer = get_local_server(DB, Shard), @@ -343,6 +381,34 @@ terminate(_Reason, {DB, Shard}) -> %% +bootstrap(St = #st{stage = trigger_election, server = Server}) -> + ok = trigger_election(Server), + St#st{stage = wait_leader}; +bootstrap(St = #st{stage = wait_leader, server = Server}) -> + case current_leader(Server) of + Leader = {_, _} -> + St#st{stage = {wait_log, Leader}}; + unknown -> + St + end; +bootstrap(St = #st{stage = {wait_log, Leader}}) -> + case ra_overview(Leader) of + #{commit_index := RaftIdx} -> + St#st{stage = {wait_log_index, RaftIdx}}; + #{} -> + St#st{stage = wait_leader} + end; +bootstrap(St = #st{stage = {wait_log_index, RaftIdx}, db = DB, shard = Shard, server = Server}) -> + case ra_overview(Server) of + #{commit_index := RaftIdx} -> + ok = announce_shard_ready(DB, Shard), + St#st{bootstrapped = true, stage = undefined}; + #{} -> + St + end. + +%% + start_server(DB, Shard, #{replication_options := ReplicationOpts}) -> ClusterName = cluster_name(DB, Shard), LocalServer = local_server(DB, Shard), @@ -350,7 +416,6 @@ start_server(DB, Shard, #{replication_options := ReplicationOpts}) -> MutableConfig = #{tick_timeout => 100}, case ra:restart_server(DB, LocalServer, MutableConfig) of {error, name_not_registered} -> - Bootstrap = true, Machine = {module, emqx_ds_replication_layer, #{db => DB, shard => Shard}}, LogOpts = maps:with( [ @@ -366,30 +431,35 @@ start_server(DB, Shard, #{replication_options := ReplicationOpts}) -> initial_members => Servers, machine => Machine, log_init_args => LogOpts - }); + }), + {_NewServer = true, LocalServer}; ok -> - Bootstrap = false; + {_NewServer = false, LocalServer}; {error, {already_started, _}} -> - Bootstrap = false - end, + {_NewServer = false, LocalServer} + end. + +trigger_election(Server) -> %% NOTE %% Triggering election is necessary when a new consensus group is being brought up. %% TODO %% It's probably a good idea to rebalance leaders across the cluster from time to %% time. There's `ra:transfer_leadership/2` for that. - try Bootstrap andalso ra:trigger_election(LocalServer, _Timeout = 1_000) of - false -> - ok; - ok -> - ok + try ra:trigger_election(Server) of + ok -> ok catch - %% TODO + %% NOTE %% Tolerating exceptions because server might be occupied with log replay for %% a while. - exit:{timeout, _} when not Bootstrap -> + exit:{timeout, _} -> + ?tp(emqx_ds_replshard_trigger_election, #{server => Server, error => timeout}), ok end. +announce_shard_ready(DB, Shard) -> + %% FIXME + persistent_term:put({?MODULE, DB, Shard}, true). + server_uid(_DB, Shard) -> %% NOTE %% Each new "instance" of a server should have a unique identifier. Otherwise,