fix(ds): LTS shall keeps the concrete topic indexes
This commit is contained in:
parent
7428e7037b
commit
56b6b176c2
|
@ -159,7 +159,7 @@ dump_to_dot(#trie{trie = Trie, stats = Stats}, Filename) ->
|
||||||
{ok, FD} = file:open(Filename, [write]),
|
{ok, FD} = file:open(Filename, [write]),
|
||||||
Print = fun
|
Print = fun
|
||||||
(?PREFIX) -> "prefix";
|
(?PREFIX) -> "prefix";
|
||||||
(NodeId) -> binary:encode_hex(NodeId)
|
(NodeId) -> integer_to_binary(NodeId, 16)
|
||||||
end,
|
end,
|
||||||
io:format(FD, "digraph {~n", []),
|
io:format(FD, "digraph {~n", []),
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
|
@ -190,12 +190,12 @@ trie_next(#trie{trie = Trie}, State, ?EOT) ->
|
||||||
[] -> undefined
|
[] -> undefined
|
||||||
end;
|
end;
|
||||||
trie_next(#trie{trie = Trie}, State, Token) ->
|
trie_next(#trie{trie = Trie}, State, Token) ->
|
||||||
case ets:lookup(Trie, {State, ?PLUS}) of
|
case ets:lookup(Trie, {State, Token}) of
|
||||||
[#trans{next = Next}] ->
|
[#trans{next = Next}] ->
|
||||||
{true, Next};
|
{false, Next};
|
||||||
[] ->
|
[] ->
|
||||||
case ets:lookup(Trie, {State, Token}) of
|
case ets:lookup(Trie, {State, ?PLUS}) of
|
||||||
[#trans{next = Next}] -> {false, Next};
|
[#trans{next = Next}] -> {true, Next};
|
||||||
[] -> undefined
|
[] -> undefined
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
@ -317,11 +317,11 @@ do_topic_key(Trie, ThresholdFun, Depth, State, [Tok | Rest], Varying0) ->
|
||||||
Threshold = ThresholdFun(Depth),
|
Threshold = ThresholdFun(Depth),
|
||||||
Varying =
|
Varying =
|
||||||
case trie_next_(Trie, State, Tok) of
|
case trie_next_(Trie, State, Tok) of
|
||||||
{NChildren, _, _DiscardState} when is_integer(NChildren), NChildren > Threshold ->
|
{NChildren, _, NextState} when is_integer(NChildren), NChildren >= Threshold ->
|
||||||
%% Number of children for the trie node reached the
|
%% Number of children for the trie node reached the
|
||||||
%% threshold, we need to insert wildcard here:
|
%% threshold, we need to insert wildcard here.
|
||||||
{_, NextState} = trie_insert(Trie, State, ?PLUS),
|
{_, _WildcardState} = trie_insert(Trie, State, ?PLUS),
|
||||||
[Tok | Varying0];
|
Varying0;
|
||||||
{_, false, NextState} ->
|
{_, false, NextState} ->
|
||||||
Varying0;
|
Varying0;
|
||||||
{_, true, NextState} ->
|
{_, true, NextState} ->
|
||||||
|
@ -331,6 +331,7 @@ do_topic_key(Trie, ThresholdFun, Depth, State, [Tok | Rest], Varying0) ->
|
||||||
end,
|
end,
|
||||||
do_topic_key(Trie, ThresholdFun, Depth + 1, NextState, Rest, Varying).
|
do_topic_key(Trie, ThresholdFun, Depth + 1, NextState, Rest, Varying).
|
||||||
|
|
||||||
|
%% @doc Has side effects! Inserts missing elements
|
||||||
-spec trie_next_(trie(), state(), binary() | ?EOT) -> {New, Wildcard, state()} when
|
-spec trie_next_(trie(), state(), binary() | ?EOT) -> {New, Wildcard, state()} when
|
||||||
New :: false | non_neg_integer(),
|
New :: false | non_neg_integer(),
|
||||||
Wildcard :: boolean().
|
Wildcard :: boolean().
|
||||||
|
@ -471,29 +472,36 @@ wildcard_lookup_test() ->
|
||||||
topic_key_test() ->
|
topic_key_test() ->
|
||||||
T = trie_create(),
|
T = trie_create(),
|
||||||
try
|
try
|
||||||
Threshold = 3,
|
Threshold = 4,
|
||||||
ThresholdFun = fun(0) -> 1000;
|
ThresholdFun = fun(0) -> 1000;
|
||||||
(_) -> Threshold
|
(_) -> Threshold
|
||||||
end,
|
end,
|
||||||
%% Test that bottom layer threshold is high:
|
%% Test that bottom layer threshold is high:
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
fun(I) ->
|
fun(I) ->
|
||||||
{_, []} = test_key(T, ThresholdFun, [I, 99, 99, 99])
|
{_, []} = test_key(T, ThresholdFun, [I, 99999, 999999, 99999])
|
||||||
end,
|
end,
|
||||||
lists:seq(1, 10)),
|
lists:seq(1, 10)),
|
||||||
%% Test adding children on the 2nd level:
|
%% Test adding children on the 2nd level:
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
fun(I) ->
|
fun(I) ->
|
||||||
case test_key(T, ThresholdFun, [1, I, 1]) of
|
case test_key(T, ThresholdFun, [1, I, 1]) of
|
||||||
{_, []} when I < Threshold ->
|
{_, []} ->
|
||||||
|
?assert(I < Threshold, {I, '<', Threshold}),
|
||||||
ok;
|
ok;
|
||||||
{_, [Var]} ->
|
{_, [Var]} ->
|
||||||
|
?assert(I >= Threshold, {I, '>=', Threshold}),
|
||||||
?assertEqual(Var, integer_to_binary(I))
|
?assertEqual(Var, integer_to_binary(I))
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
lists:seq(1, 100)),
|
lists:seq(1, 100)),
|
||||||
%% This doesn't affect 2nd level with a different prefix:
|
%% This doesn't affect 2nd level with a different prefix:
|
||||||
{_, []} = test_key(T, ThresholdFun, [2, 1, 1]),
|
?assertMatch({_, []}, test_key(T, ThresholdFun, [2, 1, 1])),
|
||||||
|
?assertMatch({_, []}, test_key(T, ThresholdFun, [2, 10, 1])),
|
||||||
|
%% This didn't retroactively change the indexes that were
|
||||||
|
%% created prior to reaching the threshold:
|
||||||
|
?assertMatch({_, []}, test_key(T, ThresholdFun, [1, 1, 1])),
|
||||||
|
?assertMatch({_, []}, test_key(T, ThresholdFun, [1, 2, 1])),
|
||||||
%% Now create another level of +:
|
%% Now create another level of +:
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
fun(I) ->
|
fun(I) ->
|
||||||
|
@ -531,28 +539,29 @@ topic_match_test() ->
|
||||||
assert_match_topics(T, [1, '+'], [{S11, []}, {S12, []}]),
|
assert_match_topics(T, [1, '+'], [{S11, []}, {S12, []}]),
|
||||||
assert_match_topics(T, [1, '+', 1], [{S111, []}]),
|
assert_match_topics(T, [1, '+', 1], [{S111, []}]),
|
||||||
%% Match topics with #:
|
%% Match topics with #:
|
||||||
assert_match_topics(T, [1, '#'], [{S1, []}, {S11, []}, {S12, []}, {S111, []}]),
|
assert_match_topics(T, [1, '#'],
|
||||||
assert_match_topics(T, [1, 1, '#'], [{S11, []}, {S111, []}]),
|
[{S1, []},
|
||||||
|
{S11, []}, {S12, []},
|
||||||
|
{S111, []}]),
|
||||||
|
assert_match_topics(T, [1, 1, '#'],
|
||||||
|
[{S11, []},
|
||||||
|
{S111, []}]),
|
||||||
%% Now add learned wildcards:
|
%% Now add learned wildcards:
|
||||||
{S21, []} = test_key(T, ThresholdFun, [2, 1]),
|
{S21, []} = test_key(T, ThresholdFun, [2, 1]),
|
||||||
{S22, []} = test_key(T, ThresholdFun, [2, 2]),
|
{S22, []} = test_key(T, ThresholdFun, [2, 2]),
|
||||||
{S2_, [<<"3">>]} = test_key(T, ThresholdFun, [2, 3]),
|
{S2_, [<<"3">>]} = test_key(T, ThresholdFun, [2, 3]),
|
||||||
{S2_11, [_]} = test_key(T, ThresholdFun, [2, 1, 1, 1]),
|
{S2_11, [<<"3">>]} = test_key(T, ThresholdFun, [2, 3, 1, 1]),
|
||||||
{S2_12, [_]} = test_key(T, ThresholdFun, [2, 1, 1, 2]),
|
{S2_12, [<<"4">>]} = test_key(T, ThresholdFun, [2, 4, 1, 2]),
|
||||||
{S2_1_, [_, _]} = test_key(T, ThresholdFun, [2, 1, 1, 3]),
|
{S2_1_, [<<"3">>, <<"3">>]} = test_key(T, ThresholdFun, [2, 3, 1, 3]),
|
||||||
%% Check matching:
|
%% %% Check matching:
|
||||||
assert_match_topics(T, [2, 2],
|
assert_match_topics(T, [2, 2],
|
||||||
[{S22, []}, {S2_, [<<"2">>]}]),
|
[{S22, []}, {S2_, [<<"2">>]}]),
|
||||||
assert_match_topics(T, [2, '+'],
|
assert_match_topics(T, [2, '+'],
|
||||||
[{S22, []}, {S21, []}, {S2_, ['+']}]),
|
[{S22, []}, {S21, []}, {S2_, ['+']}]),
|
||||||
assert_match_topics(T, [2, 1, 1, 2],
|
|
||||||
[{S2_12, [<<"1">>]},
|
|
||||||
{S2_1_, [<<"1">>, <<"2">>]}]),
|
|
||||||
assert_match_topics(T, [2, '#'],
|
assert_match_topics(T, [2, '#'],
|
||||||
[{S21, []}, {S22, []},
|
[{S21, []}, {S22, []},
|
||||||
{S2_, ['+']},
|
{S2_, ['+']},
|
||||||
{S2_11, ['+']}, {S2_12, ['+']},
|
{S2_11, ['+']}, {S2_12, ['+']}, {S2_1_, ['+', '+']}]),
|
||||||
{S2_1_, ['+', '+']}]),
|
|
||||||
ok
|
ok
|
||||||
after
|
after
|
||||||
dump_to_dot(T, filename:join("_build", atom_to_list(?FUNCTION_NAME) ++ ".dot"))
|
dump_to_dot(T, filename:join("_build", atom_to_list(?FUNCTION_NAME) ++ ".dot"))
|
||||||
|
@ -578,7 +587,10 @@ assert_match_topics(Trie, Filter0, Expected) ->
|
||||||
test_key(Trie, Threshold, Topic0) ->
|
test_key(Trie, Threshold, Topic0) ->
|
||||||
Topic = [integer_to_binary(I) || I <- Topic0],
|
Topic = [integer_to_binary(I) || I <- Topic0],
|
||||||
Ret = topic_key(Trie, Threshold, Topic),
|
Ret = topic_key(Trie, Threshold, Topic),
|
||||||
Ret = topic_key(Trie, Threshold, Topic), %% Test idempotency
|
%% Test idempotency:
|
||||||
|
Ret1 = topic_key(Trie, Threshold, Topic),
|
||||||
|
?assertEqual(Ret, Ret1, Topic),
|
||||||
|
%% Add new key to the history:
|
||||||
case get(?keys_history) of
|
case get(?keys_history) of
|
||||||
undefined -> OldHistory = #{};
|
undefined -> OldHistory = #{};
|
||||||
OldHistory -> ok
|
OldHistory -> ok
|
||||||
|
|
|
@ -168,7 +168,6 @@ t_get_streams(_Config) ->
|
||||||
%% ),
|
%% ),
|
||||||
%% ok.
|
%% ok.
|
||||||
|
|
||||||
|
|
||||||
%% t_create_gen(_Config) ->
|
%% t_create_gen(_Config) ->
|
||||||
%% {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 5, ?DEFAULT_CONFIG),
|
%% {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 5, ?DEFAULT_CONFIG),
|
||||||
%% ?assertEqual(
|
%% ?assertEqual(
|
||||||
|
|
Loading…
Reference in New Issue