diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl index 11c809dbd..42b4b87b9 100644 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl @@ -479,10 +479,10 @@ shards_of_batch(_DB, [], Acc) -> %% TODO %% There's a possibility of race condition: storage may shut down right after we %% ask for its status. --define(IF_STORAGE_RUNNING(SHARDID, EXPR), - case emqx_ds_storage_layer:shard_info(SHARDID, status) of - running -> EXPR; - down -> {error, recoverable, storage_down} +-define(IF_SHARD_READY(DB, SHARD, EXPR), + case emqx_ds_replication_layer_shard:shard_info(DB, SHARD, ready) of + true -> EXPR; + false -> {error, recoverable, shard_unavailable} end ). @@ -525,8 +525,9 @@ do_get_streams_v1(_DB, _Shard, _TopicFilter, _StartTime) -> [{integer(), emqx_ds_storage_layer:stream()}] | emqx_ds:error(storage_down). do_get_streams_v2(DB, Shard, TopicFilter, StartTime) -> ShardId = {DB, Shard}, - ?IF_STORAGE_RUNNING( - ShardId, + ?IF_SHARD_READY( + DB, + Shard, emqx_ds_storage_layer:get_streams(ShardId, TopicFilter, StartTime) ). @@ -552,8 +553,9 @@ do_make_iterator_v1(_DB, _Shard, _Stream, _TopicFilter, _StartTime) -> emqx_ds:make_iterator_result(emqx_ds_storage_layer:iterator()). do_make_iterator_v2(DB, Shard, Stream, TopicFilter, StartTime) -> ShardId = {DB, Shard}, - ?IF_STORAGE_RUNNING( - ShardId, + ?IF_SHARD_READY( + DB, + Shard, emqx_ds_storage_layer:make_iterator(ShardId, Stream, TopicFilter, StartTime) ). @@ -587,8 +589,9 @@ do_update_iterator_v2(DB, Shard, OldIter, DSKey) -> emqx_ds:next_result(emqx_ds_storage_layer:iterator()). do_next_v1(DB, Shard, Iter, BatchSize) -> ShardId = {DB, Shard}, - ?IF_STORAGE_RUNNING( - ShardId, + ?IF_SHARD_READY( + DB, + Shard, emqx_ds_storage_layer:next( ShardId, Iter, BatchSize, emqx_ds_replication_layer:current_timestamp(DB, Shard) ) @@ -620,8 +623,9 @@ do_add_generation_v2(_DB) -> | emqx_ds:error(storage_down). do_list_generations_with_lifetimes_v3(DB, Shard) -> ShardId = {DB, Shard}, - ?IF_STORAGE_RUNNING( - ShardId, + ?IF_SHARD_READY( + DB, + Shard, emqx_ds_storage_layer:list_generations_with_lifetimes(ShardId) ). diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl index 12d621102..31496c7a8 100644 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl +++ b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl @@ -18,7 +18,8 @@ %% Dynamic server location API -export([ - servers/3 + servers/3, + shard_info/3 ]). %% Safe Process Command API @@ -38,8 +39,10 @@ -behaviour(gen_server). -export([ init/1, + handle_continue/2, handle_call/3, handle_cast/2, + handle_info/2, terminate/2 ]). @@ -52,6 +55,9 @@ | {error, servers_unreachable}. -define(MEMBERSHIP_CHANGE_TIMEOUT, 30_000). +-define(MAX_BOOSTRAP_RETRY_TIMEOUT, 1_000). + +-define(PTERM(DB, SHARD, KEY), {?MODULE, DB, SHARD, KEY}). %% @@ -160,6 +166,12 @@ local_site() -> %% +-spec shard_info(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), _Info) -> _Value. +shard_info(DB, Shard, ready) -> + get_shard_info(DB, Shard, ready, false). + +%% + -spec process_command([server()], _Command, timeout()) -> {ok, _Result, _Leader :: server()} | server_error(). process_command(Servers, Command, Timeout) -> @@ -324,10 +336,45 @@ ra_overview(Server) -> %% +-record(st, { + db :: emqx_ds:db(), + shard :: emqx_ds_replication_layer:shard_id(), + server :: server(), + bootstrapped :: boolean(), + stage :: term() +}). + init({DB, Shard, Opts}) -> _ = process_flag(trap_exit, true), - ok = start_server(DB, Shard, Opts), - {ok, {DB, Shard}}. + case start_server(DB, Shard, Opts) of + {_New = true, Server} -> + NextStage = trigger_election; + {_New = false, Server} -> + NextStage = wait_leader + end, + St = #st{ + db = DB, + shard = Shard, + server = Server, + bootstrapped = false, + stage = NextStage + }, + {ok, St, {continue, bootstrap}}. + +handle_continue(bootstrap, St = #st{bootstrapped = true}) -> + {noreply, St}; +handle_continue(bootstrap, St0 = #st{db = DB, shard = Shard, stage = Stage}) -> + ?tp(emqx_ds_replshard_bootstrapping, #{db => DB, shard => Shard, stage => Stage}), + case bootstrap(St0) of + St = #st{bootstrapped = true} -> + ?tp(emqx_ds_replshard_bootstrapped, #{db => DB, shard => Shard}), + {noreply, St}; + St = #st{bootstrapped = false} -> + {noreply, St, {continue, bootstrap}}; + {retry, Timeout, St} -> + _TRef = erlang:start_timer(Timeout, self(), bootstrap), + {noreply, St} + end. handle_call(_Call, _From, State) -> {reply, ignored, State}. @@ -335,7 +382,14 @@ handle_call(_Call, _From, State) -> handle_cast(_Msg, State) -> {noreply, State}. -terminate(_Reason, {DB, Shard}) -> +handle_info({timeout, _TRef, bootstrap}, St) -> + {noreply, St, {continue, bootstrap}}; +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, #st{db = DB, shard = Shard}) -> + %% NOTE: Mark as not ready right away. + ok = erase_shard_info(DB, Shard), %% NOTE: Timeouts are ignored, it's a best effort attempt. catch prep_stop_server(DB, Shard), LocalServer = get_local_server(DB, Shard), @@ -343,6 +397,40 @@ terminate(_Reason, {DB, Shard}) -> %% +bootstrap(St = #st{stage = trigger_election, server = Server}) -> + ok = trigger_election(Server), + St#st{stage = wait_leader}; +bootstrap(St = #st{stage = wait_leader, server = Server}) -> + case current_leader(Server) of + Leader = {_, _} -> + St#st{stage = {wait_log, Leader}}; + unknown -> + St + end; +bootstrap(St = #st{stage = {wait_log, Leader}}) -> + case ra_overview(Leader) of + #{commit_index := RaftIdx} -> + St#st{stage = {wait_log_index, RaftIdx}}; + #{} -> + St#st{stage = wait_leader} + end; +bootstrap(St = #st{stage = {wait_log_index, RaftIdx}, db = DB, shard = Shard, server = Server}) -> + Overview = ra_overview(Server), + case maps:get(last_applied, Overview, 0) of + LastApplied when LastApplied >= RaftIdx -> + ok = announce_shard_ready(DB, Shard), + St#st{bootstrapped = true, stage = undefined}; + LastApplied -> + %% NOTE + %% Blunt estimate of time shard needs to catch up. If this proves to be too long in + %% practice, it's could be augmented with handling `recover` -> `follower` Ra + %% member state transition. + Timeout = min(RaftIdx - LastApplied, ?MAX_BOOSTRAP_RETRY_TIMEOUT), + {retry, Timeout, St} + end. + +%% + start_server(DB, Shard, #{replication_options := ReplicationOpts}) -> ClusterName = cluster_name(DB, Shard), LocalServer = local_server(DB, Shard), @@ -350,7 +438,6 @@ start_server(DB, Shard, #{replication_options := ReplicationOpts}) -> MutableConfig = #{tick_timeout => 100}, case ra:restart_server(DB, LocalServer, MutableConfig) of {error, name_not_registered} -> - Bootstrap = true, Machine = {module, emqx_ds_replication_layer, #{db => DB, shard => Shard}}, LogOpts = maps:with( [ @@ -366,30 +453,34 @@ start_server(DB, Shard, #{replication_options := ReplicationOpts}) -> initial_members => Servers, machine => Machine, log_init_args => LogOpts - }); + }), + {_NewServer = true, LocalServer}; ok -> - Bootstrap = false; + {_NewServer = false, LocalServer}; {error, {already_started, _}} -> - Bootstrap = false - end, + {_NewServer = false, LocalServer} + end. + +trigger_election(Server) -> %% NOTE %% Triggering election is necessary when a new consensus group is being brought up. %% TODO %% It's probably a good idea to rebalance leaders across the cluster from time to %% time. There's `ra:transfer_leadership/2` for that. - try Bootstrap andalso ra:trigger_election(LocalServer, _Timeout = 1_000) of - false -> - ok; - ok -> - ok + try ra:trigger_election(Server) of + ok -> ok catch - %% TODO + %% NOTE %% Tolerating exceptions because server might be occupied with log replay for %% a while. - exit:{timeout, _} when not Bootstrap -> + exit:{timeout, _} -> + ?tp(emqx_ds_replshard_trigger_election, #{server => Server, error => timeout}), ok end. +announce_shard_ready(DB, Shard) -> + set_shard_info(DB, Shard, ready, true). + server_uid(_DB, Shard) -> %% NOTE %% Each new "instance" of a server should have a unique identifier. Otherwise, @@ -402,6 +493,22 @@ server_uid(_DB, Shard) -> %% +get_shard_info(DB, Shard, K, Default) -> + persistent_term:get(?PTERM(DB, Shard, K), Default). + +set_shard_info(DB, Shard, K, V) -> + persistent_term:put(?PTERM(DB, Shard, K), V). + +erase_shard_info(DB, Shard) -> + lists:foreach(fun(K) -> erase_shard_info(DB, Shard, K) end, [ + ready + ]). + +erase_shard_info(DB, Shard, K) -> + persistent_term:erase(?PTERM(DB, Shard, K)). + +%% + prep_stop_server(DB, Shard) -> prep_stop_server(DB, Shard, 5_000). diff --git a/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl b/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl index 96edd1043..66d94225e 100644 --- a/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl @@ -131,7 +131,6 @@ t_replication_transfers_snapshots(Config) -> %% Initialize DB on all nodes and wait for it to be online. Opts = opts(Config, #{n_shards => 1, n_sites => 3}), assert_db_open(Nodes, ?DB, Opts), - assert_db_stable(Nodes, ?DB), %% Stop the DB on the "offline" node. ?wait_async_action( @@ -207,7 +206,6 @@ t_rebalance(Config) -> %% 1. Initialize DB on the first node. Opts = opts(Config, #{n_shards => 16, n_sites => 1, replication_factor => 3}), assert_db_open(Nodes, ?DB, Opts), - assert_db_stable(Nodes, ?DB), %% 1.1 Kick all sites except S1 from the replica set as %% the initial condition: @@ -419,7 +417,6 @@ t_rebalance_chaotic_converges(Config) -> %% Open DB: assert_db_open(Nodes, ?DB, Opts), - assert_db_stable(Nodes, ?DB), %% Kick N3 from the replica set as the initial condition: ?assertMatch( @@ -503,7 +500,6 @@ t_rebalance_offline_restarts(Config) -> %% Initialize DB on all 3 nodes. Opts = opts(Config, #{n_shards => 8, n_sites => 3, replication_factor => 3}), assert_db_open(Nodes, ?DB, Opts), - assert_db_stable(Nodes, ?DB), ?retry( 1000, @@ -845,13 +841,11 @@ t_crash_restart_recover(Config) -> ?check_trace( begin %% Initialize DB on all nodes. - ?assertEqual( - [{ok, ok} || _ <- Nodes], - erpc:multicall(Nodes, emqx_ds, open_db, [?DB, DBOpts]) - ), + assert_db_open(Nodes, ?DB, DBOpts), %% Apply the test events, including simulated node crashes. NodeStream = emqx_utils_stream:const(N1), + StartedAt = erlang:monotonic_time(millisecond), emqx_ds_test_helpers:apply_stream(?DB, NodeStream, Stream, 0), %% It's expected to lose few messages when leaders are abruptly killed. @@ -865,6 +859,10 @@ t_crash_restart_recover(Config) -> ct:pal("Some messages were lost: ~p", [LostMessages]), ?assert(length(LostMessages) < NMsgs div 20), + %% Wait until crashed nodes are ready. + SinceStarted = erlang:monotonic_time(millisecond) - StartedAt, + wait_db_bootstrapped([N2, N3], ?DB, infinity, SinceStarted), + %% Verify that all the successfully persisted messages are there. VerifyClient = fun({ClientId, ExpectedStream}) -> Topic = emqx_ds_test_helpers:client_topic(?FUNCTION_NAME, ClientId), @@ -926,7 +924,8 @@ assert_db_open(Nodes, DB, Opts) -> ?assertEqual( [{ok, ok} || _ <- Nodes], erpc:multicall(Nodes, emqx_ds, open_db, [DB, Opts]) - ). + ), + wait_db_bootstrapped(Nodes, ?DB). assert_db_stable([Node | _], DB) -> Shards = ds_repl_meta(Node, shards, [DB]), @@ -935,6 +934,32 @@ assert_db_stable([Node | _], DB) -> db_leadership(Node, DB, Shards) ). +wait_db_bootstrapped(Nodes, DB) -> + wait_db_bootstrapped(Nodes, DB, infinity, infinity). + +wait_db_bootstrapped(Nodes, DB, Timeout, BackInTime) -> + SRefs = [ + snabbkaffe:subscribe( + ?match_event(#{ + ?snk_kind := emqx_ds_replshard_bootstrapped, + ?snk_meta := #{node := Node}, + db := DB, + shard := Shard + }), + 1, + Timeout, + BackInTime + ) + || Node <- Nodes, + Shard <- ds_repl_meta(Node, my_shards, [DB]) + ], + lists:foreach( + fun({ok, SRef}) -> + ?assertMatch({ok, [_]}, snabbkaffe:receive_events(SRef)) + end, + SRefs + ). + %% db_leadership(Node, DB, Shards) ->