diff --git a/include/emqx.hrl b/include/emqx.hrl index a0b96c1c7..cb06118da 100644 --- a/include/emqx.hrl +++ b/include/emqx.hrl @@ -154,10 +154,7 @@ %% Route %%-------------------------------------------------------------------- --record(route, - { topic :: topic(), - dest :: {binary(), node()} | node() - }). +-record(route, { topic :: topic(), dest }). -type(route() :: #route{}). @@ -170,7 +167,7 @@ -record(trie_node, { node_id :: trie_node_id(), edge_count = 0 :: non_neg_integer(), - topic :: binary() | undefined, + topic :: topic() | undefined, flags :: list(atom()) }). diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index cb422b9e4..79f7911f7 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -32,30 +32,33 @@ -export([topics/0]). --export([getopts/2, setopts/3]). - --export([dump/0]). +-export([get_subopts/2, set_subopts/3]). %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {pool, id, subids :: map(), submon :: emqx_pmon:pmon()}). +-record(state, {pool, id, submon}). -define(BROKER, ?MODULE). -define(TIMEOUT, 120000). +%% ETS tables +-define(SUBOPTION, emqx_suboption). +-define(SUBSCRIBER, emqx_subscriber). +-define(SUBSCRIPTION, emqx_subscription). + %%-------------------------------------------------------------------- %% Start a broker %%-------------------------------------------------------------------- -spec(start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, term()}). start_link(Pool, Id) -> - gen_server:start_link(?MODULE, [Pool, Id], [{hibernate_after, 1000}]). + gen_server:start_link(?MODULE, [Pool, Id], [{hibernate_after, 2000}]). %%-------------------------------------------------------------------- -%% Sub/Unsub +%% Subscriber/Unsubscribe %%-------------------------------------------------------------------- -spec(subscribe(topic()) -> ok | {error, term()}). @@ -108,7 +111,7 @@ publish(Msg = #message{from = From}) -> end. publish(Topic, Msg) -> - route(emqx_router:match_routes(Topic), delivery(Msg)). + route(aggre(emqx_router:match_routes(Topic)), delivery(Msg)). route([], Delivery = #delivery{message = Msg}) -> emqx_hooks:run('message.dropped', [undefined, Msg]), @@ -120,14 +123,29 @@ route([{To, Node}], Delivery) when Node =:= node() -> route([{To, Node}], Delivery = #delivery{flows = Flows}) when is_atom(Node) -> forward(Node, To, Delivery#delivery{flows = [{route, Node, To}|Flows]}); -route([{To, Group}], Delivery) when is_binary(Group) -> - emqx_shared_sub:dispatch(Group, To, Delivery); +route([{To, Shared}], Delivery) when is_tuple(Shared); is_binary(Shared) -> + emqx_shared_sub:dispatch(Shared, To, Delivery); route(Routes, Delivery) -> lists:foldl(fun(Route, Acc) -> route([Route], Acc) end, Delivery, Routes). +aggre([]) -> + []; +aggre([{To, Dest}]) -> + [{To, Dest}]; +aggre(Routes) -> + lists:foldl( + fun({To, Node}, Acc) when is_atom(Node) -> + [{To, Node} | Acc]; + ({To, {Group, _Node}}, Acc) -> + lists:usort([{To, Group} | Acc]); + ({To, {Cluster, Group, _Node}}, Acc) -> + lists:usort([{To, {Cluster, Group}} | Acc]) + end, [], Routes). + %% @doc Forward message to another node. forward(Node, To, Delivery) -> + %% rpc:call to ensure the delivery, but the latency:( case emqx_rpc:call(Node, ?BROKER, dispatch, [To, Delivery]) of {badrpc, Reason} -> emqx_log:error("[Broker] Failed to forward msg to ~s: ~p", [Node, Reason]), @@ -169,33 +187,37 @@ delivery(Msg) -> #delivery{message = Msg, flows = []}. subscribers(Topic) -> - try ets:lookup_element(subscriber, Topic, 2) catch error:badarg -> [] end. + try ets:lookup_element(?SUBSCRIBER, Topic, 2) catch error:badarg -> [] end. subscriptions(Subscriber) -> lists:map(fun({_, {share, _Group, Topic}}) -> subscription(Topic, Subscriber); ({_, Topic}) -> subscription(Topic, Subscriber) - end, ets:lookup(subscription, Subscriber)). + end, ets:lookup(?SUBSCRIPTION, Subscriber)). subscription(Topic, Subscriber) -> - {Topic, ets:lookup_element(suboption, {Topic, Subscriber}, 2)}. + {Topic, ets:lookup_element(?SUBOPTION, {Topic, Subscriber}, 2)}. -spec(subscribed(topic(), subscriber()) -> boolean()). subscribed(Topic, SubPid) when is_binary(Topic), is_pid(SubPid) -> - ets:member(suboption, {Topic, SubPid}); + ets:member(?SUBOPTION, {Topic, SubPid}); subscribed(Topic, SubId) when is_binary(Topic), is_binary(SubId) -> - length(ets:match_object(suboption, {{Topic, {SubId, '_'}}, '_'}, 1)) == 1; -subscribed(Topic, {SubId, SubPid}) when is_binary(Topic), is_binary(SubId), is_pid(SubPid) -> - ets:member(suboption, {Topic, {SubId, SubPid}}). + length(ets:match_object(?SUBOPTION, {{Topic, {SubId, '_'}}, '_'}, 1)) == 1; +subscribed(Topic, {SubId, SubPid}) when is_binary(Topic), + is_binary(SubId), + is_pid(SubPid) -> + ets:member(?SUBOPTION, {Topic, {SubId, SubPid}}). topics() -> emqx_router:topics(). -getopts(Topic, Subscriber) when is_binary(Topic) -> - try ets:lookup_element(suboption, {Topic, Subscriber}, 2) catch error:badarg ->[] end. +get_subopts(Topic, Subscriber) when is_binary(Topic) -> + try ets:lookup_element(?SUBOPTION, {Topic, Subscriber}, 2) + catch error:badarg -> [] + end. -setopts(Topic, Subscriber, Opts) when is_binary(Topic), is_list(Opts) -> - gen_server:call(pick(Subscriber), {setopts, Topic, Subscriber, Opts}). +set_subopts(Topic, Subscriber, Opts) when is_binary(Topic), is_list(Opts) -> + gen_server:call(pick(Subscriber), {set_subopts, Topic, Subscriber, Opts}). with_subpid(SubPid) when is_pid(SubPid) -> SubPid; @@ -220,22 +242,19 @@ pick(SubId) when is_binary(SubId) -> pick({SubId, SubPid}) when is_binary(SubId), is_pid(SubPid) -> pick(SubId). -dump() -> - [{Tab, ets:tab2list(Tab)} || Tab <- [subscription, subscriber, suboption]]. - %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- init([Pool, Id]) -> gproc_pool:connect_worker(Pool, {Pool, Id}), - {ok, #state{pool = Pool, id = Id, subids = #{}, submon = emqx_pmon:new()}}. + {ok, #state{pool = Pool, id = Id, submon = emqx_pmon:new()}}. -handle_call({setopts, Topic, Subscriber, Opts}, _From, State) -> - case ets:lookup(suboption, {Topic, Subscriber}) of +handle_call({set_subopts, Topic, Subscriber, Opts}, _From, State) -> + case ets:lookup(?SUBOPTION, {Topic, Subscriber}) of [{_, OldOpts}] -> Opts1 = lists:usort(lists:umerge(Opts, OldOpts)), - ets:insert(suboption, {{Topic, Subscriber}, Opts1}), + ets:insert(?SUBOPTION, {{Topic, Subscriber}, Opts1}), {reply, ok, State}; [] -> {reply, {error, not_found}, State} @@ -246,12 +265,12 @@ handle_call(Request, _From, State) -> {reply, ignore, State}. handle_cast({From, {subscribe, Topic, Subscriber, Options}}, State) -> - case ets:lookup(suboption, {Topic, Subscriber}) of + case ets:lookup(?SUBOPTION, {Topic, Subscriber}) of [] -> Group = proplists:get_value(share, Options), true = do_subscribe(Group, Topic, Subscriber, Options), emqx_shared_sub:subscribe(Group, Topic, subpid(Subscriber)), - emqx_router:add_route(From, Topic, dest(Options)), + emqx_router:add_route(From, Topic, destination(Options)), {noreply, monitor_subscriber(Subscriber, State)}; [_] -> gen_server:reply(From, ok), @@ -259,13 +278,13 @@ handle_cast({From, {subscribe, Topic, Subscriber, Options}}, State) -> end; handle_cast({From, {unsubscribe, Topic, Subscriber}}, State) -> - case ets:lookup(suboption, {Topic, Subscriber}) of + case ets:lookup(?SUBOPTION, {Topic, Subscriber}) of [{_, Options}] -> Group = proplists:get_value(share, Options), true = do_unsubscribe(Group, Topic, Subscriber), emqx_shared_sub:unsubscribe(Group, Topic, subpid(Subscriber)), - case ets:member(subscriber, Topic) of - false -> emqx_router:del_route(From, Topic, dest(Options)); + case ets:member(?SUBSCRIBER, Topic) of + false -> emqx_router:del_route(From, Topic, destination(Options)); true -> gen_server:reply(From, ok) end; [] -> gen_server:reply(From, ok) @@ -276,23 +295,24 @@ handle_cast(Msg, State) -> emqx_log:error("[Broker] Unexpected msg: ~p", [Msg]), {noreply, State}. -handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #state{subids = SubIds}) -> - Subscriber = case maps:find(SubPid, SubIds) of - {ok, SubId} -> {SubId, SubPid}; - error -> SubPid +handle_info({'DOWN', _MRef, process, SubPid, _Reason}, + State = #state{submon = SubMon}) -> + Subscriber = case SubMon:find(SubPid) of + undefined -> SubPid; + SubId -> {SubId, SubPid} end, Topics = lists:map(fun({_, {share, _, Topic}}) -> Topic; ({_, Topic}) -> Topic - end, ets:lookup(subscription, Subscriber)), + end, ets:lookup(?SUBSCRIPTION, Subscriber)), lists:foreach(fun(Topic) -> - case ets:lookup(suboption, {Topic, Subscriber}) of + case ets:lookup(?SUBOPTION, {Topic, Subscriber}) of [{_, Options}] -> Group = proplists:get_value(share, Options), true = do_unsubscribe(Group, Topic, Subscriber), - case ets:member(subscriber, Topic) of - false -> emqx_router:del_route(Topic, dest(Options)); + case ets:member(?SUBSCRIBER, Topic) of + false -> emqx_router:del_route(Topic, destination(Options)); true -> ok end; [] -> ok @@ -315,25 +335,25 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- do_subscribe(Group, Topic, Subscriber, Options) -> - ets:insert(subscription, {Subscriber, shared(Group, Topic)}), - ets:insert(subscriber, {Topic, shared(Group, Subscriber)}), - ets:insert(suboption, {{Topic, Subscriber}, Options}). + ets:insert(?SUBSCRIPTION, {Subscriber, shared(Group, Topic)}), + ets:insert(?SUBSCRIBER, {Topic, shared(Group, Subscriber)}), + ets:insert(?SUBOPTION, {{Topic, Subscriber}, Options}). do_unsubscribe(Group, Topic, Subscriber) -> - ets:delete_object(subscription, {Subscriber, shared(Group, Topic)}), - ets:delete_object(subscriber, {Topic, shared(Group, Subscriber)}), - ets:delete(suboption, {Topic, Subscriber}). + ets:delete_object(?SUBSCRIPTION, {Subscriber, shared(Group, Topic)}), + ets:delete_object(?SUBSCRIBER, {Topic, shared(Group, Subscriber)}), + ets:delete(?SUBOPTION, {Topic, Subscriber}). monitor_subscriber(SubPid, State = #state{submon = SubMon}) when is_pid(SubPid) -> State#state{submon = SubMon:monitor(SubPid)}; -monitor_subscriber({SubId, SubPid}, State = #state{subids = SubIds, submon = SubMon}) -> - State#state{subids = maps:put(SubPid, SubId, SubIds), submon = SubMon:monitor(SubPid)}. +monitor_subscriber({SubId, SubPid}, State = #state{submon = SubMon}) -> + State#state{submon = SubMon:monitor(SubPid, SubId)}. -demonitor_subscriber(SubPid, State = #state{subids = SubIds, submon = SubMon}) -> - State#state{subids = maps:remove(SubPid, SubIds), submon = SubMon:demonitor(SubPid)}. +demonitor_subscriber(SubPid, State = #state{submon = SubMon}) -> + State#state{submon = SubMon:demonitor(SubPid)}. -dest(Options) -> +destination(Options) -> case proplists:get_value(share, Options) of undefined -> node(); Group -> {Group, node()} diff --git a/src/emqx_broker_sup.erl b/src/emqx_broker_sup.erl index b7e2b1c5b..84bde9c6d 100644 --- a/src/emqx_broker_sup.erl +++ b/src/emqx_broker_sup.erl @@ -33,7 +33,8 @@ start_link() -> init([]) -> %% Create the pubsub tables - lists:foreach(fun create_tab/1, [subscription, subscriber, suboption]), + lists:foreach(fun create_tab/1, + [subscription, subscriber, suboption]), %% Shared subscription Shared = {shared_sub, {emqx_shared_sub, start_link, []}, @@ -57,24 +58,17 @@ init([]) -> create_tab(suboption) -> %% Suboption: {Topic, Sub} -> [{qos, 1}] - ensure_tab(suboption, [set | ?CONCURRENCY_OPTS]); + emqx_tables:create(emqx_suboption, [public, set | ?CONCURRENCY_OPTS]); create_tab(subscriber) -> %% Subscriber: Topic -> Sub1, Sub2, Sub3, ..., SubN %% duplicate_bag: o(1) insert - ensure_tab(subscriber, [duplicate_bag | ?CONCURRENCY_OPTS]); + emqx_tables:create(emqx_subscriber, [public, duplicate_bag | ?CONCURRENCY_OPTS]); create_tab(subscription) -> %% Subscription: Sub -> Topic1, Topic2, Topic3, ..., TopicN %% bag: o(n) insert - ensure_tab(subscription, [bag | ?CONCURRENCY_OPTS]). - -ensure_tab(Tab, Opts) -> - case ets:info(Tab, name) of - undefined -> - ets:new(Tab, lists:usort([public, named_table | Opts])); - Tab -> Tab - end. + emqx_tables:create(emqx_subscription, [public, bag | ?CONCURRENCY_OPTS]). %%-------------------------------------------------------------------- %% Stats function @@ -83,8 +77,8 @@ ensure_tab(Tab, Opts) -> stats_fun() -> fun() -> emqx_stats:setstat('subscribers/count', 'subscribers/max', - ets:info(subscriber, size)), + ets:info(emqx_subscriber, size)), emqx_stats:setstat('subscriptions/count', 'subscriptions/max', - ets:info(subscription, size)) + ets:info(emqx_subscription, size)) end. diff --git a/src/emqx_router.erl b/src/emqx_router.erl index 58aba1e68..7d16d12d8 100644 --- a/src/emqx_router.erl +++ b/src/emqx_router.erl @@ -20,6 +20,8 @@ -include("emqx.hrl"). +-include_lib("ekka/include/ekka.hrl"). + %% Mnesia Bootstrap -export([mnesia/1]). @@ -32,42 +34,44 @@ %% Topics -export([topics/0]). -%% Route Management APIs +%% Route management APIs -export([add_route/2, add_route/3, get_routes/1, del_route/2, del_route/3]). -%% Match, print routes -export([has_routes/1, match_routes/1, print_routes/1]). --export([dump/0]). - %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +-type(group() :: binary()). + +-type(destination() :: node() | {group(), node()} + | {cluster(), group(), node()}). + -record(state, {pool, id}). --type(destination() :: node() | {binary(), node()}). +-define(ROUTE, emqx_route). %%-------------------------------------------------------------------- %% Mnesia Bootstrap %%-------------------------------------------------------------------- mnesia(boot) -> - ok = ekka_mnesia:create_table(route, [ + ok = ekka_mnesia:create_table(?ROUTE, [ {type, bag}, {ram_copies, [node()]}, {record_name, route}, {attributes, record_info(fields, route)}]); mnesia(copy) -> - ok = ekka_mnesia:copy_table(route). + ok = ekka_mnesia:copy_table(?ROUTE). %%-------------------------------------------------------------------- %% Start a router %%-------------------------------------------------------------------- start_link(Pool, Id) -> - gen_server:start_link(?MODULE, [Pool, Id], [{hibernate_after, 1000}]). + gen_server:start_link(?MODULE, [Pool, Id], [{hibernate_after, 10000}]). %%-------------------------------------------------------------------- %% Add/Del Routes @@ -85,7 +89,7 @@ add_route(From, Topic, Dest) when is_binary(Topic) -> %% @doc Get routes -spec(get_routes(topic()) -> [route()]). get_routes(Topic) -> - ets:lookup(route, Topic). + ets:lookup(?ROUTE, Topic). %% @doc Delete a route -spec(del_route(topic(), destination()) -> ok). @@ -99,45 +103,29 @@ del_route(From, Topic, Dest) when is_binary(Topic) -> %% @doc Has routes? -spec(has_routes(topic()) -> boolean()). has_routes(Topic) when is_binary(Topic) -> - ets:member(route, Topic). + ets:member(?ROUTE, Topic). %%-------------------------------------------------------------------- %% Topics %%-------------------------------------------------------------------- -spec(topics() -> list(binary())). -topics() -> - mnesia:dirty_all_keys(route). +topics() -> mnesia:dirty_all_keys(?ROUTE). %%-------------------------------------------------------------------- -%% Match Routes +%% Match routes %%-------------------------------------------------------------------- %% @doc Match routes +%% Optimize: routing table will be replicated to all router nodes. -spec(match_routes(Topic:: topic()) -> [{topic(), binary() | node()}]). match_routes(Topic) when is_binary(Topic) -> - %% Optimize: ets??? Matched = mnesia:ets(fun emqx_trie:match/1, [Topic]), - %% Optimize: route table will be replicated to all nodes. - aggre(lists:append([ets:lookup(route, To) || To <- [Topic | Matched]])). - -%% Aggregate routes -aggre([]) -> - []; -aggre([#route{topic = To, dest = Node}]) when is_atom(Node) -> - [{To, Node}]; -aggre([#route{topic = To, dest = {Group, _Node}}]) -> - [{To, Group}]; -aggre(Routes) -> - lists:foldl( - fun(#route{topic = To, dest = Node}, Acc) when is_atom(Node) -> - [{To, Node} | Acc]; - (#route{topic = To, dest = {Group, _}}, Acc) -> - lists:usort([{To, Group} | Acc]) - end, [], Routes). + Routes = [ets:lookup(?ROUTE, To) || To <- [Topic | Matched]], + [{To, Dest} || #route{topic = To, dest = Dest} <- lists:append(Routes)]. %%-------------------------------------------------------------------- -%% Print Routes +%% Print routes %%-------------------------------------------------------------------- %% @doc Print routes to a topic @@ -147,15 +135,16 @@ print_routes(Topic) -> io:format("~s -> ~s~n", [To, Dest]) end, match_routes(Topic)). +%%-------------------------------------------------------------------- +%% Utility functions +%%-------------------------------------------------------------------- + cast(Router, Msg) -> gen_server:cast(Router, Msg). pick(Topic) -> gproc_pool:pick_worker(router, Topic). -dump() -> - ets:tab2list(route). - %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- @@ -174,7 +163,7 @@ handle_cast({add_route, From, Route}, State) -> {noreply, State}; handle_cast({add_route, Route = #route{topic = Topic, dest = Dest}}, State) -> - case lists:member(Route, ets:lookup(route, Topic)) of + case lists:member(Route, ets:lookup(?ROUTE, Topic)) of true -> ok; false -> ok = emqx_router_helper:monitor(Dest), @@ -192,7 +181,7 @@ handle_cast({del_route, From, Route}, State) -> handle_cast({del_route, Route = #route{topic = Topic}}, State) -> %% Confirm if there are still subscribers... - case ets:member(subscriber, Topic) of + case ets:member(emqx_subscriber, Topic) of true -> ok; false -> case emqx_topic:wildcard(Topic) of @@ -206,7 +195,8 @@ handle_cast(Msg, State) -> emqx_log:error("[Router] Unexpected msg: ~p", [Msg]), {noreply, State}. -handle_info(_Info, State) -> +handle_info(Info, State) -> + emqx_log:error("[Router] Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, #state{pool = Pool, id = Id}) -> @@ -216,29 +206,29 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. %%-------------------------------------------------------------------- -%% Internal Functions +%% Internal functions %%-------------------------------------------------------------------- add_direct_route(Route) -> - mnesia:async_dirty(fun mnesia:write/1, [Route]). + mnesia:async_dirty(fun mnesia:write/3, [?ROUTE, Route, sticky_write]). add_trie_route(Route = #route{topic = Topic}) -> - case mnesia:wread({route, Topic}) of + case mnesia:wread({?ROUTE, Topic}) of [] -> emqx_trie:insert(Topic); _ -> ok end, - mnesia:write(Route). + mnesia:write(?ROUTE, Route, sticky_write). del_direct_route(Route) -> - mnesia:async_dirty(fun mnesia:delete_object/1, [Route]). + mnesia:async_dirty(fun mnesia:delete_object/3, [?ROUTE, Route, sticky_write]). del_trie_route(Route = #route{topic = Topic}) -> - case mnesia:wread({route, Topic}) of + case mnesia:wread({?ROUTE, Topic}) of [Route] -> %% Remove route and trie - mnesia:delete_object(Route), + mnesia:delete_object(?ROUTE, Route, sticky_write), emqx_trie:delete(Topic); [_|_] -> %% Remove route only - mnesia:delete_object(Route); + mnesia:delete_object(?ROUTE, Route, sticky_write); [] -> ok end. diff --git a/src/emqx_router_helper.erl b/src/emqx_router_helper.erl index 071bc7481..5a4de5f69 100644 --- a/src/emqx_router_helper.erl +++ b/src/emqx_router_helper.erl @@ -41,7 +41,7 @@ -define(SERVER, ?MODULE). --define(TABLE, routing_node). +-define(TAB, emqx_routing_node). -define(LOCK, {?MODULE, clean_routes}). @@ -50,14 +50,14 @@ %%-------------------------------------------------------------------- mnesia(boot) -> - ok = ekka_mnesia:create_table(?TABLE, [ + ok = ekka_mnesia:create_table(?TAB, [ {type, set}, {ram_copies, [node()]}, {record_name, routing_node}, {attributes, record_info(fields, routing_node)}]); mnesia(copy) -> - ok = ekka_mnesia:copy_table(?TABLE). + ok = ekka_mnesia:copy_table(?TAB). %%-------------------------------------------------------------------- %% API @@ -73,9 +73,10 @@ start_link(StatsFun) -> monitor({_Group, Node}) -> monitor(Node); monitor(Node) when is_atom(Node) -> - case ekka:is_member(Node) orelse ets:member(?TABLE, Node) of + case ekka:is_member(Node) orelse ets:member(?TAB, Node) of true -> ok; - false -> mnesia:dirty_write(#routing_node{name = Node, ts = os:timestamp()}) + false -> + mnesia:dirty_write(?TAB, #routing_node{name = Node, ts = os:timestamp()}) end. %%-------------------------------------------------------------------- @@ -84,7 +85,7 @@ monitor(Node) when is_atom(Node) -> init([StatsFun]) -> ekka:monitor(membership), - mnesia:subscribe({table, ?TABLE, simple}), + mnesia:subscribe({table, ?TAB, simple}), Nodes = lists:foldl( fun(Node, Acc) -> case ekka:is_member(Node) of @@ -92,7 +93,7 @@ init([StatsFun]) -> false -> _ = erlang:monitor_node(Node, true), [Node | Acc] end - end, [], mnesia:dirty_all_keys(?TABLE)), + end, [], mnesia:dirty_all_keys(?TAB)), {ok, TRef} = timer:send_interval(timer:seconds(1), stats), {ok, #state{nodes = Nodes, stats_fun = StatsFun, stats_timer = TRef}}. @@ -119,9 +120,9 @@ handle_info({mnesia_table_event, _Event}, State) -> handle_info({nodedown, Node}, State = #state{nodes = Nodes}) -> global:trans({?LOCK, self()}, fun() -> - mnesia:transaction(fun clean_routes/1, [Node]) + mnesia:transaction(fun cleanup_routes/1, [Node]) end), - mnesia:dirty_delete(routing_node, Node), + mnesia:dirty_delete(?TAB, Node), handle_info(stats, State#state{nodes = lists:delete(Node, Nodes)}); handle_info({membership, {mnesia, down, Node}}, State) -> @@ -131,7 +132,7 @@ handle_info({membership, _Event}, State) -> {noreply, State}; handle_info(stats, State = #state{stats_fun = StatsFun}) -> - ok = StatsFun(mnesia:table_info(route, size)), + ok = StatsFun(mnesia:table_info(emqx_route, size)), {noreply, State, hibernate}; handle_info(Info, State) -> @@ -141,7 +142,7 @@ handle_info(Info, State) -> terminate(_Reason, #state{stats_timer = TRef}) -> timer:cancel(TRef), ekka:unmonitor(membership), - mnesia:unsubscribe({table, ?TABLE, simple}). + mnesia:unsubscribe({table, ?TAB, simple}). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -150,9 +151,9 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%-------------------------------------------------------------------- -clean_routes(Node) -> +cleanup_routes(Node) -> Patterns = [#route{_ = '_', dest = Node}, #route{_ = '_', dest = {'_', Node}}], - [mnesia:delete_object(R) || P <- Patterns, - R <- mnesia:match_object(P)]. + [mnesia:delete_object(?TAB, R, write) + || Pat <- Patterns, R <- mnesia:match_object(?TAB, Pat, write)]. diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index 6b31cc864..a5a3a5d9f 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -35,7 +35,7 @@ -define(SERVER, ?MODULE). --define(TAB, shared_subscription). +-define(TAB, emqx_shared_subscription). -record(state, {pmon}). @@ -57,22 +57,30 @@ start_link() -> -spec(strategy() -> random | hash). strategy() -> - application:get_env(emqx, load_balancing_strategy, random). + emqx_config:get_env(shared_subscription_strategy, random). subscribe(undefined, _Topic, _SubPid) -> ok; subscribe(Group, Topic, SubPid) when is_pid(SubPid) -> - mnesia:dirty_write(r(Group, Topic, SubPid)), + mnesia:dirty_write(?TAB, r(Group, Topic, SubPid)), gen_server:cast(?SERVER, {monitor, SubPid}). unsubscribe(undefined, _Topic, _SubPid) -> ok; unsubscribe(Group, Topic, SubPid) when is_pid(SubPid) -> - mnesia:dirty_delete_object(r(Group, Topic, SubPid)). + mnesia:dirty_delete_object(?TAB, r(Group, Topic, SubPid)). r(Group, Topic, SubPid) -> #shared_subscription{group = Group, topic = Topic, subpid = SubPid}. +dispatch({Cluster, Group}, Topic, Delivery) -> + case ekka:cluster_name() of + Cluster -> + dispatch(Group, Topic, Delivery); + _ -> Delivery + end; + +%% TODO: ensure the delivery... dispatch(Group, Topic, Delivery = #delivery{message = Msg, flows = Flows}) -> case pick(subscribers(Group, Topic)) of false -> Delivery; @@ -90,8 +98,7 @@ pick(SubPids) -> lists:nth((X rem length(SubPids)) + 1, SubPids). subscribers(Group, Topic) -> - MP = {shared_subscription, Group, Topic, '$1'}, - ets:select(shared_subscription, [{MP, [], ['$1']}]). + ets:select(?TAB, [{{shared_subscription, Group, Topic, '$1'}, [], ['$1']}]). %%-------------------------------------------------------------------- %% gen_server callbacks @@ -134,7 +141,7 @@ handle_info({mnesia_table_event, _Event}, State) -> handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #state{pmon = PMon}) -> emqx_log:info("Shared subscription down: ~p", [SubPid]), - mnesia:transaction(fun clean_down/1, [SubPid]), + mnesia:async_dirty(fun cleanup_down/1, [SubPid]), {noreply, State#state{pmon = PMon:erase(SubPid)}}; handle_info(Info, State) -> @@ -151,7 +158,9 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%-------------------------------------------------------------------- -clean_down(SubPid) -> - MP = #shared_subscription{_ = '_', subpid = SubPid}, - lists:foreach(fun mnesia:delete_object/1, mnesia:match_object(MP)). +cleanup_down(SubPid) -> + Pat = #shared_subscription{_ = '_', subpid = SubPid}, + lists:foreach(fun(Record) -> + mnesia:delete_object(?TAB, Record) + end, mnesia:match_object(Pat)). diff --git a/src/emqx_trie.erl b/src/emqx_trie.erl index fc818a81b..a1b67561d 100644 --- a/src/emqx_trie.erl +++ b/src/emqx_trie.erl @@ -27,6 +27,10 @@ %% Trie API -export([insert/1, match/1, lookup/1, delete/1]). +%% Tables +-define(TRIE, emqx_trie). +-define(TRIE_NODE, emqx_trie_node). + %%-------------------------------------------------------------------- %% Mnesia Bootstrap %%-------------------------------------------------------------------- @@ -35,21 +39,21 @@ -spec(mnesia(boot | copy) -> ok). mnesia(boot) -> %% Trie Table - ok = ekka_mnesia:create_table(trie, [ + ok = ekka_mnesia:create_table(?TRIE, [ {ram_copies, [node()]}, {record_name, trie}, {attributes, record_info(fields, trie)}]), %% Trie Node Table - ok = ekka_mnesia:create_table(trie_node, [ + ok = ekka_mnesia:create_table(?TRIE_NODE, [ {ram_copies, [node()]}, {record_name, trie_node}, {attributes, record_info(fields, trie_node)}]); mnesia(copy) -> %% Copy Trie Table - ok = ekka_mnesia:copy_table(trie), + ok = ekka_mnesia:copy_table(?TRIE), %% Copy Trie Node Table - ok = ekka_mnesia:copy_table(trie_node). + ok = ekka_mnesia:copy_table(?TRIE_NODE). %%-------------------------------------------------------------------- %% Trie API @@ -58,7 +62,7 @@ mnesia(copy) -> %% @doc Insert a topic into the trie -spec(insert(Topic :: topic()) -> ok). insert(Topic) when is_binary(Topic) -> - case mnesia:read(trie_node, Topic) of + case mnesia:read(?TRIE_NODE, Topic) of [#trie_node{topic = Topic}] -> ok; [TrieNode = #trie_node{topic = undefined}] -> @@ -79,14 +83,14 @@ match(Topic) when is_binary(Topic) -> %% @doc Lookup a trie node -spec(lookup(NodeId :: binary()) -> [#trie_node{}]). lookup(NodeId) -> - mnesia:read(trie_node, NodeId). + mnesia:read(?TRIE_NODE, NodeId). %% @doc Delete a topic from the trie -spec(delete(Topic :: topic()) -> ok). delete(Topic) when is_binary(Topic) -> - case mnesia:read(trie_node, Topic) of + case mnesia:read(?TRIE_NODE, Topic) of [#trie_node{edge_count = 0}] -> - mnesia:delete({trie_node, Topic}), + mnesia:delete({?TRIE_NODE, Topic}), delete_path(lists:reverse(emqx_topic:triples(Topic))); [TrieNode] -> write_trie_node(TrieNode#trie_node{topic = undefined}); @@ -102,9 +106,9 @@ delete(Topic) when is_binary(Topic) -> %% @doc Add a path to the trie. add_path({Node, Word, Child}) -> Edge = #trie_edge{node_id = Node, word = Word}, - case mnesia:read(trie_node, Node) of + case mnesia:read(?TRIE_NODE, Node) of [TrieNode = #trie_node{edge_count = Count}] -> - case mnesia:wread({trie, Edge}) of + case mnesia:wread({?TRIE, Edge}) of [] -> write_trie_node(TrieNode#trie_node{edge_count = Count+1}), write_trie(#trie{edge = Edge, node_id = Child}); @@ -125,11 +129,11 @@ match_node(NodeId, Words) -> match_node(NodeId, Words, []). match_node(NodeId, [], ResAcc) -> - mnesia:read(trie_node, NodeId) ++ 'match_#'(NodeId, ResAcc); + mnesia:read(?TRIE_NODE, NodeId) ++ 'match_#'(NodeId, ResAcc); match_node(NodeId, [W|Words], ResAcc) -> lists:foldl(fun(WArg, Acc) -> - case mnesia:read(trie, #trie_edge{node_id = NodeId, word = WArg}) of + case mnesia:read(?TRIE, #trie_edge{node_id = NodeId, word = WArg}) of [#trie{node_id = ChildId}] -> match_node(ChildId, Words, Acc); [] -> Acc end @@ -138,9 +142,9 @@ match_node(NodeId, [W|Words], ResAcc) -> %% @private %% @doc Match node with '#'. 'match_#'(NodeId, ResAcc) -> - case mnesia:read(trie, #trie_edge{node_id = NodeId, word = '#'}) of + case mnesia:read(?TRIE, #trie_edge{node_id = NodeId, word = '#'}) of [#trie{node_id = ChildId}] -> - mnesia:read(trie_node, ChildId) ++ ResAcc; + mnesia:read(?TRIE_NODE, ChildId) ++ ResAcc; [] -> ResAcc end. @@ -150,10 +154,10 @@ match_node(NodeId, [W|Words], ResAcc) -> delete_path([]) -> ok; delete_path([{NodeId, Word, _} | RestPath]) -> - mnesia:delete({trie, #trie_edge{node_id = NodeId, word = Word}}), - case mnesia:read(trie_node, NodeId) of + mnesia:delete({?TRIE, #trie_edge{node_id = NodeId, word = Word}}), + case mnesia:read(?TRIE_NODE, NodeId) of [#trie_node{edge_count = 1, topic = undefined}] -> - mnesia:delete({trie_node, NodeId}), + mnesia:delete({?TRIE_NODE, NodeId}), delete_path(RestPath); [TrieNode = #trie_node{edge_count = 1, topic = _}] -> write_trie_node(TrieNode#trie_node{edge_count = 0}); @@ -163,11 +167,9 @@ delete_path([{NodeId, Word, _} | RestPath]) -> mnesia:abort({node_not_found, NodeId}) end. -%% @private write_trie(Trie) -> - mnesia:write(trie, Trie, write). + mnesia:write(?TRIE, Trie, write). -%% @private write_trie_node(TrieNode) -> - mnesia:write(trie_node, TrieNode, write). + mnesia:write(?TRIE_NODE, TrieNode, write). diff --git a/test/emqx_trie_SUITE.erl b/test/emqx_trie_SUITE.erl index e0b0b569d..87eb52fea 100644 --- a/test/emqx_trie_SUITE.erl +++ b/test/emqx_trie_SUITE.erl @@ -131,5 +131,5 @@ t_delete3(_) -> end). clear_tables() -> - lists:foreach(fun mnesia:clear_table/1, [trie, trie_node]). + lists:foreach(fun mnesia:clear_table/1, [emqx_trie, emqx_trie_node]).