From 3c03047c9f94c64185182a1ae91cfe0d0d2b38e9 Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Thu, 13 May 2021 15:15:22 +0200 Subject: [PATCH] fix(emqx_trie): performance issue when many levels --- src/emqx.appup.src | 2 ++ src/emqx_trie.erl | 50 ++++++++++++++++++++++++---------------- test/emqx_trie_SUITE.erl | 25 +++++++++++++++++++- 3 files changed, 56 insertions(+), 21 deletions(-) diff --git a/src/emqx.appup.src b/src/emqx.appup.src index d162cc15d..2cac1e315 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -5,6 +5,7 @@ {load_module, emqx_logger_jsonfmt, brutal_purge, soft_purge, []}, {load_module, emqx_connection, brutal_purge, soft_purge, []}, {load_module, emqx_frame, brutal_purge, soft_purge, []}, + {load_module, emqx_trie, brutal_purge, soft_purge, []}, {load_module, emqx_metrics, brutal_purge, soft_purge, []}, {apply, {emqx_metrics, upgrade_retained_delayed_counter_type, []}} ]}, @@ -15,6 +16,7 @@ {load_module, emqx_logger_jsonfmt, brutal_purge, soft_purge, []}, {load_module, emqx_connection, brutal_purge, soft_purge, []}, {load_module, emqx_frame, brutal_purge, soft_purge, []}, + {load_module, emqx_trie, brutal_purge, soft_purge, []}, %% Just load the module. We don't need to change the 'messages.retained' %% and 'messages.retained' counter type. {load_module, emqx_metrics, brutal_purge, soft_purge, []} diff --git a/src/emqx_trie.erl b/src/emqx_trie.erl index 8ece333b0..901da2556 100644 --- a/src/emqx_trie.erl +++ b/src/emqx_trie.erl @@ -219,15 +219,22 @@ do_match(Words) -> do_match(Words, empty). do_match(Words, Prefix) -> - match(is_compact(), Words, Prefix, []). + case is_compact() of + true -> match_compact(Words, Prefix, []); + false -> match_no_compact(Words, Prefix, []) + end. -match(_IsCompact, [], Topic, Acc) -> +match_no_compact([], Topic, Acc) -> 'match_#'(Topic) ++ %% try match foo/bar/# lookup_topic(Topic) ++ %% try match foo/bar Acc; -match(IsCompact, [Word | Words], Prefix, Acc0) -> - case {has_prefix(Prefix), IsCompact} of - {false, false} -> +match_no_compact([Word | Words], Prefix, Acc0) -> + case has_prefix(Prefix) of + true -> + Acc1 = 'match_#'(Prefix) ++ Acc0, + Acc = match_no_compact(Words, join(Prefix, '+'), Acc1), + match_no_compact(Words, join(Prefix, Word), Acc); + false -> %% non-compact paths in database %% if there is no prefix matches the current topic prefix %% we can simpliy return from here @@ -240,21 +247,24 @@ match(IsCompact, [Word | Words], Prefix, Acc0) -> %% then at the second level, we lookup prefix a/x, %% no such prefix to be found, meaning there is no point %% searching for 'a/x/y', 'a/x/+' or 'a/x/#' - Acc0; - _ -> - %% compact paths in database - %% we have to enumerate all possible prefixes - %% e.g. a/+/b/# results with below entries in database - %% - a/+ - %% - a/+/b/# - %% when matching a/x/y, we need to enumerate - %% - a - %% - a/x - %% - a/x/y - %% *with '+', '#' replaced at each level - Acc1 = 'match_#'(Prefix) ++ Acc0, - Acc = match(IsCompact, Words, join(Prefix, '+'), Acc1), - match(IsCompact, Words, join(Prefix, Word), Acc) + Acc0 + end. + +match_compact([], Topic, Acc) -> + 'match_#'(Topic) ++ %% try match foo/bar/# + lookup_topic(Topic) ++ %% try match foo/bar + Acc; +match_compact([Word | Words], Prefix, Acc0) -> + Acc1 = 'match_#'(Prefix) ++ Acc0, + Acc = match_compact(Words, join(Prefix, Word), Acc1), + WildcardPrefix = join(Prefix, '+'), + %% go deeper to match current_prefix/+ only when: + %% 1. current word is the last + %% OR + %% 2. there is a prefix = 'current_prefix/+' + case Words =:= [] orelse has_prefix(WildcardPrefix) of + true -> match_compact(Words, WildcardPrefix, Acc); + false -> Acc end. 'match_#'(Prefix) -> diff --git a/test/emqx_trie_SUITE.erl b/test/emqx_trie_SUITE.erl index 7516b58a0..23cd1009f 100644 --- a/test/emqx_trie_SUITE.erl +++ b/test/emqx_trie_SUITE.erl @@ -105,7 +105,10 @@ t_match3(_) -> Topics = [<<"d/#">>, <<"a/b/c">>, <<"a/b/+">>, <<"a/#">>, <<"#">>, <<"$SYS/#">>], trans(fun() -> [emqx_trie:insert(Topic) || Topic <- Topics] end), Matched = mnesia:async_dirty(fun emqx_trie:match/1, [<<"a/b/c">>]), - ?assertEqual(4, length(Matched)), + case length(Matched) of + 4 -> ok; + _ -> error({unexpected, Matched}) + end, SysMatched = emqx_trie:match(<<"$SYS/a/b/c">>), ?assertEqual([<<"$SYS/#">>], SysMatched). @@ -114,6 +117,26 @@ t_match4(_) -> trans(fun() -> lists:foreach(fun emqx_trie:insert/1, Topics) end), ?assertEqual([<<"/#">>, <<"/+/a/b/c">>], lists:sort(emqx_trie:match(<<"/0/a/b/c">>))). +t_match5(_) -> + T = <<"a/b/c/d/e/f/g/h/i/j/k/l/m/n/o/p/q/r/s/t/u/v/w/x/y/z">>, + Topics = [<<"#">>, <>, <>], + trans(fun() -> lists:foreach(fun emqx_trie:insert/1, Topics) end), + ?assertEqual([<<"#">>, <>], lists:sort(emqx_trie:match(T))), + ?assertEqual([<<"#">>, <>, <>], + lists:sort(emqx_trie:match(<>))). + +t_match6(_) -> + T = <<"a/b/c/d/e/f/g/h/i/j/k/l/m/n/o/p/q/r/s/t/u/v/w/x/y/z">>, + W = <<"+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/#">>, + trans(fun() -> emqx_trie:insert(W) end), + ?assertEqual([W], emqx_trie:match(T)). + +t_match7(_) -> + T = <<"a/b/c/d/e/f/g/h/i/j/k/l/m/n/o/p/q/r/s/t/u/v/w/x/y/z">>, + W = <<"a/+/c/+/e/+/g/+/i/+/k/+/m/+/o/+/q/+/s/+/u/+/w/+/y/+/#">>, + trans(fun() -> emqx_trie:insert(W) end), + ?assertEqual([W], emqx_trie:match(T)). + t_empty(_) -> ?assert(?TRIE:empty()), trans(fun ?TRIE:insert/1, [<<"topic/x/#">>]),