From cca5081e021c509dbdba07bb9037e9321f8cf546 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 10 Dec 2018 18:37:42 +0800 Subject: [PATCH 1/4] Improve the design of trie, router and broker modules 1. Add do_add_route/1 do_add_route/2, do_delete_route/1, do_delete_route/2 APIs in emqx_router module 2. Improve the code of emqx_trie module 3. Update the emqx_broker module to call the new APIs of emqx_router --- src/emqx_broker.erl | 30 ++++----- src/emqx_router.erl | 134 ++++++++++++++++++------------------- src/emqx_router_helper.erl | 45 +++++++------ src/emqx_shared_sub.erl | 7 +- src/emqx_trie.erl | 18 ++--- test/emqx_router_SUITE.erl | 103 ++++++++++------------------ test/emqx_trie_SUITE.erl | 113 ++++++++++++++++--------------- 7 files changed, 208 insertions(+), 242 deletions(-) diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index a2ddf5b60..a00dc17b8 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -27,6 +27,7 @@ -export([subscriptions/1, subscribers/1, subscribed/2]). -export([get_subopts/2, set_subopts/2]). -export([topics/0]). + %% Stats fun -export([stats_fun/0]). @@ -52,9 +53,9 @@ -spec(start_link(atom(), pos_integer()) -> emqx_types:startlink_ret()). start_link(Pool, Id) -> - _ = create_tabs(), - Name = emqx_misc:proc_name(?BROKER, Id), - gen_server:start_link({local, Name}, ?MODULE, [Pool, Id], []). + ok = create_tabs(), + gen_server:start_link({local, emqx_misc:proc_name(?BROKER, Id)}, + ?MODULE, [Pool, Id], []). %%------------------------------------------------------------------------------ %% Create tabs @@ -75,7 +76,7 @@ create_tabs() -> ok = emqx_tables:new(?SUBSCRIPTION, [duplicate_bag | TabOpts]), %% Subscriber: Topic -> SubPid1, SubPid2, SubPid3, ... - %% duplicate_bag: o(1) insert + %% bag: o(n) insert ok = emqx_tables:new(?SUBSCRIBER, [bag | TabOpts]). %%------------------------------------------------------------------------------ @@ -114,7 +115,7 @@ subscribe(Topic, SubId, SubOpts) when is_binary(Topic), ?is_subid(SubId), is_map call(pick({Topic, Shard}), {subscribe, Topic}); Group -> %% Shard subscription true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts}), - emqx_shard_sub:subscribe(Group, Topic, SubPid) + emqx_shared_sub:subscribe(Group, Topic, SubPid) end; true -> ok end. @@ -367,18 +368,17 @@ pick(Topic) -> %%------------------------------------------------------------------------------ init([Pool, Id]) -> - _ = emqx_router:set_mode(protected), true = gproc_pool:connect_worker(Pool, {Pool, Id}), {ok, #{pool => Pool, id => Id}}. handle_call({subscribe, Topic}, _From, State) -> - case get(Topic) of - undefined -> - _ = put(Topic, true), - emqx_router:add_route(Topic); - true -> ok - end, - {reply, ok, State}; + Ok = case get(Topic) of + undefined -> + _ = put(Topic, true), + emqx_router:do_add_route(Topic); + true -> ok + end, + {reply, Ok, State}; handle_call(Req, _From, State) -> emqx_logger:error("[Broker] unexpected call: ~p", [Req]), @@ -387,8 +387,8 @@ handle_call(Req, _From, State) -> handle_cast({unsubscribed, Topic}, State) -> case ets:member(?SUBSCRIBER, Topic) of false -> - _ = erase(Topic), - emqx_router:delete_route(Topic); + _ = erase(Topic), + emqx_router:do_delete_route(Topic); true -> ok end, {noreply, State}; diff --git a/src/emqx_router.erl b/src/emqx_router.erl index 313adc475..c165b97ca 100644 --- a/src/emqx_router.erl +++ b/src/emqx_router.erl @@ -29,19 +29,19 @@ %% Route APIs -export([add_route/1, add_route/2]). --export([get_routes/1]). +-export([do_add_route/1, do_add_route/2]). +-export([match_routes/1, lookup_routes/1, has_routes/1]). -export([delete_route/1, delete_route/2]). --export([has_routes/1, match_routes/1, print_routes/1]). +-export([do_delete_route/1, do_delete_route/2]). +-export([print_routes/1]). -export([topics/0]). -%% Mode --export([set_mode/1, get_mode/0]). - %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --type(destination() :: node() | {binary(), node()}). +-type(group() :: binary()). +-type(destination() :: node() | {group(), node()}). -define(ROUTE, emqx_route). @@ -66,75 +66,76 @@ mnesia(copy) -> -spec(start_link(atom(), pos_integer()) -> emqx_types:startlink_ret()). start_link(Pool, Id) -> - Name = emqx_misc:proc_name(?MODULE, Id), - gen_server:start_link({local, Name}, ?MODULE, [Pool, Id], [{hibernate_after, 1000}]). + gen_server:start_link({local, emqx_misc:proc_name(?MODULE, Id)}, + ?MODULE, [Pool, Id], [{hibernate_after, 1000}]). %%------------------------------------------------------------------------------ %% Route APIs %%------------------------------------------------------------------------------ --spec(add_route(emqx_topic:topic() | emqx_types:route()) -> ok | {error, term()}). +-spec(add_route(emqx_topic:topic()) -> ok | {error, term()}). add_route(Topic) when is_binary(Topic) -> - add_route(#route{topic = Topic, dest = node()}); -add_route(Route = #route{topic = Topic}) -> - case get_mode() of - protected -> do_add_route(Route); - undefined -> call(pick(Topic), {add_route, Route}) - end. + add_route(Topic, node()). -spec(add_route(emqx_topic:topic(), destination()) -> ok | {error, term()}). add_route(Topic, Dest) when is_binary(Topic) -> - add_route(#route{topic = Topic, dest = Dest}). + call(pick(Topic), {add_route, Topic, Dest}). -%% @private -do_add_route(Route = #route{topic = Topic, dest = Dest}) -> - case lists:member(Route, get_routes(Topic)) of +-spec(do_add_route(emqx_topic:topic()) -> ok | {error, term()}). +do_add_route(Topic) when is_binary(Topic) -> + do_add_route(Topic, node()). + +-spec(do_add_route(emqx_topic:topic(), destination()) -> ok | {error, term()}). +do_add_route(Topic, Dest) when is_binary(Topic) -> + Route = #route{topic = Topic, dest = Dest}, + case lists:member(Route, lookup_routes(Topic)) of true -> ok; false -> ok = emqx_router_helper:monitor(Dest), case emqx_topic:wildcard(Topic) of - true -> trans(fun add_trie_route/1, [Route]); - false -> add_direct_route(Route) + true -> trans(fun insert_trie_route/1, [Route]); + false -> insert_direct_route(Route) end end. --spec(get_routes(emqx_topic:topic()) -> [emqx_types:route()]). -get_routes(Topic) -> +%% @doc Match routes +-spec(match_routes(emqx_topic:topic()) -> [emqx_types:route()]). +match_routes(Topic) when is_binary(Topic) -> + %% Optimize: routing table will be replicated to all router nodes. + Matched = mnesia:ets(fun emqx_trie:match/1, [Topic]), + lists:append([lookup_routes(To) || To <- [Topic | Matched]]). + +-spec(lookup_routes(emqx_topic:topic()) -> [emqx_types:route()]). +lookup_routes(Topic) -> ets:lookup(?ROUTE, Topic). --spec(delete_route(emqx_topic:topic() | emqx_types:route()) -> ok | {error, term()}). -delete_route(Topic) when is_binary(Topic) -> - delete_route(#route{topic = Topic, dest = node()}); -delete_route(Route = #route{topic = Topic}) -> - case get_mode() of - protected -> do_delete_route(Route); - undefined -> call(pick(Topic), {delete_route, Route}) - end. - --spec(delete_route(emqx_topic:topic(), destination()) -> ok | {error, term()}). -delete_route(Topic, Dest) when is_binary(Topic) -> - delete_route(#route{topic = Topic, dest = Dest}). - -%% @private -do_delete_route(Route = #route{topic = Topic}) -> - case emqx_topic:wildcard(Topic) of - true -> trans(fun del_trie_route/1, [Route]); - false -> del_direct_route(Route) - end. - -spec(has_routes(emqx_topic:topic()) -> boolean()). has_routes(Topic) when is_binary(Topic) -> ets:member(?ROUTE, Topic). --spec(topics() -> list(emqx_topic:topic())). -topics() -> mnesia:dirty_all_keys(?ROUTE). +-spec(delete_route(emqx_topic:topic()) -> ok | {error, term()}). +delete_route(Topic) when is_binary(Topic) -> + delete_route(Topic, node()). -%% @doc Match routes -%% Optimize: routing table will be replicated to all router nodes. --spec(match_routes(emqx_topic:topic()) -> [emqx_types:route()]). -match_routes(Topic) when is_binary(Topic) -> - Matched = mnesia:ets(fun emqx_trie:match/1, [Topic]), - lists:append([get_routes(To) || To <- [Topic | Matched]]). +-spec(delete_route(emqx_topic:topic(), destination()) -> ok | {error, term()}). +delete_route(Topic, Dest) when is_binary(Topic) -> + call(pick(Topic), {delete_route, Topic, Dest}). + +-spec(do_delete_route(emqx_topic:topic()) -> ok | {error, term()}). +do_delete_route(Topic) when is_binary(Topic) -> + do_delete_route(Topic, node()). + +-spec(do_delete_route(emqx_topic:topic(), destination()) -> ok | {error, term()}). +do_delete_route(Topic, Dest) -> + Route = #route{topic = Topic, dest = Dest}, + case emqx_topic:wildcard(Topic) of + true -> trans(fun delete_trie_route/1, [Route]); + false -> delete_direct_route(Route) + end. + +-spec(topics() -> list(emqx_topic:topic())). +topics() -> + mnesia:dirty_all_keys(?ROUTE). %% @doc Print routes to a topic -spec(print_routes(emqx_topic:topic()) -> ok). @@ -143,13 +144,6 @@ print_routes(Topic) -> io:format("~s -> ~s~n", [To, Dest]) end, match_routes(Topic)). --spec(set_mode(protected | atom()) -> any()). -set_mode(Mode) when is_atom(Mode) -> - put('$router_mode', Mode). - --spec(get_mode() -> protected | undefined | atom()). -get_mode() -> get('$router_mode'). - call(Router, Msg) -> gen_server:call(Router, Msg, infinity). @@ -164,11 +158,13 @@ init([Pool, Id]) -> true = gproc_pool:connect_worker(Pool, {Pool, Id}), {ok, #{pool => Pool, id => Id}}. -handle_call({add_route, Route}, _From, State) -> - {reply, do_add_route(Route), State}; +handle_call({add_route, Topic, Dest}, _From, State) -> + Ok = do_add_route(Topic, Dest), + {reply, Ok, State}; -handle_call({delete_route, Route}, _From, State) -> - {reply, do_delete_route(Route), State}; +handle_call({delete_route, Topic, Dest}, _From, State) -> + Ok = do_delete_route(Topic, Dest), + {reply, Ok, State}; handle_call(Req, _From, State) -> emqx_logger:error("[Router] unexpected call: ~p", [Req]), @@ -192,23 +188,23 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%------------------------------------------------------------------------------ -add_direct_route(Route) -> +insert_direct_route(Route) -> mnesia:async_dirty(fun mnesia:write/3, [?ROUTE, Route, sticky_write]). -add_trie_route(Route = #route{topic = Topic}) -> +insert_trie_route(Route = #route{topic = Topic}) -> case mnesia:wread({?ROUTE, Topic}) of [] -> emqx_trie:insert(Topic); _ -> ok end, mnesia:write(?ROUTE, Route, sticky_write). -del_direct_route(Route) -> +delete_direct_route(Route) -> mnesia:async_dirty(fun mnesia:delete_object/3, [?ROUTE, Route, sticky_write]). -del_trie_route(Route = #route{topic = Topic}) -> +delete_trie_route(Route = #route{topic = Topic}) -> case mnesia:wread({?ROUTE, Topic}) of [Route] -> %% Remove route and trie - mnesia:delete_object(?ROUTE, Route, sticky_write), + ok = mnesia:delete_object(?ROUTE, Route, sticky_write), emqx_trie:delete(Topic); [_|_] -> %% Remove route only mnesia:delete_object(?ROUTE, Route, sticky_write); @@ -219,7 +215,7 @@ del_trie_route(Route = #route{topic = Topic}) -> -spec(trans(function(), list(any())) -> ok | {error, term()}). trans(Fun, Args) -> case mnesia:transaction(Fun, Args) of - {atomic, _} -> ok; - {aborted, Error} -> {error, Error} + {atomic, Ok} -> Ok; + {aborted, Reason} -> {error, Reason} end. diff --git a/src/emqx_router_helper.erl b/src/emqx_router_helper.erl index c24b10715..efeaabc74 100644 --- a/src/emqx_router_helper.erl +++ b/src/emqx_router_helper.erl @@ -31,15 +31,11 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -%% internal export +%% Internal export -export([stats_fun/0]). -record(routing_node, {name, const = unused}). --record(state, {nodes = []}). --compile({no_auto_import, [monitor/1]}). - --define(SERVER, ?MODULE). -define(ROUTE, emqx_route). -define(ROUTING_NODE, emqx_routing_node). -define(LOCK, {?MODULE, cleanup_routes}). @@ -64,9 +60,9 @@ mnesia(copy) -> %%------------------------------------------------------------------------------ %% @doc Starts the router helper --spec(start_link() -> {ok, pid()} | ignore | {error, any()}). +-spec(start_link() -> emqx_types:startlink_ret()). start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). %% @doc Monitor routing node -spec(monitor(node() | {binary(), node()}) -> ok). @@ -84,18 +80,18 @@ monitor(Node) when is_atom(Node) -> %%------------------------------------------------------------------------------ init([]) -> - _ = ekka:monitor(membership), - _ = mnesia:subscribe({table, ?ROUTING_NODE, simple}), + ok = ekka:monitor(membership), + {ok, _} = mnesia:subscribe({table, ?ROUTING_NODE, simple}), Nodes = lists:foldl( fun(Node, Acc) -> case ekka:is_member(Node) of true -> Acc; - false -> _ = erlang:monitor_node(Node, true), + false -> true = erlang:monitor_node(Node, true), [Node | Acc] end end, [], mnesia:dirty_all_keys(?ROUTING_NODE)), emqx_stats:update_interval(route_stats, fun ?MODULE:stats_fun/0), - {ok, #state{nodes = Nodes}, hibernate}. + {ok, #{nodes => Nodes}, hibernate}. handle_call(Req, _From, State) -> emqx_logger:error("[RouterHelper] unexpected call: ~p", [Req]), @@ -105,24 +101,29 @@ handle_cast(Msg, State) -> emqx_logger:error("[RouterHelper] unexpected cast: ~p", [Msg]), {noreply, State}. -handle_info({mnesia_table_event, {write, #routing_node{name = Node}, _}}, State = #state{nodes = Nodes}) -> - emqx_logger:info("[RouterHelper] write routing node: ~s", [Node]), +handle_info({mnesia_table_event, {write, {?ROUTING_NODE, Node, _}, _}}, State = #{nodes := Nodes}) -> case ekka:is_member(Node) orelse lists:member(Node, Nodes) of - true -> {noreply, State}; - false -> _ = erlang:monitor_node(Node, true), - {noreply, State#state{nodes = [Node | Nodes]}} + true -> {noreply, State}; + false -> + true = erlang:monitor_node(Node, true), + {noreply, State#{nodes := [Node | Nodes]}} end; -handle_info({mnesia_table_event, _Event}, State) -> +handle_info({mnesia_table_event, {delete, {?ROUTING_NODE, _Node}, _}}, State) -> + %% ignore {noreply, State}; -handle_info({nodedown, Node}, State = #state{nodes = Nodes}) -> +handle_info({mnesia_table_event, Event}, State) -> + emqx_logger:error("[RouterHelper] unexpected mnesia_table_event: ~p", [Event]), + {noreply, State}; + +handle_info({nodedown, Node}, State = #{nodes := Nodes}) -> global:trans({?LOCK, self()}, fun() -> mnesia:transaction(fun cleanup_routes/1, [Node]) end), - mnesia:dirty_delete(?ROUTING_NODE, Node), - {noreply, State#state{nodes = lists:delete(Node, Nodes)}, hibernate}; + ok = mnesia:dirty_delete(?ROUTING_NODE, Node), + {noreply, State#{nodes := lists:delete(Node, Nodes)}, hibernate}; handle_info({membership, {mnesia, down, Node}}, State) -> handle_info({nodedown, Node}, State); @@ -134,8 +135,8 @@ handle_info(Info, State) -> emqx_logger:error("[RouteHelper] unexpected info: ~p", [Info]), {noreply, State}. -terminate(_Reason, #state{}) -> - ekka:unmonitor(membership), +terminate(_Reason, _State) -> + ok = ekka:unmonitor(membership), emqx_stats:cancel_update(route_stats), mnesia:unsubscribe({table, ?ROUTING_NODE, simple}). diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index d1d0d921d..b7e41213b 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -252,7 +252,6 @@ subscribers(Group, Topic) -> %%------------------------------------------------------------------------------ init([]) -> - _ = emqx_router:set_mode(protected), mnesia:subscribe({table, ?TAB, simple}), {atomic, PMon} = mnesia:transaction(fun init_monitors/0), ok = emqx_tables:new(?SHARED_SUBS, [protected, bag]), @@ -269,7 +268,7 @@ handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon mnesia:dirty_write(?TAB, record(Group, Topic, SubPid)), case ets:member(?SHARED_SUBS, {Group, Topic}) of true -> ok; - false -> ok = emqx_router:add_route(Topic, {Group, node()}) + false -> ok = emqx_router:do_add_route(Topic, {Group, node()}) end, ok = maybe_insert_alive_tab(SubPid), true = ets:insert(?SHARED_SUBS, {{Group, Topic}, SubPid}), @@ -280,7 +279,7 @@ handle_call({unsubscribe, Group, Topic, SubPid}, _From, State) -> true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}), case ets:member(?SHARED_SUBS, {Group, Topic}) of true -> ok; - false -> ok = emqx_router:delete_route(Topic, {Group, node()}) + false -> ok = emqx_router:do_delete_route(Topic, {Group, node()}) end, {reply, ok, State}; @@ -334,7 +333,7 @@ cleanup_down(SubPid) -> true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}), case ets:member(?SHARED_SUBS, {Group, Topic}) of true -> ok; - false -> ok = emqx_router:delete_route(Topic, {Group, node()}) + false -> ok = emqx_router:do_delete_route(Topic, {Group, node()}) end end, mnesia:dirty_match_object(#emqx_shared_subscription{_ = '_', subpid = SubPid})). diff --git a/src/emqx_trie.erl b/src/emqx_trie.erl index 79f6042b7..27ff52827 100644 --- a/src/emqx_trie.erl +++ b/src/emqx_trie.erl @@ -36,7 +36,7 @@ %% @doc Create or replicate trie tables. -spec(mnesia(boot | copy) -> ok). mnesia(boot) -> - %% Optimize + %% Optimize storage StoreProps = [{ets, [{read_concurrency, true}, {write_concurrency, true}]}], %% Trie table @@ -72,7 +72,7 @@ insert(Topic) when is_binary(Topic) -> write_trie_node(TrieNode#trie_node{topic = Topic}); [] -> %% Add trie path - lists:foreach(fun add_path/1, emqx_topic:triples(Topic)), + ok = lists:foreach(fun add_path/1, emqx_topic:triples(Topic)), %% Add last node write_trie_node(#trie_node{node_id = Topic, topic = Topic}) end. @@ -93,7 +93,7 @@ lookup(NodeId) -> delete(Topic) when is_binary(Topic) -> case mnesia:wread({?TRIE_NODE, Topic}) of [#trie_node{edge_count = 0}] -> - mnesia:delete({?TRIE_NODE, Topic}), + ok = mnesia:delete({?TRIE_NODE, Topic}), delete_path(lists:reverse(emqx_topic:triples(Topic))); [TrieNode] -> write_trie_node(TrieNode#trie_node{topic = undefined}); @@ -112,12 +112,12 @@ add_path({Node, Word, Child}) -> [TrieNode = #trie_node{edge_count = Count}] -> case mnesia:wread({?TRIE, Edge}) of [] -> - write_trie_node(TrieNode#trie_node{edge_count = Count + 1}), + ok = write_trie_node(TrieNode#trie_node{edge_count = Count + 1}), write_trie(#trie{edge = Edge, node_id = Child}); [_] -> ok end; [] -> - write_trie_node(#trie_node{node_id = Node, edge_count = 1}), + ok = write_trie_node(#trie_node{node_id = Node, edge_count = 1}), write_trie(#trie{edge = Edge, node_id = Child}) end. @@ -154,10 +154,10 @@ match_node(NodeId, [W|Words], ResAcc) -> delete_path([]) -> ok; delete_path([{NodeId, Word, _} | RestPath]) -> - mnesia:delete({?TRIE, #trie_edge{node_id = NodeId, word = Word}}), - case mnesia:read(?TRIE_NODE, NodeId) of + ok = mnesia:delete({?TRIE, #trie_edge{node_id = NodeId, word = Word}}), + case mnesia:wread({?TRIE_NODE, NodeId}) of [#trie_node{edge_count = 1, topic = undefined}] -> - mnesia:delete({?TRIE_NODE, NodeId}), + ok = mnesia:delete({?TRIE_NODE, NodeId}), delete_path(RestPath); [TrieNode = #trie_node{edge_count = 1, topic = _}] -> write_trie_node(TrieNode#trie_node{edge_count = 0}); @@ -167,9 +167,11 @@ delete_path([{NodeId, Word, _} | RestPath]) -> mnesia:abort({node_not_found, NodeId}) end. +%% @private write_trie(Trie) -> mnesia:write(?TRIE, Trie, write). +%% @private write_trie_node(TrieNode) -> mnesia:write(?TRIE_NODE, TrieNode, write). diff --git a/test/emqx_router_SUITE.erl b/test/emqx_router_SUITE.erl index e317ec7b3..c115fd0cd 100644 --- a/test/emqx_router_SUITE.erl +++ b/test/emqx_router_SUITE.erl @@ -21,17 +21,16 @@ -compile(nowarn_export_all). -define(R, emqx_router). --define(TABS, [emqx_route, emqx_trie, emqx_trie_node]). all() -> [{group, route}]. groups() -> [{route, [sequence], - [add_del_route, - match_routes, - has_routes, - router_add_del]}]. + [t_add_delete, + t_do_add_delete, + t_match_routes, + t_has_routes]}]. init_per_suite(Config) -> emqx_ct_broker_helpers:run_setup_steps(), @@ -47,77 +46,47 @@ init_per_testcase(_TestCase, Config) -> end_per_testcase(_TestCase, _Config) -> clear_tables(). -add_del_route(_) -> - From = {self(), make_ref()}, - ?R:add_route(From, <<"a/b/c">>, node()), - timer:sleep(1), - - ?R:add_route(From, <<"a/b/c">>, node()), - timer:sleep(1), - - ?R:add_route(From, <<"a/+/b">>, node()), - ct:log("Topics: ~p ~n", [emqx_topic:wildcard(<<"a/+/b">>)]), - timer:sleep(1), - +t_add_delete(_) -> + ?R:add_route(<<"a/b/c">>, node()), + ?R:add_route(<<"a/b/c">>, node()), + ?R:add_route(<<"a/+/b">>, node()), ?assertEqual([<<"a/+/b">>, <<"a/b/c">>], lists:sort(?R:topics())), - ?R:del_route(From, <<"a/b/c">>, node()), + ?R:delete_route(<<"a/b/c">>), + ?R:delete_route(<<"a/+/b">>, node()), + ?assertEqual([], ?R:topics()). - ?R:del_route(From, <<"a/+/b">>, node()), - timer:sleep(120), - ?assertEqual([], lists:sort(?R:topics())). +t_do_add_delete(_) -> + ?R:do_add_route(<<"a/b/c">>, node()), + ?R:do_add_route(<<"a/b/c">>, node()), + ?R:do_add_route(<<"a/+/b">>, node()), + ?assertEqual([<<"a/+/b">>, <<"a/b/c">>], lists:sort(?R:topics())), -match_routes(_) -> - From = {self(), make_ref()}, - ?R:add_route(From, <<"a/b/c">>, node()), - ?R:add_route(From, <<"a/+/c">>, node()), - ?R:add_route(From, <<"a/b/#">>, node()), - ?R:add_route(From, <<"#">>, node()), - timer:sleep(1000), + ?R:do_delete_route(<<"a/b/c">>, node()), + ?R:do_delete_route(<<"a/+/b">>), + ?assertEqual([], ?R:topics()). + +t_match_routes(_) -> + ?R:add_route(<<"a/b/c">>, node()), + ?R:add_route(<<"a/+/c">>, node()), + ?R:add_route(<<"a/b/#">>, node()), + ?R:add_route(<<"#">>, node()), ?assertEqual([#route{topic = <<"#">>, dest = node()}, #route{topic = <<"a/+/c">>, dest = node()}, #route{topic = <<"a/b/#">>, dest = node()}, #route{topic = <<"a/b/c">>, dest = node()}], - lists:sort(?R:match_routes(<<"a/b/c">>))). + lists:sort(?R:match_routes(<<"a/b/c">>))), + ?R:delete_route(<<"a/b/c">>, node()), + ?R:delete_route(<<"a/+/c">>, node()), + ?R:delete_route(<<"a/b/#">>, node()), + ?R:delete_route(<<"#">>, node()), + ?assertEqual([], lists:sort(?R:match_routes(<<"a/b/c">>))). -has_routes(_) -> - From = {self(), make_ref()}, - ?R:add_route(From, <<"devices/+/messages">>, node()), - timer:sleep(200), - ?assert(?R:has_routes(<<"devices/+/messages">>)). +t_has_routes(_) -> + ?R:add_route(<<"devices/+/messages">>, node()), + ?assert(?R:has_routes(<<"devices/+/messages">>)), + ?R:delete_route(<<"devices/+/messages">>). clear_tables() -> - lists:foreach(fun mnesia:clear_table/1, ?TABS). - -router_add_del(_) -> - ?R:add_route(<<"#">>), - ?R:add_route(<<"a/b/c">>, node()), - ?R:add_route(<<"+/#">>), - Routes = [R1, R2 | _] = [ - #route{topic = <<"#">>, dest = node()}, - #route{topic = <<"+/#">>, dest = node()}, - #route{topic = <<"a/b/c">>, dest = node()}], - timer:sleep(500), - ?assertEqual(Routes, lists:sort(?R:match_routes(<<"a/b/c">>))), - - ?R:print_routes(<<"a/b/c">>), - - %% Batch Add - lists:foreach(fun(R) -> ?R:add_route(R) end, Routes), - ?assertEqual(Routes, lists:sort(?R:match_routes(<<"a/b/c">>))), - - %% Del - ?R:del_route(<<"a/b/c">>, node()), - timer:sleep(500), - [R1, R2] = lists:sort(?R:match_routes(<<"a/b/c">>)), - {atomic, []} = mnesia:transaction(fun emqx_trie:lookup/1, [<<"a/b/c">>]), - - %% Batch Del - R3 = #route{topic = <<"#">>, dest = 'a@127.0.0.1'}, - ?R:add_route(R3), - ?R:del_route(<<"#">>), - ?R:del_route(R2), - ?R:del_route(R3), - timer:sleep(500), - [] = lists:sort(?R:match_routes(<<"a/b/c">>)). + lists:foreach(fun mnesia:clear_table/1, [emqx_route, emqx_trie, emqx_trie_node]). diff --git a/test/emqx_trie_SUITE.erl b/test/emqx_trie_SUITE.erl index 85637a447..09226979f 100644 --- a/test/emqx_trie_SUITE.erl +++ b/test/emqx_trie_SUITE.erl @@ -47,36 +47,36 @@ t_insert(_) -> edge_count = 3, topic = <<"sensor">>, flags = undefined}, - {atomic, [TN]} = mnesia:transaction( - fun() -> - ?TRIE:insert(<<"sensor/1/metric/2">>), - ?TRIE:insert(<<"sensor/+/#">>), - ?TRIE:insert(<<"sensor/#">>), - ?TRIE:insert(<<"sensor">>), - ?TRIE:insert(<<"sensor">>), - ?TRIE:lookup(<<"sensor">>) - end). + Fun = fun() -> + ?TRIE:insert(<<"sensor/1/metric/2">>), + ?TRIE:insert(<<"sensor/+/#">>), + ?TRIE:insert(<<"sensor/#">>), + ?TRIE:insert(<<"sensor">>), + ?TRIE:insert(<<"sensor">>), + ?TRIE:lookup(<<"sensor">>) + end, + ?assertEqual({atomic, [TN]}, mnesia:transaction(Fun)). t_match(_) -> Machted = [<<"sensor/+/#">>, <<"sensor/#">>], - {atomic, Machted} = mnesia:transaction( - fun() -> - ?TRIE:insert(<<"sensor/1/metric/2">>), - ?TRIE:insert(<<"sensor/+/#">>), - ?TRIE:insert(<<"sensor/#">>), - ?TRIE:match(<<"sensor/1">>) - end). + Fun = fun() -> + ?TRIE:insert(<<"sensor/1/metric/2">>), + ?TRIE:insert(<<"sensor/+/#">>), + ?TRIE:insert(<<"sensor/#">>), + ?TRIE:match(<<"sensor/1">>) + end, + ?assertEqual({atomic, Machted}, mnesia:transaction(Fun)). t_match2(_) -> Matched = {[<<"+/+/#">>, <<"+/#">>, <<"#">>], []}, - {atomic, Matched} = mnesia:transaction( - fun() -> - ?TRIE:insert(<<"#">>), - ?TRIE:insert(<<"+/#">>), - ?TRIE:insert(<<"+/+/#">>), - {?TRIE:match(<<"a/b/c">>), - ?TRIE:match(<<"$SYS/broker/zenmq">>)} - end). + Fun = fun() -> + ?TRIE:insert(<<"#">>), + ?TRIE:insert(<<"+/#">>), + ?TRIE:insert(<<"+/+/#">>), + {?TRIE:match(<<"a/b/c">>), + ?TRIE:match(<<"$SYS/broker/zenmq">>)} + end, + ?assertEqual({atomic, Matched}, mnesia:transaction(Fun)). t_match3(_) -> Topics = [<<"d/#">>, <<"a/b/c">>, <<"a/b/+">>, <<"a/#">>, <<"#">>, <<"$SYS/#">>], @@ -91,43 +91,42 @@ t_delete(_) -> edge_count = 2, topic = undefined, flags = undefined}, - {atomic, [TN]} = mnesia:transaction( - fun() -> - ?TRIE:insert(<<"sensor/1/#">>), - ?TRIE:insert(<<"sensor/1/metric/2">>), - ?TRIE:insert(<<"sensor/1/metric/3">>), - ?TRIE:delete(<<"sensor/1/metric/2">>), - ?TRIE:delete(<<"sensor/1/metric">>), - ?TRIE:delete(<<"sensor/1/metric">>), - ?TRIE:lookup(<<"sensor/1">>) - end). + Fun = fun() -> + ?TRIE:insert(<<"sensor/1/#">>), + ?TRIE:insert(<<"sensor/1/metric/2">>), + ?TRIE:insert(<<"sensor/1/metric/3">>), + ?TRIE:delete(<<"sensor/1/metric/2">>), + ?TRIE:delete(<<"sensor/1/metric">>), + ?TRIE:delete(<<"sensor/1/metric">>), + ?TRIE:lookup(<<"sensor/1">>) + end, + ?assertEqual({atomic, [TN]}, mnesia:transaction(Fun)). t_delete2(_) -> - {atomic, {[], []}} = mnesia:transaction( - fun() -> - ?TRIE:insert(<<"sensor">>), - ?TRIE:insert(<<"sensor/1/metric/2">>), - ?TRIE:insert(<<"sensor/1/metric/3">>), - ?TRIE:delete(<<"sensor">>), - ?TRIE:delete(<<"sensor/1/metric/2">>), - ?TRIE:delete(<<"sensor/1/metric/3">>), - {?TRIE:lookup(<<"sensor">>), - ?TRIE:lookup(<<"sensor/1">>)} - end). + Fun = fun() -> + ?TRIE:insert(<<"sensor">>), + ?TRIE:insert(<<"sensor/1/metric/2">>), + ?TRIE:insert(<<"sensor/1/metric/3">>), + ?TRIE:delete(<<"sensor">>), + ?TRIE:delete(<<"sensor/1/metric/2">>), + ?TRIE:delete(<<"sensor/1/metric/3">>), + {?TRIE:lookup(<<"sensor">>), ?TRIE:lookup(<<"sensor/1">>)} + end, + ?assertEqual({atomic, {[], []}}, mnesia:transaction(Fun)). t_delete3(_) -> - {atomic, {[], []}} = mnesia:transaction( - fun() -> - ?TRIE:insert(<<"sensor/+">>), - ?TRIE:insert(<<"sensor/+/metric/2">>), - ?TRIE:insert(<<"sensor/+/metric/3">>), - ?TRIE:delete(<<"sensor/+/metric/2">>), - ?TRIE:delete(<<"sensor/+/metric/3">>), - ?TRIE:delete(<<"sensor">>), - ?TRIE:delete(<<"sensor/+">>), - ?TRIE:delete(<<"sensor/+/unknown">>), - {?TRIE:lookup(<<"sensor">>), ?TRIE:lookup(<<"sensor/+">>)} - end). + Fun = fun() -> + ?TRIE:insert(<<"sensor/+">>), + ?TRIE:insert(<<"sensor/+/metric/2">>), + ?TRIE:insert(<<"sensor/+/metric/3">>), + ?TRIE:delete(<<"sensor/+/metric/2">>), + ?TRIE:delete(<<"sensor/+/metric/3">>), + ?TRIE:delete(<<"sensor">>), + ?TRIE:delete(<<"sensor/+">>), + ?TRIE:delete(<<"sensor/+/unknown">>), + {?TRIE:lookup(<<"sensor">>), ?TRIE:lookup(<<"sensor/+">>)} + end, + ?assertEqual({atomic, {[], []}}, mnesia:transaction(Fun)). clear_tables() -> lists:foreach(fun mnesia:clear_table/1, ?TRIE_TABS). From 7074707d6459a7698c490c044439adbfb7b1da16 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Tue, 11 Dec 2018 14:06:23 +0800 Subject: [PATCH 2/4] Add t_mnesia/1 test case --- test/emqx_trie_SUITE.erl | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/test/emqx_trie_SUITE.erl b/test/emqx_trie_SUITE.erl index 09226979f..500fe3574 100644 --- a/test/emqx_trie_SUITE.erl +++ b/test/emqx_trie_SUITE.erl @@ -24,7 +24,7 @@ -define(TRIE_TABS, [emqx_trie, emqx_trie_node]). all() -> - [t_insert, t_match, t_match2, t_match3, t_delete, t_delete2, t_delete3]. + [t_mnesia, t_insert, t_match, t_match2, t_match3, t_delete, t_delete2, t_delete3]. init_per_suite(Config) -> application:load(emqx), @@ -42,6 +42,9 @@ init_per_testcase(_TestCase, Config) -> end_per_testcase(_TestCase, _Config) -> clear_tables(). +t_mnesia(_) -> + ok = ?TRIE:mnesia(copy). + t_insert(_) -> TN = #trie_node{node_id = <<"sensor">>, edge_count = 3, From 47e3cd3692722400153e95463cca8d8532615607 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 12 Dec 2018 13:34:13 +0800 Subject: [PATCH 3/4] Improve the subscription sharding. --- src/emqx.erl | 6 +- src/emqx_broker.erl | 217 +++++++++++++++++++---------------- src/emqx_broker_helper.erl | 80 ++++++++----- src/emqx_router_helper.erl | 2 +- src/emqx_sequence.erl | 1 + src/emqx_sm.erl | 2 +- src/emqx_tables.erl | 14 +++ test/emqx_sequence_SUITE.erl | 3 +- test/emqx_tables_SUITE.erl | 8 +- 9 files changed, 197 insertions(+), 136 deletions(-) diff --git a/src/emqx.erl b/src/emqx.erl index 3792cc4f8..76e966a59 100644 --- a/src/emqx.erl +++ b/src/emqx.erl @@ -22,7 +22,7 @@ %% PubSub API -export([subscribe/1, subscribe/2, subscribe/3]). -export([publish/1]). --export([unsubscribe/1, unsubscribe/2]). +-export([unsubscribe/1]). %% PubSub management API -export([topics/0, subscriptions/1, subscribers/1, subscribed/2]). @@ -88,10 +88,6 @@ publish(Msg) -> unsubscribe(Topic) -> emqx_broker:unsubscribe(iolist_to_binary(Topic)). --spec(unsubscribe(emqx_topic:topic() | string(), emqx_types:subid()) -> ok). -unsubscribe(Topic, SubId) -> - emqx_broker:unsubscribe(iolist_to_binary(Topic), SubId). - %%------------------------------------------------------------------------------ %% PubSub management API %%------------------------------------------------------------------------------ diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index a00dc17b8..9ed4aad06 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -20,7 +20,7 @@ -export([start_link/2]). -export([subscribe/1, subscribe/2, subscribe/3]). --export([unsubscribe/1, unsubscribe/2]). +-export([unsubscribe/1]). -export([subscriber_down/1]). -export([publish/1, safe_publish/1]). -export([dispatch/2]). @@ -35,6 +35,8 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +-import(emqx_tables, [lookup_value/2, lookup_value/3]). + -ifdef(TEST). -compile(export_all). -compile(nowarn_export_all). @@ -42,8 +44,7 @@ -define(BROKER, ?MODULE). -%% ETS tables --define(SUBID, emqx_subid). +%% ETS tables for PubSub -define(SUBOPTION, emqx_suboption). -define(SUBSCRIBER, emqx_subscriber). -define(SUBSCRIPTION, emqx_subscription). @@ -65,9 +66,6 @@ start_link(Pool, Id) -> create_tabs() -> TabOpts = [public, {read_concurrency, true}, {write_concurrency, true}], - %% SubId: SubId -> SubPid - ok = emqx_tables:new(?SUBID, [set | TabOpts]), - %% SubOption: {SubPid, Topic} -> SubOption ok = emqx_tables:new(?SUBOPTION, [set | TabOpts]), @@ -76,7 +74,7 @@ create_tabs() -> ok = emqx_tables:new(?SUBSCRIPTION, [duplicate_bag | TabOpts]), %% Subscriber: Topic -> SubPid1, SubPid2, SubPid3, ... - %% bag: o(n) insert + %% bag: o(n) insert:( ok = emqx_tables:new(?SUBSCRIBER, [bag | TabOpts]). %%------------------------------------------------------------------------------ @@ -98,28 +96,37 @@ subscribe(Topic, SubId, SubOpts) when is_binary(Topic), ?is_subid(SubId), is_map SubPid = self(), case ets:member(?SUBOPTION, {SubPid, Topic}) of false -> - ok = emqx_broker_helper:monitor(SubPid, SubId), - %% true = ets:insert(?SUBID, {SubId, SubPid}), - true = ets:insert(?SUBSCRIPTION, {SubPid, Topic}), - case maps:get(share, SubOpts, undefined) of - undefined -> - Shard = emqx_broker_helper:get_shard(SubPid, Topic), - case Shard of - 0 -> true = ets:insert(?SUBSCRIBER, {Topic, SubPid}); - I -> - true = ets:insert(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), - true = ets:insert(?SUBSCRIBER, {Topic, {shard, I}}) - end, - SubOpts1 = maps:put(shard, Shard, SubOpts), - true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts1}), - call(pick({Topic, Shard}), {subscribe, Topic}); - Group -> %% Shard subscription - true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts}), - emqx_shared_sub:subscribe(Group, Topic, SubPid) - end; + ok = emqx_broker_helper:monitor_sub(SubPid, SubId), + do_subscribe(Topic, SubPid, with_subid(SubId, SubOpts)); true -> ok end. +with_subid(undefined, SubOpts) -> + SubOpts; +with_subid(SubId, SubOpts) -> + maps:put(subid, SubId, SubOpts). + +%% @private +do_subscribe(Topic, SubPid, SubOpts) -> + true = ets:insert(?SUBSCRIPTION, {SubPid, Topic}), + Group = maps:get(share, SubOpts, undefined), + do_subscribe(Group, Topic, SubPid, SubOpts). + +do_subscribe(undefined, Topic, SubPid, SubOpts) -> + case emqx_broker_helper:get_sub_shard(SubPid, Topic) of + 0 -> true = ets:insert(?SUBSCRIBER, {Topic, SubPid}), + true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts}), + call(pick(Topic), {subscribe, Topic}); + I -> true = ets:insert(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), + true = ets:insert(?SUBOPTION, {{SubPid, Topic}, maps:put(shard, I, SubOpts)}), + call(pick({Topic, I}), {subscribe, Topic, I}) + end; + +%% Shared subscription +do_subscribe(Group, Topic, SubPid, SubOpts) -> + true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts}), + emqx_shared_sub:subscribe(Group, Topic, SubPid). + %%------------------------------------------------------------------------------ %% Unsubscribe API %%------------------------------------------------------------------------------ @@ -130,33 +137,26 @@ unsubscribe(Topic) when is_binary(Topic) -> case ets:lookup(?SUBOPTION, {SubPid, Topic}) of [{_, SubOpts}] -> _ = emqx_broker_helper:reclaim_seq(Topic), - case maps:get(share, SubOpts, undefined) of - undefined -> - case maps:get(shard, SubOpts, 0) of - 0 -> - true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), - ok = cast(pick(Topic), {unsubscribed, Topic}); - I -> - true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), - case ets:member(emqx_subscriber, {shard, Topic, I}) of - true -> ok; - false -> ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}}) - end, - ok = cast(pick({Topic, I}), {unsubscribed, Topic, I}) - end; - Group -> - ok = emqx_shared_sub:unsubscribe(Group, Topic, SubPid) - end, - true = ets:delete_object(?SUBSCRIPTION, {SubPid, Topic}), - %%true = ets:delete_object(?SUBID, {SubId, SubPid}), - true = ets:delete(?SUBOPTION, {SubPid, Topic}), - ok; + do_unsubscribe(Topic, SubPid, SubOpts); [] -> ok end. --spec(unsubscribe(emqx_topic:topic(), emqx_types:subid()) -> ok). -unsubscribe(Topic, _SubId) when is_binary(Topic) -> - unsubscribe(Topic). +do_unsubscribe(Topic, SubPid, SubOpts) -> + true = ets:delete(?SUBOPTION, {SubPid, Topic}), + true = ets:delete_object(?SUBSCRIPTION, {SubPid, Topic}), + Group = maps:get(share, SubOpts, undefined), + do_unsubscribe(Group, Topic, SubPid, SubOpts). + +do_unsubscribe(undefined, Topic, SubPid, SubOpts) -> + case maps:get(shard, SubOpts, 0) of + 0 -> true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), + cast(pick(Topic), {unsubscribed, Topic}); + I -> true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), + cast(pick({Topic, I}), {unsubscribed, Topic, I}) + end; + +do_unsubscribe(Group, Topic, SubPid, _SubOpts) -> + emqx_shared_sub:unsubscribe(Group, Topic, SubPid). %%------------------------------------------------------------------------------ %% Publish @@ -241,23 +241,28 @@ dispatch(Topic, Delivery = #delivery{message = Msg, results = Results}) -> inc_dropped_cnt(Topic), Delivery; [Sub] -> %% optimize? - dispatch(Sub, Topic, Msg), - Delivery#delivery{results = [{dispatch, Topic, 1}|Results]}; + Cnt = dispatch(Sub, Topic, Msg), + Delivery#delivery{results = [{dispatch, Topic, Cnt}|Results]}; Subs -> - Count = lists:foldl( - fun(Sub, Acc) -> - dispatch(Sub, Topic, Msg), Acc + 1 - end, 0, Subs), - Delivery#delivery{results = [{dispatch, Topic, Count}|Results]} + Cnt = lists:foldl( + fun(Sub, Acc) -> + dispatch(Sub, Topic, Msg) + Acc + end, 0, Subs), + Delivery#delivery{results = [{dispatch, Topic, Cnt}|Results]} end. dispatch(SubPid, Topic, Msg) when is_pid(SubPid) -> - SubPid ! {dispatch, Topic, Msg}; + case erlang:is_process_alive(SubPid) of + true -> + SubPid ! {dispatch, Topic, Msg}, + 1; + false -> 0 + end; dispatch({shard, I}, Topic, Msg) -> - - lists:foreach(fun(SubPid) -> - SubPid ! {dispatch, Topic, Msg} - end, safe_lookup_element(?SUBSCRIBER, {shard, Topic, I}, [])). + lists:foldl( + fun(SubPid, Cnt) -> + dispatch(SubPid, Topic, Msg) + Cnt + end, 0, subscribers({shard, Topic, I})). inc_dropped_cnt(<<"$SYS/", _/binary>>) -> ok; @@ -265,8 +270,10 @@ inc_dropped_cnt(_Topic) -> emqx_metrics:inc('messages/dropped'). -spec(subscribers(emqx_topic:topic()) -> [pid()]). -subscribers(Topic) -> - safe_lookup_element(?SUBSCRIBER, Topic, []). +subscribers(Topic) when is_binary(Topic) -> + lookup_value(?SUBSCRIBER, Topic, []); +subscribers(Shard = {shard, _Topic, _I}) -> + lookup_value(?SUBSCRIBER, Shard, []). %%------------------------------------------------------------------------------ %% Subscriber is down @@ -275,27 +282,21 @@ subscribers(Topic) -> -spec(subscriber_down(pid()) -> true). subscriber_down(SubPid) -> lists:foreach( - fun(Sub = {_Pid, Topic}) -> - case ets:lookup(?SUBOPTION, Sub) of - [{_, SubOpts}] -> + fun(Topic) -> + case lookup_value(?SUBOPTION, {SubPid, Topic}) of + SubOpts when is_map(SubOpts) -> _ = emqx_broker_helper:reclaim_seq(Topic), + true = ets:delete(?SUBOPTION, {SubPid, Topic}), case maps:get(shard, SubOpts, 0) of - 0 -> - true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), - ok = cast(pick(Topic), {unsubscribed, Topic}); - I -> - true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), - case ets:member(emqx_subscriber, {shard, Topic, I}) of - true -> ok; - false -> ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}}) - end, - ok = cast(pick({Topic, I}), {unsubscribed, Topic, I}) - end, - ets:delete(?SUBOPTION, Sub); - [] -> ok + 0 -> true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), + ok = cast(pick(Topic), {unsubscribed, Topic}); + I -> true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), + ok = cast(pick({Topic, I}), {unsubscribed, Topic, I}) + end; + undefined -> ok end - end, ets:lookup(?SUBSCRIPTION, SubPid)), - true = ets:delete(?SUBSCRIPTION, SubPid). + end, lookup_value(?SUBSCRIPTION, SubPid, [])), + ets:delete(?SUBSCRIPTION, SubPid). %%------------------------------------------------------------------------------ %% Management APIs @@ -303,20 +304,32 @@ subscriber_down(SubPid) -> -spec(subscriptions(pid() | emqx_types:subid()) -> [{emqx_topic:topic(), emqx_types:subopts()}]). -subscriptions(SubPid) -> - [{Topic, safe_lookup_element(?SUBOPTION, {SubPid, Topic}, #{})} - || Topic <- safe_lookup_element(?SUBSCRIPTION, SubPid, [])]. +subscriptions(SubPid) when is_pid(SubPid) -> + [{Topic, lookup_value(?SUBOPTION, {SubPid, Topic}, #{})} + || Topic <- lookup_value(?SUBSCRIPTION, SubPid, [])]; +subscriptions(SubId) -> + case emqx_broker_helper:lookup_subpid(SubId) of + SubPid when is_pid(SubPid) -> + subscriptions(SubPid); + undefined -> [] + end. -spec(subscribed(pid(), emqx_topic:topic()) -> boolean()). subscribed(SubPid, Topic) when is_pid(SubPid) -> ets:member(?SUBOPTION, {SubPid, Topic}); subscribed(SubId, Topic) when ?is_subid(SubId) -> - %%FIXME:... SubId -> SubPid - ets:member(?SUBOPTION, {SubId, Topic}). + SubPid = emqx_broker_helper:lookup_subpid(SubId), + ets:member(?SUBOPTION, {SubPid, Topic}). --spec(get_subopts(pid(), emqx_topic:topic()) -> emqx_types:subopts()). +-spec(get_subopts(pid(), emqx_topic:topic()) -> emqx_types:subopts() | undefined). get_subopts(SubPid, Topic) when is_pid(SubPid), is_binary(Topic) -> - safe_lookup_element(?SUBOPTION, {SubPid, Topic}, #{}). + lookup_value(?SUBOPTION, {SubPid, Topic}); +get_subopts(SubId, Topic) when ?is_subid(SubId) -> + case emqx_broker_helper:lookup_subpid(SubId) of + SubPid when is_pid(SubPid) -> + get_subopts(SubPid, Topic); + undefined -> undefined + end. -spec(set_subopts(emqx_topic:topic(), emqx_types:subopts()) -> boolean()). set_subopts(Topic, NewOpts) when is_binary(Topic), is_map(NewOpts) -> @@ -331,9 +344,6 @@ set_subopts(Topic, NewOpts) when is_binary(Topic), is_map(NewOpts) -> topics() -> emqx_router:topics(). -safe_lookup_element(Tab, Key, Def) -> - try ets:lookup_element(Tab, Key, 2) catch error:badarg -> Def end. - %%------------------------------------------------------------------------------ %% Stats fun %%------------------------------------------------------------------------------ @@ -372,10 +382,15 @@ init([Pool, Id]) -> {ok, #{pool => Pool, id => Id}}. handle_call({subscribe, Topic}, _From, State) -> - Ok = case get(Topic) of + Ok = emqx_router:do_add_route(Topic), + {reply, Ok, State}; + +handle_call({subscribe, Topic, I}, _From, State) -> + Ok = case get(Shard = {Topic, I}) of undefined -> - _ = put(Topic, true), - emqx_router:do_add_route(Topic); + _ = put(Shard, true), + true = ets:insert(?SUBSCRIBER, {Topic, {shard, I}}), + cast(pick(Topic), {subscribe, Topic}); true -> ok end, {reply, Ok, State}; @@ -384,11 +399,18 @@ handle_call(Req, _From, State) -> emqx_logger:error("[Broker] unexpected call: ~p", [Req]), {reply, ignored, State}. +handle_cast({subscribe, Topic}, State) -> + case emqx_router:do_add_route(Topic) of + ok -> ok; + {error, Reason} -> + emqx_logger:error("[Broker] Failed to add route: ~p", [Reason]) + end, + {noreply, State}; + handle_cast({unsubscribed, Topic}, State) -> case ets:member(?SUBSCRIBER, Topic) of false -> - _ = erase(Topic), - emqx_router:do_delete_route(Topic); + _ = emqx_router:do_delete_route(Topic); true -> ok end, {noreply, State}; @@ -396,6 +418,7 @@ handle_cast({unsubscribed, Topic}, State) -> handle_cast({unsubscribed, Topic, I}, State) -> case ets:member(?SUBSCRIBER, {shard, Topic, I}) of false -> + _ = erase({Topic, I}), true = ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}}), cast(pick(Topic), {unsubscribed, Topic}); true -> ok diff --git a/src/emqx_broker_helper.erl b/src/emqx_broker_helper.erl index d3e7f9d37..7d514e31d 100644 --- a/src/emqx_broker_helper.erl +++ b/src/emqx_broker_helper.erl @@ -16,44 +16,56 @@ -behaviour(gen_server). --compile({no_auto_import, [monitor/2]}). - -export([start_link/0]). --export([monitor/2]). --export([get_shard/2]). +-export([register_sub/2]). +-export([lookup_subid/1, lookup_subpid/1]). +-export([get_sub_shard/2]). -export([create_seq/1, reclaim_seq/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -define(HELPER, ?MODULE). +-define(SUBID, emqx_subid). -define(SUBMON, emqx_submon). -define(SUBSEQ, emqx_subseq). - --record(state, {pmon :: emqx_pmon:pmon()}). +-define(SHARD, 1024). -spec(start_link() -> emqx_types:startlink_ret()). start_link() -> gen_server:start_link({local, ?HELPER}, ?MODULE, [], []). --spec(monitor(pid(), emqx_types:subid()) -> ok). -monitor(SubPid, SubId) when is_pid(SubPid) -> +-spec(register_sub(pid(), emqx_types:subid()) -> ok). +register_sub(SubPid, SubId) when is_pid(SubPid) -> case ets:lookup(?SUBMON, SubPid) of [] -> - gen_server:cast(?HELPER, {monitor, SubPid, SubId}); + gen_server:cast(?HELPER, {register_sub, SubPid, SubId}); [{_, SubId}] -> ok; _Other -> error(subid_conflict) end. --spec(get_shard(pid(), emqx_topic:topic()) -> non_neg_integer()). -get_shard(SubPid, Topic) -> +-spec(lookup_subid(pid()) -> emqx_types:subid() | undefined). +lookup_subid(SubPid) when is_pid(SubPid) -> + emqx_tables:lookup_value(?SUBMON, SubPid). + +-spec(lookup_subpid(emqx_types:subid()) -> pid()). +lookup_subpid(SubId) -> + emqx_tables:lookup_value(?SUBID, SubId). + +-spec(get_sub_shard(pid(), emqx_topic:topic()) -> non_neg_integer()). +get_sub_shard(SubPid, Topic) -> case create_seq(Topic) of - Seq when Seq =< 1024 -> 0; - _Seq -> erlang:phash2(SubPid, ets:lookup_element(?SUBSEQ, shards, 2)) + Seq when Seq =< ?SHARD -> 0; + _ -> erlang:phash2(SubPid, shards_num()) + 1 end. +-spec(shards_num() -> pos_integer()). +shards_num() -> + %% Dynamic sharding later... + ets:lookup_element(?HELPER, shards, 2). + -spec(create_seq(emqx_topic:topic()) -> emqx_sequence:seqid()). create_seq(Topic) -> emqx_sequence:nextval(?SUBSEQ, Topic). @@ -67,41 +79,55 @@ reclaim_seq(Topic) -> %%------------------------------------------------------------------------------ init([]) -> + %% Helper table + ok = emqx_tables:new(?HELPER, [{read_concurrency, true}]), + %% Shards: CPU * 32 + true = ets:insert(?HELPER, {shards, emqx_vm:schedulers() * 32}), %% SubSeq: Topic -> SeqId ok = emqx_sequence:create(?SUBSEQ), - %% Shards: CPU * 32 - true = ets:insert(?SUBSEQ, {shards, emqx_vm:schedulers() * 32}), + %% SubId: SubId -> SubPid + ok = emqx_tables:new(?SUBID, [public, {read_concurrency, true}, {write_concurrency, true}]), %% SubMon: SubPid -> SubId - ok = emqx_tables:new(?SUBMON, [set, protected, {read_concurrency, true}]), + ok = emqx_tables:new(?SUBMON, [public, {read_concurrency, true}, {write_concurrency, true}]), %% Stats timer - emqx_stats:update_interval(broker_stats, fun emqx_broker:stats_fun/0), - {ok, #state{pmon = emqx_pmon:new()}, hibernate}. + ok = emqx_stats:update_interval(broker_stats, fun emqx_broker:stats_fun/0), + {ok, #{pmon => emqx_pmon:new()}}. handle_call(Req, _From, State) -> emqx_logger:error("[BrokerHelper] unexpected call: ~p", [Req]), - {reply, ignored, State}. + {reply, ignored, State}. -handle_cast({monitor, SubPid, SubId}, State = #state{pmon = PMon}) -> +handle_cast({register_sub, SubPid, SubId}, State = #{pmon := PMon}) -> + true = (SubId =:= undefined) orelse ets:insert(?SUBID, {SubId, SubPid}), true = ets:insert(?SUBMON, {SubPid, SubId}), - {noreply, State#state{pmon = emqx_pmon:monitor(SubPid, PMon)}}; + {noreply, State#{pmon := emqx_pmon:monitor(SubPid, PMon)}}; handle_cast(Msg, State) -> emqx_logger:error("[BrokerHelper] unexpected cast: ~p", [Msg]), {noreply, State}. -handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #state{pmon = PMon}) -> - true = ets:delete(?SUBMON, SubPid), - ok = emqx_pool:async_submit(fun emqx_broker:subscriber_down/1, [SubPid]), - {noreply, State#state{pmon = emqx_pmon:erase(SubPid, PMon)}}; +handle_info({'DOWN', _MRef, process, SubPid, Reason}, State = #{pmon := PMon}) -> + case ets:lookup(?SUBMON, SubPid) of + [{_, SubId}] -> + ok = emqx_pool:async_submit(fun subscriber_down/2, [SubPid, SubId]); + [] -> + emqx_logger:error("[BrokerHelper] unexpected DOWN: ~p, reason: ~p", [SubPid, Reason]) + end, + {noreply, State#{pmon := emqx_pmon:erase(SubPid, PMon)}}; handle_info(Info, State) -> emqx_logger:error("[BrokerHelper] unexpected info: ~p", [Info]), {noreply, State}. -terminate(_Reason, #state{}) -> - _ = emqx_sequence:delete(?SUBSEQ), +terminate(_Reason, _State) -> + true = emqx_sequence:delete(?SUBSEQ), emqx_stats:cancel_update(broker_stats). code_change(_OldVsn, State, _Extra) -> {ok, State}. +subscriber_down(SubPid, SubId) -> + true = ets:delete(?SUBMON, SubPid), + true = (SubId =:= undefined) orelse ets:delete_object(?SUBID, {SubId, SubPid}), + emqx_broker:subscriber_down(SubPid). + diff --git a/src/emqx_router_helper.erl b/src/emqx_router_helper.erl index efeaabc74..c32800a24 100644 --- a/src/emqx_router_helper.erl +++ b/src/emqx_router_helper.erl @@ -90,7 +90,7 @@ init([]) -> [Node | Acc] end end, [], mnesia:dirty_all_keys(?ROUTING_NODE)), - emqx_stats:update_interval(route_stats, fun ?MODULE:stats_fun/0), + ok = emqx_stats:update_interval(route_stats, fun ?MODULE:stats_fun/0), {ok, #{nodes => Nodes}, hibernate}. handle_call(Req, _From, State) -> diff --git a/src/emqx_sequence.erl b/src/emqx_sequence.erl index 022531df5..33bb5edda 100644 --- a/src/emqx_sequence.erl +++ b/src/emqx_sequence.erl @@ -51,6 +51,7 @@ reclaim(Name, Key) -> end. %% @doc Delete the sequence. +-spec(delete(name()) -> boolean()). delete(Name) -> case ets:info(Name, name) of Name -> ets:delete(Name); diff --git a/src/emqx_sm.erl b/src/emqx_sm.erl index d178a8ae7..637e44b0c 100644 --- a/src/emqx_sm.erl +++ b/src/emqx_sm.erl @@ -206,7 +206,7 @@ init([]) -> ok = emqx_tables:new(?SESSION_P_TAB, TabOpts), ok = emqx_tables:new(?SESSION_ATTRS_TAB, TabOpts), ok = emqx_tables:new(?SESSION_STATS_TAB, TabOpts), - emqx_stats:update_interval(sm_stats, fun ?MODULE:stats_fun/0), + ok = emqx_stats:update_interval(sm_stats, fun ?MODULE:stats_fun/0), {ok, #{session_pmon => emqx_pmon:new()}}. handle_call(Req, _From, State) -> diff --git a/src/emqx_tables.erl b/src/emqx_tables.erl index 9b3ebfeae..fdb106a99 100644 --- a/src/emqx_tables.erl +++ b/src/emqx_tables.erl @@ -15,6 +15,7 @@ -module(emqx_tables). -export([new/2]). +-export([lookup_value/2, lookup_value/3]). %% Create a named_table ets. -spec(new(atom(), list()) -> ok). @@ -26,3 +27,16 @@ new(Tab, Opts) -> Tab -> ok end. +%% KV lookup +-spec(lookup_value(atom(), term()) -> any()). +lookup_value(Tab, Key) -> + lookup_value(Tab, Key, undefined). + +-spec(lookup_value(atom(), term(), any()) -> any()). +lookup_value(Tab, Key, Def) -> + try + ets:lookup_element(Tab, Key, 2) + catch + error:badarg -> Def + end. + diff --git a/test/emqx_sequence_SUITE.erl b/test/emqx_sequence_SUITE.erl index f37b60d76..1ac0ea308 100644 --- a/test/emqx_sequence_SUITE.erl +++ b/test/emqx_sequence_SUITE.erl @@ -33,5 +33,6 @@ sequence_generate(_) -> ?assertEqual(1, reclaim(seqtab, key)), ?assertEqual(0, reclaim(seqtab, key)), ?assertEqual(false, ets:member(seqtab, key)), - ?assertEqual(1, nextval(seqtab, key)). + ?assertEqual(1, nextval(seqtab, key)), + ?assert(emqx_sequence:delete(seqtab). diff --git a/test/emqx_tables_SUITE.erl b/test/emqx_tables_SUITE.erl index 95590b0e9..1002c0a0b 100644 --- a/test/emqx_tables_SUITE.erl +++ b/test/emqx_tables_SUITE.erl @@ -20,7 +20,7 @@ all() -> [t_new]. t_new(_) -> - TId = emqx_tables:new(test_table, [{read_concurrency, true}]), - ets:insert(TId, {loss, 100}), - TId = emqx_tables:new(test_table, [{read_concurrency, true}]), - 100 = ets:lookup_element(TId, loss, 2). + ok = emqx_tables:new(test_table, [{read_concurrency, true}]), + ets:insert(test_table, {key, 100}), + ok = emqx_tables:new(test_table, [{read_concurrency, true}]), + 100 = ets:lookup_element(test_table, key, 2). From 99872b253fd2a64c60902143c166890706f184d0 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 12 Dec 2018 14:53:22 +0800 Subject: [PATCH 4/4] Fix 'function not exported' crash --- src/emqx_broker.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index 9ed4aad06..105589c65 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -96,7 +96,7 @@ subscribe(Topic, SubId, SubOpts) when is_binary(Topic), ?is_subid(SubId), is_map SubPid = self(), case ets:member(?SUBOPTION, {SubPid, Topic}) of false -> - ok = emqx_broker_helper:monitor_sub(SubPid, SubId), + ok = emqx_broker_helper:register_sub(SubPid, SubId), do_subscribe(Topic, SubPid, with_subid(SubId, SubOpts)); true -> ok end.