perf(ds): inherit only LTS paths containing wildcards when adding a new generation
Fixes https://github.com/emqx/emqx/pull/12338#discussion_r1462139499
This commit is contained in:
parent
d122340c13
commit
1eb47d0c16
|
@ -20,7 +20,7 @@
|
|||
-export([
|
||||
trie_create/1, trie_create/0,
|
||||
trie_restore/2,
|
||||
trie_restore_existing/2,
|
||||
trie_copy_learned_paths/2,
|
||||
topic_key/3,
|
||||
match_topics/2,
|
||||
lookup_topic_key/2
|
||||
|
@ -120,10 +120,6 @@ trie_create() ->
|
|||
-spec trie_restore(options(), [{_Key, _Val}]) -> trie().
|
||||
trie_restore(Options, Dump) ->
|
||||
Trie = trie_create(Options),
|
||||
trie_restore_existing(Trie, Dump).
|
||||
|
||||
-spec trie_restore_existing(trie(), [{_Key, _Val}]) -> trie().
|
||||
trie_restore_existing(Trie, Dump) ->
|
||||
lists:foreach(
|
||||
fun({{StateFrom, Token}, StateTo}) ->
|
||||
trie_insert(Trie, StateFrom, Token, StateTo)
|
||||
|
@ -132,6 +128,17 @@ trie_restore_existing(Trie, Dump) ->
|
|||
),
|
||||
Trie.
|
||||
|
||||
-spec trie_copy_learned_paths(trie(), trie()) -> trie().
|
||||
trie_copy_learned_paths(OldTrie, NewTrie) ->
|
||||
WildcardPaths = [P || P <- paths(OldTrie), contains_wildcard(P)],
|
||||
lists:foreach(
|
||||
fun({{StateFrom, Token}, StateTo}) ->
|
||||
trie_insert(NewTrie, StateFrom, Token, StateTo)
|
||||
end,
|
||||
lists:flatten(WildcardPaths)
|
||||
),
|
||||
NewTrie.
|
||||
|
||||
%% @doc Lookup the topic key. Create a new one, if not found.
|
||||
-spec topic_key(trie(), threshold_fun(), [binary() | '']) -> msg_storage_key().
|
||||
topic_key(Trie, ThresholdFun, Tokens) ->
|
||||
|
@ -385,6 +392,41 @@ emanating(#trie{trie = Tab}, State, Token) when is_binary(Token); Token =:= '' -
|
|||
ets:lookup(Tab, {State, Token})
|
||||
].
|
||||
|
||||
all_emanating(#trie{trie = Tab}, State) ->
|
||||
ets:select(
|
||||
Tab,
|
||||
ets:fun2ms(fun(#trans{key = {S, Edge}, next = Next}) when S == State ->
|
||||
{{S, Edge}, Next}
|
||||
end)
|
||||
).
|
||||
|
||||
paths(#trie{} = T) ->
|
||||
Roots = all_emanating(T, ?PREFIX),
|
||||
lists:flatmap(
|
||||
fun({Segment, Next}) ->
|
||||
follow_path(T, Next, [{Segment, Next}])
|
||||
end,
|
||||
Roots
|
||||
).
|
||||
|
||||
follow_path(#trie{} = T, State, Path) ->
|
||||
lists:flatmap(
|
||||
fun
|
||||
({{_State, ?EOT}, _Next} = Segment) ->
|
||||
[lists:reverse([Segment | Path])];
|
||||
({_Edge, Next} = Segment) ->
|
||||
follow_path(T, Next, [Segment | Path])
|
||||
end,
|
||||
all_emanating(T, State)
|
||||
).
|
||||
|
||||
contains_wildcard([{{_State, ?PLUS}, _Next} | _Rest]) ->
|
||||
true;
|
||||
contains_wildcard([_ | Rest]) ->
|
||||
contains_wildcard(Rest);
|
||||
contains_wildcard([]) ->
|
||||
false.
|
||||
|
||||
%%================================================================================
|
||||
%% Tests
|
||||
%%================================================================================
|
||||
|
@ -636,4 +678,100 @@ test_key(Trie, Threshold, Topic0) ->
|
|||
{ok, Ret} = lookup_topic_key(Trie, Topic),
|
||||
Ret.
|
||||
|
||||
paths_test() ->
|
||||
T = trie_create(),
|
||||
Threshold = 4,
|
||||
ThresholdFun = fun
|
||||
(0) -> 1000;
|
||||
(_) -> Threshold
|
||||
end,
|
||||
PathsToInsert =
|
||||
[
|
||||
[''],
|
||||
[1],
|
||||
[2, 2],
|
||||
[3, 3, 3],
|
||||
[2, 3, 4]
|
||||
] ++ [[4, I, 4] || I <- lists:seq(1, Threshold + 2)] ++
|
||||
[['', I, ''] || I <- lists:seq(1, Threshold + 2)],
|
||||
lists:foreach(
|
||||
fun(PathSpec) ->
|
||||
test_key(T, ThresholdFun, PathSpec)
|
||||
end,
|
||||
PathsToInsert
|
||||
),
|
||||
|
||||
%% Test that the paths we've inserted are produced in the output
|
||||
Paths = paths(T),
|
||||
FormattedPaths = lists:map(fun format_path/1, Paths),
|
||||
ExpectedWildcardPaths =
|
||||
[
|
||||
[4, '+', 4],
|
||||
['', '+', '']
|
||||
],
|
||||
ExpectedPaths =
|
||||
[
|
||||
[''],
|
||||
[1],
|
||||
[2, 2],
|
||||
[3, 3, 3]
|
||||
] ++ [[4, I, 4] || I <- lists:seq(1, Threshold)] ++
|
||||
[['', I, ''] || I <- lists:seq(1, Threshold)] ++
|
||||
ExpectedWildcardPaths,
|
||||
FormatPathSpec =
|
||||
fun(PathSpec) ->
|
||||
lists:map(
|
||||
fun
|
||||
(I) when is_integer(I) -> integer_to_binary(I);
|
||||
(A) -> A
|
||||
end,
|
||||
PathSpec
|
||||
) ++ [?EOT]
|
||||
end,
|
||||
lists:foreach(
|
||||
fun(PathSpec) ->
|
||||
Path = FormatPathSpec(PathSpec),
|
||||
?assert(
|
||||
lists:member(Path, FormattedPaths),
|
||||
#{
|
||||
paths => FormattedPaths,
|
||||
expected_path => Path
|
||||
}
|
||||
)
|
||||
end,
|
||||
ExpectedPaths
|
||||
),
|
||||
|
||||
%% Test filter function for paths containing wildcards
|
||||
WildcardPaths = lists:filter(fun contains_wildcard/1, Paths),
|
||||
FormattedWildcardPaths = lists:map(fun format_path/1, WildcardPaths),
|
||||
?assertEqual(
|
||||
sets:from_list(FormattedWildcardPaths, [{version, 2}]),
|
||||
sets:from_list(lists:map(FormatPathSpec, ExpectedWildcardPaths), [{version, 2}]),
|
||||
#{
|
||||
expected => ExpectedWildcardPaths,
|
||||
wildcards => FormattedWildcardPaths
|
||||
}
|
||||
),
|
||||
|
||||
%% Test that we're able to reconstruct the same trie from the paths
|
||||
T2 = trie_create(),
|
||||
[
|
||||
trie_insert(T2, State, Edge, Next)
|
||||
|| Path <- Paths,
|
||||
{{State, Edge}, Next} <- Path
|
||||
],
|
||||
#trie{trie = Tab1} = T,
|
||||
#trie{trie = Tab2} = T2,
|
||||
Dump1 = sets:from_list(ets:tab2list(Tab1), [{version, 2}]),
|
||||
Dump2 = sets:from_list(ets:tab2list(Tab2), [{version, 2}]),
|
||||
?assertEqual(Dump1, Dump2),
|
||||
|
||||
ok.
|
||||
|
||||
format_path([{{_State, Edge}, _Next} | Rest]) ->
|
||||
[Edge | format_path(Rest)];
|
||||
format_path([]) ->
|
||||
[].
|
||||
|
||||
-endif.
|
||||
|
|
|
@ -205,17 +205,15 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) ->
|
|||
s().
|
||||
post_creation_actions(
|
||||
#{
|
||||
db := DBHandle,
|
||||
old_gen_id := OldGenId,
|
||||
old_cf_refs := OldCFRefs,
|
||||
new_gen_runtime_data := NewGenData0
|
||||
new_gen_runtime_data := NewGenData,
|
||||
old_gen_runtime_data := OldGenData
|
||||
}
|
||||
) ->
|
||||
{_, OldTrieCF} = lists:keyfind(trie_cf(OldGenId), 1, OldCFRefs),
|
||||
#s{trie = NewTrie0} = NewGenData0,
|
||||
NewTrie = copy_previous_trie(DBHandle, NewTrie0, OldTrieCF),
|
||||
#s{trie = OldTrie} = OldGenData,
|
||||
#s{trie = NewTrie0} = NewGenData,
|
||||
NewTrie = copy_previous_trie(OldTrie, NewTrie0),
|
||||
?tp(bitfield_lts_inherited_trie, #{}),
|
||||
NewGenData0#s{trie = NewTrie}.
|
||||
NewGenData#s{trie = NewTrie}.
|
||||
|
||||
-spec drop(
|
||||
emqx_ds_storage_layer:shard_id(),
|
||||
|
@ -533,16 +531,9 @@ restore_trie(TopicIndexBytes, DB, CF) ->
|
|||
rocksdb:iterator_close(IT)
|
||||
end.
|
||||
|
||||
-spec copy_previous_trie(rocksdb:db_handle(), emqx_ds_lts:trie(), rocksdb:cf_handle()) ->
|
||||
emqx_ds_lts:trie().
|
||||
copy_previous_trie(DBHandle, NewTrie, OldCF) ->
|
||||
{ok, IT} = rocksdb:iterator(DBHandle, OldCF, []),
|
||||
try
|
||||
OldDump = read_persisted_trie(IT, rocksdb:iterator_move(IT, first)),
|
||||
emqx_ds_lts:trie_restore_existing(NewTrie, OldDump)
|
||||
after
|
||||
rocksdb:iterator_close(IT)
|
||||
end.
|
||||
-spec copy_previous_trie(emqx_ds_lts:trie(), emqx_ds_lts:trie()) -> emqx_ds_lts:trie().
|
||||
copy_previous_trie(OldTrie, NewTrie) ->
|
||||
emqx_ds_lts:trie_copy_learned_paths(OldTrie, NewTrie).
|
||||
|
||||
read_persisted_trie(IT, {ok, KeyB, ValB}) ->
|
||||
[
|
||||
|
|
Loading…
Reference in New Issue