test(dsrepl): Generalize tests to use different storage layouts

This commit is contained in:
ieQu1 2024-07-01 01:05:10 +02:00
parent 8c5e4a2376
commit b68ebb9a73
No known key found for this signature in database
GPG Key ID: 488654DF3FED6FDE
2 changed files with 80 additions and 41 deletions

View File

@ -599,6 +599,9 @@ init_per_suite(Config) ->
end_per_suite(_Config) ->
ok.
suite() ->
[{timetrap, 50_000}].
init_per_testcase(TC, Config) ->
Apps = emqx_cth_suite:start(
[emqx_durable_storage, emqx_ds_backends],

View File

@ -29,15 +29,12 @@
emqx_ds_test_helpers:on(NODES, fun() -> BODY end)
).
opts() ->
opts(#{}).
opts(Overrides) ->
opts(Config, Overrides) ->
Layout = ?config(layout, Config),
maps:merge(
#{
backend => builtin_raft,
%% storage => {emqx_ds_storage_reference, #{}},
storage => {emqx_ds_storage_bitfield_lts, #{epoch_bits => 10}},
storage => Layout,
n_shards => 16,
n_sites => 1,
replication_factor => 3,
@ -58,7 +55,7 @@ appspec(emqx_durable_storage) ->
t_metadata(init, Config) ->
Apps = emqx_cth_suite:start([emqx_ds_builtin_raft], #{
work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)
work_dir => ?config(work_dir, Config)
}),
[{apps, Apps} | Config];
t_metadata('end', Config) ->
@ -108,7 +105,7 @@ t_replication_transfers_snapshots(init, Config) ->
{t_replication_transfers_snapshots2, #{apps => Apps}},
{t_replication_transfers_snapshots3, #{apps => Apps}}
],
#{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
#{work_dir => ?config(work_dir, Config)}
),
Nodes = emqx_cth_cluster:start(NodeSpecs),
[{nodes, Nodes}, {specs, NodeSpecs} | Config];
@ -125,9 +122,10 @@ t_replication_transfers_snapshots(Config) ->
Nodes = [Node, NodeOffline | _] = ?config(nodes, Config),
_Specs = [_, SpecOffline | _] = ?config(specs, Config),
?check_trace(
#{timetrap => 30_000},
begin
%% Initialize DB on all nodes and wait for it to be online.
Opts = opts(#{n_shards => 1, n_sites => 3}),
Opts = opts(Config, #{n_shards => 1, n_sites => 3}),
?assertEqual(
[{ok, ok} || _ <- Nodes],
erpc:multicall(Nodes, emqx_ds, open_db, [?DB, Opts])
@ -139,8 +137,11 @@ t_replication_transfers_snapshots(Config) ->
),
%% Stop the DB on the "offline" node.
ok = emqx_cth_cluster:stop_node(NodeOffline),
_ = ?block_until(#{?snk_kind := ds_ra_state_enter, state := leader}, 500, 0),
?wait_async_action(
ok = emqx_cth_cluster:stop_node(NodeOffline),
#{?snk_kind := ds_ra_state_enter, state := leader},
5_000
),
%% Fill the storage with messages and few additional generations.
emqx_ds_test_helpers:apply_stream(?DB, Nodes -- [NodeOffline], Stream),
@ -153,9 +154,10 @@ t_replication_transfers_snapshots(Config) ->
?snk_meta := #{node := NodeOffline}
})
),
?assertEqual(
ok,
erpc:call(NodeOffline, emqx_ds, open_db, [?DB, opts()])
ok = ?ON(
NodeOffline,
emqx_ds:open_db(?DB, opts(Config, #{}))
),
%% Trigger storage operation and wait the replica to be restored.
@ -183,7 +185,7 @@ t_rebalance(init, Config) ->
{t_rebalance3, #{apps => Apps}},
{t_rebalance4, #{apps => Apps}}
],
#{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
#{work_dir => ?config(work_dir, Config)}
),
[{nodes, Nodes} | Config];
t_rebalance('end', Config) ->
@ -206,7 +208,7 @@ t_rebalance(Config) ->
begin
Sites = [S1, S2 | _] = [ds_repl_meta(N, this_site) || N <- Nodes],
%% 1. Initialize DB on the first node.
Opts = opts(#{n_shards => 16, n_sites => 1, replication_factor => 3}),
Opts = opts(Config, #{n_shards => 16, n_sites => 1, replication_factor => 3}),
[
?assertEqual(ok, ?ON(Node, emqx_ds:open_db(?DB, Opts)))
|| Node <- Nodes
@ -316,7 +318,7 @@ t_join_leave_errors(init, Config) ->
{t_join_leave_errors1, #{apps => Apps}},
{t_join_leave_errors2, #{apps => Apps}}
],
#{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
#{work_dir => ?config(work_dir, Config)}
),
[{nodes, Nodes} | Config];
t_join_leave_errors('end', Config) ->
@ -327,7 +329,7 @@ t_join_leave_errors(Config) ->
%% join/leave operations are reported correctly.
[N1, N2] = ?config(nodes, Config),
Opts = opts(#{n_shards => 16, n_sites => 1, replication_factor => 3}),
Opts = opts(Config, #{n_shards => 16, n_sites => 1, replication_factor => 3}),
?assertEqual(ok, erpc:call(N1, emqx_ds, open_db, [?FUNCTION_NAME, Opts])),
?assertEqual(ok, erpc:call(N2, emqx_ds, open_db, [?FUNCTION_NAME, Opts])),
@ -385,7 +387,7 @@ t_rebalance_chaotic_converges(init, Config) ->
{t_rebalance_chaotic_converges2, #{apps => Apps}},
{t_rebalance_chaotic_converges3, #{apps => Apps}}
],
#{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
#{work_dir => ?config(work_dir, Config)}
),
[{nodes, Nodes} | Config];
t_rebalance_chaotic_converges('end', Config) ->
@ -411,7 +413,7 @@ t_rebalance_chaotic_converges(Config) ->
ct:pal("Sites: ~p~n", [Sites]),
%% Initialize DB on first two nodes.
Opts = opts(#{n_shards => 16, n_sites => 2, replication_factor => 3}),
Opts = opts(Config, #{n_shards => 16, n_sites => 2, replication_factor => 3}),
%% Open DB:
?assertEqual(
@ -482,7 +484,7 @@ t_rebalance_offline_restarts(init, Config) ->
{t_rebalance_offline_restarts2, #{apps => Apps}},
{t_rebalance_offline_restarts3, #{apps => Apps}}
],
#{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
#{work_dir => ?config(work_dir, Config)}
),
Nodes = emqx_cth_cluster:start(Specs),
[{nodes, Nodes}, {nodespecs, Specs} | Config];
@ -498,7 +500,7 @@ t_rebalance_offline_restarts(Config) ->
_Specs = [NS1, NS2, _] = ?config(nodespecs, Config),
%% Initialize DB on all 3 nodes.
Opts = opts(#{n_shards => 8, n_sites => 3, replication_factor => 3}),
Opts = opts(Config, #{n_shards => 8, n_sites => 3, replication_factor => 3}),
?assertEqual(
[{ok, ok} || _ <- Nodes],
erpc:multicall(Nodes, emqx_ds, open_db, [?DB, Opts])
@ -544,7 +546,7 @@ t_drop_generation(Config) ->
{t_drop_generation3, #{apps => Apps}}
],
#{
work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)
work_dir => ?config(work_dir, Config)
}
),
@ -552,7 +554,7 @@ t_drop_generation(Config) ->
?check_trace(
try
%% Initialize DB on all 3 nodes.
Opts = opts(#{n_shards => 1, n_sites => 3, replication_factor => 3}),
Opts = opts(Config, #{n_shards => 1, n_sites => 3, replication_factor => 3}),
?assertEqual(
[{ok, ok} || _ <- Nodes],
erpc:multicall(Nodes, emqx_ds, open_db, [?DB, Opts])
@ -614,21 +616,21 @@ t_drop_generation(Config) ->
t_error_mapping_replication_layer(init, Config) ->
Apps = emqx_cth_suite:start([emqx_ds_builtin_raft], #{
work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)
work_dir => ?config(work_dir, Config)
}),
[{apps, Apps} | Config];
t_error_mapping_replication_layer('end', Config) ->
emqx_cth_suite:stop(?config(apps, Config)),
Config.
t_error_mapping_replication_layer(_Config) ->
t_error_mapping_replication_layer(Config) ->
%% This checks that the replication layer maps recoverable errors correctly.
ok = emqx_ds_test_helpers:mock_rpc(),
ok = snabbkaffe:start_trace(),
DB = ?FUNCTION_NAME,
?assertMatch(ok, emqx_ds:open_db(DB, (opts())#{n_shards => 2})),
?assertMatch(ok, emqx_ds:open_db(DB, opts(Config, #{n_shards => 2}))),
[Shard1, Shard2] = emqx_ds_replication_layer_meta:shards(DB),
TopicFilter = emqx_topic:words(<<"foo/#">>),
@ -695,7 +697,7 @@ t_error_mapping_replication_layer(_Config) ->
Results2 = lists:map(
fun(Iter) ->
case emqx_ds:next(DB, Iter, _BatchSize = 42) of
Ok = {ok, _Iter, [_ | _]} ->
Ok = {ok, _Iter, _} ->
Ok;
Error = {error, recoverable, {badrpc, _}} ->
Error;
@ -716,20 +718,20 @@ t_error_mapping_replication_layer(_Config) ->
%% problems.
t_store_batch_fail(init, Config) ->
Apps = emqx_cth_suite:start([emqx_ds_builtin_raft], #{
work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)
work_dir => ?config(work_dir, Config)
}),
[{apps, Apps} | Config];
t_store_batch_fail('end', Config) ->
emqx_cth_suite:stop(?config(apps, Config)),
Config.
t_store_batch_fail(_Config) ->
t_store_batch_fail(Config) ->
?check_trace(
#{timetrap => 15_000},
try
meck:new(emqx_ds_storage_layer, [passthrough, no_history]),
DB = ?FUNCTION_NAME,
?assertMatch(ok, emqx_ds:open_db(DB, (opts())#{n_shards => 2})),
?assertMatch(ok, emqx_ds:open_db(DB, opts(Config, #{n_shards => 2}))),
%% Success:
Batch1 = [
message(<<"C1">>, <<"foo/bar">>, <<"1">>, 1),
@ -768,7 +770,7 @@ t_store_batch_fail(_Config) ->
),
meck:unload(ra),
?assertMatch(ok, emqx_ds:store_batch(DB, Batch3, #{sync => true})),
lists:sort(emqx_ds_test_helpers:consume_per_stream(DB, ['#'], 1))
lists:sort(emqx_ds_test_helpers:consume_per_stream(DB, ['#'], 0))
after
meck:unload()
end,
@ -803,7 +805,7 @@ t_crash_restart_recover(init, Config) ->
{t_crash_stop_recover2, #{apps => Apps}},
{t_crash_stop_recover3, #{apps => Apps}}
],
#{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
#{work_dir => ?config(work_dir, Config)}
),
Nodes = emqx_cth_cluster:start(Specs),
[{nodes, Nodes}, {nodespecs, Specs} | Config];
@ -815,7 +817,7 @@ t_crash_restart_recover(Config) ->
%% correctly preserved.
Nodes = [N1, N2, N3] = ?config(nodes, Config),
_Specs = [_, NS2, NS3] = ?config(nodespecs, Config),
DBOpts = opts(#{n_shards => 16, n_sites => 3, replication_factor => 3}),
DBOpts = opts(Config, #{n_shards => 16, n_sites => 3, replication_factor => 3}),
%% Prepare test event stream.
NMsgs = 400,
@ -856,7 +858,10 @@ t_crash_restart_recover(Config) ->
MatchFlushFailed = ?match_event(#{?snk_kind := emqx_ds_buffer_flush_failed}),
{ok, SubRef} = snabbkaffe:subscribe(MatchFlushFailed, NMsgs, _Timeout = 5000, infinity),
{timeout, Events} = snabbkaffe:receive_events(SubRef),
LostMessages = [M || #{batch := Messages} <- Events, M <- Messages],
LostMessages = [
emqx_ds_test_helpers:message_canonical_form(M)
|| #{batch := Messages} <- Events, M <- Messages
],
ct:pal("Some messages were lost: ~p", [LostMessages]),
?assert(length(LostMessages) < NMsgs div 20),
@ -876,8 +881,16 @@ t_crash_restart_recover(Config) ->
%% Does any messages were lost unexpectedly?
{_, DSMessages} = lists:unzip(emqx_utils_stream:consume(DSStream1)),
ExpectedMessages = emqx_utils_stream:consume(ExpectedStream),
MissingMessages = ExpectedMessages -- DSMessages,
?defer_assert(?assertEqual([], MissingMessages -- LostMessages, DSMessages))
MissingMessages = emqx_ds_test_helpers:message_set_subtract(
ExpectedMessages, DSMessages
),
?defer_assert(
?assertEqual(
[],
emqx_ds_test_helpers:sublist(MissingMessages -- LostMessages),
emqx_ds_test_helpers:sublist(DSMessages)
)
)
end,
lists:foreach(VerifyClient, TopicStreams)
end,
@ -984,12 +997,35 @@ sample(N, List) ->
suite() -> [{timetrap, {seconds, 60}}].
all() -> emqx_common_test_helpers:all(?MODULE).
all() ->
[{group, Grp} || {Grp, _} <- groups()].
groups() ->
TCs = emqx_common_test_helpers:all(?MODULE),
[
{bitfield_lts, TCs},
{skipstream_lts, TCs}
].
init_per_group(Group, Config) ->
LayoutConf =
case Group of
skipstream_lts ->
{emqx_ds_storage_skipstream_lts, #{with_guid => true}};
bitfield_lts ->
{emqx_ds_storage_bitfield_lts, #{}}
end,
[{layout, LayoutConf} | Config].
end_per_group(_Group, Config) ->
Config.
init_per_testcase(TCName, Config0) ->
Config = emqx_common_test_helpers:init_per_testcase(?MODULE, TCName, Config0),
Config.
Config1 = [{work_dir, emqx_cth_suite:work_dir(TCName, Config0)} | Config0],
emqx_common_test_helpers:init_per_testcase(?MODULE, TCName, Config1).
end_per_testcase(TCName, Config) ->
ok = snabbkaffe:stop(),
emqx_common_test_helpers:end_per_testcase(?MODULE, TCName, Config).
Result = emqx_common_test_helpers:end_per_testcase(?MODULE, TCName, Config),
emqx_cth_suite:clean_work_dir(?config(work_dir, Config)),
Result.