From 8db70b5bbccf4f03060e2a7feaffc752d7574f0c Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 14 Jun 2024 16:43:33 +0200 Subject: [PATCH] test(dsrepl): add crash-restart-recover testcase That verifies nothing is lost in the event of abrupt node failures. --- .../test/emqx_ds_replication_SUITE.erl | 71 +++++++++++++++++++ .../test/emqx_ds_test_helpers.erl | 6 +- 2 files changed, 75 insertions(+), 2 deletions(-) diff --git a/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl b/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl index 2276dfb03..a6ca86f7e 100644 --- a/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl @@ -794,6 +794,77 @@ t_store_batch_fail(_Config) -> ] ). +t_crash_restart_recover(init, Config) -> + Apps = [appspec(emqx_durable_storage), emqx_ds_builtin_raft], + Specs = emqx_cth_cluster:mk_nodespecs( + [ + {t_crash_stop_recover1, #{apps => Apps}}, + {t_crash_stop_recover2, #{apps => Apps}}, + {t_crash_stop_recover3, #{apps => Apps}} + ], + #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)} + ), + Nodes = emqx_cth_cluster:start(Specs), + [{nodes, Nodes}, {nodespecs, Specs} | Config]; +t_crash_restart_recover('end', Config) -> + ok = emqx_cth_cluster:stop(?config(nodes, Config)). + +t_crash_restart_recover(Config) -> + %% This testcase verifies that in the event of abrupt site failure message data is + %% correctly preserved. + Nodes = [N1, N2, N3] = ?config(nodes, Config), + _Specs = [_, NS2, NS3] = ?config(nodespecs, Config), + DBOpts = opts(#{n_shards => 16, n_sites => 3, replication_factor => 3}), + + %% Prepare test event stream. + {Stream0, TopicStreams} = emqx_ds_test_helpers:interleaved_topic_messages( + ?FUNCTION_NAME, _NClients = 8, _NMsgs = 400 + ), + Stream1 = emqx_utils_stream:interleave( + [ + {300, Stream0}, + emqx_utils_stream:const(add_generation) + ], + false + ), + Stream = emqx_utils_stream:interleave( + [ + {1000, Stream1}, + emqx_utils_stream:list([ + fun() -> kill_restart_node_async(N2, NS2, DBOpts) end, + fun() -> kill_restart_node_async(N3, NS3, DBOpts) end + ]) + ], + true + ), + + ?check_trace( + begin + %% Initialize DB on all nodes. + ?assertEqual( + [{ok, ok} || _ <- Nodes], + erpc:multicall(Nodes, emqx_ds, open_db, [?DB, DBOpts]) + ), + + %% Apply the test events, including simulated node crashes. + NodeStream = emqx_utils_stream:const(N1), + emqx_ds_test_helpers:apply_stream(?DB, NodeStream, Stream, 0), + timer:sleep(5000), + + %% Verify that all the data is there. + emqx_ds_test_helpers:verify_stream_effects(?DB, ?FUNCTION_NAME, Nodes, TopicStreams) + end, + [] + ). + +kill_restart_node_async(Node, Spec, DBOpts) -> + erlang:spawn_link(?MODULE, kill_restart_node, [Node, Spec, DBOpts]). + +kill_restart_node(Node, Spec, DBOpts) -> + ok = emqx_cth_peer:kill(Node), + _ = emqx_cth_cluster:restart(Spec), + ok = erpc:call(Node, emqx_ds, open_db, [?DB, DBOpts]). + %% shard_server_info(Node, DB, Shard, Site, Info) -> diff --git a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl index c8162e42a..0b6b634c5 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl @@ -188,12 +188,14 @@ apply_stream(DB, NodeStream0, Stream0, N) -> ?ON(Node, emqx_ds:store_batch(DB, [Msg], #{sync => true})), apply_stream(DB, NodeStream, Stream, N + 1); [add_generation | Stream] -> - %% FIXME: + ?tp(notice, test_add_generation, #{}), [Node | NodeStream] = emqx_utils_stream:next(NodeStream0), ?ON(Node, emqx_ds:add_generation(DB)), apply_stream(DB, NodeStream, Stream, N); [{Node, Operation, Arg} | Stream] when - Operation =:= join_db_site; Operation =:= leave_db_site; Operation =:= assign_db_sites + Operation =:= join_db_site; + Operation =:= leave_db_site; + Operation =:= assign_db_sites -> ?tp(notice, test_apply_operation, #{node => Node, operation => Operation, arg => Arg}), %% Apply the transition.