mqtt_trie, mqtt_trie_node

This commit is contained in:
Feng Lee 2016-08-10 15:04:26 +08:00
parent 15be2037f3
commit 6686139bc6
2 changed files with 43 additions and 35 deletions

View File

@ -14,7 +14,7 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-type trie_node_id() :: binary() | atom(). -type(trie_node_id() :: binary() | atom()).
-record(trie_node, { -record(trie_node, {
node_id :: trie_node_id(), node_id :: trie_node_id(),
@ -24,12 +24,12 @@
}). }).
-record(trie_edge, { -record(trie_edge, {
node_id :: trie_node_id(), node_id :: trie_node_id(),
word :: binary() | atom() word :: binary() | atom()
}). }).
-record(trie, { -record(trie, {
edge :: #trie_edge{}, edge :: #trie_edge{},
node_id :: trie_node_id() node_id :: trie_node_id()
}). }).

View File

@ -38,21 +38,21 @@
-spec(mnesia(boot | copy) -> ok). -spec(mnesia(boot | copy) -> ok).
mnesia(boot) -> mnesia(boot) ->
%% Trie Table %% Trie Table
ok = emqttd_mnesia:create_table(trie, [ ok = emqttd_mnesia:create_table(mqtt_trie, [
{ram_copies, [node()]}, {ram_copies, [node()]},
{record_name, trie}, {record_name, trie},
{attributes, record_info(fields, trie)}]), {attributes, record_info(fields, trie)}]),
%% Trie Node Table %% Trie Node Table
ok = emqttd_mnesia:create_table(trie_node, [ ok = emqttd_mnesia:create_table(mqtt_trie_node, [
{ram_copies, [node()]}, {ram_copies, [node()]},
{record_name, trie_node}, {record_name, trie_node},
{attributes, record_info(fields, trie_node)}]); {attributes, record_info(fields, trie_node)}]);
mnesia(copy) -> mnesia(copy) ->
%% Copy Trie Table %% Copy Trie Table
ok = emqttd_mnesia:copy_table(trie), ok = emqttd_mnesia:copy_table(mqtt_trie),
%% Copy Trie Node Table %% Copy Trie Node Table
ok = emqttd_mnesia:copy_table(trie_node). ok = emqttd_mnesia:copy_table(mqtt_trie_node).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Trie API %% Trie API
@ -61,16 +61,16 @@ mnesia(copy) ->
%% @doc Insert topic to trie %% @doc Insert topic to trie
-spec(insert(Topic :: binary()) -> ok). -spec(insert(Topic :: binary()) -> ok).
insert(Topic) when is_binary(Topic) -> insert(Topic) when is_binary(Topic) ->
case mnesia:read(trie_node, Topic) of case mnesia:read(mqtt_trie_node, Topic) of
[#trie_node{topic=Topic}] -> [#trie_node{topic=Topic}] ->
ok; ok;
[TrieNode=#trie_node{topic=undefined}] -> [TrieNode=#trie_node{topic=undefined}] ->
mnesia:write(TrieNode#trie_node{topic=Topic}); write_trie_node(TrieNode#trie_node{topic=Topic});
[] -> [] ->
%add trie path % Add trie path
lists:foreach(fun add_path/1, emqttd_topic:triples(Topic)), lists:foreach(fun add_path/1, emqttd_topic:triples(Topic)),
%add last node % Add last node
mnesia:write(#trie_node{node_id=Topic, topic=Topic}) write_trie_node(#trie_node{node_id=Topic, topic=Topic})
end. end.
%% @doc Find trie nodes that match topic %% @doc Find trie nodes that match topic
@ -82,17 +82,17 @@ match(Topic) when is_binary(Topic) ->
%% @doc Lookup a Trie Node %% @doc Lookup a Trie Node
-spec(lookup(NodeId :: binary()) -> [#trie_node{}]). -spec(lookup(NodeId :: binary()) -> [#trie_node{}]).
lookup(NodeId) -> lookup(NodeId) ->
mnesia:read(trie_node, NodeId). mnesia:read(mqtt_trie_node, NodeId).
%% @doc Delete topic from trie %% @doc Delete topic from trie
-spec(delete(Topic :: binary()) -> ok). -spec(delete(Topic :: binary()) -> ok).
delete(Topic) when is_binary(Topic) -> delete(Topic) when is_binary(Topic) ->
case mnesia:read(trie_node, Topic) of case mnesia:read(mqtt_trie_node, Topic) of
[#trie_node{edge_count=0}] -> [#trie_node{edge_count=0}] ->
mnesia:delete({trie_node, Topic}), mnesia:delete({mqtt_trie_node, Topic}),
delete_path(lists:reverse(emqttd_topic:triples(Topic))); delete_path(lists:reverse(emqttd_topic:triples(Topic)));
[TrieNode] -> [TrieNode] ->
mnesia:write(TrieNode#trie_node{topic = undefined}); write_trie_node(TrieNode#trie_node{topic = undefined});
[] -> [] ->
ok ok
end. end.
@ -105,18 +105,18 @@ delete(Topic) when is_binary(Topic) ->
%% @doc Add path to trie tree. %% @doc Add path to trie tree.
add_path({Node, Word, Child}) -> add_path({Node, Word, Child}) ->
Edge = #trie_edge{node_id=Node, word=Word}, Edge = #trie_edge{node_id=Node, word=Word},
case mnesia:read(trie_node, Node) of case mnesia:read(mqtt_trie_node, Node) of
[TrieNode = #trie_node{edge_count=Count}] -> [TrieNode = #trie_node{edge_count=Count}] ->
case mnesia:wread({trie, Edge}) of case mnesia:wread({mqtt_trie, Edge}) of
[] -> [] ->
mnesia:write(TrieNode#trie_node{edge_count=Count+1}), write_trie_node(TrieNode#trie_node{edge_count=Count+1}),
mnesia:write(#trie{edge=Edge, node_id=Child}); write_trie(#trie{edge=Edge, node_id=Child});
[_] -> [_] ->
ok ok
end; end;
[] -> [] ->
mnesia:write(#trie_node{node_id=Node, edge_count=1}), write_trie_node(#trie_node{node_id=Node, edge_count=1}),
mnesia:write(#trie{edge=Edge, node_id=Child}) write_trie(#trie{edge=Edge, node_id=Child})
end. end.
%% @private %% @private
@ -128,11 +128,11 @@ match_node(NodeId, Words) ->
match_node(NodeId, Words, []). match_node(NodeId, Words, []).
match_node(NodeId, [], ResAcc) -> match_node(NodeId, [], ResAcc) ->
mnesia:read(trie_node, NodeId) ++ 'match_#'(NodeId, ResAcc); mnesia:read(mqtt_trie_node, NodeId) ++ 'match_#'(NodeId, ResAcc);
match_node(NodeId, [W|Words], ResAcc) -> match_node(NodeId, [W|Words], ResAcc) ->
lists:foldl(fun(WArg, Acc) -> lists:foldl(fun(WArg, Acc) ->
case mnesia:read(trie, #trie_edge{node_id=NodeId, word=WArg}) of case mnesia:read(mqtt_trie, #trie_edge{node_id=NodeId, word=WArg}) of
[#trie{node_id=ChildId}] -> match_node(ChildId, Words, Acc); [#trie{node_id=ChildId}] -> match_node(ChildId, Words, Acc);
[] -> Acc [] -> Acc
end end
@ -141,9 +141,9 @@ match_node(NodeId, [W|Words], ResAcc) ->
%% @private %% @private
%% @doc Match node with '#'. %% @doc Match node with '#'.
'match_#'(NodeId, ResAcc) -> 'match_#'(NodeId, ResAcc) ->
case mnesia:read(trie, #trie_edge{node_id=NodeId, word = '#'}) of case mnesia:read(mqtt_trie, #trie_edge{node_id=NodeId, word = '#'}) of
[#trie{node_id=ChildId}] -> [#trie{node_id=ChildId}] ->
mnesia:read(trie_node, ChildId) ++ ResAcc; mnesia:read(mqtt_trie_node, ChildId) ++ ResAcc;
[] -> [] ->
ResAcc ResAcc
end. end.
@ -153,16 +153,24 @@ match_node(NodeId, [W|Words], ResAcc) ->
delete_path([]) -> delete_path([]) ->
ok; ok;
delete_path([{NodeId, Word, _} | RestPath]) -> delete_path([{NodeId, Word, _} | RestPath]) ->
mnesia:delete({trie, #trie_edge{node_id=NodeId, word=Word}}), mnesia:delete({mqtt_trie, #trie_edge{node_id=NodeId, word=Word}}),
case mnesia:read(trie_node, NodeId) of case mnesia:read(mqtt_trie_node, NodeId) of
[#trie_node{edge_count=1, topic=undefined}] -> [#trie_node{edge_count=1, topic=undefined}] ->
mnesia:delete({trie_node, NodeId}), mnesia:delete({mqtt_trie_node, NodeId}),
delete_path(RestPath); delete_path(RestPath);
[TrieNode=#trie_node{edge_count=1, topic=_}] -> [TrieNode=#trie_node{edge_count=1, topic=_}] ->
mnesia:write(TrieNode#trie_node{edge_count=0}); write_trie_node(TrieNode#trie_node{edge_count=0});
[TrieNode=#trie_node{edge_count=C}] -> [TrieNode=#trie_node{edge_count=C}] ->
mnesia:write(TrieNode#trie_node{edge_count=C-1}); write_trie_node(TrieNode#trie_node{edge_count=C-1});
[] -> [] ->
throw({notfound, NodeId}) throw({notfound, NodeId})
end. end.
%% @private
write_trie(Trie) ->
mnesia:write(mqtt_trie, Trie, write).
%% @private
write_trie_node(TrieNode) ->
mnesia:write(mqtt_trie_node, TrieNode, write).