test(dsstore): verify that inherited TLS trie is persisted

This commit is contained in:
Andrew Mayorov 2024-06-17 16:13:22 +02:00
parent 68f6556856
commit 22009bcc58
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
1 changed files with 29 additions and 13 deletions

View File

@ -177,20 +177,33 @@ t_new_generation_inherit_trie(_Config) ->
?check_trace( ?check_trace(
begin begin
%% Create a bunch of topics to be learned in the first generation %% Create a bunch of topics to be learned in the first generation
Timestamps = lists:seq(1, 10_000, 100), TS1 = 500,
Batch = [ Batch1 = [
begin {TS1, make_message(TS1, make_topic([wildcard, I, suffix, Suffix]), bin(I))}
Topic = emqx_topic:join(["wildcard", integer_to_binary(I), "suffix", Suffix]),
{TS, make_message(TS, Topic, integer_to_binary(TS))}
end
|| I <- lists:seq(1, 200), || I <- lists:seq(1, 200),
TS <- Timestamps,
Suffix <- [<<"foo">>, <<"bar">>] Suffix <- [<<"foo">>, <<"bar">>]
], ],
ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []), ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch1, []),
%% Now we create a new generation with the same LTS module. It should inherit the %% Now we create a new generation with the same LTS module. It should inherit the
%% learned trie. %% learned trie.
ok = emqx_ds_storage_layer:add_generation(?SHARD, _Since = 1000), ok = emqx_ds_storage_layer:add_generation(?SHARD, _Since = 1_000),
%% Restart the shard, to verify that LTS is persisted.
ok = application:stop(emqx_durable_storage),
ok = application:start(emqx_durable_storage),
ok = emqx_ds:open_db(?FUNCTION_NAME, ?DEFAULT_CONFIG),
%% Store a batch of messages with the same set of topics.
TS2 = 1_500,
Batch2 = [
{TS2, make_message(TS2, make_topic([wildcard, I, suffix, Suffix]), bin(I))}
|| I <- lists:seq(1, 200),
Suffix <- [<<"foo">>, <<"bar">>]
],
ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, []),
%% We should get only two streams for wildcard query, for "foo" and for "bar".
?assertMatch(
[_Foo, _Bar],
emqx_ds_storage_layer:get_streams(?SHARD, [<<"wildcard">>, '#'], 1_000)
),
ok ok
end, end,
fun(Trace) -> fun(Trace) ->
@ -211,10 +224,7 @@ t_replay(_Config) ->
ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch1, []), ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch1, []),
%% Create wildcard topics `wildcard/+/suffix/foo' and `wildcard/+/suffix/bar': %% Create wildcard topics `wildcard/+/suffix/foo' and `wildcard/+/suffix/bar':
Batch2 = [ Batch2 = [
begin {TS, make_message(TS, make_topic([wildcard, I, suffix, Suffix]), bin(TS))}
Topic = emqx_topic:join(["wildcard", integer_to_list(I), "suffix", Suffix]),
{TS, make_message(TS, Topic, integer_to_binary(TS))}
end
|| I <- lists:seq(1, 200), TS <- Timestamps, Suffix <- [<<"foo">>, <<"bar">>] || I <- lists:seq(1, 200), TS <- Timestamps, Suffix <- [<<"foo">>, <<"bar">>]
], ],
ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, []), ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, []),
@ -475,6 +485,9 @@ make_message(PublishedAt, Topic, Payload) when is_binary(Topic) ->
payload = Payload payload = Payload
}. }.
make_topic(Tokens = [_ | _]) ->
emqx_topic:join([bin(T) || T <- Tokens]).
payloads(Messages) -> payloads(Messages) ->
lists:map( lists:map(
fun(#message{payload = P}) -> fun(#message{payload = P}) ->
@ -488,6 +501,9 @@ parse_topic(Topic = [L | _]) when is_binary(L); is_atom(L) ->
parse_topic(Topic) -> parse_topic(Topic) ->
emqx_topic:words(iolist_to_binary(Topic)). emqx_topic:words(iolist_to_binary(Topic)).
bin(X) ->
emqx_utils_conv:bin(X).
%% CT callbacks %% CT callbacks
all() -> emqx_common_test_helpers:all(?MODULE). all() -> emqx_common_test_helpers:all(?MODULE).