From 28bcb394d150d066e30bb78d7be3ecdf375b6681 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 21 Jul 2023 20:06:46 +0200 Subject: [PATCH] fix(topicidx): allow to return matches unique by record id --- apps/emqx/src/emqx_topic_index.erl | 132 ++++++++++++---------- apps/emqx/test/emqx_topic_index_SUITE.erl | 16 ++- 2 files changed, 88 insertions(+), 60 deletions(-) diff --git a/apps/emqx/src/emqx_topic_index.erl b/apps/emqx/src/emqx_topic_index.erl index 9f0b5fba1..ac4901a09 100644 --- a/apps/emqx/src/emqx_topic_index.erl +++ b/apps/emqx/src/emqx_topic_index.erl @@ -16,18 +16,16 @@ %% @doc Topic index for matching topics to topic filters. %% -%% Works on top of ETS ordered_set table. Keys are parsed topic filters -%% with record ID appended to the end, wrapped in a tuple to disambiguate from -%% topic filter words. Existing table may be used if existing keys will not -%% collide with index keys. +%% Works on top of ETS ordered_set table. Keys are tuples constructed from +%% parsed topic filters and record IDs, wrapped in a tuple to order them +%% strictly greater than unit tuple (`{}`). Existing table may be used if +%% existing keys will not collide with index keys. %% %% Designed to effectively answer questions like: %% 1. Does any topic filter match given topic? %% 2. Which records are associated with topic filters matching given topic? -%% -%% Questions like these are _only slightly_ less effective: -%% 1. Which topic filters match given topic? -%% 2. Which record IDs are associated with topic filters matching given topic? +%% 3. Which topic filters match given topic? +%% 4. Which record IDs are associated with topic filters matching given topic? -module(emqx_topic_index). @@ -35,23 +33,23 @@ -export([insert/4]). -export([delete/3]). -export([match/2]). --export([matches/2]). +-export([matches/3]). -export([get_id/1]). -export([get_topic/1]). -export([get_record/2]). --type key(ID) :: [binary() | '+' | '#' | {ID}]. +-type key(ID) :: {[binary() | '+' | '#'], {ID}}. -type match(ID) :: key(ID). new() -> ets:new(?MODULE, [public, ordered_set, {write_concurrency, true}]). insert(Filter, ID, Record, Tab) -> - ets:insert(Tab, {emqx_topic:words(Filter) ++ [{ID}], Record}). + ets:insert(Tab, {{emqx_topic:words(Filter), {ID}}, Record}). delete(Filter, ID, Tab) -> - ets:delete(Tab, emqx_topic:words(Filter) ++ [{ID}]). + ets:delete(Tab, {emqx_topic:words(Filter), {ID}}). -spec match(emqx_types:topic(), ets:table()) -> match(_ID) | false. match(Topic, Tab) -> @@ -60,8 +58,8 @@ match(Topic, Tab) -> match(Words, RPrefix, Tab) -> Prefix = lists:reverse(RPrefix), - K = ets:next(Tab, Prefix), - case match_filter(Prefix, K, Words =/= []) of + K = ets:next(Tab, {Prefix, {}}), + case match_filter(Prefix, K, Words == []) of true -> K; stop -> @@ -74,7 +72,7 @@ match_rest(false, [W | Rest], RPrefix, Tab) -> match(Rest, [W | RPrefix], Tab); match_rest(plus, [W | Rest], RPrefix, Tab) -> case match(Rest, ['+' | RPrefix], Tab) of - Match when is_list(Match) -> + Match = {_, _} -> Match; false -> match(Rest, [W | RPrefix], Tab) @@ -82,48 +80,71 @@ match_rest(plus, [W | Rest], RPrefix, Tab) -> match_rest(_, [], _RPrefix, _Tab) -> false. --spec matches(emqx_types:topic(), ets:table()) -> [match(_ID)]. -matches(Topic, Tab) -> +-spec matches(emqx_types:topic(), ets:table(), _Opts :: [unique]) -> [match(_ID)]. +matches(Topic, Tab, Opts) -> {Words, RPrefix} = match_init(Topic), - matches(Words, RPrefix, Tab). - -matches(Words, RPrefix, Tab) -> - Prefix = lists:reverse(RPrefix), - matches(ets:next(Tab, Prefix), Prefix, Words, RPrefix, Tab). - -matches(K, Prefix, Words, RPrefix, Tab) -> - case match_filter(Prefix, K, Words =/= []) of - true -> - [K | matches(ets:next(Tab, K), Prefix, Words, RPrefix, Tab)]; - stop -> - []; - Matched -> - matches_rest(Matched, Words, RPrefix, Tab) + AccIn = + case Opts of + [unique | _] -> #{}; + [] -> [] + end, + Matches = matches(Words, RPrefix, AccIn, Tab), + case Matches of + #{} -> maps:values(Matches); + _ -> Matches end. -matches_rest(false, [W | Rest], RPrefix, Tab) -> - matches(Rest, [W | RPrefix], Tab); -matches_rest(plus, [W | Rest], RPrefix, Tab) -> - matches(Rest, ['+' | RPrefix], Tab) ++ matches(Rest, [W | RPrefix], Tab); -matches_rest(_, [], _RPrefix, _Tab) -> - []. +matches(Words, RPrefix, Acc, Tab) -> + Prefix = lists:reverse(RPrefix), + matches(ets:next(Tab, {Prefix, {}}), Prefix, Words, RPrefix, Acc, Tab). -match_filter([], [{_ID}], _IsPrefix = false) -> - % NOTE: exact match is `true` only if we match whole topic, not prefix - true; -match_filter([], ['#', {_ID}], _IsPrefix) -> +matches(K, Prefix, Words, RPrefix, Acc, Tab) -> + case match_filter(Prefix, K, Words == []) of + true -> + matches(ets:next(Tab, K), Prefix, Words, RPrefix, match_add(K, Acc), Tab); + stop -> + Acc; + Matched -> + matches_rest(Matched, Words, RPrefix, Acc, Tab) + end. + +matches_rest(false, [W | Rest], RPrefix, Acc, Tab) -> + matches(Rest, [W | RPrefix], Acc, Tab); +matches_rest(plus, [W | Rest], RPrefix, Acc, Tab) -> + NAcc = matches(Rest, ['+' | RPrefix], Acc, Tab), + matches(Rest, [W | RPrefix], NAcc, Tab); +matches_rest(_, [], _RPrefix, Acc, _Tab) -> + Acc. + +match_add(K = {_Filter, ID}, Acc = #{}) -> + Acc#{ID => K}; +match_add(K, Acc) -> + [K | Acc]. + +match_filter(Prefix, {Filter, _ID}, NotPrefix) -> + case match_filter(Prefix, Filter) of + exact -> + % NOTE: exact match is `true` only if we match whole topic, not prefix + NotPrefix; + Match -> + Match + end; +match_filter(_, '$end_of_table', _) -> + stop. + +match_filter([], []) -> + exact; +match_filter([], ['#']) -> % NOTE: naturally, '#' < '+', so this is already optimal for `match/2` true; -match_filter([], ['+' | _], _) -> +match_filter([], ['+' | _]) -> plus; -match_filter([], [_H | _], _) -> +match_filter([], [_H | _]) -> false; -match_filter([H | T1], [H | T2], IsPrefix) -> - match_filter(T1, T2, IsPrefix); -match_filter([H1 | _], [H2 | _], _) when H2 > H1 -> +match_filter([H | T1], [H | T2]) -> + match_filter(T1, T2); +match_filter([H1 | _], [H2 | _]) when H2 > H1 -> % NOTE: we're strictly past the prefix, no need to continue - stop; -match_filter(_, '$end_of_table', _) -> stop. match_init(Topic) -> @@ -137,19 +158,12 @@ match_init(Topic) -> end. -spec get_id(match(ID)) -> ID. -get_id([{ID}]) -> - ID; -get_id([_ | Rest]) -> - get_id(Rest). +get_id({_Filter, {ID}}) -> + ID. -spec get_topic(match(_ID)) -> emqx_types:topic(). -get_topic(K) -> - emqx_topic:join(cut_topic(K)). - -cut_topic([{_ID}]) -> - []; -cut_topic([W | Rest]) -> - [W | cut_topic(Rest)]. +get_topic({Filter, _ID}) -> + emqx_topic:join(Filter). -spec get_record(match(_ID), ets:table()) -> _Record. get_record(K, Tab) -> diff --git a/apps/emqx/test/emqx_topic_index_SUITE.erl b/apps/emqx/test/emqx_topic_index_SUITE.erl index 98bfe48a1..b5faba4d9 100644 --- a/apps/emqx/test/emqx_topic_index_SUITE.erl +++ b/apps/emqx/test/emqx_topic_index_SUITE.erl @@ -130,11 +130,25 @@ t_match7(_) -> emqx_topic_index:insert(W, t_match7, <<>>, Tab), ?assertEqual(W, topic(match(T, Tab))). +t_match_unique(_) -> + Tab = emqx_topic_index:new(), + emqx_topic_index:insert(<<"a/b/c">>, t_match_id1, <<>>, Tab), + emqx_topic_index:insert(<<"a/b/+">>, t_match_id1, <<>>, Tab), + emqx_topic_index:insert(<<"a/b/c/+">>, t_match_id2, <<>>, Tab), + ?assertEqual( + [t_match_id1, t_match_id1], + [id(M) || M <- emqx_topic_index:matches(<<"a/b/c">>, Tab, [])] + ), + ?assertEqual( + [t_match_id1], + [id(M) || M <- emqx_topic_index:matches(<<"a/b/c">>, Tab, [unique])] + ). + match(T, Tab) -> emqx_topic_index:match(T, Tab). matches(T, Tab) -> - lists:sort(emqx_topic_index:matches(T, Tab)). + lists:sort(emqx_topic_index:matches(T, Tab, [])). id(Match) -> emqx_topic_index:get_id(Match).