From 8c5e4a237601ce5de17448b412080ce81fe60618 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Mon, 1 Jul 2024 00:55:41 +0200 Subject: [PATCH] test(ds): Generalize storage layout test suite for different layouts --- ...E.erl => emqx_ds_storage_layout_SUITE.erl} | 140 ++++++++++-------- 1 file changed, 78 insertions(+), 62 deletions(-) rename apps/emqx_durable_storage/test/{emqx_ds_storage_bitfield_lts_SUITE.erl => emqx_ds_storage_layout_SUITE.erl} (85%) diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_layout_SUITE.erl similarity index 85% rename from apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl rename to apps/emqx_durable_storage/test/emqx_ds_storage_layout_SUITE.erl index 11b6109e8..e0531dad0 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_layout_SUITE.erl @@ -13,7 +13,7 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_ds_storage_bitfield_lts_SUITE). +-module(emqx_ds_storage_layout_SUITE). -compile(export_all). -compile(nowarn_export_all). @@ -23,23 +23,34 @@ -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("stdlib/include/assert.hrl"). +-define(FUTURE, (1 bsl 64 - 1)). + -define(SHARD, shard(?FUNCTION_NAME)). --define(DEFAULT_CONFIG, #{ +-define(DB_CONFIG(CONFIG), #{ backend => builtin_local, - storage => {emqx_ds_storage_bitfield_lts, #{}}, + storage => ?config(layout, CONFIG), n_shards => 1 }). --define(COMPACT_CONFIG, #{ - backend => builtin_local, - storage => - {emqx_ds_storage_bitfield_lts, #{ - bits_per_wildcard_level => 8 - }}, - n_shards => 1, - replication_factor => 1 -}). +all() -> + [ + {group, bitfield_lts}, + {group, skipstream_lts} + ]. + +init_per_group(Group, Config) -> + LayoutConf = + case Group of + skipstream_lts -> + {emqx_ds_storage_skipstream_lts, #{with_guid => true}}; + bitfield_lts -> + {emqx_ds_storage_bitfield_lts, #{}} + end, + [{layout, LayoutConf} | Config]. + +end_per_group(_Group, Config) -> + Config. %% Smoke test of store function t_store(_Config) -> @@ -53,7 +64,7 @@ t_store(_Config) -> payload = Payload, timestamp = PublishedAt }, - ?assertMatch(ok, emqx_ds_storage_layer:store_batch(?SHARD, [{PublishedAt, Msg}], #{})). + ?assertMatch(ok, emqx_ds:store_batch(?FUNCTION_NAME, [Msg])). %% Smoke test for iteration through a concrete topic t_iterate(_Config) -> @@ -61,15 +72,17 @@ t_iterate(_Config) -> Topics = [<<"foo/bar">>, <<"foo/bar/baz">>, <<"a">>], Timestamps = lists:seq(1, 10), Batch = [ - {PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))} + make_message(PublishedAt, Topic, integer_to_binary(PublishedAt)) || Topic <- Topics, PublishedAt <- Timestamps ], - ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, #{}), + ok = emqx_ds:store_batch(?FUNCTION_NAME, Batch), %% Iterate through individual topics: [ begin - [{_Rank, Stream}] = emqx_ds_storage_layer:get_streams(?SHARD, parse_topic(Topic), 0), + [{Rank, Stream}] = emqx_ds_storage_layer:get_streams(?SHARD, parse_topic(Topic), 0), + ct:pal("Streams for ~p: {~p, ~p}", [Topic, Rank, Stream]), {ok, It} = emqx_ds_storage_layer:make_iterator(?SHARD, Stream, parse_topic(Topic), 0), + ct:pal("Iterator for ~p: ~p", [Topic, It]), {ok, NextIt, MessagesAndKeys} = emqx_ds_storage_layer:next( ?SHARD, It, 100, emqx_ds:timestamp_us() ), @@ -91,10 +104,10 @@ t_delete(_Config) -> Topics = [<<"foo/bar">>, TopicToDelete, <<"a">>], Timestamps = lists:seq(1, 10), Batch = [ - {PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))} + make_message(PublishedAt, Topic, integer_to_binary(PublishedAt)) || Topic <- Topics, PublishedAt <- Timestamps ], - ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, #{}), + ok = emqx_ds:store_batch(?FUNCTION_NAME, Batch), %% Iterate through topics: StartTime = 0, @@ -109,23 +122,21 @@ t_delete(_Config) -> Messages = [Msg || {_DSKey, Msg} <- replay(?SHARD, TopicFilter, StartTime)], MessagesByTopic = maps:groups_from_list(fun emqx_message:topic/1, Messages), ?assertNot(is_map_key(TopicToDelete, MessagesByTopic), #{msgs => MessagesByTopic}), - ?assertEqual(20, length(Messages)), - - ok. + ?assertEqual(20, length(Messages)). -define(assertSameSet(A, B), ?assertEqual(lists:sort(A), lists:sort(B))). %% Smoke test that verifies that concrete topics are mapped to %% individual streams, unless there's too many of them. -t_get_streams(_Config) -> +t_get_streams(Config) -> %% Prepare data (without wildcards): Topics = [<<"foo/bar">>, <<"foo/bar/baz">>, <<"a">>], Timestamps = lists:seq(1, 10), Batch = [ - {PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))} + make_message(PublishedAt, Topic, integer_to_binary(PublishedAt)) || Topic <- Topics, PublishedAt <- Timestamps ], - ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, #{}), + ok = emqx_ds:store_batch(?FUNCTION_NAME, Batch), GetStream = fun(Topic) -> StartTime = 0, emqx_ds_storage_layer:get_streams(?SHARD, parse_topic(Topic), StartTime) @@ -136,7 +147,7 @@ t_get_streams(_Config) -> [A] = GetStream(<<"a">>), %% Restart shard to make sure trie is persisted and restored: ok = emqx_ds:close_db(?FUNCTION_NAME), - ok = emqx_ds:open_db(?FUNCTION_NAME, ?DEFAULT_CONFIG), + ok = emqx_ds:open_db(?FUNCTION_NAME, ?DB_CONFIG(Config)), %% Verify that there are no "ghost streams" for topics that don't %% have any messages: [] = GetStream(<<"bar/foo">>), @@ -148,11 +159,11 @@ t_get_streams(_Config) -> NewBatch = [ begin B = integer_to_binary(I), - {100, make_message(100, <<"foo/bar/", B/binary>>, <<"filler", B/binary>>)} + make_message(100, <<"foo/bar/", B/binary>>, <<"filler", B/binary>>) end || I <- lists:seq(1, 200) ], - ok = emqx_ds_storage_layer:store_batch(?SHARD, NewBatch, #{}), + ok = emqx_ds:store_batch(?FUNCTION_NAME, NewBatch), %% Check that "foo/bar/baz" topic now appears in two streams: %% "foo/bar/baz" and "foo/bar/+": NewStreams = lists:sort(GetStream("foo/bar/baz")), @@ -168,7 +179,7 @@ t_get_streams(_Config) -> ?assert(lists:member(A, AllStreams)), ok. -t_new_generation_inherit_trie(_Config) -> +t_new_generation_inherit_trie(Config) -> %% This test checks that we inherit the previous generation's LTS when creating a new %% generation. ?check_trace( @@ -176,25 +187,25 @@ t_new_generation_inherit_trie(_Config) -> %% Create a bunch of topics to be learned in the first generation TS1 = 500, Batch1 = [ - {TS1, make_message(TS1, make_topic([wildcard, I, suffix, Suffix]), bin(I))} + make_message(TS1, make_topic([wildcard, I, suffix, Suffix]), bin(I)) || I <- lists:seq(1, 200), Suffix <- [<<"foo">>, <<"bar">>] ], - ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch1, #{}), + ok = emqx_ds:store_batch(?FUNCTION_NAME, Batch1), %% Now we create a new generation with the same LTS module. It should inherit the %% learned trie. ok = emqx_ds_storage_layer:add_generation(?SHARD, _Since = 1_000), %% Restart the shard, to verify that LTS is persisted. ok = emqx_ds:close_db(?FUNCTION_NAME), - ok = emqx_ds:open_db(?FUNCTION_NAME, ?DEFAULT_CONFIG), + ok = emqx_ds:open_db(?FUNCTION_NAME, ?DB_CONFIG(Config)), %% Store a batch of messages with the same set of topics. TS2 = 1_500, Batch2 = [ - {TS2, make_message(TS2, make_topic([wildcard, I, suffix, Suffix]), bin(I))} + make_message(TS2, make_topic([wildcard, I, suffix, Suffix]), bin(I)) || I <- lists:seq(1, 200), Suffix <- [<<"foo">>, <<"bar">>] ], - ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, #{}), + ok = emqx_ds:store_batch(?FUNCTION_NAME, Batch2), %% We should get only two streams for wildcard query, for "foo" and for "bar". ?assertMatch( [_Foo, _Bar], @@ -203,29 +214,30 @@ t_new_generation_inherit_trie(_Config) -> ok end, fun(Trace) -> - ?assertMatch([_], ?of_kind(bitfield_lts_inherited_trie, Trace)), + ?assertMatch([_], ?of_kind(layout_inherited_lts_trie, Trace)), ok end ), ok. -t_replay(_Config) -> +t_replay(Config) -> %% Create concrete topics: Topics = [<<"foo/bar">>, <<"foo/bar/baz">>], - Timestamps = lists:seq(1, 10_000, 100), + Values = lists:seq(1, 1_000, 100), Batch1 = [ - {PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))} - || Topic <- Topics, PublishedAt <- Timestamps + make_message(Val, Topic, bin(Val)) + || Topic <- Topics, Val <- Values ], - ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch1, #{}), + ok = emqx_ds:store_batch(?FUNCTION_NAME, Batch1), %% Create wildcard topics `wildcard/+/suffix/foo' and `wildcard/+/suffix/bar': Batch2 = [ - {TS, make_message(TS, make_topic([wildcard, I, suffix, Suffix]), bin(TS))} - || I <- lists:seq(1, 200), TS <- Timestamps, Suffix <- [<<"foo">>, <<"bar">>] + make_message(Val, make_topic([wildcard, Prefix, suffix, Suffix]), bin(Val)) + || Prefix <- lists:seq(1, 200), Val <- Values, Suffix <- [<<"foo">>, <<"bar">>] ], - ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, #{}), + ok = emqx_ds:store_batch(?FUNCTION_NAME, Batch2), + timer:sleep(5_000), %% Check various topic filters: - Messages = [M || {_TS, M} <- Batch1 ++ Batch2], + Messages = Batch1 ++ Batch2, %% Missing topics (no ghost messages): ?assertNot(check(?SHARD, <<"missing/foo/bar">>, 0, Messages)), %% Regular topics: @@ -238,7 +250,7 @@ t_replay(_Config) -> ?assert(check(?SHARD, <<"+/+/baz">>, 0, Messages)), %% Restart the DB to make sure trie is persisted and restored: ok = emqx_ds:close_db(?FUNCTION_NAME), - ok = emqx_ds:open_db(?FUNCTION_NAME, ?DEFAULT_CONFIG), + ok = emqx_ds:open_db(?FUNCTION_NAME, ?DB_CONFIG(Config)), %% Learned wildcard topics: ?assertNot(check(?SHARD, <<"wildcard/1000/suffix/foo">>, 0, [])), ?assert(check(?SHARD, <<"wildcard/1/suffix/foo">>, 0, Messages)), @@ -314,6 +326,9 @@ t_non_atomic_store_batch(_Config) -> ). check(Shard, TopicFilter, StartTime, ExpectedMessages) -> + ?tp(notice, ?MODULE_STRING "_check", #{ + shard => Shard, tf => TopicFilter, start_time => StartTime + }), ExpectedFiltered = lists:filter( fun(#message{topic = Topic, timestamp = TS}) -> emqx_topic:match(Topic, TopicFilter) andalso TS >= StartTime @@ -325,17 +340,9 @@ check(Shard, TopicFilter, StartTime, ExpectedMessages) -> begin Dump = dump_messages(Shard, TopicFilter, StartTime), verify_dump(TopicFilter, StartTime, Dump), - Missing = ExpectedFiltered -- Dump, - Extras = Dump -- ExpectedFiltered, - ?assertMatch( - #{missing := [], unexpected := []}, - #{ - missing => Missing, - unexpected => Extras, - topic_filter => TopicFilter, - start_time => StartTime - } - ) + emqx_ds_test_helpers:assert_same_set(ExpectedFiltered, Dump, #{ + topic_filter => TopicFilter, start_time => StartTime + }) end, [] ), @@ -362,6 +369,7 @@ verify_dump(TopicFilter, StartTime, Dump) -> dump_messages(Shard, TopicFilter, StartTime) -> Streams = emqx_ds_storage_layer:get_streams(Shard, parse_topic(TopicFilter), StartTime), + ct:pal("Streams for ~p:~n ~p", [TopicFilter, Streams]), lists:flatmap( fun({_Rank, Stream}) -> dump_stream(Shard, Stream, TopicFilter, StartTime) @@ -374,6 +382,7 @@ dump_stream(Shard, Stream, TopicFilter, StartTime) -> {ok, Iterator} = emqx_ds_storage_layer:make_iterator( Shard, Stream, parse_topic(TopicFilter), StartTime ), + ct:pal("Iterator for ~p at stream ~p:~n ~p", [TopicFilter, Stream, Iterator]), Loop = fun F(It, 0) -> error({too_many_iterations, It}); @@ -502,24 +511,31 @@ bin(X) -> %% CT callbacks -all() -> emqx_common_test_helpers:all(?MODULE). +groups() -> + TCs = emqx_common_test_helpers:all(?MODULE), + [ + {bitfield_lts, TCs}, + {skipstream_lts, TCs} + ]. + suite() -> [{timetrap, {seconds, 20}}]. init_per_suite(Config) -> - emqx_common_test_helpers:clear_screen(), + WorkDir = emqx_cth_suite:work_dir(Config), Apps = emqx_cth_suite:start( [emqx_ds_builtin_local], - #{work_dir => emqx_cth_suite:work_dir(Config)} + #{work_dir => WorkDir} ), - [{apps, Apps} | Config]. + [{apps, Apps}, {work_dir, WorkDir} | Config]. end_per_suite(Config) -> Apps = ?config(apps, Config), ok = emqx_cth_suite:stop(Apps), + emqx_cth_suite:clean_work_dir(?config(work_dir, Config)), ok. init_per_testcase(TC, Config) -> - ok = emqx_ds:open_db(TC, ?DEFAULT_CONFIG), + ok = emqx_ds:open_db(TC, ?DB_CONFIG(Config)), Config. end_per_testcase(TC, _Config) -> @@ -558,7 +574,7 @@ delete(Shard, Iterators, Selector) -> fun(Iterator0, {AccIterators, NAcc}) -> case emqx_ds_storage_layer:delete_next( - Shard, Iterator0, Selector, 10, emqx_ds:timestamp_us() + Shard, Iterator0, Selector, 10, ?FUTURE ) of {ok, end_of_stream} -> @@ -591,7 +607,7 @@ replay(_Shard, []) -> replay(Shard, Iterators) -> {NewIterators0, Messages0} = lists:foldl( fun(Iterator0, {AccIterators, AccMessages}) -> - case emqx_ds_storage_layer:next(Shard, Iterator0, 10, emqx_ds:timestamp_us()) of + case emqx_ds_storage_layer:next(Shard, Iterator0, 10, ?FUTURE) of {ok, end_of_stream} -> {AccIterators, AccMessages}; {ok, _Iterator1, []} ->