test(dsrepl): add crash-restart-recover testcase

That verifies nothing is lost in the event of abrupt node failures.
This commit is contained in:
Andrew Mayorov 2024-06-14 16:43:33 +02:00
parent ae89b61af0
commit 8db70b5bbc
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
2 changed files with 75 additions and 2 deletions

View File

@ -794,6 +794,77 @@ t_store_batch_fail(_Config) ->
]
).
t_crash_restart_recover(init, Config) ->
Apps = [appspec(emqx_durable_storage), emqx_ds_builtin_raft],
Specs = emqx_cth_cluster:mk_nodespecs(
[
{t_crash_stop_recover1, #{apps => Apps}},
{t_crash_stop_recover2, #{apps => Apps}},
{t_crash_stop_recover3, #{apps => Apps}}
],
#{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
),
Nodes = emqx_cth_cluster:start(Specs),
[{nodes, Nodes}, {nodespecs, Specs} | Config];
t_crash_restart_recover('end', Config) ->
ok = emqx_cth_cluster:stop(?config(nodes, Config)).
t_crash_restart_recover(Config) ->
%% This testcase verifies that in the event of abrupt site failure message data is
%% correctly preserved.
Nodes = [N1, N2, N3] = ?config(nodes, Config),
_Specs = [_, NS2, NS3] = ?config(nodespecs, Config),
DBOpts = opts(#{n_shards => 16, n_sites => 3, replication_factor => 3}),
%% Prepare test event stream.
{Stream0, TopicStreams} = emqx_ds_test_helpers:interleaved_topic_messages(
?FUNCTION_NAME, _NClients = 8, _NMsgs = 400
),
Stream1 = emqx_utils_stream:interleave(
[
{300, Stream0},
emqx_utils_stream:const(add_generation)
],
false
),
Stream = emqx_utils_stream:interleave(
[
{1000, Stream1},
emqx_utils_stream:list([
fun() -> kill_restart_node_async(N2, NS2, DBOpts) end,
fun() -> kill_restart_node_async(N3, NS3, DBOpts) end
])
],
true
),
?check_trace(
begin
%% Initialize DB on all nodes.
?assertEqual(
[{ok, ok} || _ <- Nodes],
erpc:multicall(Nodes, emqx_ds, open_db, [?DB, DBOpts])
),
%% Apply the test events, including simulated node crashes.
NodeStream = emqx_utils_stream:const(N1),
emqx_ds_test_helpers:apply_stream(?DB, NodeStream, Stream, 0),
timer:sleep(5000),
%% Verify that all the data is there.
emqx_ds_test_helpers:verify_stream_effects(?DB, ?FUNCTION_NAME, Nodes, TopicStreams)
end,
[]
).
kill_restart_node_async(Node, Spec, DBOpts) ->
erlang:spawn_link(?MODULE, kill_restart_node, [Node, Spec, DBOpts]).
kill_restart_node(Node, Spec, DBOpts) ->
ok = emqx_cth_peer:kill(Node),
_ = emqx_cth_cluster:restart(Spec),
ok = erpc:call(Node, emqx_ds, open_db, [?DB, DBOpts]).
%%
shard_server_info(Node, DB, Shard, Site, Info) ->

View File

@ -188,12 +188,14 @@ apply_stream(DB, NodeStream0, Stream0, N) ->
?ON(Node, emqx_ds:store_batch(DB, [Msg], #{sync => true})),
apply_stream(DB, NodeStream, Stream, N + 1);
[add_generation | Stream] ->
%% FIXME:
?tp(notice, test_add_generation, #{}),
[Node | NodeStream] = emqx_utils_stream:next(NodeStream0),
?ON(Node, emqx_ds:add_generation(DB)),
apply_stream(DB, NodeStream, Stream, N);
[{Node, Operation, Arg} | Stream] when
Operation =:= join_db_site; Operation =:= leave_db_site; Operation =:= assign_db_sites
Operation =:= join_db_site;
Operation =:= leave_db_site;
Operation =:= assign_db_sites
->
?tp(notice, test_apply_operation, #{node => Node, operation => Operation, arg => Arg}),
%% Apply the transition.