From 56b6b176c2c12a986cb007b91017bf2edba81e4e Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Fri, 13 Oct 2023 19:50:18 +0200 Subject: [PATCH] fix(ds): LTS shall keeps the concrete topic indexes --- apps/emqx_durable_storage/src/emqx_ds_lts.erl | 62 +++++++++++-------- .../emqx_ds_storage_bitfield_lts_SUITE.erl | 1 - 2 files changed, 37 insertions(+), 26 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_lts.erl index a6e67c069..c9a73e3e0 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_lts.erl @@ -159,7 +159,7 @@ dump_to_dot(#trie{trie = Trie, stats = Stats}, Filename) -> {ok, FD} = file:open(Filename, [write]), Print = fun (?PREFIX) -> "prefix"; - (NodeId) -> binary:encode_hex(NodeId) + (NodeId) -> integer_to_binary(NodeId, 16) end, io:format(FD, "digraph {~n", []), lists:foreach( @@ -190,12 +190,12 @@ trie_next(#trie{trie = Trie}, State, ?EOT) -> [] -> undefined end; trie_next(#trie{trie = Trie}, State, Token) -> - case ets:lookup(Trie, {State, ?PLUS}) of + case ets:lookup(Trie, {State, Token}) of [#trans{next = Next}] -> - {true, Next}; + {false, Next}; [] -> - case ets:lookup(Trie, {State, Token}) of - [#trans{next = Next}] -> {false, Next}; + case ets:lookup(Trie, {State, ?PLUS}) of + [#trans{next = Next}] -> {true, Next}; [] -> undefined end end. @@ -317,11 +317,11 @@ do_topic_key(Trie, ThresholdFun, Depth, State, [Tok | Rest], Varying0) -> Threshold = ThresholdFun(Depth), Varying = case trie_next_(Trie, State, Tok) of - {NChildren, _, _DiscardState} when is_integer(NChildren), NChildren > Threshold -> + {NChildren, _, NextState} when is_integer(NChildren), NChildren >= Threshold -> %% Number of children for the trie node reached the - %% threshold, we need to insert wildcard here: - {_, NextState} = trie_insert(Trie, State, ?PLUS), - [Tok | Varying0]; + %% threshold, we need to insert wildcard here. + {_, _WildcardState} = trie_insert(Trie, State, ?PLUS), + Varying0; {_, false, NextState} -> Varying0; {_, true, NextState} -> @@ -331,6 +331,7 @@ do_topic_key(Trie, ThresholdFun, Depth, State, [Tok | Rest], Varying0) -> end, do_topic_key(Trie, ThresholdFun, Depth + 1, NextState, Rest, Varying). +%% @doc Has side effects! Inserts missing elements -spec trie_next_(trie(), state(), binary() | ?EOT) -> {New, Wildcard, state()} when New :: false | non_neg_integer(), Wildcard :: boolean(). @@ -471,29 +472,36 @@ wildcard_lookup_test() -> topic_key_test() -> T = trie_create(), try - Threshold = 3, + Threshold = 4, ThresholdFun = fun(0) -> 1000; (_) -> Threshold end, %% Test that bottom layer threshold is high: lists:foreach( fun(I) -> - {_, []} = test_key(T, ThresholdFun, [I, 99, 99, 99]) + {_, []} = test_key(T, ThresholdFun, [I, 99999, 999999, 99999]) end, lists:seq(1, 10)), %% Test adding children on the 2nd level: lists:foreach( fun(I) -> case test_key(T, ThresholdFun, [1, I, 1]) of - {_, []} when I < Threshold -> + {_, []} -> + ?assert(I < Threshold, {I, '<', Threshold}), ok; {_, [Var]} -> + ?assert(I >= Threshold, {I, '>=', Threshold}), ?assertEqual(Var, integer_to_binary(I)) end end, lists:seq(1, 100)), %% This doesn't affect 2nd level with a different prefix: - {_, []} = test_key(T, ThresholdFun, [2, 1, 1]), + ?assertMatch({_, []}, test_key(T, ThresholdFun, [2, 1, 1])), + ?assertMatch({_, []}, test_key(T, ThresholdFun, [2, 10, 1])), + %% This didn't retroactively change the indexes that were + %% created prior to reaching the threshold: + ?assertMatch({_, []}, test_key(T, ThresholdFun, [1, 1, 1])), + ?assertMatch({_, []}, test_key(T, ThresholdFun, [1, 2, 1])), %% Now create another level of +: lists:foreach( fun(I) -> @@ -531,28 +539,29 @@ topic_match_test() -> assert_match_topics(T, [1, '+'], [{S11, []}, {S12, []}]), assert_match_topics(T, [1, '+', 1], [{S111, []}]), %% Match topics with #: - assert_match_topics(T, [1, '#'], [{S1, []}, {S11, []}, {S12, []}, {S111, []}]), - assert_match_topics(T, [1, 1, '#'], [{S11, []}, {S111, []}]), + assert_match_topics(T, [1, '#'], + [{S1, []}, + {S11, []}, {S12, []}, + {S111, []}]), + assert_match_topics(T, [1, 1, '#'], + [{S11, []}, + {S111, []}]), %% Now add learned wildcards: {S21, []} = test_key(T, ThresholdFun, [2, 1]), {S22, []} = test_key(T, ThresholdFun, [2, 2]), {S2_, [<<"3">>]} = test_key(T, ThresholdFun, [2, 3]), - {S2_11, [_]} = test_key(T, ThresholdFun, [2, 1, 1, 1]), - {S2_12, [_]} = test_key(T, ThresholdFun, [2, 1, 1, 2]), - {S2_1_, [_, _]} = test_key(T, ThresholdFun, [2, 1, 1, 3]), - %% Check matching: + {S2_11, [<<"3">>]} = test_key(T, ThresholdFun, [2, 3, 1, 1]), + {S2_12, [<<"4">>]} = test_key(T, ThresholdFun, [2, 4, 1, 2]), + {S2_1_, [<<"3">>, <<"3">>]} = test_key(T, ThresholdFun, [2, 3, 1, 3]), + %% %% Check matching: assert_match_topics(T, [2, 2], [{S22, []}, {S2_, [<<"2">>]}]), assert_match_topics(T, [2, '+'], [{S22, []}, {S21, []}, {S2_, ['+']}]), - assert_match_topics(T, [2, 1, 1, 2], - [{S2_12, [<<"1">>]}, - {S2_1_, [<<"1">>, <<"2">>]}]), assert_match_topics(T, [2, '#'], [{S21, []}, {S22, []}, {S2_, ['+']}, - {S2_11, ['+']}, {S2_12, ['+']}, - {S2_1_, ['+', '+']}]), + {S2_11, ['+']}, {S2_12, ['+']}, {S2_1_, ['+', '+']}]), ok after dump_to_dot(T, filename:join("_build", atom_to_list(?FUNCTION_NAME) ++ ".dot")) @@ -578,7 +587,10 @@ assert_match_topics(Trie, Filter0, Expected) -> test_key(Trie, Threshold, Topic0) -> Topic = [integer_to_binary(I) || I <- Topic0], Ret = topic_key(Trie, Threshold, Topic), - Ret = topic_key(Trie, Threshold, Topic), %% Test idempotency + %% Test idempotency: + Ret1 = topic_key(Trie, Threshold, Topic), + ?assertEqual(Ret, Ret1, Topic), + %% Add new key to the history: case get(?keys_history) of undefined -> OldHistory = #{}; OldHistory -> ok diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl index f9a7b02c4..22a608a7f 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl @@ -168,7 +168,6 @@ t_get_streams(_Config) -> %% ), %% ok. - %% t_create_gen(_Config) -> %% {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 5, ?DEFAULT_CONFIG), %% ?assertEqual(