From 22009bcc5899caa52a905a37d5864a91ccefaccd Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 17 Jun 2024 16:13:22 +0200 Subject: [PATCH] test(dsstore): verify that inherited TLS trie is persisted --- .../emqx_ds_storage_bitfield_lts_SUITE.erl | 42 +++++++++++++------ 1 file changed, 29 insertions(+), 13 deletions(-) diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl index bb6d0f917..004096431 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl @@ -177,20 +177,33 @@ t_new_generation_inherit_trie(_Config) -> ?check_trace( begin %% Create a bunch of topics to be learned in the first generation - Timestamps = lists:seq(1, 10_000, 100), - Batch = [ - begin - Topic = emqx_topic:join(["wildcard", integer_to_binary(I), "suffix", Suffix]), - {TS, make_message(TS, Topic, integer_to_binary(TS))} - end + TS1 = 500, + Batch1 = [ + {TS1, make_message(TS1, make_topic([wildcard, I, suffix, Suffix]), bin(I))} || I <- lists:seq(1, 200), - TS <- Timestamps, Suffix <- [<<"foo">>, <<"bar">>] ], - ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []), + ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch1, []), %% Now we create a new generation with the same LTS module. It should inherit the %% learned trie. - ok = emqx_ds_storage_layer:add_generation(?SHARD, _Since = 1000), + ok = emqx_ds_storage_layer:add_generation(?SHARD, _Since = 1_000), + %% Restart the shard, to verify that LTS is persisted. + ok = application:stop(emqx_durable_storage), + ok = application:start(emqx_durable_storage), + ok = emqx_ds:open_db(?FUNCTION_NAME, ?DEFAULT_CONFIG), + %% Store a batch of messages with the same set of topics. + TS2 = 1_500, + Batch2 = [ + {TS2, make_message(TS2, make_topic([wildcard, I, suffix, Suffix]), bin(I))} + || I <- lists:seq(1, 200), + Suffix <- [<<"foo">>, <<"bar">>] + ], + ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, []), + %% We should get only two streams for wildcard query, for "foo" and for "bar". + ?assertMatch( + [_Foo, _Bar], + emqx_ds_storage_layer:get_streams(?SHARD, [<<"wildcard">>, '#'], 1_000) + ), ok end, fun(Trace) -> @@ -211,10 +224,7 @@ t_replay(_Config) -> ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch1, []), %% Create wildcard topics `wildcard/+/suffix/foo' and `wildcard/+/suffix/bar': Batch2 = [ - begin - Topic = emqx_topic:join(["wildcard", integer_to_list(I), "suffix", Suffix]), - {TS, make_message(TS, Topic, integer_to_binary(TS))} - end + {TS, make_message(TS, make_topic([wildcard, I, suffix, Suffix]), bin(TS))} || I <- lists:seq(1, 200), TS <- Timestamps, Suffix <- [<<"foo">>, <<"bar">>] ], ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, []), @@ -475,6 +485,9 @@ make_message(PublishedAt, Topic, Payload) when is_binary(Topic) -> payload = Payload }. +make_topic(Tokens = [_ | _]) -> + emqx_topic:join([bin(T) || T <- Tokens]). + payloads(Messages) -> lists:map( fun(#message{payload = P}) -> @@ -488,6 +501,9 @@ parse_topic(Topic = [L | _]) when is_binary(L); is_atom(L) -> parse_topic(Topic) -> emqx_topic:words(iolist_to_binary(Topic)). +bin(X) -> + emqx_utils_conv:bin(X). + %% CT callbacks all() -> emqx_common_test_helpers:all(?MODULE).