From 1eb47d0c16a3a592c5d3eb0693ff7536f31e7ba9 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 22 Jan 2024 18:18:17 -0300 Subject: [PATCH 1/2] perf(ds): inherit only LTS paths containing wildcards when adding a new generation Fixes https://github.com/emqx/emqx/pull/12338#discussion_r1462139499 --- apps/emqx_durable_storage/src/emqx_ds_lts.erl | 148 +++++++++++++++++- .../src/emqx_ds_storage_bitfield_lts.erl | 27 ++-- 2 files changed, 152 insertions(+), 23 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_lts.erl index 9d87cf571..226af62f0 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_lts.erl @@ -20,7 +20,7 @@ -export([ trie_create/1, trie_create/0, trie_restore/2, - trie_restore_existing/2, + trie_copy_learned_paths/2, topic_key/3, match_topics/2, lookup_topic_key/2 @@ -120,10 +120,6 @@ trie_create() -> -spec trie_restore(options(), [{_Key, _Val}]) -> trie(). trie_restore(Options, Dump) -> Trie = trie_create(Options), - trie_restore_existing(Trie, Dump). - --spec trie_restore_existing(trie(), [{_Key, _Val}]) -> trie(). -trie_restore_existing(Trie, Dump) -> lists:foreach( fun({{StateFrom, Token}, StateTo}) -> trie_insert(Trie, StateFrom, Token, StateTo) @@ -132,6 +128,17 @@ trie_restore_existing(Trie, Dump) -> ), Trie. +-spec trie_copy_learned_paths(trie(), trie()) -> trie(). +trie_copy_learned_paths(OldTrie, NewTrie) -> + WildcardPaths = [P || P <- paths(OldTrie), contains_wildcard(P)], + lists:foreach( + fun({{StateFrom, Token}, StateTo}) -> + trie_insert(NewTrie, StateFrom, Token, StateTo) + end, + lists:flatten(WildcardPaths) + ), + NewTrie. + %% @doc Lookup the topic key. Create a new one, if not found. -spec topic_key(trie(), threshold_fun(), [binary() | '']) -> msg_storage_key(). topic_key(Trie, ThresholdFun, Tokens) -> @@ -385,6 +392,41 @@ emanating(#trie{trie = Tab}, State, Token) when is_binary(Token); Token =:= '' - ets:lookup(Tab, {State, Token}) ]. +all_emanating(#trie{trie = Tab}, State) -> + ets:select( + Tab, + ets:fun2ms(fun(#trans{key = {S, Edge}, next = Next}) when S == State -> + {{S, Edge}, Next} + end) + ). + +paths(#trie{} = T) -> + Roots = all_emanating(T, ?PREFIX), + lists:flatmap( + fun({Segment, Next}) -> + follow_path(T, Next, [{Segment, Next}]) + end, + Roots + ). + +follow_path(#trie{} = T, State, Path) -> + lists:flatmap( + fun + ({{_State, ?EOT}, _Next} = Segment) -> + [lists:reverse([Segment | Path])]; + ({_Edge, Next} = Segment) -> + follow_path(T, Next, [Segment | Path]) + end, + all_emanating(T, State) + ). + +contains_wildcard([{{_State, ?PLUS}, _Next} | _Rest]) -> + true; +contains_wildcard([_ | Rest]) -> + contains_wildcard(Rest); +contains_wildcard([]) -> + false. + %%================================================================================ %% Tests %%================================================================================ @@ -636,4 +678,100 @@ test_key(Trie, Threshold, Topic0) -> {ok, Ret} = lookup_topic_key(Trie, Topic), Ret. +paths_test() -> + T = trie_create(), + Threshold = 4, + ThresholdFun = fun + (0) -> 1000; + (_) -> Threshold + end, + PathsToInsert = + [ + [''], + [1], + [2, 2], + [3, 3, 3], + [2, 3, 4] + ] ++ [[4, I, 4] || I <- lists:seq(1, Threshold + 2)] ++ + [['', I, ''] || I <- lists:seq(1, Threshold + 2)], + lists:foreach( + fun(PathSpec) -> + test_key(T, ThresholdFun, PathSpec) + end, + PathsToInsert + ), + + %% Test that the paths we've inserted are produced in the output + Paths = paths(T), + FormattedPaths = lists:map(fun format_path/1, Paths), + ExpectedWildcardPaths = + [ + [4, '+', 4], + ['', '+', ''] + ], + ExpectedPaths = + [ + [''], + [1], + [2, 2], + [3, 3, 3] + ] ++ [[4, I, 4] || I <- lists:seq(1, Threshold)] ++ + [['', I, ''] || I <- lists:seq(1, Threshold)] ++ + ExpectedWildcardPaths, + FormatPathSpec = + fun(PathSpec) -> + lists:map( + fun + (I) when is_integer(I) -> integer_to_binary(I); + (A) -> A + end, + PathSpec + ) ++ [?EOT] + end, + lists:foreach( + fun(PathSpec) -> + Path = FormatPathSpec(PathSpec), + ?assert( + lists:member(Path, FormattedPaths), + #{ + paths => FormattedPaths, + expected_path => Path + } + ) + end, + ExpectedPaths + ), + + %% Test filter function for paths containing wildcards + WildcardPaths = lists:filter(fun contains_wildcard/1, Paths), + FormattedWildcardPaths = lists:map(fun format_path/1, WildcardPaths), + ?assertEqual( + sets:from_list(FormattedWildcardPaths, [{version, 2}]), + sets:from_list(lists:map(FormatPathSpec, ExpectedWildcardPaths), [{version, 2}]), + #{ + expected => ExpectedWildcardPaths, + wildcards => FormattedWildcardPaths + } + ), + + %% Test that we're able to reconstruct the same trie from the paths + T2 = trie_create(), + [ + trie_insert(T2, State, Edge, Next) + || Path <- Paths, + {{State, Edge}, Next} <- Path + ], + #trie{trie = Tab1} = T, + #trie{trie = Tab2} = T2, + Dump1 = sets:from_list(ets:tab2list(Tab1), [{version, 2}]), + Dump2 = sets:from_list(ets:tab2list(Tab2), [{version, 2}]), + ?assertEqual(Dump1, Dump2), + + ok. + +format_path([{{_State, Edge}, _Next} | Rest]) -> + [Edge | format_path(Rest)]; +format_path([]) -> + []. + -endif. diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index 2a3086a57..d407dab41 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -205,17 +205,15 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) -> s(). post_creation_actions( #{ - db := DBHandle, - old_gen_id := OldGenId, - old_cf_refs := OldCFRefs, - new_gen_runtime_data := NewGenData0 + new_gen_runtime_data := NewGenData, + old_gen_runtime_data := OldGenData } ) -> - {_, OldTrieCF} = lists:keyfind(trie_cf(OldGenId), 1, OldCFRefs), - #s{trie = NewTrie0} = NewGenData0, - NewTrie = copy_previous_trie(DBHandle, NewTrie0, OldTrieCF), + #s{trie = OldTrie} = OldGenData, + #s{trie = NewTrie0} = NewGenData, + NewTrie = copy_previous_trie(OldTrie, NewTrie0), ?tp(bitfield_lts_inherited_trie, #{}), - NewGenData0#s{trie = NewTrie}. + NewGenData#s{trie = NewTrie}. -spec drop( emqx_ds_storage_layer:shard_id(), @@ -533,16 +531,9 @@ restore_trie(TopicIndexBytes, DB, CF) -> rocksdb:iterator_close(IT) end. --spec copy_previous_trie(rocksdb:db_handle(), emqx_ds_lts:trie(), rocksdb:cf_handle()) -> - emqx_ds_lts:trie(). -copy_previous_trie(DBHandle, NewTrie, OldCF) -> - {ok, IT} = rocksdb:iterator(DBHandle, OldCF, []), - try - OldDump = read_persisted_trie(IT, rocksdb:iterator_move(IT, first)), - emqx_ds_lts:trie_restore_existing(NewTrie, OldDump) - after - rocksdb:iterator_close(IT) - end. +-spec copy_previous_trie(emqx_ds_lts:trie(), emqx_ds_lts:trie()) -> emqx_ds_lts:trie(). +copy_previous_trie(OldTrie, NewTrie) -> + emqx_ds_lts:trie_copy_learned_paths(OldTrie, NewTrie). read_persisted_trie(IT, {ok, KeyB, ValB}) -> [ From eecd7e084c7747c2cb486c642b2f90af691284d0 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 23 Jan 2024 09:47:03 -0300 Subject: [PATCH 2/2] test(ds): reduce flakiness --- .../test/emqx_persistent_messages_SUITE.erl | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index c46d726f4..d0b939540 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -438,10 +438,19 @@ t_message_gc(Config) -> TopicFilter = emqx_topic:words(<<"#">>), StartTime = 0, Msgs = consume(TopicFilter, StartTime), - %% only "1" and "2" should have been GC'ed - ?assertEqual( - sets:from_list([<<"3">>, <<"4">>], [{version, 2}]), - sets:from_list([emqx_message:payload(Msg) || Msg <- Msgs], [{version, 2}]) + %% "1" and "2" should have been GC'ed + PresentMessages = sets:from_list( + [emqx_message:payload(Msg) || Msg <- Msgs], + [{version, 2}] + ), + ?assert( + sets:is_empty( + sets:intersection( + PresentMessages, + sets:from_list([<<"1">>, <<"2">>], [{version, 2}]) + ) + ), + #{present_messages => PresentMessages} ), ok