diff --git a/apps/emqx/src/emqx_trie_search.erl b/apps/emqx/src/emqx_trie_search.erl index 36ccd656b..71bc8d8a0 100644 --- a/apps/emqx/src/emqx_trie_search.erl +++ b/apps/emqx/src/emqx_trie_search.erl @@ -16,10 +16,10 @@ %% @doc Topic index for matching topics to topic filters. %% -%% Works on top of ETS ordered_set table. Keys are tuples constructed from -%% parsed topic filters and record IDs, wrapped in a tuple to order them -%% strictly greater than unit tuple (`{}`). Existing table may be used if -%% existing keys will not collide with index keys. +%% Works on top of a ordered collection data set, such as ETS ordered_set table. +%% Keys are tuples constructed from parsed topic filters and record IDs, +%% wrapped in a tuple to order them strictly greater than unit tuple (`{}`). +%% Existing table may be used if existing keys will not collide with index keys. %% %% Designed to effectively answer questions like: %% 1. Does any topic filter match given topic? @@ -110,15 +110,10 @@ -type nextf() :: fun((key(_) | base_key()) -> ?END | key(_)). -type opts() :: [unique | return_first]. - %% Holds the constant values of each search. -record(ctx, { %% A function which can quickly find the immediate-next record of the given prefix nextf :: nextf(), - %% The initial prefix to start searching from - %% if the input topic starts with a dollar-word, it's the first word like [<<"$SYS">>] - %% otherwise it's a [] - prefix0 :: [word()], %% The initial words of a topic words0 :: [word()], %% Return as soon as there is one match found @@ -129,7 +124,7 @@ -spec make_key(emqx_types:topic(), ID) -> key(ID). make_key(Topic, ID) when is_binary(Topic) -> Words = words(Topic), - case lists:any(fun erlang:is_atom/1, Words) of + case emqx_topic:wildcard(Words) of true -> %% it's a wildcard {Words, {ID}}; @@ -186,10 +181,9 @@ matches(Topic, NextF, Opts) -> %% @doc Entrypoint of the search for a given topic. search(Topic, NextF, Opts) -> - {Words, Prefix} = match_init(Topic), + Words = words(Topic), Context = #ctx{ nextf = NextF, - prefix0 = Prefix, words0 = Words, return_first = proplists:get_bool(return_first, Opts) }, @@ -200,7 +194,15 @@ search(Topic, NextF, Opts) -> false -> [] end, - {MaybeEnd, Acc1} = search_new(Context, base(Prefix), Acc0), + Base = + case hd(Words) of + <<$$, _/binary>> -> + %% skip all filters starts with # or + + base([hd(Words)]); + _ -> + base([]) + end, + {MaybeEnd, Acc1} = search_new(Context, Base, Acc0), Acc = match_topics(Context, Topic, MaybeEnd, Acc1), case is_map(Acc) of true -> @@ -211,18 +213,30 @@ search(Topic, NextF, Opts) -> %% The recursive entrypoint of the trie-search algorithm. %% Always start from the initial prefix and words. -search_new(C, NewBase, Acc) -> - search_moved(C, move_up(C, NewBase), Acc). +search_new(#ctx{words0 = Words} = C, NewBase, Acc) -> + case move_up(C, NewBase) of + ?END -> + {?END, Acc}; + {Filter, _} = T -> + search_plus(C, Words, Filter, [], T, Acc) + end. -search_moved(_, ?END, Acc) -> - {?END, Acc}; -search_moved(#ctx{prefix0 = [], words0 = Words0} = C, {Filter, _} = T, Acc) -> - %% This is not a '$' topic, start from '+' - search_plus(C, Words0, Filter, [], T, Acc); -search_moved(#ctx{prefix0 = Prefix, words0 = Words0} = C, {Filter, _} = T, Acc) -> - [DollarWord] = Prefix, - %% Start from the '$' word - search_up(C, DollarWord, Words0, Filter, [], T, Acc). +%% Try to use '+' as the next word in the prefix. +search_plus(C, [W, X | Words], [W, X | Filter], RPrefix, T, Acc) -> + %% Directly append the current word to the matching prefix (RPrefix). + %% Micro optimization: try not to call the next clause because + %% it is not a continuation. + search_plus(C, [X | Words], [X | Filter], [W | RPrefix], T, Acc); +search_plus(C, [W | Words], ['+' | _] = Filter, RPrefix, T, Acc) -> + case search_up(C, '+', Words, Filter, RPrefix, T, Acc) of + {T, Acc1} -> + search_up(C, W, Words, Filter, RPrefix, T, Acc1); + TargetMoved -> + TargetMoved + end; +search_plus(C, [W | Words], Filter, RPrefix, T, Acc) -> + %% not a plus + search_up(C, W, Words, Filter, RPrefix, T, Acc). %% Search to the bigger end of ordered collection of topics and topic-filters. search_up(C, Word, Words, Filter, RPrefix, T, Acc) -> @@ -240,23 +254,6 @@ search_up(C, Word, Words, Filter, RPrefix, T, Acc) -> search_plus(C, Words, tl(Filter), [Word | RPrefix], T, Acc) end. -%% Try to use '+' as the next word in the prefix. -search_plus(C, [W, X | Words], [W, X | Filter], RPrefix, T, Acc) -> - %% Directly append the current word to the matching prefix (RPrefix). - %% Micro optimization: try not to call the next clause because - %% it is not a continuation. - search_plus(C, [X | Words], [X | Filter], [W | RPrefix], T, Acc); -search_plus(C, [W | Words], ['+' | _] = Filter, RPrefix, T, Acc) -> - case search_up(C, '+', Words, Filter, RPrefix, T, Acc) of - {T, Acc} -> - search_up(C, W, Words, Filter, RPrefix, T, Acc); - TargetMoved -> - TargetMoved - end; -search_plus(C, [W | Words], Filter, RPrefix, T, Acc) -> - %% not a plus - search_up(C, W, Words, Filter, RPrefix, T, Acc). - %% Compare prefix word then the next words in suffix against the search-target %% topic or topic-filter. compare(_, NotFilter, _) when is_binary(NotFilter) -> @@ -290,16 +287,6 @@ match_add(K = {_Filter, ID}, Acc = #{}) -> match_add(K, Acc) -> [K | Acc]. -match_init(Topic) -> - case words(Topic) of - [W = <<"$", _/bytes>> | Rest] -> - % NOTE - % This will effectively skip attempts to match special topics to `#` or `+/...`. - {Rest, [W]}; - Words -> - {Words, []} - end. - -spec words(emqx_types:topic()) -> [word()]. words(Topic) when is_binary(Topic) -> % NOTE @@ -320,9 +307,6 @@ match_topics(#ctx{nextf = NextF} = C, Topic, {Topic, _} = Key, Acc) -> match_topics(#ctx{nextf = NextF} = C, Topic, {F, _}, Acc) when F < Topic -> %% the last key is a filter, try jump to the topic match_topics(C, Topic, NextF(base(Topic)), Acc); -match_topics(#ctx{nextf = NextF} = C, Topic, continue, Acc) -> - %% the last key is a '+/...' wildcard - match_topics(C, Topic, NextF(base(Topic)), Acc); match_topics(_C, _Topic, _Key, Acc) -> %% gone pass the topic Acc. diff --git a/apps/emqx/test/emqx_topic_index_SUITE.erl b/apps/emqx/test/emqx_topic_index_SUITE.erl index eb0ac0e40..551cc2438 100644 --- a/apps/emqx/test/emqx_topic_index_SUITE.erl +++ b/apps/emqx/test/emqx_topic_index_SUITE.erl @@ -170,16 +170,27 @@ t_match8(Config) -> M = get_module(Config), Tab = M:new(), Filters = [<<"+">>, <<"dev/global/sensor">>, <<"dev/+/sensor/#">>], - IDs = [1,2,3], + IDs = [1, 2, 3], Keys = [{F, ID} || F <- Filters, ID <- IDs], - lists:foreach(fun({F, ID}) -> - M:insert(F, ID, <<>>, Tab) - end, Keys), + lists:foreach( + fun({F, ID}) -> + M:insert(F, ID, <<>>, Tab) + end, + Keys + ), Topic = <<"dev/global/sensor">>, Matches = lists:sort(matches(M, Topic, Tab)), - ?assertEqual([<<"dev/+/sensor/#">>, <<"dev/+/sensor/#">>, <<"dev/+/sensor/#">>, - <<"dev/global/sensor">>, <<"dev/global/sensor">>, <<"dev/global/sensor">>], - [emqx_topic_index:get_topic(Match) || Match <- Matches]). + ?assertEqual( + [ + <<"dev/+/sensor/#">>, + <<"dev/+/sensor/#">>, + <<"dev/+/sensor/#">>, + <<"dev/global/sensor">>, + <<"dev/global/sensor">>, + <<"dev/global/sensor">> + ], + [emqx_topic_index:get_topic(Match) || Match <- Matches] + ). t_match_fast_forward(Config) -> M = get_module(Config),