diff --git a/apps/emqx_rule_engine/src/emqx_rule_index.erl b/apps/emqx_rule_engine/src/emqx_rule_index.erl index 9c16bf3ad..7a3159f9a 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_index.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_index.erl @@ -16,29 +16,29 @@ %% @doc Topic index for matching topics to topic filters. %% -%% Works on top of ETS ordered_set table. Keys are parsed topic filters -%% with record ID appended to the end, wrapped in a tuple to disambiguate from -%% topic filter words. Existing table may be used if existing keys will not -%% collide with index keys. +%% Works on top of ETS ordered_set table. Keys are tuples constructed from +%% parsed topic filters and record IDs, wrapped in a tuple to order them +%% strictly greater than unit tuple (`{}`). Existing table may be used if +%% existing keys will not collide with index keys. %% %% Designed to effectively answer questions like: %% 1. Does any topic filter match given topic? %% 2. Which records are associated with topic filters matching given topic? -%% -%% Questions like these are _only slightly_ less effective: -%% 1. Which topic filters match given topic? -%% 2. Which record IDs are associated with topic filters matching given topic? +%% 3. Which topic filters match given topic? +%% 4. Which record IDs are associated with topic filters matching given topic? -module(emqx_rule_index). -export([insert/4]). -export([delete/3]). -export([match/2]). --export([matches/2]). +-export([matches/3]). +-export([get_id/1]). +-export([get_topic/1]). -export([get_record/2]). --type key(ID) :: [binary() | '+' | '#' | {ID}]. +-type key(ID) :: {[binary() | '+' | '#'], {ID}}. -type match(ID) :: key(ID). -ifdef(TEST). @@ -46,11 +46,10 @@ -endif. insert(Filter, ID, Record, Tab) -> - %% TODO: topic compact. see also in emqx_trie.erl - ets:insert(Tab, {emqx_topic:words(Filter) ++ [{ID}], Record}). + ets:insert(Tab, {{emqx_topic:words(Filter), {ID}}, Record}). delete(Filter, ID, Tab) -> - ets:delete(Tab, emqx_topic:words(Filter) ++ [{ID}]). + ets:delete(Tab, {emqx_topic:words(Filter), {ID}}). -spec match(emqx_types:topic(), ets:table()) -> match(_ID) | false. match(Topic, Tab) -> @@ -59,8 +58,8 @@ match(Topic, Tab) -> match(Words, RPrefix, Tab) -> Prefix = lists:reverse(RPrefix), - K = ets:next(Tab, Prefix), - case match_filter(Prefix, K, Words =/= []) of + K = ets:next(Tab, {Prefix, {}}), + case match_filter(Prefix, K, Words == []) of true -> K; stop -> @@ -73,7 +72,7 @@ match_rest(false, [W | Rest], RPrefix, Tab) -> match(Rest, [W | RPrefix], Tab); match_rest(plus, [W | Rest], RPrefix, Tab) -> case match(Rest, ['+' | RPrefix], Tab) of - Match when is_list(Match) -> + Match = {_, _} -> Match; false -> match(Rest, [W | RPrefix], Tab) @@ -81,48 +80,71 @@ match_rest(plus, [W | Rest], RPrefix, Tab) -> match_rest(_, [], _RPrefix, _Tab) -> false. --spec matches(emqx_types:topic(), ets:table()) -> [match(_ID)]. -matches(Topic, Tab) -> +-spec matches(emqx_types:topic(), ets:table(), _Opts :: [unique]) -> [match(_ID)]. +matches(Topic, Tab, Opts) -> {Words, RPrefix} = match_init(Topic), - matches(Words, RPrefix, Tab). - -matches(Words, RPrefix, Tab) -> - Prefix = lists:reverse(RPrefix), - matches(ets:next(Tab, Prefix), Prefix, Words, RPrefix, Tab). - -matches(K, Prefix, Words, RPrefix, Tab) -> - case match_filter(Prefix, K, Words =/= []) of - true -> - [K | matches(ets:next(Tab, K), Prefix, Words, RPrefix, Tab)]; - stop -> - []; - Matched -> - matches_rest(Matched, Words, RPrefix, Tab) + AccIn = + case Opts of + [unique | _] -> #{}; + [] -> [] + end, + Matches = matches(Words, RPrefix, AccIn, Tab), + case Matches of + #{} -> maps:values(Matches); + _ -> Matches end. -matches_rest(false, [W | Rest], RPrefix, Tab) -> - matches(Rest, [W | RPrefix], Tab); -matches_rest(plus, [W | Rest], RPrefix, Tab) -> - matches(Rest, ['+' | RPrefix], Tab) ++ matches(Rest, [W | RPrefix], Tab); -matches_rest(_, [], _RPrefix, _Tab) -> - []. +matches(Words, RPrefix, Acc, Tab) -> + Prefix = lists:reverse(RPrefix), + matches(ets:next(Tab, {Prefix, {}}), Prefix, Words, RPrefix, Acc, Tab). -match_filter([], [{_ID}], _IsPrefix = false) -> - % NOTE: exact match is `true` only if we match whole topic, not prefix - true; -match_filter([], ['#', {_ID}], _IsPrefix) -> +matches(K, Prefix, Words, RPrefix, Acc, Tab) -> + case match_filter(Prefix, K, Words == []) of + true -> + matches(ets:next(Tab, K), Prefix, Words, RPrefix, match_add(K, Acc), Tab); + stop -> + Acc; + Matched -> + matches_rest(Matched, Words, RPrefix, Acc, Tab) + end. + +matches_rest(false, [W | Rest], RPrefix, Acc, Tab) -> + matches(Rest, [W | RPrefix], Acc, Tab); +matches_rest(plus, [W | Rest], RPrefix, Acc, Tab) -> + NAcc = matches(Rest, ['+' | RPrefix], Acc, Tab), + matches(Rest, [W | RPrefix], NAcc, Tab); +matches_rest(_, [], _RPrefix, Acc, _Tab) -> + Acc. + +match_add(K = {_Filter, ID}, Acc = #{}) -> + Acc#{ID => K}; +match_add(K, Acc) -> + [K | Acc]. + +match_filter(Prefix, {Filter, _ID}, NotPrefix) -> + case match_filter(Prefix, Filter) of + exact -> + % NOTE: exact match is `true` only if we match whole topic, not prefix + NotPrefix; + Match -> + Match + end; +match_filter(_, '$end_of_table', _) -> + stop. + +match_filter([], []) -> + exact; +match_filter([], ['#']) -> % NOTE: naturally, '#' < '+', so this is already optimal for `match/2` true; -match_filter([], ['+' | _], _) -> +match_filter([], ['+' | _]) -> plus; -match_filter([], [_H | _], _) -> +match_filter([], [_H | _]) -> false; -match_filter([H | T1], [H | T2], IsPrefix) -> - match_filter(T1, T2, IsPrefix); -match_filter([H1 | _], [H2 | _], _) when H2 > H1 -> +match_filter([H | T1], [H | T2]) -> + match_filter(T1, T2); +match_filter([H1 | _], [H2 | _]) when H2 > H1 -> % NOTE: we're strictly past the prefix, no need to continue - stop; -match_filter(_, '$end_of_table', _) -> stop. match_init(Topic) -> @@ -135,6 +157,14 @@ match_init(Topic) -> {Words, []} end. +-spec get_id(match(ID)) -> ID. +get_id({_Filter, {ID}}) -> + ID. + +-spec get_topic(match(_ID)) -> emqx_types:topic(). +get_topic({Filter, _ID}) -> + emqx_topic:join(Filter). + -spec get_record(match(_ID), ets:table()) -> _Record. get_record(K, Tab) -> ets:lookup_element(Tab, K, 2). diff --git a/apps/emqx_rule_engine/test/emqx_rule_index_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_index_SUITE.erl index cf4b67cd4..42f3f1da0 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_index_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_index_SUITE.erl @@ -29,8 +29,8 @@ t_insert(_) -> true = emqx_rule_index:insert(<<"sensor/1/metric/2">>, t_insert_1, <<>>, Tab), true = emqx_rule_index:insert(<<"sensor/+/#">>, t_insert_2, <<>>, Tab), true = emqx_rule_index:insert(<<"sensor/#">>, t_insert_3, <<>>, Tab), - ?assertEqual(<<"sensor/#">>, get_topic(match(<<"sensor">>, Tab))), - ?assertEqual(t_insert_3, get_id(match(<<"sensor">>, Tab))), + ?assertEqual(<<"sensor/#">>, topic(match(<<"sensor">>, Tab))), + ?assertEqual(t_insert_3, id(match(<<"sensor">>, Tab))), true = ets:delete(Tab). t_match(_) -> @@ -40,7 +40,7 @@ t_match(_) -> true = emqx_rule_index:insert(<<"sensor/#">>, t_match_3, <<>>, Tab), ?assertMatch( [<<"sensor/#">>, <<"sensor/+/#">>], - [get_topic(M) || M <- matches(<<"sensor/1">>, Tab)] + [topic(M) || M <- matches(<<"sensor/1">>, Tab)] ), true = ets:delete(Tab). @@ -51,7 +51,7 @@ t_match2(_) -> true = emqx_rule_index:insert(<<"+/+/#">>, t_match2_3, <<>>, Tab), ?assertEqual( [<<"#">>, <<"+/#">>, <<"+/+/#">>], - [get_topic(M) || M <- matches(<<"a/b/c">>, Tab)] + [topic(M) || M <- matches(<<"a/b/c">>, Tab)] ), ?assertEqual( false, @@ -79,7 +79,7 @@ t_match3(_) -> end, ?assertEqual( t_match3_sys, - get_id(match(<<"$SYS/a/b/c">>, Tab)) + id(match(<<"$SYS/a/b/c">>, Tab)) ), true = ets:delete(Tab). @@ -92,11 +92,11 @@ t_match4(_) -> ), ?assertEqual( [<<"/#">>, <<"/+">>], - [get_topic(M) || M <- matches(<<"/">>, Tab)] + [topic(M) || M <- matches(<<"/">>, Tab)] ), ?assertEqual( [<<"/#">>, <<"/+/a/b/c">>], - [get_topic(M) || M <- matches(<<"/0/a/b/c">>, Tab)] + [topic(M) || M <- matches(<<"/0/a/b/c">>, Tab)] ), true = ets:delete(Tab). @@ -114,11 +114,11 @@ t_match5(_) -> ), ?assertEqual( [<<"#">>, <>], - [get_topic(M) || M <- matches(T, Tab)] + [topic(M) || M <- matches(T, Tab)] ), ?assertEqual( [<<"#">>, <>, <>], - [get_topic(M) || M <- matches(<>, Tab)] + [topic(M) || M <- matches(<>, Tab)] ), true = ets:delete(Tab). @@ -127,7 +127,7 @@ t_match6(_) -> T = <<"a/b/c/d/e/f/g/h/i/j/k/l/m/n/o/p/q/r/s/t/u/v/w/x/y/z">>, W = <<"+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/#">>, emqx_rule_index:insert(W, ID = t_match6, <<>>, Tab), - ?assertEqual(ID, get_id(match(T, Tab))), + ?assertEqual(ID, id(match(T, Tab))), true = ets:delete(Tab). t_match7(_) -> @@ -135,9 +135,23 @@ t_match7(_) -> T = <<"a/b/c/d/e/f/g/h/i/j/k/l/m/n/o/p/q/r/s/t/u/v/w/x/y/z">>, W = <<"a/+/c/+/e/+/g/+/i/+/k/+/m/+/o/+/q/+/s/+/u/+/w/+/y/+/#">>, emqx_rule_index:insert(W, t_match7, <<>>, Tab), - ?assertEqual(W, get_topic(match(T, Tab))), + ?assertEqual(W, topic(match(T, Tab))), true = ets:delete(Tab). +t_match_unique(_) -> + Tab = new(), + emqx_rule_index:insert(<<"a/b/c">>, t_match_id1, <<>>, Tab), + emqx_rule_index:insert(<<"a/b/+">>, t_match_id1, <<>>, Tab), + emqx_rule_index:insert(<<"a/b/c/+">>, t_match_id2, <<>>, Tab), + ?assertEqual( + [t_match_id1, t_match_id1], + [id(M) || M <- emqx_rule_index:matches(<<"a/b/c">>, Tab, [])] + ), + ?assertEqual( + [t_match_id1], + [id(M) || M <- emqx_rule_index:matches(<<"a/b/c">>, Tab, [unique])] + ). + new() -> ets:new(?MODULE, [public, ordered_set, {write_concurrency, true}]). @@ -145,22 +159,10 @@ match(T, Tab) -> emqx_rule_index:match(T, Tab). matches(T, Tab) -> - lists:sort(emqx_rule_index:matches(T, Tab)). + lists:sort(emqx_rule_index:matches(T, Tab, [])). --spec get_id(emqx_rule_index:match(ID)) -> ID. -get_id([{ID}]) -> - ID; -get_id([_ | Rest]) -> - get_id(Rest). +id(Match) -> + emqx_rule_index:get_id(Match). --spec get_topic(emqx_rule_index:match(_ID)) -> emqx_types:topic(). -get_topic(K) -> - emqx_topic:join(cut_topic(K)). - -cut_topic(K) -> - cut_topic(K, []). - -cut_topic([{_ID}], Acc) -> - lists:reverse(Acc); -cut_topic([W | Rest], Acc) -> - cut_topic(Rest, [W | Acc]). +topic(Match) -> + emqx_rule_index:get_topic(Match).