refactor(topicidx): split persistent term stuff off gbt-based index
This commit is contained in:
parent
88103c5f0e
commit
508346f095
|
@ -14,14 +14,17 @@
|
|||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @doc Topic index implemetation with gb_trees stored in persistent_term.
|
||||
%% This is only suitable for a static set of topic or topic-filters.
|
||||
%% @doc Topic index implemetation with gb_trees as the underlying data
|
||||
%% structure.
|
||||
|
||||
-module(emqx_topic_gbt).
|
||||
|
||||
-export([new/0, new/1]).
|
||||
-export([new/0]).
|
||||
-export([size/1]).
|
||||
-export([insert/4]).
|
||||
-export([delete/3]).
|
||||
-export([lookup/4]).
|
||||
-export([fold/3]).
|
||||
-export([match/2]).
|
||||
-export([matches/3]).
|
||||
|
||||
|
@ -29,53 +32,74 @@
|
|||
-export([get_topic/1]).
|
||||
-export([get_record/2]).
|
||||
|
||||
-export_type([t/0, t/2, match/1]).
|
||||
|
||||
-type key(ID) :: emqx_trie_search:key(ID).
|
||||
-type words() :: emqx_trie_search:words().
|
||||
-type match(ID) :: key(ID).
|
||||
-type name() :: any().
|
||||
|
||||
%% @private Only for testing.
|
||||
-spec new() -> name().
|
||||
new() ->
|
||||
new(test).
|
||||
-opaque t(ID, Value) :: gb_trees:tree(key(ID), Value).
|
||||
-opaque t() :: t(_ID, _Value).
|
||||
|
||||
%% @doc Create a new gb_tree and store it in the persitent_term with the
|
||||
%% given name.
|
||||
-spec new(name()) -> name().
|
||||
new(Name) ->
|
||||
T = gb_trees:from_orddict([]),
|
||||
true = gbt_update(Name, T),
|
||||
Name.
|
||||
-spec new() -> t().
|
||||
new() ->
|
||||
gb_trees:empty().
|
||||
|
||||
-spec size(t()) -> non_neg_integer().
|
||||
size(Gbt) ->
|
||||
gb_trees:size(Gbt).
|
||||
|
||||
%% @doc Insert a new entry into the index that associates given topic filter to given
|
||||
%% record ID, and attaches arbitrary record to the entry. This allows users to choose
|
||||
%% between regular and "materialized" indexes, for example.
|
||||
-spec insert(emqx_types:topic() | words(), _ID, _Record, name()) -> true.
|
||||
insert(Filter, ID, Record, Name) ->
|
||||
Tree = gbt(Name),
|
||||
-spec insert(emqx_types:topic() | words(), _ID, _Record, t()) -> t().
|
||||
insert(Filter, ID, Record, Gbt) ->
|
||||
Key = key(Filter, ID),
|
||||
NewTree = gb_trees:enter(Key, Record, Tree),
|
||||
true = gbt_update(Name, NewTree).
|
||||
gb_trees:enter(Key, Record, Gbt).
|
||||
|
||||
%% @doc Delete an entry from the index that associates given topic filter to given
|
||||
%% record ID. Deleting non-existing entry is not an error.
|
||||
-spec delete(emqx_types:topic() | words(), _ID, name()) -> true.
|
||||
delete(Filter, ID, Name) ->
|
||||
Tree = gbt(Name),
|
||||
-spec delete(emqx_types:topic() | words(), _ID, t()) -> t().
|
||||
delete(Filter, ID, Gbt) ->
|
||||
Key = key(Filter, ID),
|
||||
NewTree = gb_trees:delete_any(Key, Tree),
|
||||
true = gbt_update(Name, NewTree).
|
||||
gb_trees:delete_any(Key, Gbt).
|
||||
|
||||
-spec lookup(emqx_types:topic() | words(), _ID, t(), Default) -> _Record | Default.
|
||||
lookup(Filter, ID, Gbt, Default) ->
|
||||
Key = key(Filter, ID),
|
||||
case gb_trees:lookup(Key, Gbt) of
|
||||
{value, Record} ->
|
||||
Record;
|
||||
none ->
|
||||
Default
|
||||
end.
|
||||
|
||||
-spec fold(fun((key(_ID), _Record, Acc) -> Acc), Acc, t()) -> Acc.
|
||||
fold(Fun, Acc, Gbt) ->
|
||||
Iter = gb_trees:iterator(Gbt),
|
||||
fold_iter(Fun, Acc, Iter).
|
||||
|
||||
fold_iter(Fun, Acc, Iter) ->
|
||||
case gb_trees:next(Iter) of
|
||||
{Key, Record, NIter} ->
|
||||
fold_iter(Fun, Fun(Key, Record, Acc), NIter);
|
||||
none ->
|
||||
Acc
|
||||
end.
|
||||
|
||||
%% @doc Match given topic against the index and return the first match, or `false` if
|
||||
%% no match is found.
|
||||
-spec match(emqx_types:topic(), name()) -> match(_ID) | false.
|
||||
match(Topic, Name) ->
|
||||
emqx_trie_search:match(Topic, make_nextf(Name)).
|
||||
-spec match(emqx_types:topic(), t()) -> match(_ID) | false.
|
||||
match(Topic, Gbt) ->
|
||||
emqx_trie_search:match(Topic, make_nextf(Gbt)).
|
||||
|
||||
%% @doc Match given topic against the index and return _all_ matches.
|
||||
%% If `unique` option is given, return only unique matches by record ID.
|
||||
matches(Topic, Name, Opts) ->
|
||||
emqx_trie_search:matches(Topic, make_nextf(Name), Opts).
|
||||
-spec matches(emqx_types:topic(), t(), emqx_trie_search:opts()) -> [match(_ID)].
|
||||
matches(Topic, Gbt, Opts) ->
|
||||
emqx_trie_search:matches(Topic, make_nextf(Gbt), Opts).
|
||||
|
||||
%% @doc Extract record ID from the match.
|
||||
-spec get_id(match(ID)) -> ID.
|
||||
|
@ -88,21 +112,13 @@ get_topic(Key) ->
|
|||
emqx_trie_search:get_topic(Key).
|
||||
|
||||
%% @doc Fetch the record associated with the match.
|
||||
-spec get_record(match(_ID), name()) -> _Record.
|
||||
get_record(Key, Name) ->
|
||||
Gbt = gbt(Name),
|
||||
-spec get_record(match(_ID), t()) -> _Record.
|
||||
get_record(Key, Gbt) ->
|
||||
gb_trees:get(Key, Gbt).
|
||||
|
||||
key(TopicOrFilter, ID) ->
|
||||
emqx_trie_search:make_key(TopicOrFilter, ID).
|
||||
|
||||
gbt(Name) ->
|
||||
persistent_term:get({?MODULE, Name}).
|
||||
|
||||
gbt_update(Name, Tree) ->
|
||||
persistent_term:put({?MODULE, Name}, Tree),
|
||||
true.
|
||||
|
||||
gbt_next(nil, _Input) ->
|
||||
'$end_of_table';
|
||||
gbt_next({P, _V, _Smaller, Bigger}, K) when K >= P ->
|
||||
|
@ -115,6 +131,5 @@ gbt_next({P, _V, Smaller, _Bigger}, K) ->
|
|||
NextKey
|
||||
end.
|
||||
|
||||
make_nextf(Name) ->
|
||||
{_SizeWeDontCare, TheTree} = gbt(Name),
|
||||
fun(Key) -> gbt_next(TheTree, Key) end.
|
||||
make_nextf({_Size, Tree}) ->
|
||||
fun(Key) -> gbt_next(Tree, Key) end.
|
||||
|
|
|
@ -0,0 +1,71 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @doc Topic index implemetation with gb_tree as a persistent term.
|
||||
%% This is only suitable for a static set of topic or topic-filters.
|
||||
|
||||
-module(emqx_topic_gbt_pterm).
|
||||
|
||||
-export([new/0, new/1]).
|
||||
-export([insert/4]).
|
||||
-export([delete/3]).
|
||||
-export([match/2]).
|
||||
-export([matches/3]).
|
||||
|
||||
-export([get_record/2]).
|
||||
|
||||
-type name() :: any().
|
||||
-type match(ID) :: emqx_topic_gbt:match(ID).
|
||||
|
||||
%% @private Only for testing.
|
||||
-spec new() -> name().
|
||||
new() ->
|
||||
new(test).
|
||||
|
||||
-spec new(name()) -> name().
|
||||
new(Name) ->
|
||||
true = pterm_update(Name, emqx_topic_gbt:new()),
|
||||
Name.
|
||||
|
||||
-spec insert(emqx_types:topic() | emqx_trie_search:words(), _ID, _Record, name()) -> true.
|
||||
insert(Filter, ID, Record, Name) ->
|
||||
pterm_update(Name, emqx_topic_gbt:insert(Filter, ID, Record, pterm(Name))).
|
||||
|
||||
-spec delete(emqx_types:topic() | emqx_trie_search:words(), _ID, name()) -> name().
|
||||
delete(Filter, ID, Name) ->
|
||||
pterm_update(Name, emqx_topic_gbt:delete(Filter, ID, pterm(Name))).
|
||||
|
||||
-spec match(emqx_types:topic(), name()) -> match(_ID) | false.
|
||||
match(Topic, Name) ->
|
||||
emqx_topic_gbt:match(Topic, pterm(Name)).
|
||||
|
||||
-spec matches(emqx_types:topic(), name(), emqx_trie_search:opts()) -> [match(_ID)].
|
||||
matches(Topic, Name, Opts) ->
|
||||
emqx_topic_gbt:matches(Topic, pterm(Name), Opts).
|
||||
|
||||
%% @doc Fetch the record associated with the match.
|
||||
-spec get_record(match(_ID), name()) -> _Record.
|
||||
get_record(Key, Name) ->
|
||||
emqx_topic_gbt:get_record(Key, pterm(Name)).
|
||||
|
||||
%%
|
||||
|
||||
pterm(Name) ->
|
||||
persistent_term:get({?MODULE, Name}).
|
||||
|
||||
pterm_update(Name, Tree) ->
|
||||
persistent_term:put({?MODULE, Name}, Tree),
|
||||
true.
|
|
@ -40,7 +40,7 @@ groups() ->
|
|||
init_per_group(ets, Config) ->
|
||||
[{index_module, emqx_topic_index} | Config];
|
||||
init_per_group(gb_tree, Config) ->
|
||||
[{index_module, emqx_topic_gbt} | Config].
|
||||
[{index_module, emqx_topic_gbt_pterm} | Config].
|
||||
|
||||
end_per_group(_Group, _Config) ->
|
||||
ok.
|
||||
|
|
Loading…
Reference in New Issue