From 10dadbad3b00ee82f9389b8fde21a36a9376cd84 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 1 Aug 2024 19:02:22 +0200 Subject: [PATCH 1/7] fix(dsraft): add more involved shard bootstrapping Namely, attempt to make sure log is sufficiently replayed on the shard server, before announcing it is "ready". --- .../src/emqx_ds_replication_layer_shard.erl | 100 +++++++++++++++--- 1 file changed, 85 insertions(+), 15 deletions(-) diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl index 12d621102..36dc654b4 100644 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl +++ b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl @@ -18,7 +18,8 @@ %% Dynamic server location API -export([ - servers/3 + servers/3, + shard_info/3 ]). %% Safe Process Command API @@ -38,6 +39,7 @@ -behaviour(gen_server). -export([ init/1, + handle_continue/2, handle_call/3, handle_cast/2, terminate/2 @@ -160,6 +162,12 @@ local_site() -> %% +-spec shard_info(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), _Info) -> _Value. +shard_info(DB, Shard, ready) -> + persistent_term:get({?MODULE, DB, Shard}, false). + +%% + -spec process_command([server()], _Command, timeout()) -> {ok, _Result, _Leader :: server()} | server_error(). process_command(Servers, Command, Timeout) -> @@ -324,10 +332,38 @@ ra_overview(Server) -> %% +-record(st, { + db :: emqx_ds:db(), + shard :: emqx_ds_replication_layer:shard_id(), + server :: server(), + bootstrapped :: boolean(), + stage :: term() +}). + init({DB, Shard, Opts}) -> _ = process_flag(trap_exit, true), - ok = start_server(DB, Shard, Opts), - {ok, {DB, Shard}}. + case start_server(DB, Shard, Opts) of + {_New = true, Server} -> + NextStage = trigger_election; + {_New = false, Server} -> + NextStage = wait_leader + end, + St = #st{ + db = DB, + shard = Shard, + server = Server, + bootstrapped = false, + stage = NextStage + }, + {ok, St, {continue, bootstrap}}. + +handle_continue(bootstrap, St0) -> + case bootstrap(St0) of + St = #st{bootstrapped = true} -> + {noreply, St}; + St = #st{bootstrapped = false} -> + {noreply, St, {continue, bootstrap}} + end. handle_call(_Call, _From, State) -> {reply, ignored, State}. @@ -336,6 +372,8 @@ handle_cast(_Msg, State) -> {noreply, State}. terminate(_Reason, {DB, Shard}) -> + %% FIXME + persistent_term:erase({?MODULE, DB, Shard}), %% NOTE: Timeouts are ignored, it's a best effort attempt. catch prep_stop_server(DB, Shard), LocalServer = get_local_server(DB, Shard), @@ -343,6 +381,34 @@ terminate(_Reason, {DB, Shard}) -> %% +bootstrap(St = #st{stage = trigger_election, server = Server}) -> + ok = trigger_election(Server), + St#st{stage = wait_leader}; +bootstrap(St = #st{stage = wait_leader, server = Server}) -> + case current_leader(Server) of + Leader = {_, _} -> + St#st{stage = {wait_log, Leader}}; + unknown -> + St + end; +bootstrap(St = #st{stage = {wait_log, Leader}}) -> + case ra_overview(Leader) of + #{commit_index := RaftIdx} -> + St#st{stage = {wait_log_index, RaftIdx}}; + #{} -> + St#st{stage = wait_leader} + end; +bootstrap(St = #st{stage = {wait_log_index, RaftIdx}, db = DB, shard = Shard, server = Server}) -> + case ra_overview(Server) of + #{commit_index := RaftIdx} -> + ok = announce_shard_ready(DB, Shard), + St#st{bootstrapped = true, stage = undefined}; + #{} -> + St + end. + +%% + start_server(DB, Shard, #{replication_options := ReplicationOpts}) -> ClusterName = cluster_name(DB, Shard), LocalServer = local_server(DB, Shard), @@ -350,7 +416,6 @@ start_server(DB, Shard, #{replication_options := ReplicationOpts}) -> MutableConfig = #{tick_timeout => 100}, case ra:restart_server(DB, LocalServer, MutableConfig) of {error, name_not_registered} -> - Bootstrap = true, Machine = {module, emqx_ds_replication_layer, #{db => DB, shard => Shard}}, LogOpts = maps:with( [ @@ -366,30 +431,35 @@ start_server(DB, Shard, #{replication_options := ReplicationOpts}) -> initial_members => Servers, machine => Machine, log_init_args => LogOpts - }); + }), + {_NewServer = true, LocalServer}; ok -> - Bootstrap = false; + {_NewServer = false, LocalServer}; {error, {already_started, _}} -> - Bootstrap = false - end, + {_NewServer = false, LocalServer} + end. + +trigger_election(Server) -> %% NOTE %% Triggering election is necessary when a new consensus group is being brought up. %% TODO %% It's probably a good idea to rebalance leaders across the cluster from time to %% time. There's `ra:transfer_leadership/2` for that. - try Bootstrap andalso ra:trigger_election(LocalServer, _Timeout = 1_000) of - false -> - ok; - ok -> - ok + try ra:trigger_election(Server) of + ok -> ok catch - %% TODO + %% NOTE %% Tolerating exceptions because server might be occupied with log replay for %% a while. - exit:{timeout, _} when not Bootstrap -> + exit:{timeout, _} -> + ?tp(emqx_ds_replshard_trigger_election, #{server => Server, error => timeout}), ok end. +announce_shard_ready(DB, Shard) -> + %% FIXME + persistent_term:put({?MODULE, DB, Shard}, true). + server_uid(_DB, Shard) -> %% NOTE %% Each new "instance" of a server should have a unique identifier. Otherwise, From 5b158868366b2894b955ada9afc6da09f56bc59e Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 1 Aug 2024 19:04:33 +0200 Subject: [PATCH 2/7] fix(dsraft): use shard readiness as criterion for reads availability --- .../src/emqx_ds_replication_layer.erl | 28 +++++++++++-------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl index 11c809dbd..42b4b87b9 100644 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl @@ -479,10 +479,10 @@ shards_of_batch(_DB, [], Acc) -> %% TODO %% There's a possibility of race condition: storage may shut down right after we %% ask for its status. --define(IF_STORAGE_RUNNING(SHARDID, EXPR), - case emqx_ds_storage_layer:shard_info(SHARDID, status) of - running -> EXPR; - down -> {error, recoverable, storage_down} +-define(IF_SHARD_READY(DB, SHARD, EXPR), + case emqx_ds_replication_layer_shard:shard_info(DB, SHARD, ready) of + true -> EXPR; + false -> {error, recoverable, shard_unavailable} end ). @@ -525,8 +525,9 @@ do_get_streams_v1(_DB, _Shard, _TopicFilter, _StartTime) -> [{integer(), emqx_ds_storage_layer:stream()}] | emqx_ds:error(storage_down). do_get_streams_v2(DB, Shard, TopicFilter, StartTime) -> ShardId = {DB, Shard}, - ?IF_STORAGE_RUNNING( - ShardId, + ?IF_SHARD_READY( + DB, + Shard, emqx_ds_storage_layer:get_streams(ShardId, TopicFilter, StartTime) ). @@ -552,8 +553,9 @@ do_make_iterator_v1(_DB, _Shard, _Stream, _TopicFilter, _StartTime) -> emqx_ds:make_iterator_result(emqx_ds_storage_layer:iterator()). do_make_iterator_v2(DB, Shard, Stream, TopicFilter, StartTime) -> ShardId = {DB, Shard}, - ?IF_STORAGE_RUNNING( - ShardId, + ?IF_SHARD_READY( + DB, + Shard, emqx_ds_storage_layer:make_iterator(ShardId, Stream, TopicFilter, StartTime) ). @@ -587,8 +589,9 @@ do_update_iterator_v2(DB, Shard, OldIter, DSKey) -> emqx_ds:next_result(emqx_ds_storage_layer:iterator()). do_next_v1(DB, Shard, Iter, BatchSize) -> ShardId = {DB, Shard}, - ?IF_STORAGE_RUNNING( - ShardId, + ?IF_SHARD_READY( + DB, + Shard, emqx_ds_storage_layer:next( ShardId, Iter, BatchSize, emqx_ds_replication_layer:current_timestamp(DB, Shard) ) @@ -620,8 +623,9 @@ do_add_generation_v2(_DB) -> | emqx_ds:error(storage_down). do_list_generations_with_lifetimes_v3(DB, Shard) -> ShardId = {DB, Shard}, - ?IF_STORAGE_RUNNING( - ShardId, + ?IF_SHARD_READY( + DB, + Shard, emqx_ds_storage_layer:list_generations_with_lifetimes(ShardId) ). From 4971fd3eafbcd9956f81c97af8f23a83d2dcfb91 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 2 Aug 2024 09:52:55 +0200 Subject: [PATCH 3/7] chore(dsraft): make shard info pterms saner and more visible --- .../src/emqx_ds_replication_layer_shard.erl | 27 +++++++++++++++---- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl index 36dc654b4..bb0a9c99d 100644 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl +++ b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl @@ -55,6 +55,8 @@ -define(MEMBERSHIP_CHANGE_TIMEOUT, 30_000). +-define(PTERM(DB, SHARD, KEY), {?MODULE, DB, SHARD, KEY}). + %% start_link(DB, Shard, Opts) -> @@ -164,7 +166,7 @@ local_site() -> -spec shard_info(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), _Info) -> _Value. shard_info(DB, Shard, ready) -> - persistent_term:get({?MODULE, DB, Shard}, false). + get_shard_info(DB, Shard, ready, false). %% @@ -372,8 +374,8 @@ handle_cast(_Msg, State) -> {noreply, State}. terminate(_Reason, {DB, Shard}) -> - %% FIXME - persistent_term:erase({?MODULE, DB, Shard}), + %% NOTE: Mark as not ready right away. + ok = erase_shard_info(DB, Shard), %% NOTE: Timeouts are ignored, it's a best effort attempt. catch prep_stop_server(DB, Shard), LocalServer = get_local_server(DB, Shard), @@ -457,8 +459,7 @@ trigger_election(Server) -> end. announce_shard_ready(DB, Shard) -> - %% FIXME - persistent_term:put({?MODULE, DB, Shard}, true). + set_shard_info(DB, Shard, ready, true). server_uid(_DB, Shard) -> %% NOTE @@ -472,6 +473,22 @@ server_uid(_DB, Shard) -> %% +get_shard_info(DB, Shard, K, Default) -> + persistent_term:get(?PTERM(DB, Shard, K), Default). + +set_shard_info(DB, Shard, K, V) -> + persistent_term:put(?PTERM(DB, Shard, K), V). + +erase_shard_info(DB, Shard) -> + lists:foreach(fun(K) -> erase_shard_info(DB, Shard, K) end, [ + ready + ]). + +erase_shard_info(DB, Shard, K) -> + persistent_term:erase(?PTERM(DB, Shard, K)). + +%% + prep_stop_server(DB, Shard) -> prep_stop_server(DB, Shard, 5_000). From 26ddc403c8dc61a96caa01b9ce2bdc09ef192ebc Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 5 Aug 2024 16:32:19 +0200 Subject: [PATCH 4/7] fix(dsraft): avoid tight loop in shard bootstrap --- .../src/emqx_ds_replication_layer_shard.erl | 26 +++++++++++++++---- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl index bb0a9c99d..37193eaf4 100644 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl +++ b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl @@ -42,6 +42,7 @@ handle_continue/2, handle_call/3, handle_cast/2, + handle_info/2, terminate/2 ]). @@ -54,6 +55,7 @@ | {error, servers_unreachable}. -define(MEMBERSHIP_CHANGE_TIMEOUT, 30_000). +-define(MAX_BOOSTRAP_RETRY_TIMEOUT, 1_000). -define(PTERM(DB, SHARD, KEY), {?MODULE, DB, SHARD, KEY}). @@ -359,12 +361,17 @@ init({DB, Shard, Opts}) -> }, {ok, St, {continue, bootstrap}}. +handle_continue(bootstrap, St = #st{bootstrapped = true}) -> + {noreply, St}; handle_continue(bootstrap, St0) -> case bootstrap(St0) of St = #st{bootstrapped = true} -> {noreply, St}; St = #st{bootstrapped = false} -> - {noreply, St, {continue, bootstrap}} + {noreply, St, {continue, bootstrap}}; + {retry, Timeout, St} -> + _TRef = erlang:start_timer(Timeout, self(), bootstrap), + {noreply, St} end. handle_call(_Call, _From, State) -> @@ -373,6 +380,9 @@ handle_call(_Call, _From, State) -> handle_cast(_Msg, State) -> {noreply, State}. +handle_info({timeout, _TRef, bootstrap}, St) -> + {noreply, St, {continue, bootstrap}}. + terminate(_Reason, {DB, Shard}) -> %% NOTE: Mark as not ready right away. ok = erase_shard_info(DB, Shard), @@ -401,12 +411,18 @@ bootstrap(St = #st{stage = {wait_log, Leader}}) -> St#st{stage = wait_leader} end; bootstrap(St = #st{stage = {wait_log_index, RaftIdx}, db = DB, shard = Shard, server = Server}) -> - case ra_overview(Server) of - #{commit_index := RaftIdx} -> + Overview = ra_overview(Server), + case maps:get(last_applied, Overview, 0) of + LastApplied when LastApplied >= RaftIdx -> ok = announce_shard_ready(DB, Shard), St#st{bootstrapped = true, stage = undefined}; - #{} -> - St + LastApplied -> + %% NOTE + %% Blunt estimate of time shard needs to catch up. If this proves to be too long in + %% practice, it's could be augmented with handling `recover` -> `follower` Ra + %% member state transition. + Timeout = min(RaftIdx - LastApplied, ?MAX_BOOSTRAP_RETRY_TIMEOUT), + {retry, Timeout, St} end. %% From 42e4a635e0b5e6595901c052403168914a13a745 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 5 Aug 2024 20:14:16 +0200 Subject: [PATCH 5/7] chore(dsraft): sprinkle shard bootstrap process with tracepoints --- .../src/emqx_ds_replication_layer_shard.erl | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl index 37193eaf4..f67da6bab 100644 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl +++ b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl @@ -363,9 +363,11 @@ init({DB, Shard, Opts}) -> handle_continue(bootstrap, St = #st{bootstrapped = true}) -> {noreply, St}; -handle_continue(bootstrap, St0) -> +handle_continue(bootstrap, St0 = #st{db = DB, shard = Shard, stage = Stage}) -> + ?tp(emqx_ds_replshard_bootstrapping, #{db => DB, shard => Shard, stage => Stage}), case bootstrap(St0) of St = #st{bootstrapped = true} -> + ?tp(emqx_ds_replshard_bootstrapped, #{db => DB, shard => Shard}), {noreply, St}; St = #st{bootstrapped = false} -> {noreply, St, {continue, bootstrap}}; From ff72d55491574b9ce1f8415ac398d904751a0766 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 6 Aug 2024 19:57:57 +0200 Subject: [PATCH 6/7] fix(dsraft): replace unused clause with catch-all one Co-authored-by: Thales Macedo Garitezi --- .../src/emqx_ds_replication_layer_shard.erl | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl index f67da6bab..31496c7a8 100644 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl +++ b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl @@ -383,9 +383,11 @@ handle_cast(_Msg, State) -> {noreply, State}. handle_info({timeout, _TRef, bootstrap}, St) -> - {noreply, St, {continue, bootstrap}}. + {noreply, St, {continue, bootstrap}}; +handle_info(_Info, State) -> + {noreply, State}. -terminate(_Reason, {DB, Shard}) -> +terminate(_Reason, #st{db = DB, shard = Shard}) -> %% NOTE: Mark as not ready right away. ok = erase_shard_info(DB, Shard), %% NOTE: Timeouts are ignored, it's a best effort attempt. From 8d88d14f0acb29a8bd0961919d8fbba37def684a Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 7 Aug 2024 10:38:22 +0200 Subject: [PATCH 7/7] test(dsraft): use bootstrap as readiness criterion In another attempt to stabilize the rest of flaky testcases. --- .../test/emqx_ds_replication_SUITE.erl | 43 +++++++++++++++---- 1 file changed, 34 insertions(+), 9 deletions(-) diff --git a/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl b/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl index 96edd1043..66d94225e 100644 --- a/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl @@ -131,7 +131,6 @@ t_replication_transfers_snapshots(Config) -> %% Initialize DB on all nodes and wait for it to be online. Opts = opts(Config, #{n_shards => 1, n_sites => 3}), assert_db_open(Nodes, ?DB, Opts), - assert_db_stable(Nodes, ?DB), %% Stop the DB on the "offline" node. ?wait_async_action( @@ -207,7 +206,6 @@ t_rebalance(Config) -> %% 1. Initialize DB on the first node. Opts = opts(Config, #{n_shards => 16, n_sites => 1, replication_factor => 3}), assert_db_open(Nodes, ?DB, Opts), - assert_db_stable(Nodes, ?DB), %% 1.1 Kick all sites except S1 from the replica set as %% the initial condition: @@ -419,7 +417,6 @@ t_rebalance_chaotic_converges(Config) -> %% Open DB: assert_db_open(Nodes, ?DB, Opts), - assert_db_stable(Nodes, ?DB), %% Kick N3 from the replica set as the initial condition: ?assertMatch( @@ -503,7 +500,6 @@ t_rebalance_offline_restarts(Config) -> %% Initialize DB on all 3 nodes. Opts = opts(Config, #{n_shards => 8, n_sites => 3, replication_factor => 3}), assert_db_open(Nodes, ?DB, Opts), - assert_db_stable(Nodes, ?DB), ?retry( 1000, @@ -845,13 +841,11 @@ t_crash_restart_recover(Config) -> ?check_trace( begin %% Initialize DB on all nodes. - ?assertEqual( - [{ok, ok} || _ <- Nodes], - erpc:multicall(Nodes, emqx_ds, open_db, [?DB, DBOpts]) - ), + assert_db_open(Nodes, ?DB, DBOpts), %% Apply the test events, including simulated node crashes. NodeStream = emqx_utils_stream:const(N1), + StartedAt = erlang:monotonic_time(millisecond), emqx_ds_test_helpers:apply_stream(?DB, NodeStream, Stream, 0), %% It's expected to lose few messages when leaders are abruptly killed. @@ -865,6 +859,10 @@ t_crash_restart_recover(Config) -> ct:pal("Some messages were lost: ~p", [LostMessages]), ?assert(length(LostMessages) < NMsgs div 20), + %% Wait until crashed nodes are ready. + SinceStarted = erlang:monotonic_time(millisecond) - StartedAt, + wait_db_bootstrapped([N2, N3], ?DB, infinity, SinceStarted), + %% Verify that all the successfully persisted messages are there. VerifyClient = fun({ClientId, ExpectedStream}) -> Topic = emqx_ds_test_helpers:client_topic(?FUNCTION_NAME, ClientId), @@ -926,7 +924,8 @@ assert_db_open(Nodes, DB, Opts) -> ?assertEqual( [{ok, ok} || _ <- Nodes], erpc:multicall(Nodes, emqx_ds, open_db, [DB, Opts]) - ). + ), + wait_db_bootstrapped(Nodes, ?DB). assert_db_stable([Node | _], DB) -> Shards = ds_repl_meta(Node, shards, [DB]), @@ -935,6 +934,32 @@ assert_db_stable([Node | _], DB) -> db_leadership(Node, DB, Shards) ). +wait_db_bootstrapped(Nodes, DB) -> + wait_db_bootstrapped(Nodes, DB, infinity, infinity). + +wait_db_bootstrapped(Nodes, DB, Timeout, BackInTime) -> + SRefs = [ + snabbkaffe:subscribe( + ?match_event(#{ + ?snk_kind := emqx_ds_replshard_bootstrapped, + ?snk_meta := #{node := Node}, + db := DB, + shard := Shard + }), + 1, + Timeout, + BackInTime + ) + || Node <- Nodes, + Shard <- ds_repl_meta(Node, my_shards, [DB]) + ], + lists:foreach( + fun({ok, SRef}) -> + ?assertMatch({ok, [_]}, snabbkaffe:receive_events(SRef)) + end, + SRefs + ). + %% db_leadership(Node, DB, Shards) ->