From 6432c9c8fc99f3fc0388a7e8ef43bc3384f7215e Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 21 Jul 2023 20:06:46 +0200 Subject: [PATCH 1/5] fix(topicidx): allow to return matches unique by record id --- apps/emqx_rule_engine/src/emqx_rule_index.erl | 128 +++++++++++------- .../test/emqx_rule_index_SUITE.erl | 58 ++++---- 2 files changed, 109 insertions(+), 77 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_index.erl b/apps/emqx_rule_engine/src/emqx_rule_index.erl index 9c16bf3ad..7a3159f9a 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_index.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_index.erl @@ -16,29 +16,29 @@ %% @doc Topic index for matching topics to topic filters. %% -%% Works on top of ETS ordered_set table. Keys are parsed topic filters -%% with record ID appended to the end, wrapped in a tuple to disambiguate from -%% topic filter words. Existing table may be used if existing keys will not -%% collide with index keys. +%% Works on top of ETS ordered_set table. Keys are tuples constructed from +%% parsed topic filters and record IDs, wrapped in a tuple to order them +%% strictly greater than unit tuple (`{}`). Existing table may be used if +%% existing keys will not collide with index keys. %% %% Designed to effectively answer questions like: %% 1. Does any topic filter match given topic? %% 2. Which records are associated with topic filters matching given topic? -%% -%% Questions like these are _only slightly_ less effective: -%% 1. Which topic filters match given topic? -%% 2. Which record IDs are associated with topic filters matching given topic? +%% 3. Which topic filters match given topic? +%% 4. Which record IDs are associated with topic filters matching given topic? -module(emqx_rule_index). -export([insert/4]). -export([delete/3]). -export([match/2]). --export([matches/2]). +-export([matches/3]). +-export([get_id/1]). +-export([get_topic/1]). -export([get_record/2]). --type key(ID) :: [binary() | '+' | '#' | {ID}]. +-type key(ID) :: {[binary() | '+' | '#'], {ID}}. -type match(ID) :: key(ID). -ifdef(TEST). @@ -46,11 +46,10 @@ -endif. insert(Filter, ID, Record, Tab) -> - %% TODO: topic compact. see also in emqx_trie.erl - ets:insert(Tab, {emqx_topic:words(Filter) ++ [{ID}], Record}). + ets:insert(Tab, {{emqx_topic:words(Filter), {ID}}, Record}). delete(Filter, ID, Tab) -> - ets:delete(Tab, emqx_topic:words(Filter) ++ [{ID}]). + ets:delete(Tab, {emqx_topic:words(Filter), {ID}}). -spec match(emqx_types:topic(), ets:table()) -> match(_ID) | false. match(Topic, Tab) -> @@ -59,8 +58,8 @@ match(Topic, Tab) -> match(Words, RPrefix, Tab) -> Prefix = lists:reverse(RPrefix), - K = ets:next(Tab, Prefix), - case match_filter(Prefix, K, Words =/= []) of + K = ets:next(Tab, {Prefix, {}}), + case match_filter(Prefix, K, Words == []) of true -> K; stop -> @@ -73,7 +72,7 @@ match_rest(false, [W | Rest], RPrefix, Tab) -> match(Rest, [W | RPrefix], Tab); match_rest(plus, [W | Rest], RPrefix, Tab) -> case match(Rest, ['+' | RPrefix], Tab) of - Match when is_list(Match) -> + Match = {_, _} -> Match; false -> match(Rest, [W | RPrefix], Tab) @@ -81,48 +80,71 @@ match_rest(plus, [W | Rest], RPrefix, Tab) -> match_rest(_, [], _RPrefix, _Tab) -> false. --spec matches(emqx_types:topic(), ets:table()) -> [match(_ID)]. -matches(Topic, Tab) -> +-spec matches(emqx_types:topic(), ets:table(), _Opts :: [unique]) -> [match(_ID)]. +matches(Topic, Tab, Opts) -> {Words, RPrefix} = match_init(Topic), - matches(Words, RPrefix, Tab). - -matches(Words, RPrefix, Tab) -> - Prefix = lists:reverse(RPrefix), - matches(ets:next(Tab, Prefix), Prefix, Words, RPrefix, Tab). - -matches(K, Prefix, Words, RPrefix, Tab) -> - case match_filter(Prefix, K, Words =/= []) of - true -> - [K | matches(ets:next(Tab, K), Prefix, Words, RPrefix, Tab)]; - stop -> - []; - Matched -> - matches_rest(Matched, Words, RPrefix, Tab) + AccIn = + case Opts of + [unique | _] -> #{}; + [] -> [] + end, + Matches = matches(Words, RPrefix, AccIn, Tab), + case Matches of + #{} -> maps:values(Matches); + _ -> Matches end. -matches_rest(false, [W | Rest], RPrefix, Tab) -> - matches(Rest, [W | RPrefix], Tab); -matches_rest(plus, [W | Rest], RPrefix, Tab) -> - matches(Rest, ['+' | RPrefix], Tab) ++ matches(Rest, [W | RPrefix], Tab); -matches_rest(_, [], _RPrefix, _Tab) -> - []. +matches(Words, RPrefix, Acc, Tab) -> + Prefix = lists:reverse(RPrefix), + matches(ets:next(Tab, {Prefix, {}}), Prefix, Words, RPrefix, Acc, Tab). -match_filter([], [{_ID}], _IsPrefix = false) -> - % NOTE: exact match is `true` only if we match whole topic, not prefix - true; -match_filter([], ['#', {_ID}], _IsPrefix) -> +matches(K, Prefix, Words, RPrefix, Acc, Tab) -> + case match_filter(Prefix, K, Words == []) of + true -> + matches(ets:next(Tab, K), Prefix, Words, RPrefix, match_add(K, Acc), Tab); + stop -> + Acc; + Matched -> + matches_rest(Matched, Words, RPrefix, Acc, Tab) + end. + +matches_rest(false, [W | Rest], RPrefix, Acc, Tab) -> + matches(Rest, [W | RPrefix], Acc, Tab); +matches_rest(plus, [W | Rest], RPrefix, Acc, Tab) -> + NAcc = matches(Rest, ['+' | RPrefix], Acc, Tab), + matches(Rest, [W | RPrefix], NAcc, Tab); +matches_rest(_, [], _RPrefix, Acc, _Tab) -> + Acc. + +match_add(K = {_Filter, ID}, Acc = #{}) -> + Acc#{ID => K}; +match_add(K, Acc) -> + [K | Acc]. + +match_filter(Prefix, {Filter, _ID}, NotPrefix) -> + case match_filter(Prefix, Filter) of + exact -> + % NOTE: exact match is `true` only if we match whole topic, not prefix + NotPrefix; + Match -> + Match + end; +match_filter(_, '$end_of_table', _) -> + stop. + +match_filter([], []) -> + exact; +match_filter([], ['#']) -> % NOTE: naturally, '#' < '+', so this is already optimal for `match/2` true; -match_filter([], ['+' | _], _) -> +match_filter([], ['+' | _]) -> plus; -match_filter([], [_H | _], _) -> +match_filter([], [_H | _]) -> false; -match_filter([H | T1], [H | T2], IsPrefix) -> - match_filter(T1, T2, IsPrefix); -match_filter([H1 | _], [H2 | _], _) when H2 > H1 -> +match_filter([H | T1], [H | T2]) -> + match_filter(T1, T2); +match_filter([H1 | _], [H2 | _]) when H2 > H1 -> % NOTE: we're strictly past the prefix, no need to continue - stop; -match_filter(_, '$end_of_table', _) -> stop. match_init(Topic) -> @@ -135,6 +157,14 @@ match_init(Topic) -> {Words, []} end. +-spec get_id(match(ID)) -> ID. +get_id({_Filter, {ID}}) -> + ID. + +-spec get_topic(match(_ID)) -> emqx_types:topic(). +get_topic({Filter, _ID}) -> + emqx_topic:join(Filter). + -spec get_record(match(_ID), ets:table()) -> _Record. get_record(K, Tab) -> ets:lookup_element(Tab, K, 2). diff --git a/apps/emqx_rule_engine/test/emqx_rule_index_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_index_SUITE.erl index cf4b67cd4..42f3f1da0 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_index_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_index_SUITE.erl @@ -29,8 +29,8 @@ t_insert(_) -> true = emqx_rule_index:insert(<<"sensor/1/metric/2">>, t_insert_1, <<>>, Tab), true = emqx_rule_index:insert(<<"sensor/+/#">>, t_insert_2, <<>>, Tab), true = emqx_rule_index:insert(<<"sensor/#">>, t_insert_3, <<>>, Tab), - ?assertEqual(<<"sensor/#">>, get_topic(match(<<"sensor">>, Tab))), - ?assertEqual(t_insert_3, get_id(match(<<"sensor">>, Tab))), + ?assertEqual(<<"sensor/#">>, topic(match(<<"sensor">>, Tab))), + ?assertEqual(t_insert_3, id(match(<<"sensor">>, Tab))), true = ets:delete(Tab). t_match(_) -> @@ -40,7 +40,7 @@ t_match(_) -> true = emqx_rule_index:insert(<<"sensor/#">>, t_match_3, <<>>, Tab), ?assertMatch( [<<"sensor/#">>, <<"sensor/+/#">>], - [get_topic(M) || M <- matches(<<"sensor/1">>, Tab)] + [topic(M) || M <- matches(<<"sensor/1">>, Tab)] ), true = ets:delete(Tab). @@ -51,7 +51,7 @@ t_match2(_) -> true = emqx_rule_index:insert(<<"+/+/#">>, t_match2_3, <<>>, Tab), ?assertEqual( [<<"#">>, <<"+/#">>, <<"+/+/#">>], - [get_topic(M) || M <- matches(<<"a/b/c">>, Tab)] + [topic(M) || M <- matches(<<"a/b/c">>, Tab)] ), ?assertEqual( false, @@ -79,7 +79,7 @@ t_match3(_) -> end, ?assertEqual( t_match3_sys, - get_id(match(<<"$SYS/a/b/c">>, Tab)) + id(match(<<"$SYS/a/b/c">>, Tab)) ), true = ets:delete(Tab). @@ -92,11 +92,11 @@ t_match4(_) -> ), ?assertEqual( [<<"/#">>, <<"/+">>], - [get_topic(M) || M <- matches(<<"/">>, Tab)] + [topic(M) || M <- matches(<<"/">>, Tab)] ), ?assertEqual( [<<"/#">>, <<"/+/a/b/c">>], - [get_topic(M) || M <- matches(<<"/0/a/b/c">>, Tab)] + [topic(M) || M <- matches(<<"/0/a/b/c">>, Tab)] ), true = ets:delete(Tab). @@ -114,11 +114,11 @@ t_match5(_) -> ), ?assertEqual( [<<"#">>, <>], - [get_topic(M) || M <- matches(T, Tab)] + [topic(M) || M <- matches(T, Tab)] ), ?assertEqual( [<<"#">>, <>, <>], - [get_topic(M) || M <- matches(<>, Tab)] + [topic(M) || M <- matches(<>, Tab)] ), true = ets:delete(Tab). @@ -127,7 +127,7 @@ t_match6(_) -> T = <<"a/b/c/d/e/f/g/h/i/j/k/l/m/n/o/p/q/r/s/t/u/v/w/x/y/z">>, W = <<"+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/#">>, emqx_rule_index:insert(W, ID = t_match6, <<>>, Tab), - ?assertEqual(ID, get_id(match(T, Tab))), + ?assertEqual(ID, id(match(T, Tab))), true = ets:delete(Tab). t_match7(_) -> @@ -135,9 +135,23 @@ t_match7(_) -> T = <<"a/b/c/d/e/f/g/h/i/j/k/l/m/n/o/p/q/r/s/t/u/v/w/x/y/z">>, W = <<"a/+/c/+/e/+/g/+/i/+/k/+/m/+/o/+/q/+/s/+/u/+/w/+/y/+/#">>, emqx_rule_index:insert(W, t_match7, <<>>, Tab), - ?assertEqual(W, get_topic(match(T, Tab))), + ?assertEqual(W, topic(match(T, Tab))), true = ets:delete(Tab). +t_match_unique(_) -> + Tab = new(), + emqx_rule_index:insert(<<"a/b/c">>, t_match_id1, <<>>, Tab), + emqx_rule_index:insert(<<"a/b/+">>, t_match_id1, <<>>, Tab), + emqx_rule_index:insert(<<"a/b/c/+">>, t_match_id2, <<>>, Tab), + ?assertEqual( + [t_match_id1, t_match_id1], + [id(M) || M <- emqx_rule_index:matches(<<"a/b/c">>, Tab, [])] + ), + ?assertEqual( + [t_match_id1], + [id(M) || M <- emqx_rule_index:matches(<<"a/b/c">>, Tab, [unique])] + ). + new() -> ets:new(?MODULE, [public, ordered_set, {write_concurrency, true}]). @@ -145,22 +159,10 @@ match(T, Tab) -> emqx_rule_index:match(T, Tab). matches(T, Tab) -> - lists:sort(emqx_rule_index:matches(T, Tab)). + lists:sort(emqx_rule_index:matches(T, Tab, [])). --spec get_id(emqx_rule_index:match(ID)) -> ID. -get_id([{ID}]) -> - ID; -get_id([_ | Rest]) -> - get_id(Rest). +id(Match) -> + emqx_rule_index:get_id(Match). --spec get_topic(emqx_rule_index:match(_ID)) -> emqx_types:topic(). -get_topic(K) -> - emqx_topic:join(cut_topic(K)). - -cut_topic(K) -> - cut_topic(K, []). - -cut_topic([{_ID}], Acc) -> - lists:reverse(Acc); -cut_topic([W | Rest], Acc) -> - cut_topic(Rest, [W | Acc]). +topic(Match) -> + emqx_rule_index:get_topic(Match). From 04960383616ceb7eecd4bc04e70078412e19fbf4 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 21 Jul 2023 20:09:17 +0200 Subject: [PATCH 2/5] fix(ruleeng): ensure topic index matched rules evalauted once --- apps/emqx_rule_engine/src/emqx_rule_engine.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index d92931d77..dd4b52d44 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -226,7 +226,7 @@ get_rules_ordered_by_ts() -> get_rules_for_topic(Topic) -> [ emqx_rule_index:get_record(M, ?RULE_TOPIC_INDEX) - || M <- emqx_rule_index:matches(Topic, ?RULE_TOPIC_INDEX) + || M <- emqx_rule_index:matches(Topic, ?RULE_TOPIC_INDEX, [unique]) ]. -spec get_rules_with_same_event(Topic :: binary()) -> [rule()]. From dcf4819c044221147e835da873e8e0997ffe6a3d Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 24 Jul 2023 19:30:34 +0800 Subject: [PATCH 3/5] test(rule): add tests to ensure the rules ordering --- apps/emqx_rule_engine/src/emqx_rule_index.erl | 8 +++- .../test/emqx_rule_engine_SUITE.erl | 40 +++++++++++++++++++ .../test/emqx_rule_index_SUITE.erl | 10 +++++ 3 files changed, 56 insertions(+), 2 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_index.erl b/apps/emqx_rule_engine/src/emqx_rule_index.erl index 7a3159f9a..70564f62c 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_index.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_index.erl @@ -89,9 +89,13 @@ matches(Topic, Tab, Opts) -> [] -> [] end, Matches = matches(Words, RPrefix, AccIn, Tab), + %% return rules ordered by Rule ID case Matches of - #{} -> maps:values(Matches); - _ -> Matches + #{} -> + maps:values(Matches); + _ -> + F = fun({_, {ID1}}, {_, {ID2}}) -> ID1 < ID2 end, + lists:sort(F, Matches) end. matches(Words, RPrefix, Acc, Tab) -> diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index c8bebab99..87563f660 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -58,6 +58,7 @@ groups() -> t_create_existing_rule, t_get_rules_for_topic, t_get_rules_for_topic_2, + t_get_rules_for_topic_3, t_get_rules_with_same_event, t_get_rule_ids_by_action, t_ensure_action_removed @@ -399,6 +400,45 @@ t_get_rules_for_topic_2(_Config) -> ]), ok. +t_get_rules_for_topic_3(_Config) -> + ok = create_rules( + [ + make_simple_rule(<<"rule-debug-5">>, <<"select * from \"simple/#\"">>), + make_simple_rule(<<"rule-debug-4">>, <<"select * from \"simple/+\"">>), + make_simple_rule(<<"rule-debug-3">>, <<"select * from \"simple/+/1\"">>), + make_simple_rule(<<"rule-debug-2">>, <<"select * from \"simple/1\"">>), + make_simple_rule( + <<"rule-debug-1">>, + <<"select * from \"simple/2\", \"simple/+\", \"simple/3\"">> + ) + ] + ), + Rules1 = get_rules_for_topic_in_e510_impl(<<"simple/1">>), + Rules2 = emqx_rule_engine:get_rules_for_topic(<<"simple/1">>), + %% assert, ensure the order of rules is the same as e5.1.0 + ?assertEqual(Rules1, Rules2), + ?assertEqual( + [<<"rule-debug-1">>, <<"rule-debug-2">>, <<"rule-debug-4">>, <<"rule-debug-5">>], + [Id || #{id := Id} <- Rules1] + ), + + ok = delete_rules_by_ids([ + <<"rule-debug-1">>, + <<"rule-debug-2">>, + <<"rule-debug-3">>, + <<"rule-debug-4">>, + <<"rule-debug-5">>, + <<"rule-debug-6">> + ]), + ok. + +get_rules_for_topic_in_e510_impl(Topic) -> + [ + Rule + || Rule = #{from := From} <- emqx_rule_engine:get_rules(), + emqx_topic:match_any(Topic, From) + ]. + t_get_rules_with_same_event(_Config) -> PubT = <<"simple/1">>, PubN = length(emqx_rule_engine:get_rules_with_same_event(PubT)), diff --git a/apps/emqx_rule_engine/test/emqx_rule_index_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_index_SUITE.erl index 42f3f1da0..76ad1cda6 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_index_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_index_SUITE.erl @@ -152,6 +152,16 @@ t_match_unique(_) -> [id(M) || M <- emqx_rule_index:matches(<<"a/b/c">>, Tab, [unique])] ). +t_match_ordering(_) -> + Tab = new(), + emqx_rule_index:insert(<<"a/b/+">>, t_match_id2, <<>>, Tab), + emqx_rule_index:insert(<<"a/b/c">>, t_match_id1, <<>>, Tab), + emqx_rule_index:insert(<<"a/b/#">>, t_match_id3, <<>>, Tab), + Ids1 = [id(M) || M <- emqx_rule_index:matches(<<"a/b/c">>, Tab, [])], + Ids2 = [id(M) || M <- emqx_rule_index:matches(<<"a/b/c">>, Tab, [unique])], + ?assertEqual(Ids1, Ids2), + ?assertEqual([t_match_id1, t_match_id2, t_match_id3], Ids1). + new() -> ets:new(?MODULE, [public, ordered_set, {write_concurrency, true}]). From e630331de14ca266d5bb9d7bb1c187acc1e3cf50 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 24 Jul 2023 23:04:53 +0800 Subject: [PATCH 4/5] fix(rule): fix a quering problem when 'a/b' and 'a/b/#' exist at the same time. When using `ets:next` to query the next level of topic words, we should prioritize the next level of '#', '+'. --- apps/emqx_rule_engine/src/emqx_rule_index.erl | 11 +++++++++- .../test/emqx_rule_engine_SUITE.erl | 3 +-- .../test/emqx_rule_index_SUITE.erl | 20 ++++++++++++++++++- 3 files changed, 30 insertions(+), 4 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_index.erl b/apps/emqx_rule_engine/src/emqx_rule_index.erl index 70564f62c..16f23896a 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_index.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_index.erl @@ -114,6 +114,10 @@ matches(K, Prefix, Words, RPrefix, Acc, Tab) -> matches_rest(false, [W | Rest], RPrefix, Acc, Tab) -> matches(Rest, [W | RPrefix], Acc, Tab); +matches_rest({false, exact}, [W | Rest], RPrefix, Acc, Tab) -> + NAcc1 = matches(Rest, ['#' | RPrefix], Acc, Tab), + NAcc2 = matches(Rest, ['+' | RPrefix], NAcc1, Tab), + matches(Rest, [W | RPrefix], NAcc2, Tab); matches_rest(plus, [W | Rest], RPrefix, Acc, Tab) -> NAcc = matches(Rest, ['+' | RPrefix], Acc, Tab), matches(Rest, [W | RPrefix], NAcc, Tab); @@ -129,7 +133,12 @@ match_filter(Prefix, {Filter, _ID}, NotPrefix) -> case match_filter(Prefix, Filter) of exact -> % NOTE: exact match is `true` only if we match whole topic, not prefix - NotPrefix; + case NotPrefix of + true -> + true; + false -> + {false, exact} + end; Match -> Match end; diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 87563f660..2b70ea8ae 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -427,8 +427,7 @@ t_get_rules_for_topic_3(_Config) -> <<"rule-debug-2">>, <<"rule-debug-3">>, <<"rule-debug-4">>, - <<"rule-debug-5">>, - <<"rule-debug-6">> + <<"rule-debug-5">> ]), ok. diff --git a/apps/emqx_rule_engine/test/emqx_rule_index_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_index_SUITE.erl index 76ad1cda6..027d85b7e 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_index_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_index_SUITE.erl @@ -162,8 +162,26 @@ t_match_ordering(_) -> ?assertEqual(Ids1, Ids2), ?assertEqual([t_match_id1, t_match_id2, t_match_id3], Ids1). +t_match_wildcards(_) -> + Tab = new(), + emqx_rule_index:insert(<<"a/b">>, id1, <<>>, Tab), + emqx_rule_index:insert(<<"a/b/#">>, id2, <<>>, Tab), + emqx_rule_index:insert(<<"a/b/#">>, id3, <<>>, Tab), + emqx_rule_index:insert(<<"a/b/c">>, id4, <<>>, Tab), + emqx_rule_index:insert(<<"a/b/+">>, id5, <<>>, Tab), + emqx_rule_index:insert(<<"a/b/d">>, id6, <<>>, Tab), + emqx_rule_index:insert(<<"a/+/+">>, id7, <<>>, Tab), + emqx_rule_index:insert(<<"a/+/#">>, id8, <<>>, Tab), + + Rules = [id(M) || M <- emqx_rule_index:matches(<<"a/b/c">>, Tab, [])], + ?assertEqual([id2, id3, id4, id5, id7, id8], Rules), + + Rules1 = [id(M) || M <- emqx_rule_index:matches(<<"a/b">>, Tab, [])], + ?assertEqual([id1, id2, id3, id8], Rules1), + ok. + new() -> - ets:new(?MODULE, [public, ordered_set, {write_concurrency, true}]). + ets:new(?MODULE, [public, ordered_set, {read_concurrency, true}]). match(T, Tab) -> emqx_rule_index:match(T, Tab). From d05a5cfe0fc4ebd39ef46653b115fc7f28545e87 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 25 Jul 2023 14:28:50 +0800 Subject: [PATCH 5/5] fix(rule): fix the `matches/2` for some edge cases --- apps/emqx_rule_engine/src/emqx_rule_index.erl | 8 +- .../test/emqx_rule_index_SUITE.erl | 142 ++++++++++++++++-- 2 files changed, 132 insertions(+), 18 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_index.erl b/apps/emqx_rule_engine/src/emqx_rule_index.erl index 16f23896a..4dd395b94 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_index.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_index.erl @@ -114,8 +114,8 @@ matches(K, Prefix, Words, RPrefix, Acc, Tab) -> matches_rest(false, [W | Rest], RPrefix, Acc, Tab) -> matches(Rest, [W | RPrefix], Acc, Tab); -matches_rest({false, exact}, [W | Rest], RPrefix, Acc, Tab) -> - NAcc1 = matches(Rest, ['#' | RPrefix], Acc, Tab), +matches_rest(sharp, [W | Rest], RPrefix, Acc, Tab) -> + NAcc1 = matches([], ['#' | RPrefix], Acc, Tab), NAcc2 = matches(Rest, ['+' | RPrefix], NAcc1, Tab), matches(Rest, [W | RPrefix], NAcc2, Tab); matches_rest(plus, [W | Rest], RPrefix, Acc, Tab) -> @@ -137,7 +137,7 @@ match_filter(Prefix, {Filter, _ID}, NotPrefix) -> true -> true; false -> - {false, exact} + sharp end; Match -> Match @@ -147,6 +147,8 @@ match_filter(_, '$end_of_table', _) -> match_filter([], []) -> exact; +match_filter([], ['' | _]) -> + sharp; match_filter([], ['#']) -> % NOTE: naturally, '#' < '+', so this is already optimal for `match/2` true; diff --git a/apps/emqx_rule_engine/test/emqx_rule_index_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_index_SUITE.erl index 027d85b7e..8a65ee8e8 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_index_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_index_SUITE.erl @@ -19,6 +19,7 @@ -compile(export_all). -compile(nowarn_export_all). +-include_lib("proper/include/proper.hrl"). -include_lib("eunit/include/eunit.hrl"). all() -> @@ -162,23 +163,106 @@ t_match_ordering(_) -> ?assertEqual(Ids1, Ids2), ?assertEqual([t_match_id1, t_match_id2, t_match_id3], Ids1). -t_match_wildcards(_) -> - Tab = new(), - emqx_rule_index:insert(<<"a/b">>, id1, <<>>, Tab), - emqx_rule_index:insert(<<"a/b/#">>, id2, <<>>, Tab), - emqx_rule_index:insert(<<"a/b/#">>, id3, <<>>, Tab), - emqx_rule_index:insert(<<"a/b/c">>, id4, <<>>, Tab), - emqx_rule_index:insert(<<"a/b/+">>, id5, <<>>, Tab), - emqx_rule_index:insert(<<"a/b/d">>, id6, <<>>, Tab), - emqx_rule_index:insert(<<"a/+/+">>, id7, <<>>, Tab), - emqx_rule_index:insert(<<"a/+/#">>, id8, <<>>, Tab), +t_match_wildcard_edge_cases(_) -> + CommonTopics = [ + <<"a/b">>, + <<"a/b/#">>, + <<"a/b/#">>, + <<"a/b/c">>, + <<"a/b/+">>, + <<"a/b/d">>, + <<"a/+/+">>, + <<"a/+/#">> + ], + Datasets = + [ + %% Topics, TopicName, Results + {CommonTopics, <<"a/b/c">>, [2, 3, 4, 5, 7, 8]}, + {CommonTopics, <<"a/b">>, [1, 2, 3, 8]}, + {[<<"+/b/c">>, <<"/">>], <<"a/b/c">>, [1]}, + {[<<"#">>, <<"/">>], <<"a">>, [1]}, + {[<<"/">>, <<"+">>], <<"a">>, [2]} + ], + F = fun({Topics, TopicName, Expected}) -> + Tab = new(), + _ = lists:foldl( + fun(T, N) -> + emqx_rule_index:insert(T, N, <<>>, Tab), + N + 1 + end, + 1, + Topics + ), + Results = [id(M) || M <- emqx_rule_index:matches(TopicName, Tab, [unique])], + case Results == Expected of + true -> + ets:delete(Tab); + false -> + ct:pal( + "Base topics: ~p~n" + "Topic name: ~p~n" + "Index results: ~p~n" + "Expected results:: ~p~n", + [Topics, TopicName, Results, Expected] + ), + error(bad_matches) + end + end, + lists:foreach(F, Datasets). - Rules = [id(M) || M <- emqx_rule_index:matches(<<"a/b/c">>, Tab, [])], - ?assertEqual([id2, id3, id4, id5, id7, id8], Rules), +t_prop_matches(_) -> + ?assert( + proper:quickcheck( + topic_matches_prop(), + [{max_size, 100}, {numtests, 100}] + ) + ). - Rules1 = [id(M) || M <- emqx_rule_index:matches(<<"a/b">>, Tab, [])], - ?assertEqual([id1, id2, id3, id8], Rules1), - ok. +topic_matches_prop() -> + ?FORALL( + Topics0, + list(emqx_proper_types:topic()), + begin + Tab = new(), + Topics = lists:filter(fun(Topic) -> Topic =/= <<>> end, Topics0), + lists:foldl( + fun(Topic, N) -> + true = emqx_rule_index:insert(Topic, N, <<>>, Tab), + N + 1 + end, + 1, + Topics + ), + lists:foreach( + fun(Topic0) -> + Topic = topic_filter_to_topic_name(Topic0), + Ids1 = [ + emqx_rule_index:get_id(R) + || R <- emqx_rule_index:matches(Topic, Tab, [unique]) + ], + Ids2 = topic_matches(Topic, Topics), + case Ids2 == Ids1 of + true -> + ok; + false -> + ct:pal( + "Base topics: ~p~n" + "Topic name: ~p~n" + "Index results: ~p~n" + "Topic match results:: ~p~n", + [Topics, Topic, Ids1, Ids2] + ), + error(bad_matches) + end + end, + Topics + ), + true + end + ). + +%%-------------------------------------------------------------------- +%% helpers new() -> ets:new(?MODULE, [public, ordered_set, {read_concurrency, true}]). @@ -194,3 +278,31 @@ id(Match) -> topic(Match) -> emqx_rule_index:get_topic(Match). + +topic_filter_to_topic_name(Topic) when is_binary(Topic) -> + topic_filter_to_topic_name(emqx_topic:words(Topic)); +topic_filter_to_topic_name(Words) when is_list(Words) -> + topic_filter_to_topic_name(Words, []). + +topic_filter_to_topic_name([], Acc) -> + emqx_topic:join(lists:reverse(Acc)); +topic_filter_to_topic_name(['#' | _Rest], Acc) -> + case rand:uniform(2) of + 1 -> emqx_topic:join(lists:reverse(Acc)); + _ -> emqx_topic:join(lists:reverse([<<"_sharp">> | Acc])) + end; +topic_filter_to_topic_name(['+' | Rest], Acc) -> + topic_filter_to_topic_name(Rest, [<<"_plus">> | Acc]); +topic_filter_to_topic_name([H | Rest], Acc) -> + topic_filter_to_topic_name(Rest, [H | Acc]). + +topic_matches(Topic, Topics0) -> + Topics = lists:zip(lists:seq(1, length(Topics0)), Topics0), + lists:sort( + lists:filtermap( + fun({Id, Topic0}) -> + emqx_topic:match(Topic, Topic0) andalso {true, Id} + end, + Topics + ) + ).