diff --git a/apps/emqx/src/emqx_trie_search.erl b/apps/emqx/src/emqx_trie_search.erl index 71bc8d8a0..95174f292 100644 --- a/apps/emqx/src/emqx_trie_search.erl +++ b/apps/emqx/src/emqx_trie_search.erl @@ -110,16 +110,6 @@ -type nextf() :: fun((key(_) | base_key()) -> ?END | key(_)). -type opts() :: [unique | return_first]. -%% Holds the constant values of each search. --record(ctx, { - %% A function which can quickly find the immediate-next record of the given prefix - nextf :: nextf(), - %% The initial words of a topic - words0 :: [word()], - %% Return as soon as there is one match found - return_first :: boolean() -}). - %% @doc Make a search-key for the given topic. -spec make_key(emqx_types:topic(), ID) -> key(ID). make_key(Topic, ID) when is_binary(Topic) -> @@ -147,29 +137,29 @@ get_topic({Filter, _ID}) when is_list(Filter) -> get_topic({Topic, _ID}) -> Topic. +-compile({inline, [base/1, move_up/2, match_add/2, compare/3]}). + %% Make the base-key which can be used to locate the desired search target. base(Prefix) -> {Prefix, {}}. -%% Move the search target to the key next to the given Base. -move_up(#ctx{nextf = NextF}, Base) -> - NextF(Base). +base_init([W = <<"$", _/bytes>> | _]) -> + base([W]); +base_init(_) -> + base([]). -%% Add the given key to the accumulation. -add(#ctx{return_first = true}, _Acc, Key) -> - throw({return_first, Key}); -add(_C, Acc, Key) -> - match_add(Key, Acc). +%% Move the search target to the key next to the given Base. +move_up(NextF, Base) -> + NextF(Base). %% @doc Match given topic against the index and return the first match, or `false` if %% no match is found. -spec match(emqx_types:topic(), nextf()) -> false | key(_). match(Topic, NextF) -> try search(Topic, NextF, [return_first]) of - [] -> - false + _ -> false catch - throw:{return_first, Res} -> + throw:{first, Res} -> Res end. @@ -182,110 +172,159 @@ matches(Topic, NextF, Opts) -> %% @doc Entrypoint of the search for a given topic. search(Topic, NextF, Opts) -> Words = words(Topic), - Context = #ctx{ - nextf = NextF, - words0 = Words, - return_first = proplists:get_bool(return_first, Opts) - }, + Base = base_init(Words), + ORetFirst = proplists:get_bool(return_first, Opts), + OUnique = proplists:get_bool(unique, Opts), Acc0 = - case proplists:get_bool(unique, Opts) of + case ORetFirst of true -> + first; + false when OUnique -> #{}; false -> [] end, - Base = - case hd(Words) of - <<$$, _/binary>> -> - %% skip all filters starts with # or + - base([hd(Words)]); - _ -> - base([]) + Matches = + case search_new(Words, Base, NextF, Acc0) of + {Cursor, Acc} -> + match_topics(Topic, Cursor, NextF, Acc); + Acc -> + Acc end, - {MaybeEnd, Acc1} = search_new(Context, Base, Acc0), - Acc = match_topics(Context, Topic, MaybeEnd, Acc1), - case is_map(Acc) of + case is_map(Matches) of true -> - maps:values(Acc); + maps:values(Matches); false -> - Acc + Matches end. %% The recursive entrypoint of the trie-search algorithm. %% Always start from the initial prefix and words. -search_new(#ctx{words0 = Words} = C, NewBase, Acc) -> - case move_up(C, NewBase) of +search_new(Words0, NewBase, NextF, Acc) -> + case move_up(NextF, NewBase) of ?END -> - {?END, Acc}; - {Filter, _} = T -> - search_plus(C, Words, Filter, [], T, Acc) + Acc; + Cursor -> + search_up(Words0, Cursor, NextF, Acc) end. -%% Try to use '+' as the next word in the prefix. -search_plus(C, [W, X | Words], [W, X | Filter], RPrefix, T, Acc) -> - %% Directly append the current word to the matching prefix (RPrefix). - %% Micro optimization: try not to call the next clause because - %% it is not a continuation. - search_plus(C, [X | Words], [X | Filter], [W | RPrefix], T, Acc); -search_plus(C, [W | Words], ['+' | _] = Filter, RPrefix, T, Acc) -> - case search_up(C, '+', Words, Filter, RPrefix, T, Acc) of - {T, Acc1} -> - search_up(C, W, Words, Filter, RPrefix, T, Acc1); - TargetMoved -> - TargetMoved - end; -search_plus(C, [W | Words], Filter, RPrefix, T, Acc) -> - %% not a plus - search_up(C, W, Words, Filter, RPrefix, T, Acc). - %% Search to the bigger end of ordered collection of topics and topic-filters. -search_up(C, Word, Words, Filter, RPrefix, T, Acc) -> - case compare(Word, Filter, Words) of - {match, full} -> - search_new(C, T, add(C, Acc, T)); - {match, prefix} -> - search_new(C, T, Acc); +search_up(Words, {Filter, _} = Cursor, NextF, Acc) -> + case compare(Filter, Words, false) of + match_full -> + search_new(Words, Cursor, NextF, match_add(Cursor, Acc)); + match_prefix -> + search_new(Words, Cursor, NextF, Acc); lower -> - {T, Acc}; - higher -> - NewBase = base(lists:reverse([Word | RPrefix])), - search_new(C, NewBase, Acc); - shorter -> - search_plus(C, Words, tl(Filter), [Word | RPrefix], T, Acc) + {Cursor, Acc}; + [SeekWord | FilterTail] -> + % NOTE + % This is a seek instruction. + % If we visualize the `Filter` as `FilterHead ++ [_] ++ FilterTail`, we need to + % seek to `FilterHead ++ [SeekWord]`. It carries the `FilterTail` because it's + % much cheaper to return it from `compare/3` than anything more usable. + NewBase = base(seek(SeekWord, Filter, FilterTail)), + search_new(Words, NewBase, NextF, Acc) end. -%% Compare prefix word then the next words in suffix against the search-target -%% topic or topic-filter. -compare(_, NotFilter, _) when is_binary(NotFilter) -> - lower; -compare(H, [H | Filter], Words) -> - compare(Filter, Words); -compare(_, ['#'], _Words) -> - {match, full}; -compare(H1, [H2 | _T2], _Words) when H1 < H2 -> - lower; -compare(_H, [_ | _], _Words) -> - higher. +seek(SeekWord, [_ | FilterTail], FilterTail) -> + [SeekWord]; +seek(SeekWord, [FilterWord | Rest], FilterTail) -> + [FilterWord | seek(SeekWord, Rest, FilterTail)]. -%% Now compare the filter suffix and the topic suffix. -compare([], []) -> - {match, full}; -compare([], _Words) -> - {match, prefix}; -compare(['#'], _Words) -> - {match, full}; -compare([_ | _], []) -> +compare(NotFilter, _, _) when is_binary(NotFilter) -> lower; -compare([_ | _], _Words) -> - %% cannot know if it's a match, lower, or higher, - %% must search with a longer prefix. - shorter. +compare([], [], _) -> + % NOTE + % Topic: a/b/c/d + % Filter: a/+/+/d + % We matched the topic to a topic filter exactly (possibly with pluses). + % We include it in the result set, and now need to try next entry in the table. + % Closest possible next entries that we must not miss: + % * a/+/+/d (same topic but a different ID) + % * a/+/+/d/# (also a match) + match_full; +compare([], _Words, _) -> + % NOTE + % Topic: a/b/c/d + % Filter: a/+/c + % We found out that a topic filter is a prefix of the topic (possibly with pluses). + % We discard it, and now need to try next entry in the table. + % Closest possible next entries that we must not miss: + % * a/+/c/# (which is a match) + % * a/+/c/+ (also a match) + % + % TODO + % We might probably instead seek to a/+/c/# right away. + match_prefix; +compare(['#'], _Words, _) -> + % NOTE + % Topic: a/b/c/d + % Filter: a/+/+/d/# + % We matched the topic to a topic filter with wildcard (possibly with pluses). + % We include it in the result set, and now need to try next entry in the table. + % Closest possible next entries that we must not miss: + % * a/+/+/d/# (same topic but a different ID) + match_full; +compare(['+' | TF], [HW | TW], _PrevBacktrack) -> + % NOTE + % We need to keep backtrack point each time we encounter a plus. To safely skip over + % parts of the search space, we may need last backtrack point when recursion terminates. + % See next clauses for examples. + compare(TF, TW, [HW | TF]); +compare([HW | TF], [HW | TW], Backtrack) -> + % NOTE + % Skip over the same word in both topic and filter, keeping the last backtrack point. + compare(TF, TW, Backtrack); +compare([HF | _], [HW | _], false) when HF > HW -> + % NOTE + % Topic: a/b/c/d + % Filter: a/b/c/e/1 + % The topic is lower than a topic filter. There's nowhere to backtrackto, we're out of + % the search space. We should stop the search. + lower; +compare([HF | _], [HW | _], Backtrack) when HF > HW -> + % NOTE + % Topic: a/b/c/d + % Filter: a/+/+/e/1 + % The topic is lower than a topic filter. There was a plus, last time at the 3rd level, + % we have a backtrack point to seek to: + % Seek: [c | e/1] + % We need to skip over part of search space, and seek to the next possible match: + % Next: a/+/c + Backtrack; +compare([_ | _], [], false) -> + % NOTE + % Topic: a/b/c/d + % Filter: a/b/c/d/1 + % The topic is lower than a topic filter. (since it's shorter). There's nowhere to + % backtrack to, we're out of the search space. We should stop the search. + lower; +compare([_ | _], [], Backtrack) -> + % NOTE + % Topic: a/b/c/d + % Filter: a/+/c/d/1 + % The topic is lower than a topic filter. There was a plus, last and only time at the + % 3rd level, we have a backtrack point: + % Seek: [b | c/d/1] + % Next: a/b + Backtrack; +compare([_HF | TF], [HW | _], _) -> + % NOTE + % Topic: a/b/c/d + % Filter: a/+/+/0/1/2 + % Topic is higher than the filter, we need to skip over to the next possible filter. + % Seek: [d | 0/1/2] + % Next: a/+/+/d + [HW | TF]. match_add(K = {_Filter, ID}, Acc = #{}) -> % NOTE: ensuring uniqueness by record ID Acc#{ID => K}; -match_add(K, Acc) -> - [K | Acc]. +match_add(K, Acc) when is_list(Acc) -> + [K | Acc]; +match_add(K, first) -> + throw({first, K}). -spec words(emqx_types:topic()) -> [word()]. words(Topic) when is_binary(Topic) -> @@ -301,12 +340,12 @@ word(<<"#">>) -> '#'; word(Bin) -> Bin. %% match non-wildcard topics -match_topics(#ctx{nextf = NextF} = C, Topic, {Topic, _} = Key, Acc) -> +match_topics(Topic, {Topic, _} = Key, NextF, Acc) -> %% found a topic match - match_topics(C, Topic, NextF(Key), add(C, Acc, Key)); -match_topics(#ctx{nextf = NextF} = C, Topic, {F, _}, Acc) when F < Topic -> + match_topics(Topic, NextF(Key), NextF, match_add(Key, Acc)); +match_topics(Topic, {F, _}, NextF, Acc) when F < Topic -> %% the last key is a filter, try jump to the topic - match_topics(C, Topic, NextF(base(Topic)), Acc); -match_topics(_C, _Topic, _Key, Acc) -> + match_topics(Topic, NextF(base(Topic)), NextF, Acc); +match_topics(_Topic, _Key, _NextF, Acc) -> %% gone pass the topic Acc. diff --git a/apps/emqx/test/emqx_topic_index_SUITE.erl b/apps/emqx/test/emqx_topic_index_SUITE.erl index 551cc2438..08056a16f 100644 --- a/apps/emqx/test/emqx_topic_index_SUITE.erl +++ b/apps/emqx/test/emqx_topic_index_SUITE.erl @@ -256,6 +256,19 @@ t_match_wildcard_edge_cases(Config) -> end, lists:foreach(F, Datasets). +t_prop_edgecase(Config) -> + M = get_module(Config), + Tab = M:new(), + Topic = <<"01/01">>, + Filters = [ + {1, <<>>}, + {2, <<"+/01">>}, + {3, <<>>}, + {4, <<"+/+/01">>} + ], + _ = [M:insert(F, N, <<>>, Tab) || {N, F} <- Filters], + ?assertMatch([2], [id(X) || X <- matches(M, Topic, Tab, [unique])]). + t_prop_matches(Config) -> M = get_module(Config), ?assert( diff --git a/rebar.config.erl b/rebar.config.erl index 02e946e15..8f26d11d8 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -190,7 +190,8 @@ test_deps() -> {meck, "0.9.2"}, {proper, "1.4.0"}, {er_coap_client, {git, "https://github.com/emqx/er_coap_client", {tag, "v1.0.5"}}}, - {erl_csv, "0.2.0"} + {erl_csv, "0.2.0"}, + {eministat, "0.10.1"} ]. common_compile_opts() ->