diff --git a/apps/emqx/src/emqx_topic_gbt.erl b/apps/emqx/src/emqx_topic_gbt.erl index 063cba21d..6e9e7d2fc 100644 --- a/apps/emqx/src/emqx_topic_gbt.erl +++ b/apps/emqx/src/emqx_topic_gbt.erl @@ -14,14 +14,17 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc Topic index implemetation with gb_trees stored in persistent_term. -%% This is only suitable for a static set of topic or topic-filters. +%% @doc Topic index implemetation with gb_trees as the underlying data +%% structure. -module(emqx_topic_gbt). --export([new/0, new/1]). +-export([new/0]). +-export([size/1]). -export([insert/4]). -export([delete/3]). +-export([lookup/4]). +-export([fold/3]). -export([match/2]). -export([matches/3]). @@ -29,53 +32,74 @@ -export([get_topic/1]). -export([get_record/2]). +-export_type([t/0, t/2, match/1]). + -type key(ID) :: emqx_trie_search:key(ID). -type words() :: emqx_trie_search:words(). -type match(ID) :: key(ID). --type name() :: any(). -%% @private Only for testing. --spec new() -> name(). -new() -> - new(test). +-opaque t(ID, Value) :: gb_trees:tree(key(ID), Value). +-opaque t() :: t(_ID, _Value). %% @doc Create a new gb_tree and store it in the persitent_term with the %% given name. --spec new(name()) -> name(). -new(Name) -> - T = gb_trees:from_orddict([]), - true = gbt_update(Name, T), - Name. +-spec new() -> t(). +new() -> + gb_trees:empty(). + +-spec size(t()) -> non_neg_integer(). +size(Gbt) -> + gb_trees:size(Gbt). %% @doc Insert a new entry into the index that associates given topic filter to given %% record ID, and attaches arbitrary record to the entry. This allows users to choose %% between regular and "materialized" indexes, for example. --spec insert(emqx_types:topic() | words(), _ID, _Record, name()) -> true. -insert(Filter, ID, Record, Name) -> - Tree = gbt(Name), +-spec insert(emqx_types:topic() | words(), _ID, _Record, t()) -> t(). +insert(Filter, ID, Record, Gbt) -> Key = key(Filter, ID), - NewTree = gb_trees:enter(Key, Record, Tree), - true = gbt_update(Name, NewTree). + gb_trees:enter(Key, Record, Gbt). %% @doc Delete an entry from the index that associates given topic filter to given %% record ID. Deleting non-existing entry is not an error. --spec delete(emqx_types:topic() | words(), _ID, name()) -> true. -delete(Filter, ID, Name) -> - Tree = gbt(Name), +-spec delete(emqx_types:topic() | words(), _ID, t()) -> t(). +delete(Filter, ID, Gbt) -> Key = key(Filter, ID), - NewTree = gb_trees:delete_any(Key, Tree), - true = gbt_update(Name, NewTree). + gb_trees:delete_any(Key, Gbt). + +-spec lookup(emqx_types:topic() | words(), _ID, t(), Default) -> _Record | Default. +lookup(Filter, ID, Gbt, Default) -> + Key = key(Filter, ID), + case gb_trees:lookup(Key, Gbt) of + {value, Record} -> + Record; + none -> + Default + end. + +-spec fold(fun((key(_ID), _Record, Acc) -> Acc), Acc, t()) -> Acc. +fold(Fun, Acc, Gbt) -> + Iter = gb_trees:iterator(Gbt), + fold_iter(Fun, Acc, Iter). + +fold_iter(Fun, Acc, Iter) -> + case gb_trees:next(Iter) of + {Key, Record, NIter} -> + fold_iter(Fun, Fun(Key, Record, Acc), NIter); + none -> + Acc + end. %% @doc Match given topic against the index and return the first match, or `false` if %% no match is found. --spec match(emqx_types:topic(), name()) -> match(_ID) | false. -match(Topic, Name) -> - emqx_trie_search:match(Topic, make_nextf(Name)). +-spec match(emqx_types:topic(), t()) -> match(_ID) | false. +match(Topic, Gbt) -> + emqx_trie_search:match(Topic, make_nextf(Gbt)). %% @doc Match given topic against the index and return _all_ matches. %% If `unique` option is given, return only unique matches by record ID. -matches(Topic, Name, Opts) -> - emqx_trie_search:matches(Topic, make_nextf(Name), Opts). +-spec matches(emqx_types:topic(), t(), emqx_trie_search:opts()) -> [match(_ID)]. +matches(Topic, Gbt, Opts) -> + emqx_trie_search:matches(Topic, make_nextf(Gbt), Opts). %% @doc Extract record ID from the match. -spec get_id(match(ID)) -> ID. @@ -88,21 +112,13 @@ get_topic(Key) -> emqx_trie_search:get_topic(Key). %% @doc Fetch the record associated with the match. --spec get_record(match(_ID), name()) -> _Record. -get_record(Key, Name) -> - Gbt = gbt(Name), +-spec get_record(match(_ID), t()) -> _Record. +get_record(Key, Gbt) -> gb_trees:get(Key, Gbt). key(TopicOrFilter, ID) -> emqx_trie_search:make_key(TopicOrFilter, ID). -gbt(Name) -> - persistent_term:get({?MODULE, Name}). - -gbt_update(Name, Tree) -> - persistent_term:put({?MODULE, Name}, Tree), - true. - gbt_next(nil, _Input) -> '$end_of_table'; gbt_next({P, _V, _Smaller, Bigger}, K) when K >= P -> @@ -115,6 +131,5 @@ gbt_next({P, _V, Smaller, _Bigger}, K) -> NextKey end. -make_nextf(Name) -> - {_SizeWeDontCare, TheTree} = gbt(Name), - fun(Key) -> gbt_next(TheTree, Key) end. +make_nextf({_Size, Tree}) -> + fun(Key) -> gbt_next(Tree, Key) end. diff --git a/apps/emqx/src/emqx_topic_gbt_pterm.erl b/apps/emqx/src/emqx_topic_gbt_pterm.erl new file mode 100644 index 000000000..4702fcb5f --- /dev/null +++ b/apps/emqx/src/emqx_topic_gbt_pterm.erl @@ -0,0 +1,71 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc Topic index implemetation with gb_tree as a persistent term. +%% This is only suitable for a static set of topic or topic-filters. + +-module(emqx_topic_gbt_pterm). + +-export([new/0, new/1]). +-export([insert/4]). +-export([delete/3]). +-export([match/2]). +-export([matches/3]). + +-export([get_record/2]). + +-type name() :: any(). +-type match(ID) :: emqx_topic_gbt:match(ID). + +%% @private Only for testing. +-spec new() -> name(). +new() -> + new(test). + +-spec new(name()) -> name(). +new(Name) -> + true = pterm_update(Name, emqx_topic_gbt:new()), + Name. + +-spec insert(emqx_types:topic() | emqx_trie_search:words(), _ID, _Record, name()) -> true. +insert(Filter, ID, Record, Name) -> + pterm_update(Name, emqx_topic_gbt:insert(Filter, ID, Record, pterm(Name))). + +-spec delete(emqx_types:topic() | emqx_trie_search:words(), _ID, name()) -> name(). +delete(Filter, ID, Name) -> + pterm_update(Name, emqx_topic_gbt:delete(Filter, ID, pterm(Name))). + +-spec match(emqx_types:topic(), name()) -> match(_ID) | false. +match(Topic, Name) -> + emqx_topic_gbt:match(Topic, pterm(Name)). + +-spec matches(emqx_types:topic(), name(), emqx_trie_search:opts()) -> [match(_ID)]. +matches(Topic, Name, Opts) -> + emqx_topic_gbt:matches(Topic, pterm(Name), Opts). + +%% @doc Fetch the record associated with the match. +-spec get_record(match(_ID), name()) -> _Record. +get_record(Key, Name) -> + emqx_topic_gbt:get_record(Key, pterm(Name)). + +%% + +pterm(Name) -> + persistent_term:get({?MODULE, Name}). + +pterm_update(Name, Tree) -> + persistent_term:put({?MODULE, Name}, Tree), + true. diff --git a/apps/emqx/test/emqx_topic_index_SUITE.erl b/apps/emqx/test/emqx_topic_index_SUITE.erl index 71e508306..a61edced6 100644 --- a/apps/emqx/test/emqx_topic_index_SUITE.erl +++ b/apps/emqx/test/emqx_topic_index_SUITE.erl @@ -40,7 +40,7 @@ groups() -> init_per_group(ets, Config) -> [{index_module, emqx_topic_index} | Config]; init_per_group(gb_tree, Config) -> - [{index_module, emqx_topic_gbt} | Config]. + [{index_module, emqx_topic_gbt_pterm} | Config]. end_per_group(_Group, _Config) -> ok.