From 95446ca83780481c7656c9cc04ed41159dcbe06a Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 30 Nov 2018 20:37:10 +0800 Subject: [PATCH] Optimize the route and trie modules. 1. Use mnesia:wread/1 to replace mnesia:read/2 2. Update the router supervisor --- src/emqx_router.erl | 15 ++++++++------- src/emqx_router_helper.erl | 4 ++-- src/emqx_router_sup.erl | 8 ++++++-- src/emqx_trie.erl | 14 +++++++------- 4 files changed, 23 insertions(+), 18 deletions(-) diff --git a/src/emqx_router.erl b/src/emqx_router.erl index 2ac656d56..941c004f7 100644 --- a/src/emqx_router.erl +++ b/src/emqx_router.erl @@ -34,7 +34,8 @@ -export([has_routes/1, match_routes/1, print_routes/1]). -export([topics/0]). %% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). -type(destination() :: node() | {binary(), node()}). @@ -45,9 +46,9 @@ -define(BATCH(Enabled), #batch{enabled = Enabled}). -define(BATCH(Enabled, Pending), #batch{enabled = Enabled, pending = Pending}). -%%----------------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% Mnesia bootstrap -%%----------------------------------------------------------------------------- +%%------------------------------------------------------------------------------ mnesia(boot) -> ok = ekka_mnesia:create_table(?ROUTE, [ @@ -132,9 +133,9 @@ cast(Router, Msg) -> pick(Topic) -> gproc_pool:pick_worker(router, Topic). -%%----------------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% gen_server callbacks -%%----------------------------------------------------------------------------- +%%------------------------------------------------------------------------------ init([Pool, Id]) -> rand:seed(exsplus, erlang:timestamp()), @@ -207,9 +208,9 @@ terminate(_Reason, #state{pool = Pool, id = Id, batch = Batch}) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -%%----------------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% Internal functions -%%----------------------------------------------------------------------------- +%%------------------------------------------------------------------------------ ensure_batch_timer(State = #state{batch = #batch{enabled = false}}) -> State; diff --git a/src/emqx_router_helper.erl b/src/emqx_router_helper.erl index 7bacddd4c..c24b10715 100644 --- a/src/emqx_router_helper.erl +++ b/src/emqx_router_helper.erl @@ -84,8 +84,8 @@ monitor(Node) when is_atom(Node) -> %%------------------------------------------------------------------------------ init([]) -> - ekka:monitor(membership), - mnesia:subscribe({table, ?ROUTING_NODE, simple}), + _ = ekka:monitor(membership), + _ = mnesia:subscribe({table, ?ROUTING_NODE, simple}), Nodes = lists:foldl( fun(Node, Acc) -> case ekka:is_member(Node) of diff --git a/src/emqx_router_sup.erl b/src/emqx_router_sup.erl index 004b88bb8..2bbaabc18 100644 --- a/src/emqx_router_sup.erl +++ b/src/emqx_router_sup.erl @@ -24,8 +24,12 @@ start_link() -> init([]) -> %% Router helper - Helper = {router_helper, {emqx_router_helper, start_link, []}, - permanent, 5000, worker, [emqx_router_helper]}, + Helper = #{id => helper, + start => {emqx_router_helper, start_link, []}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [emqx_router_helper]}, %% Router pool RouterPool = emqx_pool_sup:spec(emqx_router_pool, diff --git a/src/emqx_trie.erl b/src/emqx_trie.erl index 569de9092..79f6042b7 100644 --- a/src/emqx_trie.erl +++ b/src/emqx_trie.erl @@ -62,10 +62,10 @@ mnesia(copy) -> %% Trie APIs %%------------------------------------------------------------------------------ -%% @doc Insert a topic into the trie +%% @doc Insert a topic filter into the trie. -spec(insert(emqx_topic:topic()) -> ok). insert(Topic) when is_binary(Topic) -> - case mnesia:read(?TRIE_NODE, Topic) of + case mnesia:wread({?TRIE_NODE, Topic}) of [#trie_node{topic = Topic}] -> ok; [TrieNode = #trie_node{topic = undefined}] -> @@ -77,21 +77,21 @@ insert(Topic) when is_binary(Topic) -> write_trie_node(#trie_node{node_id = Topic, topic = Topic}) end. -%% @doc Find trie nodes that match the topic +%% @doc Find trie nodes that match the topic name. -spec(match(emqx_topic:topic()) -> list(emqx_topic:topic())). match(Topic) when is_binary(Topic) -> TrieNodes = match_node(root, emqx_topic:words(Topic)), [Name || #trie_node{topic = Name} <- TrieNodes, Name =/= undefined]. -%% @doc Lookup a trie node +%% @doc Lookup a trie node. -spec(lookup(NodeId :: binary()) -> [#trie_node{}]). lookup(NodeId) -> mnesia:read(?TRIE_NODE, NodeId). -%% @doc Delete a topic from the trie +%% @doc Delete a topic filter from the trie. -spec(delete(emqx_topic:topic()) -> ok). delete(Topic) when is_binary(Topic) -> - case mnesia:read(?TRIE_NODE, Topic) of + case mnesia:wread({?TRIE_NODE, Topic}) of [#trie_node{edge_count = 0}] -> mnesia:delete({?TRIE_NODE, Topic}), delete_path(lists:reverse(emqx_topic:triples(Topic))); @@ -108,7 +108,7 @@ delete(Topic) when is_binary(Topic) -> %% @doc Add a path to the trie. add_path({Node, Word, Child}) -> Edge = #trie_edge{node_id = Node, word = Word}, - case mnesia:read(?TRIE_NODE, Node) of + case mnesia:wread({?TRIE_NODE, Node}) of [TrieNode = #trie_node{edge_count = Count}] -> case mnesia:wread({?TRIE, Edge}) of [] ->