diff --git a/apps/emqx_ds_backends/test/emqx_ds_backends_SUITE.erl b/apps/emqx_ds_backends/test/emqx_ds_backends_SUITE.erl index ef99a0376..1fc1594fc 100644 --- a/apps/emqx_ds_backends/test/emqx_ds_backends_SUITE.erl +++ b/apps/emqx_ds_backends/test/emqx_ds_backends_SUITE.erl @@ -599,6 +599,9 @@ init_per_suite(Config) -> end_per_suite(_Config) -> ok. +suite() -> + [{timetrap, 50_000}]. + init_per_testcase(TC, Config) -> Apps = emqx_cth_suite:start( [emqx_durable_storage, emqx_ds_backends], diff --git a/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl b/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl index 068842709..92ceb2e16 100644 --- a/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl @@ -29,15 +29,12 @@ emqx_ds_test_helpers:on(NODES, fun() -> BODY end) ). -opts() -> - opts(#{}). - -opts(Overrides) -> +opts(Config, Overrides) -> + Layout = ?config(layout, Config), maps:merge( #{ backend => builtin_raft, - %% storage => {emqx_ds_storage_reference, #{}}, - storage => {emqx_ds_storage_bitfield_lts, #{epoch_bits => 10}}, + storage => Layout, n_shards => 16, n_sites => 1, replication_factor => 3, @@ -58,7 +55,7 @@ appspec(emqx_durable_storage) -> t_metadata(init, Config) -> Apps = emqx_cth_suite:start([emqx_ds_builtin_raft], #{ - work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config) + work_dir => ?config(work_dir, Config) }), [{apps, Apps} | Config]; t_metadata('end', Config) -> @@ -108,7 +105,7 @@ t_replication_transfers_snapshots(init, Config) -> {t_replication_transfers_snapshots2, #{apps => Apps}}, {t_replication_transfers_snapshots3, #{apps => Apps}} ], - #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)} + #{work_dir => ?config(work_dir, Config)} ), Nodes = emqx_cth_cluster:start(NodeSpecs), [{nodes, Nodes}, {specs, NodeSpecs} | Config]; @@ -125,9 +122,10 @@ t_replication_transfers_snapshots(Config) -> Nodes = [Node, NodeOffline | _] = ?config(nodes, Config), _Specs = [_, SpecOffline | _] = ?config(specs, Config), ?check_trace( + #{timetrap => 30_000}, begin %% Initialize DB on all nodes and wait for it to be online. - Opts = opts(#{n_shards => 1, n_sites => 3}), + Opts = opts(Config, #{n_shards => 1, n_sites => 3}), ?assertEqual( [{ok, ok} || _ <- Nodes], erpc:multicall(Nodes, emqx_ds, open_db, [?DB, Opts]) @@ -139,8 +137,11 @@ t_replication_transfers_snapshots(Config) -> ), %% Stop the DB on the "offline" node. - ok = emqx_cth_cluster:stop_node(NodeOffline), - _ = ?block_until(#{?snk_kind := ds_ra_state_enter, state := leader}, 500, 0), + ?wait_async_action( + ok = emqx_cth_cluster:stop_node(NodeOffline), + #{?snk_kind := ds_ra_state_enter, state := leader}, + 5_000 + ), %% Fill the storage with messages and few additional generations. emqx_ds_test_helpers:apply_stream(?DB, Nodes -- [NodeOffline], Stream), @@ -153,9 +154,10 @@ t_replication_transfers_snapshots(Config) -> ?snk_meta := #{node := NodeOffline} }) ), - ?assertEqual( - ok, - erpc:call(NodeOffline, emqx_ds, open_db, [?DB, opts()]) + + ok = ?ON( + NodeOffline, + emqx_ds:open_db(?DB, opts(Config, #{})) ), %% Trigger storage operation and wait the replica to be restored. @@ -183,7 +185,7 @@ t_rebalance(init, Config) -> {t_rebalance3, #{apps => Apps}}, {t_rebalance4, #{apps => Apps}} ], - #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)} + #{work_dir => ?config(work_dir, Config)} ), [{nodes, Nodes} | Config]; t_rebalance('end', Config) -> @@ -206,7 +208,7 @@ t_rebalance(Config) -> begin Sites = [S1, S2 | _] = [ds_repl_meta(N, this_site) || N <- Nodes], %% 1. Initialize DB on the first node. - Opts = opts(#{n_shards => 16, n_sites => 1, replication_factor => 3}), + Opts = opts(Config, #{n_shards => 16, n_sites => 1, replication_factor => 3}), [ ?assertEqual(ok, ?ON(Node, emqx_ds:open_db(?DB, Opts))) || Node <- Nodes @@ -316,7 +318,7 @@ t_join_leave_errors(init, Config) -> {t_join_leave_errors1, #{apps => Apps}}, {t_join_leave_errors2, #{apps => Apps}} ], - #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)} + #{work_dir => ?config(work_dir, Config)} ), [{nodes, Nodes} | Config]; t_join_leave_errors('end', Config) -> @@ -327,7 +329,7 @@ t_join_leave_errors(Config) -> %% join/leave operations are reported correctly. [N1, N2] = ?config(nodes, Config), - Opts = opts(#{n_shards => 16, n_sites => 1, replication_factor => 3}), + Opts = opts(Config, #{n_shards => 16, n_sites => 1, replication_factor => 3}), ?assertEqual(ok, erpc:call(N1, emqx_ds, open_db, [?FUNCTION_NAME, Opts])), ?assertEqual(ok, erpc:call(N2, emqx_ds, open_db, [?FUNCTION_NAME, Opts])), @@ -385,7 +387,7 @@ t_rebalance_chaotic_converges(init, Config) -> {t_rebalance_chaotic_converges2, #{apps => Apps}}, {t_rebalance_chaotic_converges3, #{apps => Apps}} ], - #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)} + #{work_dir => ?config(work_dir, Config)} ), [{nodes, Nodes} | Config]; t_rebalance_chaotic_converges('end', Config) -> @@ -411,7 +413,7 @@ t_rebalance_chaotic_converges(Config) -> ct:pal("Sites: ~p~n", [Sites]), %% Initialize DB on first two nodes. - Opts = opts(#{n_shards => 16, n_sites => 2, replication_factor => 3}), + Opts = opts(Config, #{n_shards => 16, n_sites => 2, replication_factor => 3}), %% Open DB: ?assertEqual( @@ -482,7 +484,7 @@ t_rebalance_offline_restarts(init, Config) -> {t_rebalance_offline_restarts2, #{apps => Apps}}, {t_rebalance_offline_restarts3, #{apps => Apps}} ], - #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)} + #{work_dir => ?config(work_dir, Config)} ), Nodes = emqx_cth_cluster:start(Specs), [{nodes, Nodes}, {nodespecs, Specs} | Config]; @@ -498,7 +500,7 @@ t_rebalance_offline_restarts(Config) -> _Specs = [NS1, NS2, _] = ?config(nodespecs, Config), %% Initialize DB on all 3 nodes. - Opts = opts(#{n_shards => 8, n_sites => 3, replication_factor => 3}), + Opts = opts(Config, #{n_shards => 8, n_sites => 3, replication_factor => 3}), ?assertEqual( [{ok, ok} || _ <- Nodes], erpc:multicall(Nodes, emqx_ds, open_db, [?DB, Opts]) @@ -544,7 +546,7 @@ t_drop_generation(Config) -> {t_drop_generation3, #{apps => Apps}} ], #{ - work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config) + work_dir => ?config(work_dir, Config) } ), @@ -552,7 +554,7 @@ t_drop_generation(Config) -> ?check_trace( try %% Initialize DB on all 3 nodes. - Opts = opts(#{n_shards => 1, n_sites => 3, replication_factor => 3}), + Opts = opts(Config, #{n_shards => 1, n_sites => 3, replication_factor => 3}), ?assertEqual( [{ok, ok} || _ <- Nodes], erpc:multicall(Nodes, emqx_ds, open_db, [?DB, Opts]) @@ -614,21 +616,21 @@ t_drop_generation(Config) -> t_error_mapping_replication_layer(init, Config) -> Apps = emqx_cth_suite:start([emqx_ds_builtin_raft], #{ - work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config) + work_dir => ?config(work_dir, Config) }), [{apps, Apps} | Config]; t_error_mapping_replication_layer('end', Config) -> emqx_cth_suite:stop(?config(apps, Config)), Config. -t_error_mapping_replication_layer(_Config) -> +t_error_mapping_replication_layer(Config) -> %% This checks that the replication layer maps recoverable errors correctly. ok = emqx_ds_test_helpers:mock_rpc(), ok = snabbkaffe:start_trace(), DB = ?FUNCTION_NAME, - ?assertMatch(ok, emqx_ds:open_db(DB, (opts())#{n_shards => 2})), + ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config, #{n_shards => 2}))), [Shard1, Shard2] = emqx_ds_replication_layer_meta:shards(DB), TopicFilter = emqx_topic:words(<<"foo/#">>), @@ -695,7 +697,7 @@ t_error_mapping_replication_layer(_Config) -> Results2 = lists:map( fun(Iter) -> case emqx_ds:next(DB, Iter, _BatchSize = 42) of - Ok = {ok, _Iter, [_ | _]} -> + Ok = {ok, _Iter, _} -> Ok; Error = {error, recoverable, {badrpc, _}} -> Error; @@ -716,20 +718,20 @@ t_error_mapping_replication_layer(_Config) -> %% problems. t_store_batch_fail(init, Config) -> Apps = emqx_cth_suite:start([emqx_ds_builtin_raft], #{ - work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config) + work_dir => ?config(work_dir, Config) }), [{apps, Apps} | Config]; t_store_batch_fail('end', Config) -> emqx_cth_suite:stop(?config(apps, Config)), Config. -t_store_batch_fail(_Config) -> +t_store_batch_fail(Config) -> ?check_trace( #{timetrap => 15_000}, try meck:new(emqx_ds_storage_layer, [passthrough, no_history]), DB = ?FUNCTION_NAME, - ?assertMatch(ok, emqx_ds:open_db(DB, (opts())#{n_shards => 2})), + ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config, #{n_shards => 2}))), %% Success: Batch1 = [ message(<<"C1">>, <<"foo/bar">>, <<"1">>, 1), @@ -768,7 +770,7 @@ t_store_batch_fail(_Config) -> ), meck:unload(ra), ?assertMatch(ok, emqx_ds:store_batch(DB, Batch3, #{sync => true})), - lists:sort(emqx_ds_test_helpers:consume_per_stream(DB, ['#'], 1)) + lists:sort(emqx_ds_test_helpers:consume_per_stream(DB, ['#'], 0)) after meck:unload() end, @@ -803,7 +805,7 @@ t_crash_restart_recover(init, Config) -> {t_crash_stop_recover2, #{apps => Apps}}, {t_crash_stop_recover3, #{apps => Apps}} ], - #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)} + #{work_dir => ?config(work_dir, Config)} ), Nodes = emqx_cth_cluster:start(Specs), [{nodes, Nodes}, {nodespecs, Specs} | Config]; @@ -815,7 +817,7 @@ t_crash_restart_recover(Config) -> %% correctly preserved. Nodes = [N1, N2, N3] = ?config(nodes, Config), _Specs = [_, NS2, NS3] = ?config(nodespecs, Config), - DBOpts = opts(#{n_shards => 16, n_sites => 3, replication_factor => 3}), + DBOpts = opts(Config, #{n_shards => 16, n_sites => 3, replication_factor => 3}), %% Prepare test event stream. NMsgs = 400, @@ -856,7 +858,10 @@ t_crash_restart_recover(Config) -> MatchFlushFailed = ?match_event(#{?snk_kind := emqx_ds_buffer_flush_failed}), {ok, SubRef} = snabbkaffe:subscribe(MatchFlushFailed, NMsgs, _Timeout = 5000, infinity), {timeout, Events} = snabbkaffe:receive_events(SubRef), - LostMessages = [M || #{batch := Messages} <- Events, M <- Messages], + LostMessages = [ + emqx_ds_test_helpers:message_canonical_form(M) + || #{batch := Messages} <- Events, M <- Messages + ], ct:pal("Some messages were lost: ~p", [LostMessages]), ?assert(length(LostMessages) < NMsgs div 20), @@ -876,8 +881,16 @@ t_crash_restart_recover(Config) -> %% Does any messages were lost unexpectedly? {_, DSMessages} = lists:unzip(emqx_utils_stream:consume(DSStream1)), ExpectedMessages = emqx_utils_stream:consume(ExpectedStream), - MissingMessages = ExpectedMessages -- DSMessages, - ?defer_assert(?assertEqual([], MissingMessages -- LostMessages, DSMessages)) + MissingMessages = emqx_ds_test_helpers:message_set_subtract( + ExpectedMessages, DSMessages + ), + ?defer_assert( + ?assertEqual( + [], + emqx_ds_test_helpers:sublist(MissingMessages -- LostMessages), + emqx_ds_test_helpers:sublist(DSMessages) + ) + ) end, lists:foreach(VerifyClient, TopicStreams) end, @@ -984,12 +997,35 @@ sample(N, List) -> suite() -> [{timetrap, {seconds, 60}}]. -all() -> emqx_common_test_helpers:all(?MODULE). +all() -> + [{group, Grp} || {Grp, _} <- groups()]. + +groups() -> + TCs = emqx_common_test_helpers:all(?MODULE), + [ + {bitfield_lts, TCs}, + {skipstream_lts, TCs} + ]. + +init_per_group(Group, Config) -> + LayoutConf = + case Group of + skipstream_lts -> + {emqx_ds_storage_skipstream_lts, #{with_guid => true}}; + bitfield_lts -> + {emqx_ds_storage_bitfield_lts, #{}} + end, + [{layout, LayoutConf} | Config]. + +end_per_group(_Group, Config) -> + Config. init_per_testcase(TCName, Config0) -> - Config = emqx_common_test_helpers:init_per_testcase(?MODULE, TCName, Config0), - Config. + Config1 = [{work_dir, emqx_cth_suite:work_dir(TCName, Config0)} | Config0], + emqx_common_test_helpers:init_per_testcase(?MODULE, TCName, Config1). end_per_testcase(TCName, Config) -> ok = snabbkaffe:stop(), - emqx_common_test_helpers:end_per_testcase(?MODULE, TCName, Config). + Result = emqx_common_test_helpers:end_per_testcase(?MODULE, TCName, Config), + emqx_cth_suite:clean_work_dir(?config(work_dir, Config)), + Result.