test(ds): verify deletions work predictably

This commit is contained in:
Andrew Mayorov 2024-07-31 18:43:39 +02:00
parent 58b9ab0210
commit 26ec69d5f4
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
1 changed files with 47 additions and 65 deletions

View File

@ -18,11 +18,14 @@
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx_ds.hrl").
-include("../../emqx/include/emqx.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("stdlib/include/assert.hrl").
-define(assertSameSet(A, B), ?assertEqual(lists:sort(A), lists:sort(B))).
-define(FUTURE, (1 bsl 64 - 1)).
-define(SHARD, shard(?FUNCTION_NAME)).
@ -66,6 +69,30 @@ t_store(_Config) ->
},
?assertMatch(ok, emqx_ds:store_batch(?FUNCTION_NAME, [Msg])).
%% Smoke test of applying batch operations
t_operations(db_config, _Config) ->
#{force_monotonic_timestamps => false}.
t_operations(_Config) ->
Batch1 = [
make_message(100, <<"t/1">>, <<"M1">>),
make_message(200, <<"t/2">>, <<"M2">>),
make_message(300, <<"t/3">>, <<"M3">>)
],
Batch2 = [
make_deletion(200, <<"t/2">>, <<"M2">>),
make_deletion(300, <<"t/3">>, '_'),
make_deletion(400, <<"t/4">>, '_')
],
?assertEqual(ok, emqx_ds:store_batch(?FUNCTION_NAME, Batch1)),
?assertEqual(ok, emqx_ds:store_batch(?FUNCTION_NAME, Batch2)),
?assertMatch(
[
#message{timestamp = 100, topic = <<"t/1">>, payload = <<"M1">>}
],
dump_messages(?SHARD, <<"t/#">>, 0)
).
%% Smoke test for iteration through a concrete topic
t_iterate(_Config) ->
%% Prepare data:
@ -124,8 +151,6 @@ t_delete(_Config) ->
?assertNot(is_map_key(TopicToDelete, MessagesByTopic), #{msgs => MessagesByTopic}),
?assertEqual(20, length(Messages)).
-define(assertSameSet(A, B), ?assertEqual(lists:sort(A), lists:sort(B))).
%% Smoke test that verifies that concrete topics are mapped to
%% individual streams, unless there's too many of them.
t_get_streams(Config) ->
@ -417,79 +442,26 @@ dump_stream(Shard, Stream, TopicFilter, StartTime) ->
%% || Topic <- Topics, PublishedAt <- Timestamps
%% ].
%% t_iterate_multigen(_Config) ->
%% {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG),
%% {ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 50, ?DEFAULT_CONFIG),
%% {ok, 3} = emqx_ds_storage_layer:create_generation(?SHARD, 1000, ?DEFAULT_CONFIG),
%% Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"],
%% Timestamps = lists:seq(1, 100),
%% _ = [
%% store(?SHARD, PublishedAt, Topic, term_to_binary({Topic, PublishedAt}))
%% || Topic <- Topics, PublishedAt <- Timestamps
%% ],
%% ?assertEqual(
%% lists:sort([
%% {Topic, PublishedAt}
%% || Topic <- ["foo/bar", "foo/bar/baz"], PublishedAt <- Timestamps
%% ]),
%% lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/#", 0)])
%% ),
%% ?assertEqual(
%% lists:sort([
%% {Topic, PublishedAt}
%% || Topic <- ["a", "a/bar"], PublishedAt <- lists:seq(60, 100)
%% ]),
%% lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "a/#", 60)])
%% ).
%% t_iterate_multigen_preserve_restore(_Config) ->
%% ReplayID = atom_to_binary(?FUNCTION_NAME),
%% {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG),
%% {ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 50, ?DEFAULT_CONFIG),
%% {ok, 3} = emqx_ds_storage_layer:create_generation(?SHARD, 100, ?DEFAULT_CONFIG),
%% Topics = ["foo/bar", "foo/bar/baz", "a/bar"],
%% Timestamps = lists:seq(1, 100),
%% TopicFilter = "foo/#",
%% TopicsMatching = ["foo/bar", "foo/bar/baz"],
%% _ = [
%% store(?SHARD, TS, Topic, term_to_binary({Topic, TS}))
%% || Topic <- Topics, TS <- Timestamps
%% ],
%% It0 = iterator(?SHARD, TopicFilter, 0),
%% {It1, Res10} = iterate(It0, 10),
%% % preserve mid-generation
%% ok = emqx_ds_storage_layer:preserve_iterator(It1, ReplayID),
%% {ok, It2} = emqx_ds_storage_layer:restore_iterator(?SHARD, ReplayID),
%% {It3, Res100} = iterate(It2, 88),
%% % preserve on the generation boundary
%% ok = emqx_ds_storage_layer:preserve_iterator(It3, ReplayID),
%% {ok, It4} = emqx_ds_storage_layer:restore_iterator(?SHARD, ReplayID),
%% {It5, Res200} = iterate(It4, 1000),
%% ?assertEqual({end_of_stream, []}, iterate(It5, 1)),
%% ?assertEqual(
%% lists:sort([{Topic, TS} || Topic <- TopicsMatching, TS <- Timestamps]),
%% lists:sort([binary_to_term(Payload) || Payload <- Res10 ++ Res100 ++ Res200])
%% ),
%% ?assertEqual(
%% ok,
%% emqx_ds_storage_layer:discard_iterator(?SHARD, ReplayID)
%% ),
%% ?assertEqual(
%% {error, not_found},
%% emqx_ds_storage_layer:restore_iterator(?SHARD, ReplayID)
%% ).
make_message(PublishedAt, Topic, Payload) when is_list(Topic) ->
make_message(PublishedAt, list_to_binary(Topic), Payload);
make_message(PublishedAt, Topic, Payload) when is_binary(Topic) ->
ID = emqx_guid:gen(),
#message{
id = ID,
from = <<?MODULE_STRING>>,
topic = Topic,
timestamp = PublishedAt,
payload = Payload
}.
make_deletion(Timestamp, Topic, Payload) ->
{delete, #message_matcher{
from = <<?MODULE_STRING>>,
topic = Topic,
timestamp = Timestamp,
payload = Payload
}}.
make_topic(Tokens = [_ | _]) ->
emqx_topic:join([bin(T) || T <- Tokens]).
@ -535,13 +507,23 @@ end_per_suite(Config) ->
ok.
init_per_testcase(TC, Config) ->
ok = emqx_ds:open_db(TC, ?DB_CONFIG(Config)),
ok = emqx_ds:open_db(TC, db_config(TC, Config)),
Config.
end_per_testcase(TC, _Config) ->
emqx_ds:drop_db(TC),
ok.
db_config(TC, Config) ->
ConfigBase = ?DB_CONFIG(Config),
SpecificConfig =
try
?MODULE:TC(?FUNCTION_NAME, Config)
catch
error:undef -> #{}
end,
maps:merge(ConfigBase, SpecificConfig).
shard(TC) ->
{TC, <<"0">>}.