emqx/src/emqx_trie.erl

221 lines
7.1 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_trie).
-include("emqx.hrl").
%% Mnesia bootstrap
-export([mnesia/1]).
-boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).
%% Trie APIs
-export([ insert/1
, match/1
, lookup/1
, delete/1
]).
-export([ empty/0
, lock_tables/0
]).
-ifdef(TEST).
-compile(export_all).
-compile(nowarn_export_all).
-endif.
-type(triple() :: {root | binary(), emqx_topic:word(), binary()}).
%% Mnesia tables
-define(TRIE_TAB, emqx_trie).
-define(TRIE_NODE_TAB, emqx_trie_node).
%%--------------------------------------------------------------------
%% Mnesia bootstrap
%%--------------------------------------------------------------------
%% @doc Create or replicate trie tables.
-spec(mnesia(boot | copy) -> ok).
mnesia(boot) ->
%% Optimize storage
StoreProps = [{ets, [{read_concurrency, true},
{write_concurrency, true}]}],
%% Trie table
ok = ekka_mnesia:create_table(?TRIE_TAB, [
{ram_copies, [node()]},
{record_name, trie},
{attributes, record_info(fields, trie)},
{storage_properties, StoreProps}]),
%% Trie node table
ok = ekka_mnesia:create_table(?TRIE_NODE_TAB, [
{ram_copies, [node()]},
{record_name, trie_node},
{attributes, record_info(fields, trie_node)},
{storage_properties, StoreProps}]);
mnesia(copy) ->
%% Copy trie table
ok = ekka_mnesia:copy_table(?TRIE_TAB),
%% Copy trie_node table
ok = ekka_mnesia:copy_table(?TRIE_NODE_TAB).
%%--------------------------------------------------------------------
%% Trie APIs
%%--------------------------------------------------------------------
%% @doc Insert a topic filter into the trie.
-spec(insert(emqx_topic:topic()) -> ok).
insert(Topic) when is_binary(Topic) ->
case mnesia:wread({?TRIE_NODE_TAB, Topic}) of
[#trie_node{topic = Topic}] ->
ok;
[TrieNode = #trie_node{topic = undefined}] ->
write_trie_node(TrieNode#trie_node{topic = Topic});
[] ->
%% Add trie path
ok = lists:foreach(fun add_path/1, triples(Topic)),
%% Add last node
write_trie_node(#trie_node{node_id = Topic, topic = Topic})
end.
%% @doc Find trie nodes that match the topic name.
-spec(match(emqx_topic:topic()) -> list(emqx_topic:topic())).
match(Topic) when is_binary(Topic) ->
TrieNodes = match_node(root, emqx_topic:words(Topic)),
[Name || #trie_node{topic = Name} <- TrieNodes, Name =/= undefined].
%% @doc Lookup a trie node.
-spec(lookup(NodeId :: binary()) -> [#trie_node{}]).
lookup(NodeId) ->
mnesia:read(?TRIE_NODE_TAB, NodeId).
%% @doc Delete a topic filter from the trie.
-spec(delete(emqx_topic:topic()) -> ok).
delete(Topic) when is_binary(Topic) ->
case mnesia:wread({?TRIE_NODE_TAB, Topic}) of
[#trie_node{edge_count = 0}] ->
ok = mnesia:delete({?TRIE_NODE_TAB, Topic}),
delete_path(lists:reverse(triples(Topic)));
[TrieNode] ->
write_trie_node(TrieNode#trie_node{topic = undefined});
[] -> ok
end.
%% @doc Is the trie empty?
-spec(empty() -> boolean()).
empty() ->
ets:info(?TRIE_TAB, size) == 0.
-spec lock_tables() -> ok.
lock_tables() ->
mnesia:write_lock_table(?TRIE_TAB),
mnesia:write_lock_table(?TRIE_NODE_TAB).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
%% @doc Topic to triples.
-spec(triples(emqx_topic:topic()) -> list(triple())).
triples(Topic) when is_binary(Topic) ->
triples(emqx_topic:words(Topic), root, []).
triples([], _Parent, Acc) ->
lists:reverse(Acc);
triples([W|Words], Parent, Acc) ->
Node = join(Parent, W),
triples(Words, Node, [{Parent, W, Node}|Acc]).
join(root, W) ->
emqx_topic:join([W]);
join(Parent, W) ->
emqx_topic:join([Parent, W]).
%% @private
%% @doc Add a path to the trie.
add_path({Node, Word, Child}) ->
Edge = #trie_edge{node_id = Node, word = Word},
case mnesia:wread({?TRIE_NODE_TAB, Node}) of
[TrieNode = #trie_node{edge_count = Count}] ->
case mnesia:wread({?TRIE_TAB, Edge}) of
[] ->
ok = write_trie_node(TrieNode#trie_node{edge_count = Count + 1}),
write_trie(#trie{edge = Edge, node_id = Child});
[_] -> ok
end;
[] ->
ok = write_trie_node(#trie_node{node_id = Node, edge_count = 1}),
write_trie(#trie{edge = Edge, node_id = Child})
end.
%% @private
%% @doc Match node with word or '+'.
match_node(root, [NodeId = <<$$, _/binary>>|Words]) ->
match_node(NodeId, Words, []);
match_node(NodeId, Words) ->
match_node(NodeId, Words, []).
match_node(NodeId, [], ResAcc) ->
mnesia:read(?TRIE_NODE_TAB, NodeId) ++ 'match_#'(NodeId, ResAcc);
match_node(NodeId, [W|Words], ResAcc) ->
lists:foldl(fun(WArg, Acc) ->
case mnesia:read(?TRIE_TAB, #trie_edge{node_id = NodeId, word = WArg}) of
[#trie{node_id = ChildId}] -> match_node(ChildId, Words, Acc);
[] -> Acc
end
end, 'match_#'(NodeId, ResAcc), [W, '+']).
%% @private
%% @doc Match node with '#'.
'match_#'(NodeId, ResAcc) ->
case mnesia:read(?TRIE_TAB, #trie_edge{node_id = NodeId, word = '#'}) of
[#trie{node_id = ChildId}] ->
mnesia:read(?TRIE_NODE_TAB, ChildId) ++ ResAcc;
[] -> ResAcc
end.
%% @private
%% @doc Delete paths from the trie.
delete_path([]) ->
ok;
delete_path([{NodeId, Word, _} | RestPath]) ->
ok = mnesia:delete({?TRIE_TAB, #trie_edge{node_id = NodeId, word = Word}}),
case mnesia:wread({?TRIE_NODE_TAB, NodeId}) of
[#trie_node{edge_count = 1, topic = undefined}] ->
ok = mnesia:delete({?TRIE_NODE_TAB, NodeId}),
delete_path(RestPath);
[TrieNode = #trie_node{edge_count = 1, topic = _}] ->
write_trie_node(TrieNode#trie_node{edge_count = 0});
[TrieNode = #trie_node{edge_count = C}] ->
write_trie_node(TrieNode#trie_node{edge_count = C-1});
[] ->
mnesia:abort({node_not_found, NodeId})
end.
%% @private
write_trie(Trie) ->
mnesia:write(?TRIE_TAB, Trie, write).
%% @private
write_trie_node(TrieNode) ->
mnesia:write(?TRIE_NODE_TAB, TrieNode, write).