test(ds): Generalize storage layout test suite for different layouts

This commit is contained in:
ieQu1 2024-07-01 00:55:41 +02:00
parent 086e7256f5
commit 8c5e4a2376
No known key found for this signature in database
GPG Key ID: 488654DF3FED6FDE
1 changed files with 78 additions and 62 deletions

View File

@ -13,7 +13,7 @@
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ds_storage_bitfield_lts_SUITE).
-module(emqx_ds_storage_layout_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
@ -23,23 +23,34 @@
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("stdlib/include/assert.hrl").
-define(FUTURE, (1 bsl 64 - 1)).
-define(SHARD, shard(?FUNCTION_NAME)).
-define(DEFAULT_CONFIG, #{
-define(DB_CONFIG(CONFIG), #{
backend => builtin_local,
storage => {emqx_ds_storage_bitfield_lts, #{}},
storage => ?config(layout, CONFIG),
n_shards => 1
}).
-define(COMPACT_CONFIG, #{
backend => builtin_local,
storage =>
{emqx_ds_storage_bitfield_lts, #{
bits_per_wildcard_level => 8
}},
n_shards => 1,
replication_factor => 1
}).
all() ->
[
{group, bitfield_lts},
{group, skipstream_lts}
].
init_per_group(Group, Config) ->
LayoutConf =
case Group of
skipstream_lts ->
{emqx_ds_storage_skipstream_lts, #{with_guid => true}};
bitfield_lts ->
{emqx_ds_storage_bitfield_lts, #{}}
end,
[{layout, LayoutConf} | Config].
end_per_group(_Group, Config) ->
Config.
%% Smoke test of store function
t_store(_Config) ->
@ -53,7 +64,7 @@ t_store(_Config) ->
payload = Payload,
timestamp = PublishedAt
},
?assertMatch(ok, emqx_ds_storage_layer:store_batch(?SHARD, [{PublishedAt, Msg}], #{})).
?assertMatch(ok, emqx_ds:store_batch(?FUNCTION_NAME, [Msg])).
%% Smoke test for iteration through a concrete topic
t_iterate(_Config) ->
@ -61,15 +72,17 @@ t_iterate(_Config) ->
Topics = [<<"foo/bar">>, <<"foo/bar/baz">>, <<"a">>],
Timestamps = lists:seq(1, 10),
Batch = [
{PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))}
make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))
|| Topic <- Topics, PublishedAt <- Timestamps
],
ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, #{}),
ok = emqx_ds:store_batch(?FUNCTION_NAME, Batch),
%% Iterate through individual topics:
[
begin
[{_Rank, Stream}] = emqx_ds_storage_layer:get_streams(?SHARD, parse_topic(Topic), 0),
[{Rank, Stream}] = emqx_ds_storage_layer:get_streams(?SHARD, parse_topic(Topic), 0),
ct:pal("Streams for ~p: {~p, ~p}", [Topic, Rank, Stream]),
{ok, It} = emqx_ds_storage_layer:make_iterator(?SHARD, Stream, parse_topic(Topic), 0),
ct:pal("Iterator for ~p: ~p", [Topic, It]),
{ok, NextIt, MessagesAndKeys} = emqx_ds_storage_layer:next(
?SHARD, It, 100, emqx_ds:timestamp_us()
),
@ -91,10 +104,10 @@ t_delete(_Config) ->
Topics = [<<"foo/bar">>, TopicToDelete, <<"a">>],
Timestamps = lists:seq(1, 10),
Batch = [
{PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))}
make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))
|| Topic <- Topics, PublishedAt <- Timestamps
],
ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, #{}),
ok = emqx_ds:store_batch(?FUNCTION_NAME, Batch),
%% Iterate through topics:
StartTime = 0,
@ -109,23 +122,21 @@ t_delete(_Config) ->
Messages = [Msg || {_DSKey, Msg} <- replay(?SHARD, TopicFilter, StartTime)],
MessagesByTopic = maps:groups_from_list(fun emqx_message:topic/1, Messages),
?assertNot(is_map_key(TopicToDelete, MessagesByTopic), #{msgs => MessagesByTopic}),
?assertEqual(20, length(Messages)),
ok.
?assertEqual(20, length(Messages)).
-define(assertSameSet(A, B), ?assertEqual(lists:sort(A), lists:sort(B))).
%% Smoke test that verifies that concrete topics are mapped to
%% individual streams, unless there's too many of them.
t_get_streams(_Config) ->
t_get_streams(Config) ->
%% Prepare data (without wildcards):
Topics = [<<"foo/bar">>, <<"foo/bar/baz">>, <<"a">>],
Timestamps = lists:seq(1, 10),
Batch = [
{PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))}
make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))
|| Topic <- Topics, PublishedAt <- Timestamps
],
ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, #{}),
ok = emqx_ds:store_batch(?FUNCTION_NAME, Batch),
GetStream = fun(Topic) ->
StartTime = 0,
emqx_ds_storage_layer:get_streams(?SHARD, parse_topic(Topic), StartTime)
@ -136,7 +147,7 @@ t_get_streams(_Config) ->
[A] = GetStream(<<"a">>),
%% Restart shard to make sure trie is persisted and restored:
ok = emqx_ds:close_db(?FUNCTION_NAME),
ok = emqx_ds:open_db(?FUNCTION_NAME, ?DEFAULT_CONFIG),
ok = emqx_ds:open_db(?FUNCTION_NAME, ?DB_CONFIG(Config)),
%% Verify that there are no "ghost streams" for topics that don't
%% have any messages:
[] = GetStream(<<"bar/foo">>),
@ -148,11 +159,11 @@ t_get_streams(_Config) ->
NewBatch = [
begin
B = integer_to_binary(I),
{100, make_message(100, <<"foo/bar/", B/binary>>, <<"filler", B/binary>>)}
make_message(100, <<"foo/bar/", B/binary>>, <<"filler", B/binary>>)
end
|| I <- lists:seq(1, 200)
],
ok = emqx_ds_storage_layer:store_batch(?SHARD, NewBatch, #{}),
ok = emqx_ds:store_batch(?FUNCTION_NAME, NewBatch),
%% Check that "foo/bar/baz" topic now appears in two streams:
%% "foo/bar/baz" and "foo/bar/+":
NewStreams = lists:sort(GetStream("foo/bar/baz")),
@ -168,7 +179,7 @@ t_get_streams(_Config) ->
?assert(lists:member(A, AllStreams)),
ok.
t_new_generation_inherit_trie(_Config) ->
t_new_generation_inherit_trie(Config) ->
%% This test checks that we inherit the previous generation's LTS when creating a new
%% generation.
?check_trace(
@ -176,25 +187,25 @@ t_new_generation_inherit_trie(_Config) ->
%% Create a bunch of topics to be learned in the first generation
TS1 = 500,
Batch1 = [
{TS1, make_message(TS1, make_topic([wildcard, I, suffix, Suffix]), bin(I))}
make_message(TS1, make_topic([wildcard, I, suffix, Suffix]), bin(I))
|| I <- lists:seq(1, 200),
Suffix <- [<<"foo">>, <<"bar">>]
],
ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch1, #{}),
ok = emqx_ds:store_batch(?FUNCTION_NAME, Batch1),
%% Now we create a new generation with the same LTS module. It should inherit the
%% learned trie.
ok = emqx_ds_storage_layer:add_generation(?SHARD, _Since = 1_000),
%% Restart the shard, to verify that LTS is persisted.
ok = emqx_ds:close_db(?FUNCTION_NAME),
ok = emqx_ds:open_db(?FUNCTION_NAME, ?DEFAULT_CONFIG),
ok = emqx_ds:open_db(?FUNCTION_NAME, ?DB_CONFIG(Config)),
%% Store a batch of messages with the same set of topics.
TS2 = 1_500,
Batch2 = [
{TS2, make_message(TS2, make_topic([wildcard, I, suffix, Suffix]), bin(I))}
make_message(TS2, make_topic([wildcard, I, suffix, Suffix]), bin(I))
|| I <- lists:seq(1, 200),
Suffix <- [<<"foo">>, <<"bar">>]
],
ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, #{}),
ok = emqx_ds:store_batch(?FUNCTION_NAME, Batch2),
%% We should get only two streams for wildcard query, for "foo" and for "bar".
?assertMatch(
[_Foo, _Bar],
@ -203,29 +214,30 @@ t_new_generation_inherit_trie(_Config) ->
ok
end,
fun(Trace) ->
?assertMatch([_], ?of_kind(bitfield_lts_inherited_trie, Trace)),
?assertMatch([_], ?of_kind(layout_inherited_lts_trie, Trace)),
ok
end
),
ok.
t_replay(_Config) ->
t_replay(Config) ->
%% Create concrete topics:
Topics = [<<"foo/bar">>, <<"foo/bar/baz">>],
Timestamps = lists:seq(1, 10_000, 100),
Values = lists:seq(1, 1_000, 100),
Batch1 = [
{PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))}
|| Topic <- Topics, PublishedAt <- Timestamps
make_message(Val, Topic, bin(Val))
|| Topic <- Topics, Val <- Values
],
ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch1, #{}),
ok = emqx_ds:store_batch(?FUNCTION_NAME, Batch1),
%% Create wildcard topics `wildcard/+/suffix/foo' and `wildcard/+/suffix/bar':
Batch2 = [
{TS, make_message(TS, make_topic([wildcard, I, suffix, Suffix]), bin(TS))}
|| I <- lists:seq(1, 200), TS <- Timestamps, Suffix <- [<<"foo">>, <<"bar">>]
make_message(Val, make_topic([wildcard, Prefix, suffix, Suffix]), bin(Val))
|| Prefix <- lists:seq(1, 200), Val <- Values, Suffix <- [<<"foo">>, <<"bar">>]
],
ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, #{}),
ok = emqx_ds:store_batch(?FUNCTION_NAME, Batch2),
timer:sleep(5_000),
%% Check various topic filters:
Messages = [M || {_TS, M} <- Batch1 ++ Batch2],
Messages = Batch1 ++ Batch2,
%% Missing topics (no ghost messages):
?assertNot(check(?SHARD, <<"missing/foo/bar">>, 0, Messages)),
%% Regular topics:
@ -238,7 +250,7 @@ t_replay(_Config) ->
?assert(check(?SHARD, <<"+/+/baz">>, 0, Messages)),
%% Restart the DB to make sure trie is persisted and restored:
ok = emqx_ds:close_db(?FUNCTION_NAME),
ok = emqx_ds:open_db(?FUNCTION_NAME, ?DEFAULT_CONFIG),
ok = emqx_ds:open_db(?FUNCTION_NAME, ?DB_CONFIG(Config)),
%% Learned wildcard topics:
?assertNot(check(?SHARD, <<"wildcard/1000/suffix/foo">>, 0, [])),
?assert(check(?SHARD, <<"wildcard/1/suffix/foo">>, 0, Messages)),
@ -314,6 +326,9 @@ t_non_atomic_store_batch(_Config) ->
).
check(Shard, TopicFilter, StartTime, ExpectedMessages) ->
?tp(notice, ?MODULE_STRING "_check", #{
shard => Shard, tf => TopicFilter, start_time => StartTime
}),
ExpectedFiltered = lists:filter(
fun(#message{topic = Topic, timestamp = TS}) ->
emqx_topic:match(Topic, TopicFilter) andalso TS >= StartTime
@ -325,17 +340,9 @@ check(Shard, TopicFilter, StartTime, ExpectedMessages) ->
begin
Dump = dump_messages(Shard, TopicFilter, StartTime),
verify_dump(TopicFilter, StartTime, Dump),
Missing = ExpectedFiltered -- Dump,
Extras = Dump -- ExpectedFiltered,
?assertMatch(
#{missing := [], unexpected := []},
#{
missing => Missing,
unexpected => Extras,
topic_filter => TopicFilter,
start_time => StartTime
}
)
emqx_ds_test_helpers:assert_same_set(ExpectedFiltered, Dump, #{
topic_filter => TopicFilter, start_time => StartTime
})
end,
[]
),
@ -362,6 +369,7 @@ verify_dump(TopicFilter, StartTime, Dump) ->
dump_messages(Shard, TopicFilter, StartTime) ->
Streams = emqx_ds_storage_layer:get_streams(Shard, parse_topic(TopicFilter), StartTime),
ct:pal("Streams for ~p:~n ~p", [TopicFilter, Streams]),
lists:flatmap(
fun({_Rank, Stream}) ->
dump_stream(Shard, Stream, TopicFilter, StartTime)
@ -374,6 +382,7 @@ dump_stream(Shard, Stream, TopicFilter, StartTime) ->
{ok, Iterator} = emqx_ds_storage_layer:make_iterator(
Shard, Stream, parse_topic(TopicFilter), StartTime
),
ct:pal("Iterator for ~p at stream ~p:~n ~p", [TopicFilter, Stream, Iterator]),
Loop = fun
F(It, 0) ->
error({too_many_iterations, It});
@ -502,24 +511,31 @@ bin(X) ->
%% CT callbacks
all() -> emqx_common_test_helpers:all(?MODULE).
groups() ->
TCs = emqx_common_test_helpers:all(?MODULE),
[
{bitfield_lts, TCs},
{skipstream_lts, TCs}
].
suite() -> [{timetrap, {seconds, 20}}].
init_per_suite(Config) ->
emqx_common_test_helpers:clear_screen(),
WorkDir = emqx_cth_suite:work_dir(Config),
Apps = emqx_cth_suite:start(
[emqx_ds_builtin_local],
#{work_dir => emqx_cth_suite:work_dir(Config)}
#{work_dir => WorkDir}
),
[{apps, Apps} | Config].
[{apps, Apps}, {work_dir, WorkDir} | Config].
end_per_suite(Config) ->
Apps = ?config(apps, Config),
ok = emqx_cth_suite:stop(Apps),
emqx_cth_suite:clean_work_dir(?config(work_dir, Config)),
ok.
init_per_testcase(TC, Config) ->
ok = emqx_ds:open_db(TC, ?DEFAULT_CONFIG),
ok = emqx_ds:open_db(TC, ?DB_CONFIG(Config)),
Config.
end_per_testcase(TC, _Config) ->
@ -558,7 +574,7 @@ delete(Shard, Iterators, Selector) ->
fun(Iterator0, {AccIterators, NAcc}) ->
case
emqx_ds_storage_layer:delete_next(
Shard, Iterator0, Selector, 10, emqx_ds:timestamp_us()
Shard, Iterator0, Selector, 10, ?FUTURE
)
of
{ok, end_of_stream} ->
@ -591,7 +607,7 @@ replay(_Shard, []) ->
replay(Shard, Iterators) ->
{NewIterators0, Messages0} = lists:foldl(
fun(Iterator0, {AccIterators, AccMessages}) ->
case emqx_ds_storage_layer:next(Shard, Iterator0, 10, emqx_ds:timestamp_us()) of
case emqx_ds_storage_layer:next(Shard, Iterator0, 10, ?FUTURE) of
{ok, end_of_stream} ->
{AccIterators, AccMessages};
{ok, _Iterator1, []} ->