refactor(topic_index): less special handling for leading $ words
This commit is contained in:
parent
a30d87e14f
commit
62423b0b12
|
@ -16,10 +16,10 @@
|
||||||
|
|
||||||
%% @doc Topic index for matching topics to topic filters.
|
%% @doc Topic index for matching topics to topic filters.
|
||||||
%%
|
%%
|
||||||
%% Works on top of ETS ordered_set table. Keys are tuples constructed from
|
%% Works on top of a ordered collection data set, such as ETS ordered_set table.
|
||||||
%% parsed topic filters and record IDs, wrapped in a tuple to order them
|
%% Keys are tuples constructed from parsed topic filters and record IDs,
|
||||||
%% strictly greater than unit tuple (`{}`). Existing table may be used if
|
%% wrapped in a tuple to order them strictly greater than unit tuple (`{}`).
|
||||||
%% existing keys will not collide with index keys.
|
%% Existing table may be used if existing keys will not collide with index keys.
|
||||||
%%
|
%%
|
||||||
%% Designed to effectively answer questions like:
|
%% Designed to effectively answer questions like:
|
||||||
%% 1. Does any topic filter match given topic?
|
%% 1. Does any topic filter match given topic?
|
||||||
|
@ -110,15 +110,10 @@
|
||||||
-type nextf() :: fun((key(_) | base_key()) -> ?END | key(_)).
|
-type nextf() :: fun((key(_) | base_key()) -> ?END | key(_)).
|
||||||
-type opts() :: [unique | return_first].
|
-type opts() :: [unique | return_first].
|
||||||
|
|
||||||
|
|
||||||
%% Holds the constant values of each search.
|
%% Holds the constant values of each search.
|
||||||
-record(ctx, {
|
-record(ctx, {
|
||||||
%% A function which can quickly find the immediate-next record of the given prefix
|
%% A function which can quickly find the immediate-next record of the given prefix
|
||||||
nextf :: nextf(),
|
nextf :: nextf(),
|
||||||
%% The initial prefix to start searching from
|
|
||||||
%% if the input topic starts with a dollar-word, it's the first word like [<<"$SYS">>]
|
|
||||||
%% otherwise it's a []
|
|
||||||
prefix0 :: [word()],
|
|
||||||
%% The initial words of a topic
|
%% The initial words of a topic
|
||||||
words0 :: [word()],
|
words0 :: [word()],
|
||||||
%% Return as soon as there is one match found
|
%% Return as soon as there is one match found
|
||||||
|
@ -129,7 +124,7 @@
|
||||||
-spec make_key(emqx_types:topic(), ID) -> key(ID).
|
-spec make_key(emqx_types:topic(), ID) -> key(ID).
|
||||||
make_key(Topic, ID) when is_binary(Topic) ->
|
make_key(Topic, ID) when is_binary(Topic) ->
|
||||||
Words = words(Topic),
|
Words = words(Topic),
|
||||||
case lists:any(fun erlang:is_atom/1, Words) of
|
case emqx_topic:wildcard(Words) of
|
||||||
true ->
|
true ->
|
||||||
%% it's a wildcard
|
%% it's a wildcard
|
||||||
{Words, {ID}};
|
{Words, {ID}};
|
||||||
|
@ -186,10 +181,9 @@ matches(Topic, NextF, Opts) ->
|
||||||
|
|
||||||
%% @doc Entrypoint of the search for a given topic.
|
%% @doc Entrypoint of the search for a given topic.
|
||||||
search(Topic, NextF, Opts) ->
|
search(Topic, NextF, Opts) ->
|
||||||
{Words, Prefix} = match_init(Topic),
|
Words = words(Topic),
|
||||||
Context = #ctx{
|
Context = #ctx{
|
||||||
nextf = NextF,
|
nextf = NextF,
|
||||||
prefix0 = Prefix,
|
|
||||||
words0 = Words,
|
words0 = Words,
|
||||||
return_first = proplists:get_bool(return_first, Opts)
|
return_first = proplists:get_bool(return_first, Opts)
|
||||||
},
|
},
|
||||||
|
@ -200,7 +194,15 @@ search(Topic, NextF, Opts) ->
|
||||||
false ->
|
false ->
|
||||||
[]
|
[]
|
||||||
end,
|
end,
|
||||||
{MaybeEnd, Acc1} = search_new(Context, base(Prefix), Acc0),
|
Base =
|
||||||
|
case hd(Words) of
|
||||||
|
<<$$, _/binary>> ->
|
||||||
|
%% skip all filters starts with # or +
|
||||||
|
base([hd(Words)]);
|
||||||
|
_ ->
|
||||||
|
base([])
|
||||||
|
end,
|
||||||
|
{MaybeEnd, Acc1} = search_new(Context, Base, Acc0),
|
||||||
Acc = match_topics(Context, Topic, MaybeEnd, Acc1),
|
Acc = match_topics(Context, Topic, MaybeEnd, Acc1),
|
||||||
case is_map(Acc) of
|
case is_map(Acc) of
|
||||||
true ->
|
true ->
|
||||||
|
@ -211,18 +213,30 @@ search(Topic, NextF, Opts) ->
|
||||||
|
|
||||||
%% The recursive entrypoint of the trie-search algorithm.
|
%% The recursive entrypoint of the trie-search algorithm.
|
||||||
%% Always start from the initial prefix and words.
|
%% Always start from the initial prefix and words.
|
||||||
search_new(C, NewBase, Acc) ->
|
search_new(#ctx{words0 = Words} = C, NewBase, Acc) ->
|
||||||
search_moved(C, move_up(C, NewBase), Acc).
|
case move_up(C, NewBase) of
|
||||||
|
?END ->
|
||||||
search_moved(_, ?END, Acc) ->
|
|
||||||
{?END, Acc};
|
{?END, Acc};
|
||||||
search_moved(#ctx{prefix0 = [], words0 = Words0} = C, {Filter, _} = T, Acc) ->
|
{Filter, _} = T ->
|
||||||
%% This is not a '$' topic, start from '+'
|
search_plus(C, Words, Filter, [], T, Acc)
|
||||||
search_plus(C, Words0, Filter, [], T, Acc);
|
end.
|
||||||
search_moved(#ctx{prefix0 = Prefix, words0 = Words0} = C, {Filter, _} = T, Acc) ->
|
|
||||||
[DollarWord] = Prefix,
|
%% Try to use '+' as the next word in the prefix.
|
||||||
%% Start from the '$' word
|
search_plus(C, [W, X | Words], [W, X | Filter], RPrefix, T, Acc) ->
|
||||||
search_up(C, DollarWord, Words0, Filter, [], T, Acc).
|
%% Directly append the current word to the matching prefix (RPrefix).
|
||||||
|
%% Micro optimization: try not to call the next clause because
|
||||||
|
%% it is not a continuation.
|
||||||
|
search_plus(C, [X | Words], [X | Filter], [W | RPrefix], T, Acc);
|
||||||
|
search_plus(C, [W | Words], ['+' | _] = Filter, RPrefix, T, Acc) ->
|
||||||
|
case search_up(C, '+', Words, Filter, RPrefix, T, Acc) of
|
||||||
|
{T, Acc1} ->
|
||||||
|
search_up(C, W, Words, Filter, RPrefix, T, Acc1);
|
||||||
|
TargetMoved ->
|
||||||
|
TargetMoved
|
||||||
|
end;
|
||||||
|
search_plus(C, [W | Words], Filter, RPrefix, T, Acc) ->
|
||||||
|
%% not a plus
|
||||||
|
search_up(C, W, Words, Filter, RPrefix, T, Acc).
|
||||||
|
|
||||||
%% Search to the bigger end of ordered collection of topics and topic-filters.
|
%% Search to the bigger end of ordered collection of topics and topic-filters.
|
||||||
search_up(C, Word, Words, Filter, RPrefix, T, Acc) ->
|
search_up(C, Word, Words, Filter, RPrefix, T, Acc) ->
|
||||||
|
@ -240,23 +254,6 @@ search_up(C, Word, Words, Filter, RPrefix, T, Acc) ->
|
||||||
search_plus(C, Words, tl(Filter), [Word | RPrefix], T, Acc)
|
search_plus(C, Words, tl(Filter), [Word | RPrefix], T, Acc)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% Try to use '+' as the next word in the prefix.
|
|
||||||
search_plus(C, [W, X | Words], [W, X | Filter], RPrefix, T, Acc) ->
|
|
||||||
%% Directly append the current word to the matching prefix (RPrefix).
|
|
||||||
%% Micro optimization: try not to call the next clause because
|
|
||||||
%% it is not a continuation.
|
|
||||||
search_plus(C, [X | Words], [X | Filter], [W | RPrefix], T, Acc);
|
|
||||||
search_plus(C, [W | Words], ['+' | _] = Filter, RPrefix, T, Acc) ->
|
|
||||||
case search_up(C, '+', Words, Filter, RPrefix, T, Acc) of
|
|
||||||
{T, Acc} ->
|
|
||||||
search_up(C, W, Words, Filter, RPrefix, T, Acc);
|
|
||||||
TargetMoved ->
|
|
||||||
TargetMoved
|
|
||||||
end;
|
|
||||||
search_plus(C, [W | Words], Filter, RPrefix, T, Acc) ->
|
|
||||||
%% not a plus
|
|
||||||
search_up(C, W, Words, Filter, RPrefix, T, Acc).
|
|
||||||
|
|
||||||
%% Compare prefix word then the next words in suffix against the search-target
|
%% Compare prefix word then the next words in suffix against the search-target
|
||||||
%% topic or topic-filter.
|
%% topic or topic-filter.
|
||||||
compare(_, NotFilter, _) when is_binary(NotFilter) ->
|
compare(_, NotFilter, _) when is_binary(NotFilter) ->
|
||||||
|
@ -290,16 +287,6 @@ match_add(K = {_Filter, ID}, Acc = #{}) ->
|
||||||
match_add(K, Acc) ->
|
match_add(K, Acc) ->
|
||||||
[K | Acc].
|
[K | Acc].
|
||||||
|
|
||||||
match_init(Topic) ->
|
|
||||||
case words(Topic) of
|
|
||||||
[W = <<"$", _/bytes>> | Rest] ->
|
|
||||||
% NOTE
|
|
||||||
% This will effectively skip attempts to match special topics to `#` or `+/...`.
|
|
||||||
{Rest, [W]};
|
|
||||||
Words ->
|
|
||||||
{Words, []}
|
|
||||||
end.
|
|
||||||
|
|
||||||
-spec words(emqx_types:topic()) -> [word()].
|
-spec words(emqx_types:topic()) -> [word()].
|
||||||
words(Topic) when is_binary(Topic) ->
|
words(Topic) when is_binary(Topic) ->
|
||||||
% NOTE
|
% NOTE
|
||||||
|
@ -320,9 +307,6 @@ match_topics(#ctx{nextf = NextF} = C, Topic, {Topic, _} = Key, Acc) ->
|
||||||
match_topics(#ctx{nextf = NextF} = C, Topic, {F, _}, Acc) when F < Topic ->
|
match_topics(#ctx{nextf = NextF} = C, Topic, {F, _}, Acc) when F < Topic ->
|
||||||
%% the last key is a filter, try jump to the topic
|
%% the last key is a filter, try jump to the topic
|
||||||
match_topics(C, Topic, NextF(base(Topic)), Acc);
|
match_topics(C, Topic, NextF(base(Topic)), Acc);
|
||||||
match_topics(#ctx{nextf = NextF} = C, Topic, continue, Acc) ->
|
|
||||||
%% the last key is a '+/...' wildcard
|
|
||||||
match_topics(C, Topic, NextF(base(Topic)), Acc);
|
|
||||||
match_topics(_C, _Topic, _Key, Acc) ->
|
match_topics(_C, _Topic, _Key, Acc) ->
|
||||||
%% gone pass the topic
|
%% gone pass the topic
|
||||||
Acc.
|
Acc.
|
||||||
|
|
|
@ -172,14 +172,25 @@ t_match8(Config) ->
|
||||||
Filters = [<<"+">>, <<"dev/global/sensor">>, <<"dev/+/sensor/#">>],
|
Filters = [<<"+">>, <<"dev/global/sensor">>, <<"dev/+/sensor/#">>],
|
||||||
IDs = [1, 2, 3],
|
IDs = [1, 2, 3],
|
||||||
Keys = [{F, ID} || F <- Filters, ID <- IDs],
|
Keys = [{F, ID} || F <- Filters, ID <- IDs],
|
||||||
lists:foreach(fun({F, ID}) ->
|
lists:foreach(
|
||||||
|
fun({F, ID}) ->
|
||||||
M:insert(F, ID, <<>>, Tab)
|
M:insert(F, ID, <<>>, Tab)
|
||||||
end, Keys),
|
end,
|
||||||
|
Keys
|
||||||
|
),
|
||||||
Topic = <<"dev/global/sensor">>,
|
Topic = <<"dev/global/sensor">>,
|
||||||
Matches = lists:sort(matches(M, Topic, Tab)),
|
Matches = lists:sort(matches(M, Topic, Tab)),
|
||||||
?assertEqual([<<"dev/+/sensor/#">>, <<"dev/+/sensor/#">>, <<"dev/+/sensor/#">>,
|
?assertEqual(
|
||||||
<<"dev/global/sensor">>, <<"dev/global/sensor">>, <<"dev/global/sensor">>],
|
[
|
||||||
[emqx_topic_index:get_topic(Match) || Match <- Matches]).
|
<<"dev/+/sensor/#">>,
|
||||||
|
<<"dev/+/sensor/#">>,
|
||||||
|
<<"dev/+/sensor/#">>,
|
||||||
|
<<"dev/global/sensor">>,
|
||||||
|
<<"dev/global/sensor">>,
|
||||||
|
<<"dev/global/sensor">>
|
||||||
|
],
|
||||||
|
[emqx_topic_index:get_topic(Match) || Match <- Matches]
|
||||||
|
).
|
||||||
|
|
||||||
t_match_fast_forward(Config) ->
|
t_match_fast_forward(Config) ->
|
||||||
M = get_module(Config),
|
M = get_module(Config),
|
||||||
|
|
Loading…
Reference in New Issue