perf(ds): inherit only LTS paths containing wildcards when adding a new generation
Fixes https://github.com/emqx/emqx/pull/12338#discussion_r1462139499
This commit is contained in:
parent
e4c683d6f8
commit
9003bc5b72
|
@ -20,7 +20,7 @@
|
||||||
-export([
|
-export([
|
||||||
trie_create/1, trie_create/0,
|
trie_create/1, trie_create/0,
|
||||||
trie_restore/2,
|
trie_restore/2,
|
||||||
trie_restore_existing/2,
|
trie_copy_learned_paths/2,
|
||||||
topic_key/3,
|
topic_key/3,
|
||||||
match_topics/2,
|
match_topics/2,
|
||||||
lookup_topic_key/2
|
lookup_topic_key/2
|
||||||
|
@ -120,10 +120,6 @@ trie_create() ->
|
||||||
-spec trie_restore(options(), [{_Key, _Val}]) -> trie().
|
-spec trie_restore(options(), [{_Key, _Val}]) -> trie().
|
||||||
trie_restore(Options, Dump) ->
|
trie_restore(Options, Dump) ->
|
||||||
Trie = trie_create(Options),
|
Trie = trie_create(Options),
|
||||||
trie_restore_existing(Trie, Dump).
|
|
||||||
|
|
||||||
-spec trie_restore_existing(trie(), [{_Key, _Val}]) -> trie().
|
|
||||||
trie_restore_existing(Trie, Dump) ->
|
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
fun({{StateFrom, Token}, StateTo}) ->
|
fun({{StateFrom, Token}, StateTo}) ->
|
||||||
trie_insert(Trie, StateFrom, Token, StateTo)
|
trie_insert(Trie, StateFrom, Token, StateTo)
|
||||||
|
@ -132,6 +128,17 @@ trie_restore_existing(Trie, Dump) ->
|
||||||
),
|
),
|
||||||
Trie.
|
Trie.
|
||||||
|
|
||||||
|
-spec trie_copy_learned_paths(trie(), trie()) -> trie().
|
||||||
|
trie_copy_learned_paths(OldTrie, NewTrie) ->
|
||||||
|
WildcardPaths = [P || P <- paths(OldTrie), contains_wildcard(P)],
|
||||||
|
lists:foreach(
|
||||||
|
fun({{StateFrom, Token}, StateTo}) ->
|
||||||
|
trie_insert(NewTrie, StateFrom, Token, StateTo)
|
||||||
|
end,
|
||||||
|
lists:flatten(WildcardPaths)
|
||||||
|
),
|
||||||
|
NewTrie.
|
||||||
|
|
||||||
%% @doc Lookup the topic key. Create a new one, if not found.
|
%% @doc Lookup the topic key. Create a new one, if not found.
|
||||||
-spec topic_key(trie(), threshold_fun(), [binary() | '']) -> msg_storage_key().
|
-spec topic_key(trie(), threshold_fun(), [binary() | '']) -> msg_storage_key().
|
||||||
topic_key(Trie, ThresholdFun, Tokens) ->
|
topic_key(Trie, ThresholdFun, Tokens) ->
|
||||||
|
@ -385,6 +392,41 @@ emanating(#trie{trie = Tab}, State, Token) when is_binary(Token); Token =:= '' -
|
||||||
ets:lookup(Tab, {State, Token})
|
ets:lookup(Tab, {State, Token})
|
||||||
].
|
].
|
||||||
|
|
||||||
|
all_emanating(#trie{trie = Tab}, State) ->
|
||||||
|
ets:select(
|
||||||
|
Tab,
|
||||||
|
ets:fun2ms(fun(#trans{key = {S, Edge}, next = Next}) when S == State ->
|
||||||
|
{{S, Edge}, Next}
|
||||||
|
end)
|
||||||
|
).
|
||||||
|
|
||||||
|
paths(#trie{} = T) ->
|
||||||
|
Roots = all_emanating(T, ?PREFIX),
|
||||||
|
lists:flatmap(
|
||||||
|
fun({Segment, Next}) ->
|
||||||
|
follow_path(T, Next, [{Segment, Next}])
|
||||||
|
end,
|
||||||
|
Roots
|
||||||
|
).
|
||||||
|
|
||||||
|
follow_path(#trie{} = T, State, Path) ->
|
||||||
|
lists:flatmap(
|
||||||
|
fun
|
||||||
|
({{_State, ?EOT}, _Next} = Segment) ->
|
||||||
|
[lists:reverse([Segment | Path])];
|
||||||
|
({_Edge, Next} = Segment) ->
|
||||||
|
follow_path(T, Next, [Segment | Path])
|
||||||
|
end,
|
||||||
|
all_emanating(T, State)
|
||||||
|
).
|
||||||
|
|
||||||
|
contains_wildcard([{{_State, ?PLUS}, _Next} | _Rest]) ->
|
||||||
|
true;
|
||||||
|
contains_wildcard([_ | Rest]) ->
|
||||||
|
contains_wildcard(Rest);
|
||||||
|
contains_wildcard([]) ->
|
||||||
|
false.
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% Tests
|
%% Tests
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
@ -636,4 +678,100 @@ test_key(Trie, Threshold, Topic0) ->
|
||||||
{ok, Ret} = lookup_topic_key(Trie, Topic),
|
{ok, Ret} = lookup_topic_key(Trie, Topic),
|
||||||
Ret.
|
Ret.
|
||||||
|
|
||||||
|
paths_test() ->
|
||||||
|
T = trie_create(),
|
||||||
|
Threshold = 4,
|
||||||
|
ThresholdFun = fun
|
||||||
|
(0) -> 1000;
|
||||||
|
(_) -> Threshold
|
||||||
|
end,
|
||||||
|
PathsToInsert =
|
||||||
|
[
|
||||||
|
[''],
|
||||||
|
[1],
|
||||||
|
[2, 2],
|
||||||
|
[3, 3, 3],
|
||||||
|
[2, 3, 4]
|
||||||
|
] ++ [[4, I, 4] || I <- lists:seq(1, Threshold + 2)] ++
|
||||||
|
[['', I, ''] || I <- lists:seq(1, Threshold + 2)],
|
||||||
|
lists:foreach(
|
||||||
|
fun(PathSpec) ->
|
||||||
|
test_key(T, ThresholdFun, PathSpec)
|
||||||
|
end,
|
||||||
|
PathsToInsert
|
||||||
|
),
|
||||||
|
|
||||||
|
%% Test that the paths we've inserted are produced in the output
|
||||||
|
Paths = paths(T),
|
||||||
|
FormattedPaths = lists:map(fun format_path/1, Paths),
|
||||||
|
ExpectedWildcardPaths =
|
||||||
|
[
|
||||||
|
[4, '+', 4],
|
||||||
|
['', '+', '']
|
||||||
|
],
|
||||||
|
ExpectedPaths =
|
||||||
|
[
|
||||||
|
[''],
|
||||||
|
[1],
|
||||||
|
[2, 2],
|
||||||
|
[3, 3, 3]
|
||||||
|
] ++ [[4, I, 4] || I <- lists:seq(1, Threshold)] ++
|
||||||
|
[['', I, ''] || I <- lists:seq(1, Threshold)] ++
|
||||||
|
ExpectedWildcardPaths,
|
||||||
|
FormatPathSpec =
|
||||||
|
fun(PathSpec) ->
|
||||||
|
lists:map(
|
||||||
|
fun
|
||||||
|
(I) when is_integer(I) -> integer_to_binary(I);
|
||||||
|
(A) -> A
|
||||||
|
end,
|
||||||
|
PathSpec
|
||||||
|
) ++ [?EOT]
|
||||||
|
end,
|
||||||
|
lists:foreach(
|
||||||
|
fun(PathSpec) ->
|
||||||
|
Path = FormatPathSpec(PathSpec),
|
||||||
|
?assert(
|
||||||
|
lists:member(Path, FormattedPaths),
|
||||||
|
#{
|
||||||
|
paths => FormattedPaths,
|
||||||
|
expected_path => Path
|
||||||
|
}
|
||||||
|
)
|
||||||
|
end,
|
||||||
|
ExpectedPaths
|
||||||
|
),
|
||||||
|
|
||||||
|
%% Test filter function for paths containing wildcards
|
||||||
|
WildcardPaths = lists:filter(fun contains_wildcard/1, Paths),
|
||||||
|
FormattedWildcardPaths = lists:map(fun format_path/1, WildcardPaths),
|
||||||
|
?assertEqual(
|
||||||
|
sets:from_list(FormattedWildcardPaths, [{version, 2}]),
|
||||||
|
sets:from_list(lists:map(FormatPathSpec, ExpectedWildcardPaths), [{version, 2}]),
|
||||||
|
#{
|
||||||
|
expected => ExpectedWildcardPaths,
|
||||||
|
wildcards => FormattedWildcardPaths
|
||||||
|
}
|
||||||
|
),
|
||||||
|
|
||||||
|
%% Test that we're able to reconstruct the same trie from the paths
|
||||||
|
T2 = trie_create(),
|
||||||
|
[
|
||||||
|
trie_insert(T2, State, Edge, Next)
|
||||||
|
|| Path <- Paths,
|
||||||
|
{{State, Edge}, Next} <- Path
|
||||||
|
],
|
||||||
|
#trie{trie = Tab1} = T,
|
||||||
|
#trie{trie = Tab2} = T2,
|
||||||
|
Dump1 = sets:from_list(ets:tab2list(Tab1), [{version, 2}]),
|
||||||
|
Dump2 = sets:from_list(ets:tab2list(Tab2), [{version, 2}]),
|
||||||
|
?assertEqual(Dump1, Dump2),
|
||||||
|
|
||||||
|
ok.
|
||||||
|
|
||||||
|
format_path([{{_State, Edge}, _Next} | Rest]) ->
|
||||||
|
[Edge | format_path(Rest)];
|
||||||
|
format_path([]) ->
|
||||||
|
[].
|
||||||
|
|
||||||
-endif.
|
-endif.
|
||||||
|
|
|
@ -205,17 +205,15 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) ->
|
||||||
s().
|
s().
|
||||||
post_creation_actions(
|
post_creation_actions(
|
||||||
#{
|
#{
|
||||||
db := DBHandle,
|
new_gen_runtime_data := NewGenData,
|
||||||
old_gen_id := OldGenId,
|
old_gen_runtime_data := OldGenData
|
||||||
old_cf_refs := OldCFRefs,
|
|
||||||
new_gen_runtime_data := NewGenData0
|
|
||||||
}
|
}
|
||||||
) ->
|
) ->
|
||||||
{_, OldTrieCF} = lists:keyfind(trie_cf(OldGenId), 1, OldCFRefs),
|
#s{trie = OldTrie} = OldGenData,
|
||||||
#s{trie = NewTrie0} = NewGenData0,
|
#s{trie = NewTrie0} = NewGenData,
|
||||||
NewTrie = copy_previous_trie(DBHandle, NewTrie0, OldTrieCF),
|
NewTrie = copy_previous_trie(OldTrie, NewTrie0),
|
||||||
?tp(bitfield_lts_inherited_trie, #{}),
|
?tp(bitfield_lts_inherited_trie, #{}),
|
||||||
NewGenData0#s{trie = NewTrie}.
|
NewGenData#s{trie = NewTrie}.
|
||||||
|
|
||||||
-spec drop(
|
-spec drop(
|
||||||
emqx_ds_storage_layer:shard_id(),
|
emqx_ds_storage_layer:shard_id(),
|
||||||
|
@ -533,16 +531,9 @@ restore_trie(TopicIndexBytes, DB, CF) ->
|
||||||
rocksdb:iterator_close(IT)
|
rocksdb:iterator_close(IT)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec copy_previous_trie(rocksdb:db_handle(), emqx_ds_lts:trie(), rocksdb:cf_handle()) ->
|
-spec copy_previous_trie(emqx_ds_lts:trie(), emqx_ds_lts:trie()) -> emqx_ds_lts:trie().
|
||||||
emqx_ds_lts:trie().
|
copy_previous_trie(OldTrie, NewTrie) ->
|
||||||
copy_previous_trie(DBHandle, NewTrie, OldCF) ->
|
emqx_ds_lts:trie_copy_learned_paths(OldTrie, NewTrie).
|
||||||
{ok, IT} = rocksdb:iterator(DBHandle, OldCF, []),
|
|
||||||
try
|
|
||||||
OldDump = read_persisted_trie(IT, rocksdb:iterator_move(IT, first)),
|
|
||||||
emqx_ds_lts:trie_restore_existing(NewTrie, OldDump)
|
|
||||||
after
|
|
||||||
rocksdb:iterator_close(IT)
|
|
||||||
end.
|
|
||||||
|
|
||||||
read_persisted_trie(IT, {ok, KeyB, ValB}) ->
|
read_persisted_trie(IT, {ok, KeyB, ValB}) ->
|
||||||
[
|
[
|
||||||
|
|
Loading…
Reference in New Issue