diff --git a/apps/emqx/src/emqx_topic_gbt.erl b/apps/emqx/src/emqx_topic_gbt.erl index dade70cee..063cba21d 100644 --- a/apps/emqx/src/emqx_topic_gbt.erl +++ b/apps/emqx/src/emqx_topic_gbt.erl @@ -29,8 +29,8 @@ -export([get_topic/1]). -export([get_record/2]). --type word() :: binary() | '+' | '#'. --type key(ID) :: {[word()], {ID}}. +-type key(ID) :: emqx_trie_search:key(ID). +-type words() :: emqx_trie_search:words(). -type match(ID) :: key(ID). -type name() :: any(). @@ -50,7 +50,7 @@ new(Name) -> %% @doc Insert a new entry into the index that associates given topic filter to given %% record ID, and attaches arbitrary record to the entry. This allows users to choose %% between regular and "materialized" indexes, for example. --spec insert(emqx_types:topic(), _ID, _Record, name()) -> true. +-spec insert(emqx_types:topic() | words(), _ID, _Record, name()) -> true. insert(Filter, ID, Record, Name) -> Tree = gbt(Name), Key = key(Filter, ID), @@ -59,7 +59,7 @@ insert(Filter, ID, Record, Name) -> %% @doc Delete an entry from the index that associates given topic filter to given %% record ID. Deleting non-existing entry is not an error. --spec delete(emqx_types:topic(), _ID, name()) -> true. +-spec delete(emqx_types:topic() | words(), _ID, name()) -> true. delete(Filter, ID, Name) -> Tree = gbt(Name), Key = key(Filter, ID), diff --git a/apps/emqx/src/emqx_topic_index.erl b/apps/emqx/src/emqx_topic_index.erl index 5db8ce7cc..59dfdfeab 100644 --- a/apps/emqx/src/emqx_topic_index.erl +++ b/apps/emqx/src/emqx_topic_index.erl @@ -32,6 +32,7 @@ -type key(ID) :: emqx_trie_search:key(ID). -type match(ID) :: key(ID). +-type words() :: emqx_trie_search:words(). %% @doc Create a new ETS table suitable for topic index. %% Usable mostly for testing purposes. @@ -42,18 +43,18 @@ new() -> %% @doc Insert a new entry into the index that associates given topic filter to given %% record ID, and attaches arbitrary record to the entry. This allows users to choose %% between regular and "materialized" indexes, for example. --spec insert(emqx_types:topic(), _ID, _Record, ets:table()) -> true. +-spec insert(emqx_types:topic() | words(), _ID, _Record, ets:table()) -> true. insert(Filter, ID, Record, Tab) -> Key = make_key(Filter, ID), true = ets:insert(Tab, {Key, Record}). %% @doc Delete an entry from the index that associates given topic filter to given %% record ID. Deleting non-existing entry is not an error. --spec delete(emqx_types:topic(), _ID, ets:table()) -> true. +-spec delete(emqx_types:topic() | words(), _ID, ets:table()) -> true. delete(Filter, ID, Tab) -> ets:delete(Tab, make_key(Filter, ID)). --spec make_key(emqx_types:topic(), ID) -> key(ID). +-spec make_key(emqx_types:topic() | words(), ID) -> key(ID). make_key(TopicOrFilter, ID) -> emqx_trie_search:make_key(TopicOrFilter, ID). diff --git a/apps/emqx/src/emqx_trie_search.erl b/apps/emqx/src/emqx_trie_search.erl index b774e1459..c8c088b58 100644 --- a/apps/emqx/src/emqx_trie_search.erl +++ b/apps/emqx/src/emqx_trie_search.erl @@ -98,24 +98,24 @@ -module(emqx_trie_search). --export([make_key/2]). +-export([make_key/2, filter/1]). -export([match/2, matches/3, get_id/1, get_topic/1]). --export_type([key/1, word/0, nextf/0, opts/0]). +-export_type([key/1, word/0, words/0, nextf/0, opts/0]). -define(END, '$end_of_table'). -type word() :: binary() | '+' | '#'. +-type words() :: [word()]. -type base_key() :: {binary() | [word()], {}}. -type key(ID) :: {binary() | [word()], {ID}}. -type nextf() :: fun((key(_) | base_key()) -> ?END | key(_)). -type opts() :: [unique | return_first]. %% @doc Make a search-key for the given topic. --spec make_key(emqx_types:topic(), ID) -> key(ID). +-spec make_key(emqx_types:topic() | words(), ID) -> key(ID). make_key(Topic, ID) when is_binary(Topic) -> - Words = filter_words(Topic), - case emqx_topic:wildcard(Words) of - true -> + case filter(Topic) of + Words when is_list(Words) -> %% it's a wildcard {Words, {ID}}; false -> @@ -123,7 +123,15 @@ make_key(Topic, ID) when is_binary(Topic) -> %% because they can be found with direct lookups. %% it is also more compact in memory. {Topic, {ID}} - end. + end; +make_key(Words, ID) when is_list(Words) -> + {Words, {ID}}. + +%% @doc Parse a topic filter into a list of words. Returns `false` if it's not a filter. +-spec filter(emqx_types:topic()) -> words() | false. +filter(Topic) -> + Words = filter_words(Topic), + emqx_topic:wildcard(Words) andalso Words. %% @doc Extract record ID from the match. -spec get_id(key(ID)) -> ID. @@ -325,6 +333,7 @@ filter_words(Topic) when is_binary(Topic) -> % `match_filter/3` expects. [word(W, filter) || W <- emqx_topic:tokens(Topic)]. +-spec topic_words(emqx_types:topic()) -> [binary()]. topic_words(Topic) when is_binary(Topic) -> [word(W, topic) || W <- emqx_topic:tokens(Topic)]. diff --git a/apps/emqx/test/emqx_topic_index_SUITE.erl b/apps/emqx/test/emqx_topic_index_SUITE.erl index 08056a16f..9df9743f1 100644 --- a/apps/emqx/test/emqx_topic_index_SUITE.erl +++ b/apps/emqx/test/emqx_topic_index_SUITE.erl @@ -57,6 +57,17 @@ t_insert(Config) -> ?assertEqual(<<"sensor/#">>, topic(match(M, <<"sensor">>, Tab))), ?assertEqual(t_insert_3, id(match(M, <<"sensor">>, Tab))). +t_insert_filter(Config) -> + M = get_module(Config), + Tab = M:new(), + Topic = <<"sensor/+/metric//#">>, + true = M:insert(Topic, 1, <<>>, Tab), + true = M:insert(emqx_trie_search:filter(Topic), 2, <<>>, Tab), + ?assertEqual( + [Topic, Topic], + [topic(X) || X <- matches(M, <<"sensor/1/metric//2">>, Tab)] + ). + t_match(Config) -> M = get_module(Config), Tab = M:new(), diff --git a/apps/emqx/test/emqx_trie_search_tests.erl b/apps/emqx/test/emqx_trie_search_tests.erl index 75994131d..d78347de6 100644 --- a/apps/emqx/test/emqx_trie_search_tests.erl +++ b/apps/emqx/test/emqx_trie_search_tests.erl @@ -18,15 +18,30 @@ -include_lib("eunit/include/eunit.hrl"). -topic_validation_test() -> +-import(emqx_trie_search, [filter/1]). + +filter_test_() -> + [ + ?_assertEqual( + [<<"sensor">>, '+', <<"metric">>, <<>>, '#'], + filter(<<"sensor/+/metric//#">>) + ), + ?_assertEqual( + false, + filter(<<"sensor/1/metric//42">>) + ) + ]. + +topic_validation_test_() -> NextF = fun(_) -> '$end_of_table' end, Call = fun(Topic) -> emqx_trie_search:match(Topic, NextF) end, - ?assertError(badarg, Call(<<"+">>)), - ?assertError(badarg, Call(<<"#">>)), - ?assertError(badarg, Call(<<"a/+/b">>)), - ?assertError(badarg, Call(<<"a/b/#">>)), - ?assertEqual(false, Call(<<"a/b/b+">>)), - ?assertEqual(false, Call(<<"a/b/c#">>)), - ok. + [ + ?_assertError(badarg, Call(<<"+">>)), + ?_assertError(badarg, Call(<<"#">>)), + ?_assertError(badarg, Call(<<"a/+/b">>)), + ?_assertError(badarg, Call(<<"a/b/#">>)), + ?_assertEqual(false, Call(<<"a/b/b+">>)), + ?_assertEqual(false, Call(<<"a/b/c#">>)) + ].