refactor(topic_index): remove more unnecessary next calls

also avoid using records (setelement) for recursive return values
This commit is contained in:
Zaiming (Stone) Shi 2023-08-24 11:51:34 +02:00
parent a1e6635614
commit a30d87e14f
2 changed files with 71 additions and 63 deletions

View File

@ -102,12 +102,15 @@
-export([match/2, matches/3, get_id/1, get_topic/1]). -export([match/2, matches/3, get_id/1, get_topic/1]).
-export_type([key/1, word/0, nextf/0, opts/0]). -export_type([key/1, word/0, nextf/0, opts/0]).
-define(END, '$end_of_table').
-type word() :: binary() | '+' | '#'. -type word() :: binary() | '+' | '#'.
-type base_key() :: {binary() | [word()], {}}. -type base_key() :: {binary() | [word()], {}}.
-type key(ID) :: {binary() | [word()], {ID}}. -type key(ID) :: {binary() | [word()], {ID}}.
-type nextf() :: fun((key(_) | base_key()) -> '$end_of_table' | key(_)). -type nextf() :: fun((key(_) | base_key()) -> ?END | key(_)).
-type opts() :: [unique | return_first]. -type opts() :: [unique | return_first].
%% Holds the constant values of each search. %% Holds the constant values of each search.
-record(ctx, { -record(ctx, {
%% A function which can quickly find the immediate-next record of the given prefix %% A function which can quickly find the immediate-next record of the given prefix
@ -122,18 +125,6 @@
return_first :: boolean() return_first :: boolean()
}). }).
%% Holds the variable parts of each search.
-record(acc, {
%% The current searching target topic/filter
target,
%% The number of moves.
%% This is used to check if the target has been moved
%% after attempting to append '+' to the searching prefix
moves = 0,
%% Search result accumulation
matches = []
}).
%% @doc Make a search-key for the given topic. %% @doc Make a search-key for the given topic.
-spec make_key(emqx_types:topic(), ID) -> key(ID). -spec make_key(emqx_types:topic(), ID) -> key(ID).
make_key(Topic, ID) when is_binary(Topic) -> make_key(Topic, ID) when is_binary(Topic) ->
@ -166,18 +157,14 @@ base(Prefix) ->
{Prefix, {}}. {Prefix, {}}.
%% Move the search target to the key next to the given Base. %% Move the search target to the key next to the given Base.
move_up(#ctx{nextf = NextF}, #acc{moves = Moves} = Acc, Base) -> move_up(#ctx{nextf = NextF}, Base) ->
Acc#acc{target = NextF(Base), moves = Moves + 1}. NextF(Base).
%% The current target key is a match, add it to the accumulation.
add(C, #acc{target = Key} = Acc) ->
add(C, Acc, Key).
%% Add the given key to the accumulation. %% Add the given key to the accumulation.
add(#ctx{return_first = true}, _Acc, Key) -> add(#ctx{return_first = true}, _Acc, Key) ->
throw({return_first, Key}); throw({return_first, Key});
add(_C, #acc{matches = Matches} = Acc, Key) -> add(_C, Acc, Key) ->
Acc#acc{matches = match_add(Key, Matches)}. match_add(Key, Acc).
%% @doc Match given topic against the index and return the first match, or `false` if %% @doc Match given topic against the index and return the first match, or `false` if
%% no match is found. %% no match is found.
@ -206,69 +193,69 @@ search(Topic, NextF, Opts) ->
words0 = Words, words0 = Words,
return_first = proplists:get_bool(return_first, Opts) return_first = proplists:get_bool(return_first, Opts)
}, },
Matches0 = Acc0 =
case proplists:get_bool(unique, Opts) of case proplists:get_bool(unique, Opts) of
true -> true ->
#{}; #{};
false -> false ->
[] []
end, end,
Acc = search_new(Context, base(Prefix), #acc{matches = Matches0}), {MaybeEnd, Acc1} = search_new(Context, base(Prefix), Acc0),
#acc{matches = Matches} = match_non_wildcards(Context, base(Topic), Acc), Acc = match_topics(Context, Topic, MaybeEnd, Acc1),
case is_map(Matches) of case is_map(Acc) of
true -> true ->
maps:values(Matches); maps:values(Acc);
false -> false ->
Matches Acc
end. end.
%% The recursive entrypoint of the trie-search algorithm. %% The recursive entrypoint of the trie-search algorithm.
%% Always start from the initial prefix and words. %% Always start from the initial prefix and words.
search_new(#ctx{prefix0 = Prefix, words0 = Words0} = C, NewBase, Acc0) -> search_new(C, NewBase, Acc) ->
case move_up(C, Acc0, NewBase) of search_moved(C, move_up(C, NewBase), Acc).
#acc{target = '$end_of_table'} = Acc ->
Acc; search_moved(_, ?END, Acc) ->
#acc{target = {Filter, _}} = Acc when Prefix =:= [] -> {?END, Acc};
search_moved(#ctx{prefix0 = [], words0 = Words0} = C, {Filter, _} = T, Acc) ->
%% This is not a '$' topic, start from '+' %% This is not a '$' topic, start from '+'
search_plus(C, Words0, Filter, [], Acc); search_plus(C, Words0, Filter, [], T, Acc);
#acc{target = {Filter, _}} = Acc -> search_moved(#ctx{prefix0 = Prefix, words0 = Words0} = C, {Filter, _} = T, Acc) ->
[DollarWord] = Prefix, [DollarWord] = Prefix,
%% Start from the '$' word %% Start from the '$' word
search_up(C, DollarWord, Words0, Filter, [], Acc) search_up(C, DollarWord, Words0, Filter, [], T, Acc).
end.
%% Search to the bigger end of ordered collection of topics and topic-filters. %% Search to the bigger end of ordered collection of topics and topic-filters.
search_up(C, Word, Words, Filter, RPrefix, #acc{target = Base} = Acc) -> search_up(C, Word, Words, Filter, RPrefix, T, Acc) ->
case compare(Word, Filter, Words) of case compare(Word, Filter, Words) of
{match, full} -> {match, full} ->
search_new(C, Base, add(C, Acc)); search_new(C, T, add(C, Acc, T));
{match, prefix} -> {match, prefix} ->
search_new(C, Base, Acc); search_new(C, T, Acc);
lower -> lower ->
Acc; {T, Acc};
higher -> higher ->
NewBase = base(lists:reverse([Word | RPrefix])), NewBase = base(lists:reverse([Word | RPrefix])),
search_new(C, NewBase, Acc); search_new(C, NewBase, Acc);
shorter -> shorter ->
search_plus(C, Words, tl(Filter), [Word | RPrefix], Acc) search_plus(C, Words, tl(Filter), [Word | RPrefix], T, Acc)
end. end.
%% Try to use '+' as the next word in the prefix. %% Try to use '+' as the next word in the prefix.
search_plus(C, [W, X | Words], [W, X | Filter], RPrefix, Acc) -> search_plus(C, [W, X | Words], [W, X | Filter], RPrefix, T, Acc) ->
%% Directly append the current word to the matching prefix (RPrefix). %% Directly append the current word to the matching prefix (RPrefix).
%% Micro optimization: try not to call the next clause because %% Micro optimization: try not to call the next clause because
%% it is not a continuation. %% it is not a continuation.
search_plus(C, [X | Words], [X | Filter], [W | RPrefix], Acc); search_plus(C, [X | Words], [X | Filter], [W | RPrefix], T, Acc);
search_plus(C, [W | Words], Filter, RPrefix, Acc) -> search_plus(C, [W | Words], ['+' | _] = Filter, RPrefix, T, Acc) ->
M = Acc#acc.moves, case search_up(C, '+', Words, Filter, RPrefix, T, Acc) of
case search_up(C, '+', Words, Filter, RPrefix, Acc) of {T, Acc} ->
#acc{moves = M1} = Acc1 when M1 =:= M -> search_up(C, W, Words, Filter, RPrefix, T, Acc);
%% Keep searching for one which has W as the next word TargetMoved ->
search_up(C, W, Words, Filter, RPrefix, Acc1); TargetMoved
Acc1 -> end;
%% Already searched search_plus(C, [W | Words], Filter, RPrefix, T, Acc) ->
Acc1 %% not a plus
end. search_up(C, W, Words, Filter, RPrefix, T, Acc).
%% Compare prefix word then the next words in suffix against the search-target %% Compare prefix word then the next words in suffix against the search-target
%% topic or topic-filter. %% topic or topic-filter.
@ -326,10 +313,16 @@ word(<<"+">>) -> '+';
word(<<"#">>) -> '#'; word(<<"#">>) -> '#';
word(Bin) -> Bin. word(Bin) -> Bin.
match_non_wildcards(#ctx{nextf = NextF} = C, {Topic, _} = Base, Acc) -> %% match non-wildcard topics
case NextF(Base) of match_topics(#ctx{nextf = NextF} = C, Topic, {Topic, _} = Key, Acc) ->
{Topic, _ID} = Key -> %% found a topic match
match_non_wildcards(C, Key, add(C, Acc, Key)); match_topics(C, Topic, NextF(Key), add(C, Acc, Key));
_Other -> match_topics(#ctx{nextf = NextF} = C, Topic, {F, _}, Acc) when F < Topic ->
Acc %% the last key is a filter, try jump to the topic
end. match_topics(C, Topic, NextF(base(Topic)), Acc);
match_topics(#ctx{nextf = NextF} = C, Topic, continue, Acc) ->
%% the last key is a '+/...' wildcard
match_topics(C, Topic, NextF(base(Topic)), Acc);
match_topics(_C, _Topic, _Key, Acc) ->
%% gone pass the topic
Acc.

View File

@ -166,6 +166,21 @@ t_match7(Config) ->
M:insert(W, t_match7, <<>>, Tab), M:insert(W, t_match7, <<>>, Tab),
?assertEqual(W, topic(match(M, T, Tab))). ?assertEqual(W, topic(match(M, T, Tab))).
t_match8(Config) ->
M = get_module(Config),
Tab = M:new(),
Filters = [<<"+">>, <<"dev/global/sensor">>, <<"dev/+/sensor/#">>],
IDs = [1,2,3],
Keys = [{F, ID} || F <- Filters, ID <- IDs],
lists:foreach(fun({F, ID}) ->
M:insert(F, ID, <<>>, Tab)
end, Keys),
Topic = <<"dev/global/sensor">>,
Matches = lists:sort(matches(M, Topic, Tab)),
?assertEqual([<<"dev/+/sensor/#">>, <<"dev/+/sensor/#">>, <<"dev/+/sensor/#">>,
<<"dev/global/sensor">>, <<"dev/global/sensor">>, <<"dev/global/sensor">>],
[emqx_topic_index:get_topic(Match) || Match <- Matches]).
t_match_fast_forward(Config) -> t_match_fast_forward(Config) ->
M = get_module(Config), M = get_module(Config),
Tab = M:new(), Tab = M:new(),