fix(dsraft): add more involved shard bootstrapping

Namely, attempt to make sure log is sufficiently replayed on the shard
server, before announcing it is "ready".
This commit is contained in:
Andrew Mayorov 2024-08-01 19:02:22 +02:00
parent cf608a73a5
commit 10dadbad3b
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
1 changed files with 85 additions and 15 deletions

View File

@ -18,7 +18,8 @@
%% Dynamic server location API %% Dynamic server location API
-export([ -export([
servers/3 servers/3,
shard_info/3
]). ]).
%% Safe Process Command API %% Safe Process Command API
@ -38,6 +39,7 @@
-behaviour(gen_server). -behaviour(gen_server).
-export([ -export([
init/1, init/1,
handle_continue/2,
handle_call/3, handle_call/3,
handle_cast/2, handle_cast/2,
terminate/2 terminate/2
@ -160,6 +162,12 @@ local_site() ->
%% %%
-spec shard_info(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), _Info) -> _Value.
shard_info(DB, Shard, ready) ->
persistent_term:get({?MODULE, DB, Shard}, false).
%%
-spec process_command([server()], _Command, timeout()) -> -spec process_command([server()], _Command, timeout()) ->
{ok, _Result, _Leader :: server()} | server_error(). {ok, _Result, _Leader :: server()} | server_error().
process_command(Servers, Command, Timeout) -> process_command(Servers, Command, Timeout) ->
@ -324,10 +332,38 @@ ra_overview(Server) ->
%% %%
-record(st, {
db :: emqx_ds:db(),
shard :: emqx_ds_replication_layer:shard_id(),
server :: server(),
bootstrapped :: boolean(),
stage :: term()
}).
init({DB, Shard, Opts}) -> init({DB, Shard, Opts}) ->
_ = process_flag(trap_exit, true), _ = process_flag(trap_exit, true),
ok = start_server(DB, Shard, Opts), case start_server(DB, Shard, Opts) of
{ok, {DB, Shard}}. {_New = true, Server} ->
NextStage = trigger_election;
{_New = false, Server} ->
NextStage = wait_leader
end,
St = #st{
db = DB,
shard = Shard,
server = Server,
bootstrapped = false,
stage = NextStage
},
{ok, St, {continue, bootstrap}}.
handle_continue(bootstrap, St0) ->
case bootstrap(St0) of
St = #st{bootstrapped = true} ->
{noreply, St};
St = #st{bootstrapped = false} ->
{noreply, St, {continue, bootstrap}}
end.
handle_call(_Call, _From, State) -> handle_call(_Call, _From, State) ->
{reply, ignored, State}. {reply, ignored, State}.
@ -336,6 +372,8 @@ handle_cast(_Msg, State) ->
{noreply, State}. {noreply, State}.
terminate(_Reason, {DB, Shard}) -> terminate(_Reason, {DB, Shard}) ->
%% FIXME
persistent_term:erase({?MODULE, DB, Shard}),
%% NOTE: Timeouts are ignored, it's a best effort attempt. %% NOTE: Timeouts are ignored, it's a best effort attempt.
catch prep_stop_server(DB, Shard), catch prep_stop_server(DB, Shard),
LocalServer = get_local_server(DB, Shard), LocalServer = get_local_server(DB, Shard),
@ -343,6 +381,34 @@ terminate(_Reason, {DB, Shard}) ->
%% %%
bootstrap(St = #st{stage = trigger_election, server = Server}) ->
ok = trigger_election(Server),
St#st{stage = wait_leader};
bootstrap(St = #st{stage = wait_leader, server = Server}) ->
case current_leader(Server) of
Leader = {_, _} ->
St#st{stage = {wait_log, Leader}};
unknown ->
St
end;
bootstrap(St = #st{stage = {wait_log, Leader}}) ->
case ra_overview(Leader) of
#{commit_index := RaftIdx} ->
St#st{stage = {wait_log_index, RaftIdx}};
#{} ->
St#st{stage = wait_leader}
end;
bootstrap(St = #st{stage = {wait_log_index, RaftIdx}, db = DB, shard = Shard, server = Server}) ->
case ra_overview(Server) of
#{commit_index := RaftIdx} ->
ok = announce_shard_ready(DB, Shard),
St#st{bootstrapped = true, stage = undefined};
#{} ->
St
end.
%%
start_server(DB, Shard, #{replication_options := ReplicationOpts}) -> start_server(DB, Shard, #{replication_options := ReplicationOpts}) ->
ClusterName = cluster_name(DB, Shard), ClusterName = cluster_name(DB, Shard),
LocalServer = local_server(DB, Shard), LocalServer = local_server(DB, Shard),
@ -350,7 +416,6 @@ start_server(DB, Shard, #{replication_options := ReplicationOpts}) ->
MutableConfig = #{tick_timeout => 100}, MutableConfig = #{tick_timeout => 100},
case ra:restart_server(DB, LocalServer, MutableConfig) of case ra:restart_server(DB, LocalServer, MutableConfig) of
{error, name_not_registered} -> {error, name_not_registered} ->
Bootstrap = true,
Machine = {module, emqx_ds_replication_layer, #{db => DB, shard => Shard}}, Machine = {module, emqx_ds_replication_layer, #{db => DB, shard => Shard}},
LogOpts = maps:with( LogOpts = maps:with(
[ [
@ -366,30 +431,35 @@ start_server(DB, Shard, #{replication_options := ReplicationOpts}) ->
initial_members => Servers, initial_members => Servers,
machine => Machine, machine => Machine,
log_init_args => LogOpts log_init_args => LogOpts
}); }),
{_NewServer = true, LocalServer};
ok -> ok ->
Bootstrap = false; {_NewServer = false, LocalServer};
{error, {already_started, _}} -> {error, {already_started, _}} ->
Bootstrap = false {_NewServer = false, LocalServer}
end, end.
trigger_election(Server) ->
%% NOTE %% NOTE
%% Triggering election is necessary when a new consensus group is being brought up. %% Triggering election is necessary when a new consensus group is being brought up.
%% TODO %% TODO
%% It's probably a good idea to rebalance leaders across the cluster from time to %% It's probably a good idea to rebalance leaders across the cluster from time to
%% time. There's `ra:transfer_leadership/2` for that. %% time. There's `ra:transfer_leadership/2` for that.
try Bootstrap andalso ra:trigger_election(LocalServer, _Timeout = 1_000) of try ra:trigger_election(Server) of
false -> ok -> ok
ok;
ok ->
ok
catch catch
%% TODO %% NOTE
%% Tolerating exceptions because server might be occupied with log replay for %% Tolerating exceptions because server might be occupied with log replay for
%% a while. %% a while.
exit:{timeout, _} when not Bootstrap -> exit:{timeout, _} ->
?tp(emqx_ds_replshard_trigger_election, #{server => Server, error => timeout}),
ok ok
end. end.
announce_shard_ready(DB, Shard) ->
%% FIXME
persistent_term:put({?MODULE, DB, Shard}, true).
server_uid(_DB, Shard) -> server_uid(_DB, Shard) ->
%% NOTE %% NOTE
%% Each new "instance" of a server should have a unique identifier. Otherwise, %% Each new "instance" of a server should have a unique identifier. Otherwise,