diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_layout_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_layout_SUITE.erl index e0531dad0..e0fea7875 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_layout_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_layout_SUITE.erl @@ -18,11 +18,14 @@ -compile(export_all). -compile(nowarn_export_all). +-include("emqx_ds.hrl"). -include("../../emqx/include/emqx.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("stdlib/include/assert.hrl"). +-define(assertSameSet(A, B), ?assertEqual(lists:sort(A), lists:sort(B))). + -define(FUTURE, (1 bsl 64 - 1)). -define(SHARD, shard(?FUNCTION_NAME)). @@ -66,6 +69,30 @@ t_store(_Config) -> }, ?assertMatch(ok, emqx_ds:store_batch(?FUNCTION_NAME, [Msg])). +%% Smoke test of applying batch operations +t_operations(db_config, _Config) -> + #{force_monotonic_timestamps => false}. + +t_operations(_Config) -> + Batch1 = [ + make_message(100, <<"t/1">>, <<"M1">>), + make_message(200, <<"t/2">>, <<"M2">>), + make_message(300, <<"t/3">>, <<"M3">>) + ], + Batch2 = [ + make_deletion(200, <<"t/2">>, <<"M2">>), + make_deletion(300, <<"t/3">>, '_'), + make_deletion(400, <<"t/4">>, '_') + ], + ?assertEqual(ok, emqx_ds:store_batch(?FUNCTION_NAME, Batch1)), + ?assertEqual(ok, emqx_ds:store_batch(?FUNCTION_NAME, Batch2)), + ?assertMatch( + [ + #message{timestamp = 100, topic = <<"t/1">>, payload = <<"M1">>} + ], + dump_messages(?SHARD, <<"t/#">>, 0) + ). + %% Smoke test for iteration through a concrete topic t_iterate(_Config) -> %% Prepare data: @@ -124,8 +151,6 @@ t_delete(_Config) -> ?assertNot(is_map_key(TopicToDelete, MessagesByTopic), #{msgs => MessagesByTopic}), ?assertEqual(20, length(Messages)). --define(assertSameSet(A, B), ?assertEqual(lists:sort(A), lists:sort(B))). - %% Smoke test that verifies that concrete topics are mapped to %% individual streams, unless there's too many of them. t_get_streams(Config) -> @@ -417,79 +442,26 @@ dump_stream(Shard, Stream, TopicFilter, StartTime) -> %% || Topic <- Topics, PublishedAt <- Timestamps %% ]. -%% t_iterate_multigen(_Config) -> -%% {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG), -%% {ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 50, ?DEFAULT_CONFIG), -%% {ok, 3} = emqx_ds_storage_layer:create_generation(?SHARD, 1000, ?DEFAULT_CONFIG), -%% Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"], -%% Timestamps = lists:seq(1, 100), -%% _ = [ -%% store(?SHARD, PublishedAt, Topic, term_to_binary({Topic, PublishedAt})) -%% || Topic <- Topics, PublishedAt <- Timestamps -%% ], -%% ?assertEqual( -%% lists:sort([ -%% {Topic, PublishedAt} -%% || Topic <- ["foo/bar", "foo/bar/baz"], PublishedAt <- Timestamps -%% ]), -%% lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/#", 0)]) -%% ), -%% ?assertEqual( -%% lists:sort([ -%% {Topic, PublishedAt} -%% || Topic <- ["a", "a/bar"], PublishedAt <- lists:seq(60, 100) -%% ]), -%% lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "a/#", 60)]) -%% ). - -%% t_iterate_multigen_preserve_restore(_Config) -> -%% ReplayID = atom_to_binary(?FUNCTION_NAME), -%% {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG), -%% {ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 50, ?DEFAULT_CONFIG), -%% {ok, 3} = emqx_ds_storage_layer:create_generation(?SHARD, 100, ?DEFAULT_CONFIG), -%% Topics = ["foo/bar", "foo/bar/baz", "a/bar"], -%% Timestamps = lists:seq(1, 100), -%% TopicFilter = "foo/#", -%% TopicsMatching = ["foo/bar", "foo/bar/baz"], -%% _ = [ -%% store(?SHARD, TS, Topic, term_to_binary({Topic, TS})) -%% || Topic <- Topics, TS <- Timestamps -%% ], -%% It0 = iterator(?SHARD, TopicFilter, 0), -%% {It1, Res10} = iterate(It0, 10), -%% % preserve mid-generation -%% ok = emqx_ds_storage_layer:preserve_iterator(It1, ReplayID), -%% {ok, It2} = emqx_ds_storage_layer:restore_iterator(?SHARD, ReplayID), -%% {It3, Res100} = iterate(It2, 88), -%% % preserve on the generation boundary -%% ok = emqx_ds_storage_layer:preserve_iterator(It3, ReplayID), -%% {ok, It4} = emqx_ds_storage_layer:restore_iterator(?SHARD, ReplayID), -%% {It5, Res200} = iterate(It4, 1000), -%% ?assertEqual({end_of_stream, []}, iterate(It5, 1)), -%% ?assertEqual( -%% lists:sort([{Topic, TS} || Topic <- TopicsMatching, TS <- Timestamps]), -%% lists:sort([binary_to_term(Payload) || Payload <- Res10 ++ Res100 ++ Res200]) -%% ), -%% ?assertEqual( -%% ok, -%% emqx_ds_storage_layer:discard_iterator(?SHARD, ReplayID) -%% ), -%% ?assertEqual( -%% {error, not_found}, -%% emqx_ds_storage_layer:restore_iterator(?SHARD, ReplayID) -%% ). - make_message(PublishedAt, Topic, Payload) when is_list(Topic) -> make_message(PublishedAt, list_to_binary(Topic), Payload); make_message(PublishedAt, Topic, Payload) when is_binary(Topic) -> ID = emqx_guid:gen(), #message{ id = ID, + from = <>, topic = Topic, timestamp = PublishedAt, payload = Payload }. +make_deletion(Timestamp, Topic, Payload) -> + {delete, #message_matcher{ + from = <>, + topic = Topic, + timestamp = Timestamp, + payload = Payload + }}. + make_topic(Tokens = [_ | _]) -> emqx_topic:join([bin(T) || T <- Tokens]). @@ -535,13 +507,23 @@ end_per_suite(Config) -> ok. init_per_testcase(TC, Config) -> - ok = emqx_ds:open_db(TC, ?DB_CONFIG(Config)), + ok = emqx_ds:open_db(TC, db_config(TC, Config)), Config. end_per_testcase(TC, _Config) -> emqx_ds:drop_db(TC), ok. +db_config(TC, Config) -> + ConfigBase = ?DB_CONFIG(Config), + SpecificConfig = + try + ?MODULE:TC(?FUNCTION_NAME, Config) + catch + error:undef -> #{} + end, + maps:merge(ConfigBase, SpecificConfig). + shard(TC) -> {TC, <<"0">>}.