Optimize the route and trie modules.
1. Use mnesia:wread/1 to replace mnesia:read/2 2. Update the router supervisor
This commit is contained in:
parent
61030c8d10
commit
95446ca837
|
@ -34,7 +34,8 @@
|
||||||
-export([has_routes/1, match_routes/1, print_routes/1]).
|
-export([has_routes/1, match_routes/1, print_routes/1]).
|
||||||
-export([topics/0]).
|
-export([topics/0]).
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
|
||||||
|
code_change/3]).
|
||||||
|
|
||||||
-type(destination() :: node() | {binary(), node()}).
|
-type(destination() :: node() | {binary(), node()}).
|
||||||
|
|
||||||
|
@ -45,9 +46,9 @@
|
||||||
-define(BATCH(Enabled), #batch{enabled = Enabled}).
|
-define(BATCH(Enabled), #batch{enabled = Enabled}).
|
||||||
-define(BATCH(Enabled, Pending), #batch{enabled = Enabled, pending = Pending}).
|
-define(BATCH(Enabled, Pending), #batch{enabled = Enabled, pending = Pending}).
|
||||||
|
|
||||||
%%-----------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Mnesia bootstrap
|
%% Mnesia bootstrap
|
||||||
%%-----------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
mnesia(boot) ->
|
mnesia(boot) ->
|
||||||
ok = ekka_mnesia:create_table(?ROUTE, [
|
ok = ekka_mnesia:create_table(?ROUTE, [
|
||||||
|
@ -132,9 +133,9 @@ cast(Router, Msg) ->
|
||||||
pick(Topic) ->
|
pick(Topic) ->
|
||||||
gproc_pool:pick_worker(router, Topic).
|
gproc_pool:pick_worker(router, Topic).
|
||||||
|
|
||||||
%%-----------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
%%-----------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
init([Pool, Id]) ->
|
init([Pool, Id]) ->
|
||||||
rand:seed(exsplus, erlang:timestamp()),
|
rand:seed(exsplus, erlang:timestamp()),
|
||||||
|
@ -207,9 +208,9 @@ terminate(_Reason, #state{pool = Pool, id = Id, batch = Batch}) ->
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
|
||||||
%%-----------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%-----------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
ensure_batch_timer(State = #state{batch = #batch{enabled = false}}) ->
|
ensure_batch_timer(State = #state{batch = #batch{enabled = false}}) ->
|
||||||
State;
|
State;
|
||||||
|
|
|
@ -84,8 +84,8 @@ monitor(Node) when is_atom(Node) ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
ekka:monitor(membership),
|
_ = ekka:monitor(membership),
|
||||||
mnesia:subscribe({table, ?ROUTING_NODE, simple}),
|
_ = mnesia:subscribe({table, ?ROUTING_NODE, simple}),
|
||||||
Nodes = lists:foldl(
|
Nodes = lists:foldl(
|
||||||
fun(Node, Acc) ->
|
fun(Node, Acc) ->
|
||||||
case ekka:is_member(Node) of
|
case ekka:is_member(Node) of
|
||||||
|
|
|
@ -24,8 +24,12 @@ start_link() ->
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
%% Router helper
|
%% Router helper
|
||||||
Helper = {router_helper, {emqx_router_helper, start_link, []},
|
Helper = #{id => helper,
|
||||||
permanent, 5000, worker, [emqx_router_helper]},
|
start => {emqx_router_helper, start_link, []},
|
||||||
|
restart => permanent,
|
||||||
|
shutdown => 5000,
|
||||||
|
type => worker,
|
||||||
|
modules => [emqx_router_helper]},
|
||||||
|
|
||||||
%% Router pool
|
%% Router pool
|
||||||
RouterPool = emqx_pool_sup:spec(emqx_router_pool,
|
RouterPool = emqx_pool_sup:spec(emqx_router_pool,
|
||||||
|
|
|
@ -62,10 +62,10 @@ mnesia(copy) ->
|
||||||
%% Trie APIs
|
%% Trie APIs
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
%% @doc Insert a topic into the trie
|
%% @doc Insert a topic filter into the trie.
|
||||||
-spec(insert(emqx_topic:topic()) -> ok).
|
-spec(insert(emqx_topic:topic()) -> ok).
|
||||||
insert(Topic) when is_binary(Topic) ->
|
insert(Topic) when is_binary(Topic) ->
|
||||||
case mnesia:read(?TRIE_NODE, Topic) of
|
case mnesia:wread({?TRIE_NODE, Topic}) of
|
||||||
[#trie_node{topic = Topic}] ->
|
[#trie_node{topic = Topic}] ->
|
||||||
ok;
|
ok;
|
||||||
[TrieNode = #trie_node{topic = undefined}] ->
|
[TrieNode = #trie_node{topic = undefined}] ->
|
||||||
|
@ -77,21 +77,21 @@ insert(Topic) when is_binary(Topic) ->
|
||||||
write_trie_node(#trie_node{node_id = Topic, topic = Topic})
|
write_trie_node(#trie_node{node_id = Topic, topic = Topic})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% @doc Find trie nodes that match the topic
|
%% @doc Find trie nodes that match the topic name.
|
||||||
-spec(match(emqx_topic:topic()) -> list(emqx_topic:topic())).
|
-spec(match(emqx_topic:topic()) -> list(emqx_topic:topic())).
|
||||||
match(Topic) when is_binary(Topic) ->
|
match(Topic) when is_binary(Topic) ->
|
||||||
TrieNodes = match_node(root, emqx_topic:words(Topic)),
|
TrieNodes = match_node(root, emqx_topic:words(Topic)),
|
||||||
[Name || #trie_node{topic = Name} <- TrieNodes, Name =/= undefined].
|
[Name || #trie_node{topic = Name} <- TrieNodes, Name =/= undefined].
|
||||||
|
|
||||||
%% @doc Lookup a trie node
|
%% @doc Lookup a trie node.
|
||||||
-spec(lookup(NodeId :: binary()) -> [#trie_node{}]).
|
-spec(lookup(NodeId :: binary()) -> [#trie_node{}]).
|
||||||
lookup(NodeId) ->
|
lookup(NodeId) ->
|
||||||
mnesia:read(?TRIE_NODE, NodeId).
|
mnesia:read(?TRIE_NODE, NodeId).
|
||||||
|
|
||||||
%% @doc Delete a topic from the trie
|
%% @doc Delete a topic filter from the trie.
|
||||||
-spec(delete(emqx_topic:topic()) -> ok).
|
-spec(delete(emqx_topic:topic()) -> ok).
|
||||||
delete(Topic) when is_binary(Topic) ->
|
delete(Topic) when is_binary(Topic) ->
|
||||||
case mnesia:read(?TRIE_NODE, Topic) of
|
case mnesia:wread({?TRIE_NODE, Topic}) of
|
||||||
[#trie_node{edge_count = 0}] ->
|
[#trie_node{edge_count = 0}] ->
|
||||||
mnesia:delete({?TRIE_NODE, Topic}),
|
mnesia:delete({?TRIE_NODE, Topic}),
|
||||||
delete_path(lists:reverse(emqx_topic:triples(Topic)));
|
delete_path(lists:reverse(emqx_topic:triples(Topic)));
|
||||||
|
@ -108,7 +108,7 @@ delete(Topic) when is_binary(Topic) ->
|
||||||
%% @doc Add a path to the trie.
|
%% @doc Add a path to the trie.
|
||||||
add_path({Node, Word, Child}) ->
|
add_path({Node, Word, Child}) ->
|
||||||
Edge = #trie_edge{node_id = Node, word = Word},
|
Edge = #trie_edge{node_id = Node, word = Word},
|
||||||
case mnesia:read(?TRIE_NODE, Node) of
|
case mnesia:wread({?TRIE_NODE, Node}) of
|
||||||
[TrieNode = #trie_node{edge_count = Count}] ->
|
[TrieNode = #trie_node{edge_count = Count}] ->
|
||||||
case mnesia:wread({?TRIE, Edge}) of
|
case mnesia:wread({?TRIE, Edge}) of
|
||||||
[] ->
|
[] ->
|
||||||
|
|
Loading…
Reference in New Issue