Merge pull request #13561 from keynslug/fix/raft/bootstrap-wait-log

fix(dsraft): use shard readiness as criterion for reads availability
This commit is contained in:
Andrew Mayorov 2024-08-08 10:29:48 +03:00 committed by GitHub
commit 6849801293
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 173 additions and 37 deletions

View File

@ -479,10 +479,10 @@ shards_of_batch(_DB, [], Acc) ->
%% TODO %% TODO
%% There's a possibility of race condition: storage may shut down right after we %% There's a possibility of race condition: storage may shut down right after we
%% ask for its status. %% ask for its status.
-define(IF_STORAGE_RUNNING(SHARDID, EXPR), -define(IF_SHARD_READY(DB, SHARD, EXPR),
case emqx_ds_storage_layer:shard_info(SHARDID, status) of case emqx_ds_replication_layer_shard:shard_info(DB, SHARD, ready) of
running -> EXPR; true -> EXPR;
down -> {error, recoverable, storage_down} false -> {error, recoverable, shard_unavailable}
end end
). ).
@ -525,8 +525,9 @@ do_get_streams_v1(_DB, _Shard, _TopicFilter, _StartTime) ->
[{integer(), emqx_ds_storage_layer:stream()}] | emqx_ds:error(storage_down). [{integer(), emqx_ds_storage_layer:stream()}] | emqx_ds:error(storage_down).
do_get_streams_v2(DB, Shard, TopicFilter, StartTime) -> do_get_streams_v2(DB, Shard, TopicFilter, StartTime) ->
ShardId = {DB, Shard}, ShardId = {DB, Shard},
?IF_STORAGE_RUNNING( ?IF_SHARD_READY(
ShardId, DB,
Shard,
emqx_ds_storage_layer:get_streams(ShardId, TopicFilter, StartTime) emqx_ds_storage_layer:get_streams(ShardId, TopicFilter, StartTime)
). ).
@ -552,8 +553,9 @@ do_make_iterator_v1(_DB, _Shard, _Stream, _TopicFilter, _StartTime) ->
emqx_ds:make_iterator_result(emqx_ds_storage_layer:iterator()). emqx_ds:make_iterator_result(emqx_ds_storage_layer:iterator()).
do_make_iterator_v2(DB, Shard, Stream, TopicFilter, StartTime) -> do_make_iterator_v2(DB, Shard, Stream, TopicFilter, StartTime) ->
ShardId = {DB, Shard}, ShardId = {DB, Shard},
?IF_STORAGE_RUNNING( ?IF_SHARD_READY(
ShardId, DB,
Shard,
emqx_ds_storage_layer:make_iterator(ShardId, Stream, TopicFilter, StartTime) emqx_ds_storage_layer:make_iterator(ShardId, Stream, TopicFilter, StartTime)
). ).
@ -587,8 +589,9 @@ do_update_iterator_v2(DB, Shard, OldIter, DSKey) ->
emqx_ds:next_result(emqx_ds_storage_layer:iterator()). emqx_ds:next_result(emqx_ds_storage_layer:iterator()).
do_next_v1(DB, Shard, Iter, BatchSize) -> do_next_v1(DB, Shard, Iter, BatchSize) ->
ShardId = {DB, Shard}, ShardId = {DB, Shard},
?IF_STORAGE_RUNNING( ?IF_SHARD_READY(
ShardId, DB,
Shard,
emqx_ds_storage_layer:next( emqx_ds_storage_layer:next(
ShardId, Iter, BatchSize, emqx_ds_replication_layer:current_timestamp(DB, Shard) ShardId, Iter, BatchSize, emqx_ds_replication_layer:current_timestamp(DB, Shard)
) )
@ -620,8 +623,9 @@ do_add_generation_v2(_DB) ->
| emqx_ds:error(storage_down). | emqx_ds:error(storage_down).
do_list_generations_with_lifetimes_v3(DB, Shard) -> do_list_generations_with_lifetimes_v3(DB, Shard) ->
ShardId = {DB, Shard}, ShardId = {DB, Shard},
?IF_STORAGE_RUNNING( ?IF_SHARD_READY(
ShardId, DB,
Shard,
emqx_ds_storage_layer:list_generations_with_lifetimes(ShardId) emqx_ds_storage_layer:list_generations_with_lifetimes(ShardId)
). ).

View File

@ -18,7 +18,8 @@
%% Dynamic server location API %% Dynamic server location API
-export([ -export([
servers/3 servers/3,
shard_info/3
]). ]).
%% Safe Process Command API %% Safe Process Command API
@ -38,8 +39,10 @@
-behaviour(gen_server). -behaviour(gen_server).
-export([ -export([
init/1, init/1,
handle_continue/2,
handle_call/3, handle_call/3,
handle_cast/2, handle_cast/2,
handle_info/2,
terminate/2 terminate/2
]). ]).
@ -52,6 +55,9 @@
| {error, servers_unreachable}. | {error, servers_unreachable}.
-define(MEMBERSHIP_CHANGE_TIMEOUT, 30_000). -define(MEMBERSHIP_CHANGE_TIMEOUT, 30_000).
-define(MAX_BOOSTRAP_RETRY_TIMEOUT, 1_000).
-define(PTERM(DB, SHARD, KEY), {?MODULE, DB, SHARD, KEY}).
%% %%
@ -160,6 +166,12 @@ local_site() ->
%% %%
-spec shard_info(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), _Info) -> _Value.
shard_info(DB, Shard, ready) ->
get_shard_info(DB, Shard, ready, false).
%%
-spec process_command([server()], _Command, timeout()) -> -spec process_command([server()], _Command, timeout()) ->
{ok, _Result, _Leader :: server()} | server_error(). {ok, _Result, _Leader :: server()} | server_error().
process_command(Servers, Command, Timeout) -> process_command(Servers, Command, Timeout) ->
@ -324,10 +336,45 @@ ra_overview(Server) ->
%% %%
-record(st, {
db :: emqx_ds:db(),
shard :: emqx_ds_replication_layer:shard_id(),
server :: server(),
bootstrapped :: boolean(),
stage :: term()
}).
init({DB, Shard, Opts}) -> init({DB, Shard, Opts}) ->
_ = process_flag(trap_exit, true), _ = process_flag(trap_exit, true),
ok = start_server(DB, Shard, Opts), case start_server(DB, Shard, Opts) of
{ok, {DB, Shard}}. {_New = true, Server} ->
NextStage = trigger_election;
{_New = false, Server} ->
NextStage = wait_leader
end,
St = #st{
db = DB,
shard = Shard,
server = Server,
bootstrapped = false,
stage = NextStage
},
{ok, St, {continue, bootstrap}}.
handle_continue(bootstrap, St = #st{bootstrapped = true}) ->
{noreply, St};
handle_continue(bootstrap, St0 = #st{db = DB, shard = Shard, stage = Stage}) ->
?tp(emqx_ds_replshard_bootstrapping, #{db => DB, shard => Shard, stage => Stage}),
case bootstrap(St0) of
St = #st{bootstrapped = true} ->
?tp(emqx_ds_replshard_bootstrapped, #{db => DB, shard => Shard}),
{noreply, St};
St = #st{bootstrapped = false} ->
{noreply, St, {continue, bootstrap}};
{retry, Timeout, St} ->
_TRef = erlang:start_timer(Timeout, self(), bootstrap),
{noreply, St}
end.
handle_call(_Call, _From, State) -> handle_call(_Call, _From, State) ->
{reply, ignored, State}. {reply, ignored, State}.
@ -335,7 +382,14 @@ handle_call(_Call, _From, State) ->
handle_cast(_Msg, State) -> handle_cast(_Msg, State) ->
{noreply, State}. {noreply, State}.
terminate(_Reason, {DB, Shard}) -> handle_info({timeout, _TRef, bootstrap}, St) ->
{noreply, St, {continue, bootstrap}};
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, #st{db = DB, shard = Shard}) ->
%% NOTE: Mark as not ready right away.
ok = erase_shard_info(DB, Shard),
%% NOTE: Timeouts are ignored, it's a best effort attempt. %% NOTE: Timeouts are ignored, it's a best effort attempt.
catch prep_stop_server(DB, Shard), catch prep_stop_server(DB, Shard),
LocalServer = get_local_server(DB, Shard), LocalServer = get_local_server(DB, Shard),
@ -343,6 +397,40 @@ terminate(_Reason, {DB, Shard}) ->
%% %%
bootstrap(St = #st{stage = trigger_election, server = Server}) ->
ok = trigger_election(Server),
St#st{stage = wait_leader};
bootstrap(St = #st{stage = wait_leader, server = Server}) ->
case current_leader(Server) of
Leader = {_, _} ->
St#st{stage = {wait_log, Leader}};
unknown ->
St
end;
bootstrap(St = #st{stage = {wait_log, Leader}}) ->
case ra_overview(Leader) of
#{commit_index := RaftIdx} ->
St#st{stage = {wait_log_index, RaftIdx}};
#{} ->
St#st{stage = wait_leader}
end;
bootstrap(St = #st{stage = {wait_log_index, RaftIdx}, db = DB, shard = Shard, server = Server}) ->
Overview = ra_overview(Server),
case maps:get(last_applied, Overview, 0) of
LastApplied when LastApplied >= RaftIdx ->
ok = announce_shard_ready(DB, Shard),
St#st{bootstrapped = true, stage = undefined};
LastApplied ->
%% NOTE
%% Blunt estimate of time shard needs to catch up. If this proves to be too long in
%% practice, it's could be augmented with handling `recover` -> `follower` Ra
%% member state transition.
Timeout = min(RaftIdx - LastApplied, ?MAX_BOOSTRAP_RETRY_TIMEOUT),
{retry, Timeout, St}
end.
%%
start_server(DB, Shard, #{replication_options := ReplicationOpts}) -> start_server(DB, Shard, #{replication_options := ReplicationOpts}) ->
ClusterName = cluster_name(DB, Shard), ClusterName = cluster_name(DB, Shard),
LocalServer = local_server(DB, Shard), LocalServer = local_server(DB, Shard),
@ -350,7 +438,6 @@ start_server(DB, Shard, #{replication_options := ReplicationOpts}) ->
MutableConfig = #{tick_timeout => 100}, MutableConfig = #{tick_timeout => 100},
case ra:restart_server(DB, LocalServer, MutableConfig) of case ra:restart_server(DB, LocalServer, MutableConfig) of
{error, name_not_registered} -> {error, name_not_registered} ->
Bootstrap = true,
Machine = {module, emqx_ds_replication_layer, #{db => DB, shard => Shard}}, Machine = {module, emqx_ds_replication_layer, #{db => DB, shard => Shard}},
LogOpts = maps:with( LogOpts = maps:with(
[ [
@ -366,30 +453,34 @@ start_server(DB, Shard, #{replication_options := ReplicationOpts}) ->
initial_members => Servers, initial_members => Servers,
machine => Machine, machine => Machine,
log_init_args => LogOpts log_init_args => LogOpts
}); }),
{_NewServer = true, LocalServer};
ok -> ok ->
Bootstrap = false; {_NewServer = false, LocalServer};
{error, {already_started, _}} -> {error, {already_started, _}} ->
Bootstrap = false {_NewServer = false, LocalServer}
end, end.
trigger_election(Server) ->
%% NOTE %% NOTE
%% Triggering election is necessary when a new consensus group is being brought up. %% Triggering election is necessary when a new consensus group is being brought up.
%% TODO %% TODO
%% It's probably a good idea to rebalance leaders across the cluster from time to %% It's probably a good idea to rebalance leaders across the cluster from time to
%% time. There's `ra:transfer_leadership/2` for that. %% time. There's `ra:transfer_leadership/2` for that.
try Bootstrap andalso ra:trigger_election(LocalServer, _Timeout = 1_000) of try ra:trigger_election(Server) of
false -> ok -> ok
ok;
ok ->
ok
catch catch
%% TODO %% NOTE
%% Tolerating exceptions because server might be occupied with log replay for %% Tolerating exceptions because server might be occupied with log replay for
%% a while. %% a while.
exit:{timeout, _} when not Bootstrap -> exit:{timeout, _} ->
?tp(emqx_ds_replshard_trigger_election, #{server => Server, error => timeout}),
ok ok
end. end.
announce_shard_ready(DB, Shard) ->
set_shard_info(DB, Shard, ready, true).
server_uid(_DB, Shard) -> server_uid(_DB, Shard) ->
%% NOTE %% NOTE
%% Each new "instance" of a server should have a unique identifier. Otherwise, %% Each new "instance" of a server should have a unique identifier. Otherwise,
@ -402,6 +493,22 @@ server_uid(_DB, Shard) ->
%% %%
get_shard_info(DB, Shard, K, Default) ->
persistent_term:get(?PTERM(DB, Shard, K), Default).
set_shard_info(DB, Shard, K, V) ->
persistent_term:put(?PTERM(DB, Shard, K), V).
erase_shard_info(DB, Shard) ->
lists:foreach(fun(K) -> erase_shard_info(DB, Shard, K) end, [
ready
]).
erase_shard_info(DB, Shard, K) ->
persistent_term:erase(?PTERM(DB, Shard, K)).
%%
prep_stop_server(DB, Shard) -> prep_stop_server(DB, Shard) ->
prep_stop_server(DB, Shard, 5_000). prep_stop_server(DB, Shard, 5_000).

View File

@ -131,7 +131,6 @@ t_replication_transfers_snapshots(Config) ->
%% Initialize DB on all nodes and wait for it to be online. %% Initialize DB on all nodes and wait for it to be online.
Opts = opts(Config, #{n_shards => 1, n_sites => 3}), Opts = opts(Config, #{n_shards => 1, n_sites => 3}),
assert_db_open(Nodes, ?DB, Opts), assert_db_open(Nodes, ?DB, Opts),
assert_db_stable(Nodes, ?DB),
%% Stop the DB on the "offline" node. %% Stop the DB on the "offline" node.
?wait_async_action( ?wait_async_action(
@ -207,7 +206,6 @@ t_rebalance(Config) ->
%% 1. Initialize DB on the first node. %% 1. Initialize DB on the first node.
Opts = opts(Config, #{n_shards => 16, n_sites => 1, replication_factor => 3}), Opts = opts(Config, #{n_shards => 16, n_sites => 1, replication_factor => 3}),
assert_db_open(Nodes, ?DB, Opts), assert_db_open(Nodes, ?DB, Opts),
assert_db_stable(Nodes, ?DB),
%% 1.1 Kick all sites except S1 from the replica set as %% 1.1 Kick all sites except S1 from the replica set as
%% the initial condition: %% the initial condition:
@ -419,7 +417,6 @@ t_rebalance_chaotic_converges(Config) ->
%% Open DB: %% Open DB:
assert_db_open(Nodes, ?DB, Opts), assert_db_open(Nodes, ?DB, Opts),
assert_db_stable(Nodes, ?DB),
%% Kick N3 from the replica set as the initial condition: %% Kick N3 from the replica set as the initial condition:
?assertMatch( ?assertMatch(
@ -503,7 +500,6 @@ t_rebalance_offline_restarts(Config) ->
%% Initialize DB on all 3 nodes. %% Initialize DB on all 3 nodes.
Opts = opts(Config, #{n_shards => 8, n_sites => 3, replication_factor => 3}), Opts = opts(Config, #{n_shards => 8, n_sites => 3, replication_factor => 3}),
assert_db_open(Nodes, ?DB, Opts), assert_db_open(Nodes, ?DB, Opts),
assert_db_stable(Nodes, ?DB),
?retry( ?retry(
1000, 1000,
@ -845,13 +841,11 @@ t_crash_restart_recover(Config) ->
?check_trace( ?check_trace(
begin begin
%% Initialize DB on all nodes. %% Initialize DB on all nodes.
?assertEqual( assert_db_open(Nodes, ?DB, DBOpts),
[{ok, ok} || _ <- Nodes],
erpc:multicall(Nodes, emqx_ds, open_db, [?DB, DBOpts])
),
%% Apply the test events, including simulated node crashes. %% Apply the test events, including simulated node crashes.
NodeStream = emqx_utils_stream:const(N1), NodeStream = emqx_utils_stream:const(N1),
StartedAt = erlang:monotonic_time(millisecond),
emqx_ds_test_helpers:apply_stream(?DB, NodeStream, Stream, 0), emqx_ds_test_helpers:apply_stream(?DB, NodeStream, Stream, 0),
%% It's expected to lose few messages when leaders are abruptly killed. %% It's expected to lose few messages when leaders are abruptly killed.
@ -865,6 +859,10 @@ t_crash_restart_recover(Config) ->
ct:pal("Some messages were lost: ~p", [LostMessages]), ct:pal("Some messages were lost: ~p", [LostMessages]),
?assert(length(LostMessages) < NMsgs div 20), ?assert(length(LostMessages) < NMsgs div 20),
%% Wait until crashed nodes are ready.
SinceStarted = erlang:monotonic_time(millisecond) - StartedAt,
wait_db_bootstrapped([N2, N3], ?DB, infinity, SinceStarted),
%% Verify that all the successfully persisted messages are there. %% Verify that all the successfully persisted messages are there.
VerifyClient = fun({ClientId, ExpectedStream}) -> VerifyClient = fun({ClientId, ExpectedStream}) ->
Topic = emqx_ds_test_helpers:client_topic(?FUNCTION_NAME, ClientId), Topic = emqx_ds_test_helpers:client_topic(?FUNCTION_NAME, ClientId),
@ -926,7 +924,8 @@ assert_db_open(Nodes, DB, Opts) ->
?assertEqual( ?assertEqual(
[{ok, ok} || _ <- Nodes], [{ok, ok} || _ <- Nodes],
erpc:multicall(Nodes, emqx_ds, open_db, [DB, Opts]) erpc:multicall(Nodes, emqx_ds, open_db, [DB, Opts])
). ),
wait_db_bootstrapped(Nodes, ?DB).
assert_db_stable([Node | _], DB) -> assert_db_stable([Node | _], DB) ->
Shards = ds_repl_meta(Node, shards, [DB]), Shards = ds_repl_meta(Node, shards, [DB]),
@ -935,6 +934,32 @@ assert_db_stable([Node | _], DB) ->
db_leadership(Node, DB, Shards) db_leadership(Node, DB, Shards)
). ).
wait_db_bootstrapped(Nodes, DB) ->
wait_db_bootstrapped(Nodes, DB, infinity, infinity).
wait_db_bootstrapped(Nodes, DB, Timeout, BackInTime) ->
SRefs = [
snabbkaffe:subscribe(
?match_event(#{
?snk_kind := emqx_ds_replshard_bootstrapped,
?snk_meta := #{node := Node},
db := DB,
shard := Shard
}),
1,
Timeout,
BackInTime
)
|| Node <- Nodes,
Shard <- ds_repl_meta(Node, my_shards, [DB])
],
lists:foreach(
fun({ok, SRef}) ->
?assertMatch({ok, [_]}, snabbkaffe:receive_events(SRef))
end,
SRefs
).
%% %%
db_leadership(Node, DB, Shards) -> db_leadership(Node, DB, Shards) ->