test(ds): verify deletions work predictably
This commit is contained in:
parent
109ffe7a70
commit
9f96e0957e
|
@ -18,11 +18,14 @@
|
|||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
||||
-include("emqx_ds.hrl").
|
||||
-include("../../emqx/include/emqx.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||
-include_lib("stdlib/include/assert.hrl").
|
||||
|
||||
-define(assertSameSet(A, B), ?assertEqual(lists:sort(A), lists:sort(B))).
|
||||
|
||||
-define(FUTURE, (1 bsl 64 - 1)).
|
||||
|
||||
-define(SHARD, shard(?FUNCTION_NAME)).
|
||||
|
@ -66,6 +69,30 @@ t_store(_Config) ->
|
|||
},
|
||||
?assertMatch(ok, emqx_ds:store_batch(?FUNCTION_NAME, [Msg])).
|
||||
|
||||
%% Smoke test of applying batch operations
|
||||
t_operations(db_config, _Config) ->
|
||||
#{force_monotonic_timestamps => false}.
|
||||
|
||||
t_operations(_Config) ->
|
||||
Batch1 = [
|
||||
make_message(100, <<"t/1">>, <<"M1">>),
|
||||
make_message(200, <<"t/2">>, <<"M2">>),
|
||||
make_message(300, <<"t/3">>, <<"M3">>)
|
||||
],
|
||||
Batch2 = [
|
||||
make_deletion(200, <<"t/2">>, <<"M2">>),
|
||||
make_deletion(300, <<"t/3">>, '_'),
|
||||
make_deletion(400, <<"t/4">>, '_')
|
||||
],
|
||||
?assertEqual(ok, emqx_ds:store_batch(?FUNCTION_NAME, Batch1)),
|
||||
?assertEqual(ok, emqx_ds:store_batch(?FUNCTION_NAME, Batch2)),
|
||||
?assertMatch(
|
||||
[
|
||||
#message{timestamp = 100, topic = <<"t/1">>, payload = <<"M1">>}
|
||||
],
|
||||
dump_messages(?SHARD, <<"t/#">>, 0)
|
||||
).
|
||||
|
||||
%% Smoke test for iteration through a concrete topic
|
||||
t_iterate(_Config) ->
|
||||
%% Prepare data:
|
||||
|
@ -124,8 +151,6 @@ t_delete(_Config) ->
|
|||
?assertNot(is_map_key(TopicToDelete, MessagesByTopic), #{msgs => MessagesByTopic}),
|
||||
?assertEqual(20, length(Messages)).
|
||||
|
||||
-define(assertSameSet(A, B), ?assertEqual(lists:sort(A), lists:sort(B))).
|
||||
|
||||
%% Smoke test that verifies that concrete topics are mapped to
|
||||
%% individual streams, unless there's too many of them.
|
||||
t_get_streams(Config) ->
|
||||
|
@ -417,79 +442,26 @@ dump_stream(Shard, Stream, TopicFilter, StartTime) ->
|
|||
%% || Topic <- Topics, PublishedAt <- Timestamps
|
||||
%% ].
|
||||
|
||||
%% t_iterate_multigen(_Config) ->
|
||||
%% {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG),
|
||||
%% {ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 50, ?DEFAULT_CONFIG),
|
||||
%% {ok, 3} = emqx_ds_storage_layer:create_generation(?SHARD, 1000, ?DEFAULT_CONFIG),
|
||||
%% Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"],
|
||||
%% Timestamps = lists:seq(1, 100),
|
||||
%% _ = [
|
||||
%% store(?SHARD, PublishedAt, Topic, term_to_binary({Topic, PublishedAt}))
|
||||
%% || Topic <- Topics, PublishedAt <- Timestamps
|
||||
%% ],
|
||||
%% ?assertEqual(
|
||||
%% lists:sort([
|
||||
%% {Topic, PublishedAt}
|
||||
%% || Topic <- ["foo/bar", "foo/bar/baz"], PublishedAt <- Timestamps
|
||||
%% ]),
|
||||
%% lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/#", 0)])
|
||||
%% ),
|
||||
%% ?assertEqual(
|
||||
%% lists:sort([
|
||||
%% {Topic, PublishedAt}
|
||||
%% || Topic <- ["a", "a/bar"], PublishedAt <- lists:seq(60, 100)
|
||||
%% ]),
|
||||
%% lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "a/#", 60)])
|
||||
%% ).
|
||||
|
||||
%% t_iterate_multigen_preserve_restore(_Config) ->
|
||||
%% ReplayID = atom_to_binary(?FUNCTION_NAME),
|
||||
%% {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG),
|
||||
%% {ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 50, ?DEFAULT_CONFIG),
|
||||
%% {ok, 3} = emqx_ds_storage_layer:create_generation(?SHARD, 100, ?DEFAULT_CONFIG),
|
||||
%% Topics = ["foo/bar", "foo/bar/baz", "a/bar"],
|
||||
%% Timestamps = lists:seq(1, 100),
|
||||
%% TopicFilter = "foo/#",
|
||||
%% TopicsMatching = ["foo/bar", "foo/bar/baz"],
|
||||
%% _ = [
|
||||
%% store(?SHARD, TS, Topic, term_to_binary({Topic, TS}))
|
||||
%% || Topic <- Topics, TS <- Timestamps
|
||||
%% ],
|
||||
%% It0 = iterator(?SHARD, TopicFilter, 0),
|
||||
%% {It1, Res10} = iterate(It0, 10),
|
||||
%% % preserve mid-generation
|
||||
%% ok = emqx_ds_storage_layer:preserve_iterator(It1, ReplayID),
|
||||
%% {ok, It2} = emqx_ds_storage_layer:restore_iterator(?SHARD, ReplayID),
|
||||
%% {It3, Res100} = iterate(It2, 88),
|
||||
%% % preserve on the generation boundary
|
||||
%% ok = emqx_ds_storage_layer:preserve_iterator(It3, ReplayID),
|
||||
%% {ok, It4} = emqx_ds_storage_layer:restore_iterator(?SHARD, ReplayID),
|
||||
%% {It5, Res200} = iterate(It4, 1000),
|
||||
%% ?assertEqual({end_of_stream, []}, iterate(It5, 1)),
|
||||
%% ?assertEqual(
|
||||
%% lists:sort([{Topic, TS} || Topic <- TopicsMatching, TS <- Timestamps]),
|
||||
%% lists:sort([binary_to_term(Payload) || Payload <- Res10 ++ Res100 ++ Res200])
|
||||
%% ),
|
||||
%% ?assertEqual(
|
||||
%% ok,
|
||||
%% emqx_ds_storage_layer:discard_iterator(?SHARD, ReplayID)
|
||||
%% ),
|
||||
%% ?assertEqual(
|
||||
%% {error, not_found},
|
||||
%% emqx_ds_storage_layer:restore_iterator(?SHARD, ReplayID)
|
||||
%% ).
|
||||
|
||||
make_message(PublishedAt, Topic, Payload) when is_list(Topic) ->
|
||||
make_message(PublishedAt, list_to_binary(Topic), Payload);
|
||||
make_message(PublishedAt, Topic, Payload) when is_binary(Topic) ->
|
||||
ID = emqx_guid:gen(),
|
||||
#message{
|
||||
id = ID,
|
||||
from = <<?MODULE_STRING>>,
|
||||
topic = Topic,
|
||||
timestamp = PublishedAt,
|
||||
payload = Payload
|
||||
}.
|
||||
|
||||
make_deletion(Timestamp, Topic, Payload) ->
|
||||
{delete, #message_matcher{
|
||||
from = <<?MODULE_STRING>>,
|
||||
topic = Topic,
|
||||
timestamp = Timestamp,
|
||||
payload = Payload
|
||||
}}.
|
||||
|
||||
make_topic(Tokens = [_ | _]) ->
|
||||
emqx_topic:join([bin(T) || T <- Tokens]).
|
||||
|
||||
|
@ -535,13 +507,23 @@ end_per_suite(Config) ->
|
|||
ok.
|
||||
|
||||
init_per_testcase(TC, Config) ->
|
||||
ok = emqx_ds:open_db(TC, ?DB_CONFIG(Config)),
|
||||
ok = emqx_ds:open_db(TC, db_config(TC, Config)),
|
||||
Config.
|
||||
|
||||
end_per_testcase(TC, _Config) ->
|
||||
emqx_ds:drop_db(TC),
|
||||
ok.
|
||||
|
||||
db_config(TC, Config) ->
|
||||
ConfigBase = ?DB_CONFIG(Config),
|
||||
SpecificConfig =
|
||||
try
|
||||
?MODULE:TC(?FUNCTION_NAME, Config)
|
||||
catch
|
||||
error:undef -> #{}
|
||||
end,
|
||||
maps:merge(ConfigBase, SpecificConfig).
|
||||
|
||||
shard(TC) ->
|
||||
{TC, <<"0">>}.
|
||||
|
||||
|
|
Loading…
Reference in New Issue