Refactor the broker, router modules
This commit is contained in:
parent
bcdcb30af5
commit
bbb66ff92e
|
@ -154,10 +154,7 @@
|
||||||
%% Route
|
%% Route
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-record(route,
|
-record(route, { topic :: topic(), dest }).
|
||||||
{ topic :: topic(),
|
|
||||||
dest :: {binary(), node()} | node()
|
|
||||||
}).
|
|
||||||
|
|
||||||
-type(route() :: #route{}).
|
-type(route() :: #route{}).
|
||||||
|
|
||||||
|
@ -170,7 +167,7 @@
|
||||||
-record(trie_node,
|
-record(trie_node,
|
||||||
{ node_id :: trie_node_id(),
|
{ node_id :: trie_node_id(),
|
||||||
edge_count = 0 :: non_neg_integer(),
|
edge_count = 0 :: non_neg_integer(),
|
||||||
topic :: binary() | undefined,
|
topic :: topic() | undefined,
|
||||||
flags :: list(atom())
|
flags :: list(atom())
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
|
|
@ -32,30 +32,33 @@
|
||||||
|
|
||||||
-export([topics/0]).
|
-export([topics/0]).
|
||||||
|
|
||||||
-export([getopts/2, setopts/3]).
|
-export([get_subopts/2, set_subopts/3]).
|
||||||
|
|
||||||
-export([dump/0]).
|
|
||||||
|
|
||||||
%% gen_server Function Exports
|
%% gen_server Function Exports
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||||
terminate/2, code_change/3]).
|
terminate/2, code_change/3]).
|
||||||
|
|
||||||
-record(state, {pool, id, subids :: map(), submon :: emqx_pmon:pmon()}).
|
-record(state, {pool, id, submon}).
|
||||||
|
|
||||||
-define(BROKER, ?MODULE).
|
-define(BROKER, ?MODULE).
|
||||||
|
|
||||||
-define(TIMEOUT, 120000).
|
-define(TIMEOUT, 120000).
|
||||||
|
|
||||||
|
%% ETS tables
|
||||||
|
-define(SUBOPTION, emqx_suboption).
|
||||||
|
-define(SUBSCRIBER, emqx_subscriber).
|
||||||
|
-define(SUBSCRIPTION, emqx_subscription).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Start a broker
|
%% Start a broker
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-spec(start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, term()}).
|
-spec(start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, term()}).
|
||||||
start_link(Pool, Id) ->
|
start_link(Pool, Id) ->
|
||||||
gen_server:start_link(?MODULE, [Pool, Id], [{hibernate_after, 1000}]).
|
gen_server:start_link(?MODULE, [Pool, Id], [{hibernate_after, 2000}]).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Sub/Unsub
|
%% Subscriber/Unsubscribe
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-spec(subscribe(topic()) -> ok | {error, term()}).
|
-spec(subscribe(topic()) -> ok | {error, term()}).
|
||||||
|
@ -108,7 +111,7 @@ publish(Msg = #message{from = From}) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
publish(Topic, Msg) ->
|
publish(Topic, Msg) ->
|
||||||
route(emqx_router:match_routes(Topic), delivery(Msg)).
|
route(aggre(emqx_router:match_routes(Topic)), delivery(Msg)).
|
||||||
|
|
||||||
route([], Delivery = #delivery{message = Msg}) ->
|
route([], Delivery = #delivery{message = Msg}) ->
|
||||||
emqx_hooks:run('message.dropped', [undefined, Msg]),
|
emqx_hooks:run('message.dropped', [undefined, Msg]),
|
||||||
|
@ -120,14 +123,29 @@ route([{To, Node}], Delivery) when Node =:= node() ->
|
||||||
route([{To, Node}], Delivery = #delivery{flows = Flows}) when is_atom(Node) ->
|
route([{To, Node}], Delivery = #delivery{flows = Flows}) when is_atom(Node) ->
|
||||||
forward(Node, To, Delivery#delivery{flows = [{route, Node, To}|Flows]});
|
forward(Node, To, Delivery#delivery{flows = [{route, Node, To}|Flows]});
|
||||||
|
|
||||||
route([{To, Group}], Delivery) when is_binary(Group) ->
|
route([{To, Shared}], Delivery) when is_tuple(Shared); is_binary(Shared) ->
|
||||||
emqx_shared_sub:dispatch(Group, To, Delivery);
|
emqx_shared_sub:dispatch(Shared, To, Delivery);
|
||||||
|
|
||||||
route(Routes, Delivery) ->
|
route(Routes, Delivery) ->
|
||||||
lists:foldl(fun(Route, Acc) -> route([Route], Acc) end, Delivery, Routes).
|
lists:foldl(fun(Route, Acc) -> route([Route], Acc) end, Delivery, Routes).
|
||||||
|
|
||||||
|
aggre([]) ->
|
||||||
|
[];
|
||||||
|
aggre([{To, Dest}]) ->
|
||||||
|
[{To, Dest}];
|
||||||
|
aggre(Routes) ->
|
||||||
|
lists:foldl(
|
||||||
|
fun({To, Node}, Acc) when is_atom(Node) ->
|
||||||
|
[{To, Node} | Acc];
|
||||||
|
({To, {Group, _Node}}, Acc) ->
|
||||||
|
lists:usort([{To, Group} | Acc]);
|
||||||
|
({To, {Cluster, Group, _Node}}, Acc) ->
|
||||||
|
lists:usort([{To, {Cluster, Group}} | Acc])
|
||||||
|
end, [], Routes).
|
||||||
|
|
||||||
%% @doc Forward message to another node.
|
%% @doc Forward message to another node.
|
||||||
forward(Node, To, Delivery) ->
|
forward(Node, To, Delivery) ->
|
||||||
|
%% rpc:call to ensure the delivery, but the latency:(
|
||||||
case emqx_rpc:call(Node, ?BROKER, dispatch, [To, Delivery]) of
|
case emqx_rpc:call(Node, ?BROKER, dispatch, [To, Delivery]) of
|
||||||
{badrpc, Reason} ->
|
{badrpc, Reason} ->
|
||||||
emqx_log:error("[Broker] Failed to forward msg to ~s: ~p", [Node, Reason]),
|
emqx_log:error("[Broker] Failed to forward msg to ~s: ~p", [Node, Reason]),
|
||||||
|
@ -169,33 +187,37 @@ delivery(Msg) ->
|
||||||
#delivery{message = Msg, flows = []}.
|
#delivery{message = Msg, flows = []}.
|
||||||
|
|
||||||
subscribers(Topic) ->
|
subscribers(Topic) ->
|
||||||
try ets:lookup_element(subscriber, Topic, 2) catch error:badarg -> [] end.
|
try ets:lookup_element(?SUBSCRIBER, Topic, 2) catch error:badarg -> [] end.
|
||||||
|
|
||||||
subscriptions(Subscriber) ->
|
subscriptions(Subscriber) ->
|
||||||
lists:map(fun({_, {share, _Group, Topic}}) ->
|
lists:map(fun({_, {share, _Group, Topic}}) ->
|
||||||
subscription(Topic, Subscriber);
|
subscription(Topic, Subscriber);
|
||||||
({_, Topic}) ->
|
({_, Topic}) ->
|
||||||
subscription(Topic, Subscriber)
|
subscription(Topic, Subscriber)
|
||||||
end, ets:lookup(subscription, Subscriber)).
|
end, ets:lookup(?SUBSCRIPTION, Subscriber)).
|
||||||
|
|
||||||
subscription(Topic, Subscriber) ->
|
subscription(Topic, Subscriber) ->
|
||||||
{Topic, ets:lookup_element(suboption, {Topic, Subscriber}, 2)}.
|
{Topic, ets:lookup_element(?SUBOPTION, {Topic, Subscriber}, 2)}.
|
||||||
|
|
||||||
-spec(subscribed(topic(), subscriber()) -> boolean()).
|
-spec(subscribed(topic(), subscriber()) -> boolean()).
|
||||||
subscribed(Topic, SubPid) when is_binary(Topic), is_pid(SubPid) ->
|
subscribed(Topic, SubPid) when is_binary(Topic), is_pid(SubPid) ->
|
||||||
ets:member(suboption, {Topic, SubPid});
|
ets:member(?SUBOPTION, {Topic, SubPid});
|
||||||
subscribed(Topic, SubId) when is_binary(Topic), is_binary(SubId) ->
|
subscribed(Topic, SubId) when is_binary(Topic), is_binary(SubId) ->
|
||||||
length(ets:match_object(suboption, {{Topic, {SubId, '_'}}, '_'}, 1)) == 1;
|
length(ets:match_object(?SUBOPTION, {{Topic, {SubId, '_'}}, '_'}, 1)) == 1;
|
||||||
subscribed(Topic, {SubId, SubPid}) when is_binary(Topic), is_binary(SubId), is_pid(SubPid) ->
|
subscribed(Topic, {SubId, SubPid}) when is_binary(Topic),
|
||||||
ets:member(suboption, {Topic, {SubId, SubPid}}).
|
is_binary(SubId),
|
||||||
|
is_pid(SubPid) ->
|
||||||
|
ets:member(?SUBOPTION, {Topic, {SubId, SubPid}}).
|
||||||
|
|
||||||
topics() -> emqx_router:topics().
|
topics() -> emqx_router:topics().
|
||||||
|
|
||||||
getopts(Topic, Subscriber) when is_binary(Topic) ->
|
get_subopts(Topic, Subscriber) when is_binary(Topic) ->
|
||||||
try ets:lookup_element(suboption, {Topic, Subscriber}, 2) catch error:badarg ->[] end.
|
try ets:lookup_element(?SUBOPTION, {Topic, Subscriber}, 2)
|
||||||
|
catch error:badarg -> []
|
||||||
|
end.
|
||||||
|
|
||||||
setopts(Topic, Subscriber, Opts) when is_binary(Topic), is_list(Opts) ->
|
set_subopts(Topic, Subscriber, Opts) when is_binary(Topic), is_list(Opts) ->
|
||||||
gen_server:call(pick(Subscriber), {setopts, Topic, Subscriber, Opts}).
|
gen_server:call(pick(Subscriber), {set_subopts, Topic, Subscriber, Opts}).
|
||||||
|
|
||||||
with_subpid(SubPid) when is_pid(SubPid) ->
|
with_subpid(SubPid) when is_pid(SubPid) ->
|
||||||
SubPid;
|
SubPid;
|
||||||
|
@ -220,22 +242,19 @@ pick(SubId) when is_binary(SubId) ->
|
||||||
pick({SubId, SubPid}) when is_binary(SubId), is_pid(SubPid) ->
|
pick({SubId, SubPid}) when is_binary(SubId), is_pid(SubPid) ->
|
||||||
pick(SubId).
|
pick(SubId).
|
||||||
|
|
||||||
dump() ->
|
|
||||||
[{Tab, ets:tab2list(Tab)} || Tab <- [subscription, subscriber, suboption]].
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
init([Pool, Id]) ->
|
init([Pool, Id]) ->
|
||||||
gproc_pool:connect_worker(Pool, {Pool, Id}),
|
gproc_pool:connect_worker(Pool, {Pool, Id}),
|
||||||
{ok, #state{pool = Pool, id = Id, subids = #{}, submon = emqx_pmon:new()}}.
|
{ok, #state{pool = Pool, id = Id, submon = emqx_pmon:new()}}.
|
||||||
|
|
||||||
handle_call({setopts, Topic, Subscriber, Opts}, _From, State) ->
|
handle_call({set_subopts, Topic, Subscriber, Opts}, _From, State) ->
|
||||||
case ets:lookup(suboption, {Topic, Subscriber}) of
|
case ets:lookup(?SUBOPTION, {Topic, Subscriber}) of
|
||||||
[{_, OldOpts}] ->
|
[{_, OldOpts}] ->
|
||||||
Opts1 = lists:usort(lists:umerge(Opts, OldOpts)),
|
Opts1 = lists:usort(lists:umerge(Opts, OldOpts)),
|
||||||
ets:insert(suboption, {{Topic, Subscriber}, Opts1}),
|
ets:insert(?SUBOPTION, {{Topic, Subscriber}, Opts1}),
|
||||||
{reply, ok, State};
|
{reply, ok, State};
|
||||||
[] ->
|
[] ->
|
||||||
{reply, {error, not_found}, State}
|
{reply, {error, not_found}, State}
|
||||||
|
@ -246,12 +265,12 @@ handle_call(Request, _From, State) ->
|
||||||
{reply, ignore, State}.
|
{reply, ignore, State}.
|
||||||
|
|
||||||
handle_cast({From, {subscribe, Topic, Subscriber, Options}}, State) ->
|
handle_cast({From, {subscribe, Topic, Subscriber, Options}}, State) ->
|
||||||
case ets:lookup(suboption, {Topic, Subscriber}) of
|
case ets:lookup(?SUBOPTION, {Topic, Subscriber}) of
|
||||||
[] ->
|
[] ->
|
||||||
Group = proplists:get_value(share, Options),
|
Group = proplists:get_value(share, Options),
|
||||||
true = do_subscribe(Group, Topic, Subscriber, Options),
|
true = do_subscribe(Group, Topic, Subscriber, Options),
|
||||||
emqx_shared_sub:subscribe(Group, Topic, subpid(Subscriber)),
|
emqx_shared_sub:subscribe(Group, Topic, subpid(Subscriber)),
|
||||||
emqx_router:add_route(From, Topic, dest(Options)),
|
emqx_router:add_route(From, Topic, destination(Options)),
|
||||||
{noreply, monitor_subscriber(Subscriber, State)};
|
{noreply, monitor_subscriber(Subscriber, State)};
|
||||||
[_] ->
|
[_] ->
|
||||||
gen_server:reply(From, ok),
|
gen_server:reply(From, ok),
|
||||||
|
@ -259,13 +278,13 @@ handle_cast({From, {subscribe, Topic, Subscriber, Options}}, State) ->
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_cast({From, {unsubscribe, Topic, Subscriber}}, State) ->
|
handle_cast({From, {unsubscribe, Topic, Subscriber}}, State) ->
|
||||||
case ets:lookup(suboption, {Topic, Subscriber}) of
|
case ets:lookup(?SUBOPTION, {Topic, Subscriber}) of
|
||||||
[{_, Options}] ->
|
[{_, Options}] ->
|
||||||
Group = proplists:get_value(share, Options),
|
Group = proplists:get_value(share, Options),
|
||||||
true = do_unsubscribe(Group, Topic, Subscriber),
|
true = do_unsubscribe(Group, Topic, Subscriber),
|
||||||
emqx_shared_sub:unsubscribe(Group, Topic, subpid(Subscriber)),
|
emqx_shared_sub:unsubscribe(Group, Topic, subpid(Subscriber)),
|
||||||
case ets:member(subscriber, Topic) of
|
case ets:member(?SUBSCRIBER, Topic) of
|
||||||
false -> emqx_router:del_route(From, Topic, dest(Options));
|
false -> emqx_router:del_route(From, Topic, destination(Options));
|
||||||
true -> gen_server:reply(From, ok)
|
true -> gen_server:reply(From, ok)
|
||||||
end;
|
end;
|
||||||
[] -> gen_server:reply(From, ok)
|
[] -> gen_server:reply(From, ok)
|
||||||
|
@ -276,23 +295,24 @@ handle_cast(Msg, State) ->
|
||||||
emqx_log:error("[Broker] Unexpected msg: ~p", [Msg]),
|
emqx_log:error("[Broker] Unexpected msg: ~p", [Msg]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #state{subids = SubIds}) ->
|
handle_info({'DOWN', _MRef, process, SubPid, _Reason},
|
||||||
Subscriber = case maps:find(SubPid, SubIds) of
|
State = #state{submon = SubMon}) ->
|
||||||
{ok, SubId} -> {SubId, SubPid};
|
Subscriber = case SubMon:find(SubPid) of
|
||||||
error -> SubPid
|
undefined -> SubPid;
|
||||||
|
SubId -> {SubId, SubPid}
|
||||||
end,
|
end,
|
||||||
Topics = lists:map(fun({_, {share, _, Topic}}) ->
|
Topics = lists:map(fun({_, {share, _, Topic}}) ->
|
||||||
Topic;
|
Topic;
|
||||||
({_, Topic}) ->
|
({_, Topic}) ->
|
||||||
Topic
|
Topic
|
||||||
end, ets:lookup(subscription, Subscriber)),
|
end, ets:lookup(?SUBSCRIPTION, Subscriber)),
|
||||||
lists:foreach(fun(Topic) ->
|
lists:foreach(fun(Topic) ->
|
||||||
case ets:lookup(suboption, {Topic, Subscriber}) of
|
case ets:lookup(?SUBOPTION, {Topic, Subscriber}) of
|
||||||
[{_, Options}] ->
|
[{_, Options}] ->
|
||||||
Group = proplists:get_value(share, Options),
|
Group = proplists:get_value(share, Options),
|
||||||
true = do_unsubscribe(Group, Topic, Subscriber),
|
true = do_unsubscribe(Group, Topic, Subscriber),
|
||||||
case ets:member(subscriber, Topic) of
|
case ets:member(?SUBSCRIBER, Topic) of
|
||||||
false -> emqx_router:del_route(Topic, dest(Options));
|
false -> emqx_router:del_route(Topic, destination(Options));
|
||||||
true -> ok
|
true -> ok
|
||||||
end;
|
end;
|
||||||
[] -> ok
|
[] -> ok
|
||||||
|
@ -315,25 +335,25 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
do_subscribe(Group, Topic, Subscriber, Options) ->
|
do_subscribe(Group, Topic, Subscriber, Options) ->
|
||||||
ets:insert(subscription, {Subscriber, shared(Group, Topic)}),
|
ets:insert(?SUBSCRIPTION, {Subscriber, shared(Group, Topic)}),
|
||||||
ets:insert(subscriber, {Topic, shared(Group, Subscriber)}),
|
ets:insert(?SUBSCRIBER, {Topic, shared(Group, Subscriber)}),
|
||||||
ets:insert(suboption, {{Topic, Subscriber}, Options}).
|
ets:insert(?SUBOPTION, {{Topic, Subscriber}, Options}).
|
||||||
|
|
||||||
do_unsubscribe(Group, Topic, Subscriber) ->
|
do_unsubscribe(Group, Topic, Subscriber) ->
|
||||||
ets:delete_object(subscription, {Subscriber, shared(Group, Topic)}),
|
ets:delete_object(?SUBSCRIPTION, {Subscriber, shared(Group, Topic)}),
|
||||||
ets:delete_object(subscriber, {Topic, shared(Group, Subscriber)}),
|
ets:delete_object(?SUBSCRIBER, {Topic, shared(Group, Subscriber)}),
|
||||||
ets:delete(suboption, {Topic, Subscriber}).
|
ets:delete(?SUBOPTION, {Topic, Subscriber}).
|
||||||
|
|
||||||
monitor_subscriber(SubPid, State = #state{submon = SubMon}) when is_pid(SubPid) ->
|
monitor_subscriber(SubPid, State = #state{submon = SubMon}) when is_pid(SubPid) ->
|
||||||
State#state{submon = SubMon:monitor(SubPid)};
|
State#state{submon = SubMon:monitor(SubPid)};
|
||||||
|
|
||||||
monitor_subscriber({SubId, SubPid}, State = #state{subids = SubIds, submon = SubMon}) ->
|
monitor_subscriber({SubId, SubPid}, State = #state{submon = SubMon}) ->
|
||||||
State#state{subids = maps:put(SubPid, SubId, SubIds), submon = SubMon:monitor(SubPid)}.
|
State#state{submon = SubMon:monitor(SubPid, SubId)}.
|
||||||
|
|
||||||
demonitor_subscriber(SubPid, State = #state{subids = SubIds, submon = SubMon}) ->
|
demonitor_subscriber(SubPid, State = #state{submon = SubMon}) ->
|
||||||
State#state{subids = maps:remove(SubPid, SubIds), submon = SubMon:demonitor(SubPid)}.
|
State#state{submon = SubMon:demonitor(SubPid)}.
|
||||||
|
|
||||||
dest(Options) ->
|
destination(Options) ->
|
||||||
case proplists:get_value(share, Options) of
|
case proplists:get_value(share, Options) of
|
||||||
undefined -> node();
|
undefined -> node();
|
||||||
Group -> {Group, node()}
|
Group -> {Group, node()}
|
||||||
|
|
|
@ -33,7 +33,8 @@ start_link() ->
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
%% Create the pubsub tables
|
%% Create the pubsub tables
|
||||||
lists:foreach(fun create_tab/1, [subscription, subscriber, suboption]),
|
lists:foreach(fun create_tab/1,
|
||||||
|
[subscription, subscriber, suboption]),
|
||||||
|
|
||||||
%% Shared subscription
|
%% Shared subscription
|
||||||
Shared = {shared_sub, {emqx_shared_sub, start_link, []},
|
Shared = {shared_sub, {emqx_shared_sub, start_link, []},
|
||||||
|
@ -57,24 +58,17 @@ init([]) ->
|
||||||
|
|
||||||
create_tab(suboption) ->
|
create_tab(suboption) ->
|
||||||
%% Suboption: {Topic, Sub} -> [{qos, 1}]
|
%% Suboption: {Topic, Sub} -> [{qos, 1}]
|
||||||
ensure_tab(suboption, [set | ?CONCURRENCY_OPTS]);
|
emqx_tables:create(emqx_suboption, [public, set | ?CONCURRENCY_OPTS]);
|
||||||
|
|
||||||
create_tab(subscriber) ->
|
create_tab(subscriber) ->
|
||||||
%% Subscriber: Topic -> Sub1, Sub2, Sub3, ..., SubN
|
%% Subscriber: Topic -> Sub1, Sub2, Sub3, ..., SubN
|
||||||
%% duplicate_bag: o(1) insert
|
%% duplicate_bag: o(1) insert
|
||||||
ensure_tab(subscriber, [duplicate_bag | ?CONCURRENCY_OPTS]);
|
emqx_tables:create(emqx_subscriber, [public, duplicate_bag | ?CONCURRENCY_OPTS]);
|
||||||
|
|
||||||
create_tab(subscription) ->
|
create_tab(subscription) ->
|
||||||
%% Subscription: Sub -> Topic1, Topic2, Topic3, ..., TopicN
|
%% Subscription: Sub -> Topic1, Topic2, Topic3, ..., TopicN
|
||||||
%% bag: o(n) insert
|
%% bag: o(n) insert
|
||||||
ensure_tab(subscription, [bag | ?CONCURRENCY_OPTS]).
|
emqx_tables:create(emqx_subscription, [public, bag | ?CONCURRENCY_OPTS]).
|
||||||
|
|
||||||
ensure_tab(Tab, Opts) ->
|
|
||||||
case ets:info(Tab, name) of
|
|
||||||
undefined ->
|
|
||||||
ets:new(Tab, lists:usort([public, named_table | Opts]));
|
|
||||||
Tab -> Tab
|
|
||||||
end.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Stats function
|
%% Stats function
|
||||||
|
@ -83,8 +77,8 @@ ensure_tab(Tab, Opts) ->
|
||||||
stats_fun() ->
|
stats_fun() ->
|
||||||
fun() ->
|
fun() ->
|
||||||
emqx_stats:setstat('subscribers/count', 'subscribers/max',
|
emqx_stats:setstat('subscribers/count', 'subscribers/max',
|
||||||
ets:info(subscriber, size)),
|
ets:info(emqx_subscriber, size)),
|
||||||
emqx_stats:setstat('subscriptions/count', 'subscriptions/max',
|
emqx_stats:setstat('subscriptions/count', 'subscriptions/max',
|
||||||
ets:info(subscription, size))
|
ets:info(emqx_subscription, size))
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -20,6 +20,8 @@
|
||||||
|
|
||||||
-include("emqx.hrl").
|
-include("emqx.hrl").
|
||||||
|
|
||||||
|
-include_lib("ekka/include/ekka.hrl").
|
||||||
|
|
||||||
%% Mnesia Bootstrap
|
%% Mnesia Bootstrap
|
||||||
-export([mnesia/1]).
|
-export([mnesia/1]).
|
||||||
|
|
||||||
|
@ -32,42 +34,44 @@
|
||||||
%% Topics
|
%% Topics
|
||||||
-export([topics/0]).
|
-export([topics/0]).
|
||||||
|
|
||||||
%% Route Management APIs
|
%% Route management APIs
|
||||||
-export([add_route/2, add_route/3, get_routes/1, del_route/2, del_route/3]).
|
-export([add_route/2, add_route/3, get_routes/1, del_route/2, del_route/3]).
|
||||||
|
|
||||||
%% Match, print routes
|
|
||||||
-export([has_routes/1, match_routes/1, print_routes/1]).
|
-export([has_routes/1, match_routes/1, print_routes/1]).
|
||||||
|
|
||||||
-export([dump/0]).
|
|
||||||
|
|
||||||
%% gen_server Function Exports
|
%% gen_server Function Exports
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||||
terminate/2, code_change/3]).
|
terminate/2, code_change/3]).
|
||||||
|
|
||||||
|
-type(group() :: binary()).
|
||||||
|
|
||||||
|
-type(destination() :: node() | {group(), node()}
|
||||||
|
| {cluster(), group(), node()}).
|
||||||
|
|
||||||
-record(state, {pool, id}).
|
-record(state, {pool, id}).
|
||||||
|
|
||||||
-type(destination() :: node() | {binary(), node()}).
|
-define(ROUTE, emqx_route).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Mnesia Bootstrap
|
%% Mnesia Bootstrap
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
mnesia(boot) ->
|
mnesia(boot) ->
|
||||||
ok = ekka_mnesia:create_table(route, [
|
ok = ekka_mnesia:create_table(?ROUTE, [
|
||||||
{type, bag},
|
{type, bag},
|
||||||
{ram_copies, [node()]},
|
{ram_copies, [node()]},
|
||||||
{record_name, route},
|
{record_name, route},
|
||||||
{attributes, record_info(fields, route)}]);
|
{attributes, record_info(fields, route)}]);
|
||||||
|
|
||||||
mnesia(copy) ->
|
mnesia(copy) ->
|
||||||
ok = ekka_mnesia:copy_table(route).
|
ok = ekka_mnesia:copy_table(?ROUTE).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Start a router
|
%% Start a router
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
start_link(Pool, Id) ->
|
start_link(Pool, Id) ->
|
||||||
gen_server:start_link(?MODULE, [Pool, Id], [{hibernate_after, 1000}]).
|
gen_server:start_link(?MODULE, [Pool, Id], [{hibernate_after, 10000}]).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Add/Del Routes
|
%% Add/Del Routes
|
||||||
|
@ -85,7 +89,7 @@ add_route(From, Topic, Dest) when is_binary(Topic) ->
|
||||||
%% @doc Get routes
|
%% @doc Get routes
|
||||||
-spec(get_routes(topic()) -> [route()]).
|
-spec(get_routes(topic()) -> [route()]).
|
||||||
get_routes(Topic) ->
|
get_routes(Topic) ->
|
||||||
ets:lookup(route, Topic).
|
ets:lookup(?ROUTE, Topic).
|
||||||
|
|
||||||
%% @doc Delete a route
|
%% @doc Delete a route
|
||||||
-spec(del_route(topic(), destination()) -> ok).
|
-spec(del_route(topic(), destination()) -> ok).
|
||||||
|
@ -99,45 +103,29 @@ del_route(From, Topic, Dest) when is_binary(Topic) ->
|
||||||
%% @doc Has routes?
|
%% @doc Has routes?
|
||||||
-spec(has_routes(topic()) -> boolean()).
|
-spec(has_routes(topic()) -> boolean()).
|
||||||
has_routes(Topic) when is_binary(Topic) ->
|
has_routes(Topic) when is_binary(Topic) ->
|
||||||
ets:member(route, Topic).
|
ets:member(?ROUTE, Topic).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Topics
|
%% Topics
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-spec(topics() -> list(binary())).
|
-spec(topics() -> list(binary())).
|
||||||
topics() ->
|
topics() -> mnesia:dirty_all_keys(?ROUTE).
|
||||||
mnesia:dirty_all_keys(route).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Match Routes
|
%% Match routes
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
%% @doc Match routes
|
%% @doc Match routes
|
||||||
|
%% Optimize: routing table will be replicated to all router nodes.
|
||||||
-spec(match_routes(Topic:: topic()) -> [{topic(), binary() | node()}]).
|
-spec(match_routes(Topic:: topic()) -> [{topic(), binary() | node()}]).
|
||||||
match_routes(Topic) when is_binary(Topic) ->
|
match_routes(Topic) when is_binary(Topic) ->
|
||||||
%% Optimize: ets???
|
|
||||||
Matched = mnesia:ets(fun emqx_trie:match/1, [Topic]),
|
Matched = mnesia:ets(fun emqx_trie:match/1, [Topic]),
|
||||||
%% Optimize: route table will be replicated to all nodes.
|
Routes = [ets:lookup(?ROUTE, To) || To <- [Topic | Matched]],
|
||||||
aggre(lists:append([ets:lookup(route, To) || To <- [Topic | Matched]])).
|
[{To, Dest} || #route{topic = To, dest = Dest} <- lists:append(Routes)].
|
||||||
|
|
||||||
%% Aggregate routes
|
|
||||||
aggre([]) ->
|
|
||||||
[];
|
|
||||||
aggre([#route{topic = To, dest = Node}]) when is_atom(Node) ->
|
|
||||||
[{To, Node}];
|
|
||||||
aggre([#route{topic = To, dest = {Group, _Node}}]) ->
|
|
||||||
[{To, Group}];
|
|
||||||
aggre(Routes) ->
|
|
||||||
lists:foldl(
|
|
||||||
fun(#route{topic = To, dest = Node}, Acc) when is_atom(Node) ->
|
|
||||||
[{To, Node} | Acc];
|
|
||||||
(#route{topic = To, dest = {Group, _}}, Acc) ->
|
|
||||||
lists:usort([{To, Group} | Acc])
|
|
||||||
end, [], Routes).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Print Routes
|
%% Print routes
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
%% @doc Print routes to a topic
|
%% @doc Print routes to a topic
|
||||||
|
@ -147,15 +135,16 @@ print_routes(Topic) ->
|
||||||
io:format("~s -> ~s~n", [To, Dest])
|
io:format("~s -> ~s~n", [To, Dest])
|
||||||
end, match_routes(Topic)).
|
end, match_routes(Topic)).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Utility functions
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
cast(Router, Msg) ->
|
cast(Router, Msg) ->
|
||||||
gen_server:cast(Router, Msg).
|
gen_server:cast(Router, Msg).
|
||||||
|
|
||||||
pick(Topic) ->
|
pick(Topic) ->
|
||||||
gproc_pool:pick_worker(router, Topic).
|
gproc_pool:pick_worker(router, Topic).
|
||||||
|
|
||||||
dump() ->
|
|
||||||
ets:tab2list(route).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -174,7 +163,7 @@ handle_cast({add_route, From, Route}, State) ->
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
handle_cast({add_route, Route = #route{topic = Topic, dest = Dest}}, State) ->
|
handle_cast({add_route, Route = #route{topic = Topic, dest = Dest}}, State) ->
|
||||||
case lists:member(Route, ets:lookup(route, Topic)) of
|
case lists:member(Route, ets:lookup(?ROUTE, Topic)) of
|
||||||
true -> ok;
|
true -> ok;
|
||||||
false ->
|
false ->
|
||||||
ok = emqx_router_helper:monitor(Dest),
|
ok = emqx_router_helper:monitor(Dest),
|
||||||
|
@ -192,7 +181,7 @@ handle_cast({del_route, From, Route}, State) ->
|
||||||
|
|
||||||
handle_cast({del_route, Route = #route{topic = Topic}}, State) ->
|
handle_cast({del_route, Route = #route{topic = Topic}}, State) ->
|
||||||
%% Confirm if there are still subscribers...
|
%% Confirm if there are still subscribers...
|
||||||
case ets:member(subscriber, Topic) of
|
case ets:member(emqx_subscriber, Topic) of
|
||||||
true -> ok;
|
true -> ok;
|
||||||
false ->
|
false ->
|
||||||
case emqx_topic:wildcard(Topic) of
|
case emqx_topic:wildcard(Topic) of
|
||||||
|
@ -206,7 +195,8 @@ handle_cast(Msg, State) ->
|
||||||
emqx_log:error("[Router] Unexpected msg: ~p", [Msg]),
|
emqx_log:error("[Router] Unexpected msg: ~p", [Msg]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info(_Info, State) ->
|
handle_info(Info, State) ->
|
||||||
|
emqx_log:error("[Router] Unexpected info: ~p", [Info]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
terminate(_Reason, #state{pool = Pool, id = Id}) ->
|
terminate(_Reason, #state{pool = Pool, id = Id}) ->
|
||||||
|
@ -216,29 +206,29 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal Functions
|
%% Internal functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
add_direct_route(Route) ->
|
add_direct_route(Route) ->
|
||||||
mnesia:async_dirty(fun mnesia:write/1, [Route]).
|
mnesia:async_dirty(fun mnesia:write/3, [?ROUTE, Route, sticky_write]).
|
||||||
|
|
||||||
add_trie_route(Route = #route{topic = Topic}) ->
|
add_trie_route(Route = #route{topic = Topic}) ->
|
||||||
case mnesia:wread({route, Topic}) of
|
case mnesia:wread({?ROUTE, Topic}) of
|
||||||
[] -> emqx_trie:insert(Topic);
|
[] -> emqx_trie:insert(Topic);
|
||||||
_ -> ok
|
_ -> ok
|
||||||
end,
|
end,
|
||||||
mnesia:write(Route).
|
mnesia:write(?ROUTE, Route, sticky_write).
|
||||||
|
|
||||||
del_direct_route(Route) ->
|
del_direct_route(Route) ->
|
||||||
mnesia:async_dirty(fun mnesia:delete_object/1, [Route]).
|
mnesia:async_dirty(fun mnesia:delete_object/3, [?ROUTE, Route, sticky_write]).
|
||||||
|
|
||||||
del_trie_route(Route = #route{topic = Topic}) ->
|
del_trie_route(Route = #route{topic = Topic}) ->
|
||||||
case mnesia:wread({route, Topic}) of
|
case mnesia:wread({?ROUTE, Topic}) of
|
||||||
[Route] -> %% Remove route and trie
|
[Route] -> %% Remove route and trie
|
||||||
mnesia:delete_object(Route),
|
mnesia:delete_object(?ROUTE, Route, sticky_write),
|
||||||
emqx_trie:delete(Topic);
|
emqx_trie:delete(Topic);
|
||||||
[_|_] -> %% Remove route only
|
[_|_] -> %% Remove route only
|
||||||
mnesia:delete_object(Route);
|
mnesia:delete_object(?ROUTE, Route, sticky_write);
|
||||||
[] -> ok
|
[] -> ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -41,7 +41,7 @@
|
||||||
|
|
||||||
-define(SERVER, ?MODULE).
|
-define(SERVER, ?MODULE).
|
||||||
|
|
||||||
-define(TABLE, routing_node).
|
-define(TAB, emqx_routing_node).
|
||||||
|
|
||||||
-define(LOCK, {?MODULE, clean_routes}).
|
-define(LOCK, {?MODULE, clean_routes}).
|
||||||
|
|
||||||
|
@ -50,14 +50,14 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
mnesia(boot) ->
|
mnesia(boot) ->
|
||||||
ok = ekka_mnesia:create_table(?TABLE, [
|
ok = ekka_mnesia:create_table(?TAB, [
|
||||||
{type, set},
|
{type, set},
|
||||||
{ram_copies, [node()]},
|
{ram_copies, [node()]},
|
||||||
{record_name, routing_node},
|
{record_name, routing_node},
|
||||||
{attributes, record_info(fields, routing_node)}]);
|
{attributes, record_info(fields, routing_node)}]);
|
||||||
|
|
||||||
mnesia(copy) ->
|
mnesia(copy) ->
|
||||||
ok = ekka_mnesia:copy_table(?TABLE).
|
ok = ekka_mnesia:copy_table(?TAB).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% API
|
%% API
|
||||||
|
@ -73,9 +73,10 @@ start_link(StatsFun) ->
|
||||||
monitor({_Group, Node}) ->
|
monitor({_Group, Node}) ->
|
||||||
monitor(Node);
|
monitor(Node);
|
||||||
monitor(Node) when is_atom(Node) ->
|
monitor(Node) when is_atom(Node) ->
|
||||||
case ekka:is_member(Node) orelse ets:member(?TABLE, Node) of
|
case ekka:is_member(Node) orelse ets:member(?TAB, Node) of
|
||||||
true -> ok;
|
true -> ok;
|
||||||
false -> mnesia:dirty_write(#routing_node{name = Node, ts = os:timestamp()})
|
false ->
|
||||||
|
mnesia:dirty_write(?TAB, #routing_node{name = Node, ts = os:timestamp()})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -84,7 +85,7 @@ monitor(Node) when is_atom(Node) ->
|
||||||
|
|
||||||
init([StatsFun]) ->
|
init([StatsFun]) ->
|
||||||
ekka:monitor(membership),
|
ekka:monitor(membership),
|
||||||
mnesia:subscribe({table, ?TABLE, simple}),
|
mnesia:subscribe({table, ?TAB, simple}),
|
||||||
Nodes = lists:foldl(
|
Nodes = lists:foldl(
|
||||||
fun(Node, Acc) ->
|
fun(Node, Acc) ->
|
||||||
case ekka:is_member(Node) of
|
case ekka:is_member(Node) of
|
||||||
|
@ -92,7 +93,7 @@ init([StatsFun]) ->
|
||||||
false -> _ = erlang:monitor_node(Node, true),
|
false -> _ = erlang:monitor_node(Node, true),
|
||||||
[Node | Acc]
|
[Node | Acc]
|
||||||
end
|
end
|
||||||
end, [], mnesia:dirty_all_keys(?TABLE)),
|
end, [], mnesia:dirty_all_keys(?TAB)),
|
||||||
{ok, TRef} = timer:send_interval(timer:seconds(1), stats),
|
{ok, TRef} = timer:send_interval(timer:seconds(1), stats),
|
||||||
{ok, #state{nodes = Nodes, stats_fun = StatsFun, stats_timer = TRef}}.
|
{ok, #state{nodes = Nodes, stats_fun = StatsFun, stats_timer = TRef}}.
|
||||||
|
|
||||||
|
@ -119,9 +120,9 @@ handle_info({mnesia_table_event, _Event}, State) ->
|
||||||
handle_info({nodedown, Node}, State = #state{nodes = Nodes}) ->
|
handle_info({nodedown, Node}, State = #state{nodes = Nodes}) ->
|
||||||
global:trans({?LOCK, self()},
|
global:trans({?LOCK, self()},
|
||||||
fun() ->
|
fun() ->
|
||||||
mnesia:transaction(fun clean_routes/1, [Node])
|
mnesia:transaction(fun cleanup_routes/1, [Node])
|
||||||
end),
|
end),
|
||||||
mnesia:dirty_delete(routing_node, Node),
|
mnesia:dirty_delete(?TAB, Node),
|
||||||
handle_info(stats, State#state{nodes = lists:delete(Node, Nodes)});
|
handle_info(stats, State#state{nodes = lists:delete(Node, Nodes)});
|
||||||
|
|
||||||
handle_info({membership, {mnesia, down, Node}}, State) ->
|
handle_info({membership, {mnesia, down, Node}}, State) ->
|
||||||
|
@ -131,7 +132,7 @@ handle_info({membership, _Event}, State) ->
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
handle_info(stats, State = #state{stats_fun = StatsFun}) ->
|
handle_info(stats, State = #state{stats_fun = StatsFun}) ->
|
||||||
ok = StatsFun(mnesia:table_info(route, size)),
|
ok = StatsFun(mnesia:table_info(emqx_route, size)),
|
||||||
{noreply, State, hibernate};
|
{noreply, State, hibernate};
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
|
@ -141,7 +142,7 @@ handle_info(Info, State) ->
|
||||||
terminate(_Reason, #state{stats_timer = TRef}) ->
|
terminate(_Reason, #state{stats_timer = TRef}) ->
|
||||||
timer:cancel(TRef),
|
timer:cancel(TRef),
|
||||||
ekka:unmonitor(membership),
|
ekka:unmonitor(membership),
|
||||||
mnesia:unsubscribe({table, ?TABLE, simple}).
|
mnesia:unsubscribe({table, ?TAB, simple}).
|
||||||
|
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
@ -150,9 +151,9 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
clean_routes(Node) ->
|
cleanup_routes(Node) ->
|
||||||
Patterns = [#route{_ = '_', dest = Node},
|
Patterns = [#route{_ = '_', dest = Node},
|
||||||
#route{_ = '_', dest = {'_', Node}}],
|
#route{_ = '_', dest = {'_', Node}}],
|
||||||
[mnesia:delete_object(R) || P <- Patterns,
|
[mnesia:delete_object(?TAB, R, write)
|
||||||
R <- mnesia:match_object(P)].
|
|| Pat <- Patterns, R <- mnesia:match_object(?TAB, Pat, write)].
|
||||||
|
|
||||||
|
|
|
@ -35,7 +35,7 @@
|
||||||
|
|
||||||
-define(SERVER, ?MODULE).
|
-define(SERVER, ?MODULE).
|
||||||
|
|
||||||
-define(TAB, shared_subscription).
|
-define(TAB, emqx_shared_subscription).
|
||||||
|
|
||||||
-record(state, {pmon}).
|
-record(state, {pmon}).
|
||||||
|
|
||||||
|
@ -57,22 +57,30 @@ start_link() ->
|
||||||
|
|
||||||
-spec(strategy() -> random | hash).
|
-spec(strategy() -> random | hash).
|
||||||
strategy() ->
|
strategy() ->
|
||||||
application:get_env(emqx, load_balancing_strategy, random).
|
emqx_config:get_env(shared_subscription_strategy, random).
|
||||||
|
|
||||||
subscribe(undefined, _Topic, _SubPid) ->
|
subscribe(undefined, _Topic, _SubPid) ->
|
||||||
ok;
|
ok;
|
||||||
subscribe(Group, Topic, SubPid) when is_pid(SubPid) ->
|
subscribe(Group, Topic, SubPid) when is_pid(SubPid) ->
|
||||||
mnesia:dirty_write(r(Group, Topic, SubPid)),
|
mnesia:dirty_write(?TAB, r(Group, Topic, SubPid)),
|
||||||
gen_server:cast(?SERVER, {monitor, SubPid}).
|
gen_server:cast(?SERVER, {monitor, SubPid}).
|
||||||
|
|
||||||
unsubscribe(undefined, _Topic, _SubPid) ->
|
unsubscribe(undefined, _Topic, _SubPid) ->
|
||||||
ok;
|
ok;
|
||||||
unsubscribe(Group, Topic, SubPid) when is_pid(SubPid) ->
|
unsubscribe(Group, Topic, SubPid) when is_pid(SubPid) ->
|
||||||
mnesia:dirty_delete_object(r(Group, Topic, SubPid)).
|
mnesia:dirty_delete_object(?TAB, r(Group, Topic, SubPid)).
|
||||||
|
|
||||||
r(Group, Topic, SubPid) ->
|
r(Group, Topic, SubPid) ->
|
||||||
#shared_subscription{group = Group, topic = Topic, subpid = SubPid}.
|
#shared_subscription{group = Group, topic = Topic, subpid = SubPid}.
|
||||||
|
|
||||||
|
dispatch({Cluster, Group}, Topic, Delivery) ->
|
||||||
|
case ekka:cluster_name() of
|
||||||
|
Cluster ->
|
||||||
|
dispatch(Group, Topic, Delivery);
|
||||||
|
_ -> Delivery
|
||||||
|
end;
|
||||||
|
|
||||||
|
%% TODO: ensure the delivery...
|
||||||
dispatch(Group, Topic, Delivery = #delivery{message = Msg, flows = Flows}) ->
|
dispatch(Group, Topic, Delivery = #delivery{message = Msg, flows = Flows}) ->
|
||||||
case pick(subscribers(Group, Topic)) of
|
case pick(subscribers(Group, Topic)) of
|
||||||
false -> Delivery;
|
false -> Delivery;
|
||||||
|
@ -90,8 +98,7 @@ pick(SubPids) ->
|
||||||
lists:nth((X rem length(SubPids)) + 1, SubPids).
|
lists:nth((X rem length(SubPids)) + 1, SubPids).
|
||||||
|
|
||||||
subscribers(Group, Topic) ->
|
subscribers(Group, Topic) ->
|
||||||
MP = {shared_subscription, Group, Topic, '$1'},
|
ets:select(?TAB, [{{shared_subscription, Group, Topic, '$1'}, [], ['$1']}]).
|
||||||
ets:select(shared_subscription, [{MP, [], ['$1']}]).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
|
@ -134,7 +141,7 @@ handle_info({mnesia_table_event, _Event}, State) ->
|
||||||
|
|
||||||
handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #state{pmon = PMon}) ->
|
handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #state{pmon = PMon}) ->
|
||||||
emqx_log:info("Shared subscription down: ~p", [SubPid]),
|
emqx_log:info("Shared subscription down: ~p", [SubPid]),
|
||||||
mnesia:transaction(fun clean_down/1, [SubPid]),
|
mnesia:async_dirty(fun cleanup_down/1, [SubPid]),
|
||||||
{noreply, State#state{pmon = PMon:erase(SubPid)}};
|
{noreply, State#state{pmon = PMon:erase(SubPid)}};
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
|
@ -151,7 +158,9 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
clean_down(SubPid) ->
|
cleanup_down(SubPid) ->
|
||||||
MP = #shared_subscription{_ = '_', subpid = SubPid},
|
Pat = #shared_subscription{_ = '_', subpid = SubPid},
|
||||||
lists:foreach(fun mnesia:delete_object/1, mnesia:match_object(MP)).
|
lists:foreach(fun(Record) ->
|
||||||
|
mnesia:delete_object(?TAB, Record)
|
||||||
|
end, mnesia:match_object(Pat)).
|
||||||
|
|
||||||
|
|
|
@ -27,6 +27,10 @@
|
||||||
%% Trie API
|
%% Trie API
|
||||||
-export([insert/1, match/1, lookup/1, delete/1]).
|
-export([insert/1, match/1, lookup/1, delete/1]).
|
||||||
|
|
||||||
|
%% Tables
|
||||||
|
-define(TRIE, emqx_trie).
|
||||||
|
-define(TRIE_NODE, emqx_trie_node).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Mnesia Bootstrap
|
%% Mnesia Bootstrap
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -35,21 +39,21 @@
|
||||||
-spec(mnesia(boot | copy) -> ok).
|
-spec(mnesia(boot | copy) -> ok).
|
||||||
mnesia(boot) ->
|
mnesia(boot) ->
|
||||||
%% Trie Table
|
%% Trie Table
|
||||||
ok = ekka_mnesia:create_table(trie, [
|
ok = ekka_mnesia:create_table(?TRIE, [
|
||||||
{ram_copies, [node()]},
|
{ram_copies, [node()]},
|
||||||
{record_name, trie},
|
{record_name, trie},
|
||||||
{attributes, record_info(fields, trie)}]),
|
{attributes, record_info(fields, trie)}]),
|
||||||
%% Trie Node Table
|
%% Trie Node Table
|
||||||
ok = ekka_mnesia:create_table(trie_node, [
|
ok = ekka_mnesia:create_table(?TRIE_NODE, [
|
||||||
{ram_copies, [node()]},
|
{ram_copies, [node()]},
|
||||||
{record_name, trie_node},
|
{record_name, trie_node},
|
||||||
{attributes, record_info(fields, trie_node)}]);
|
{attributes, record_info(fields, trie_node)}]);
|
||||||
|
|
||||||
mnesia(copy) ->
|
mnesia(copy) ->
|
||||||
%% Copy Trie Table
|
%% Copy Trie Table
|
||||||
ok = ekka_mnesia:copy_table(trie),
|
ok = ekka_mnesia:copy_table(?TRIE),
|
||||||
%% Copy Trie Node Table
|
%% Copy Trie Node Table
|
||||||
ok = ekka_mnesia:copy_table(trie_node).
|
ok = ekka_mnesia:copy_table(?TRIE_NODE).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Trie API
|
%% Trie API
|
||||||
|
@ -58,7 +62,7 @@ mnesia(copy) ->
|
||||||
%% @doc Insert a topic into the trie
|
%% @doc Insert a topic into the trie
|
||||||
-spec(insert(Topic :: topic()) -> ok).
|
-spec(insert(Topic :: topic()) -> ok).
|
||||||
insert(Topic) when is_binary(Topic) ->
|
insert(Topic) when is_binary(Topic) ->
|
||||||
case mnesia:read(trie_node, Topic) of
|
case mnesia:read(?TRIE_NODE, Topic) of
|
||||||
[#trie_node{topic = Topic}] ->
|
[#trie_node{topic = Topic}] ->
|
||||||
ok;
|
ok;
|
||||||
[TrieNode = #trie_node{topic = undefined}] ->
|
[TrieNode = #trie_node{topic = undefined}] ->
|
||||||
|
@ -79,14 +83,14 @@ match(Topic) when is_binary(Topic) ->
|
||||||
%% @doc Lookup a trie node
|
%% @doc Lookup a trie node
|
||||||
-spec(lookup(NodeId :: binary()) -> [#trie_node{}]).
|
-spec(lookup(NodeId :: binary()) -> [#trie_node{}]).
|
||||||
lookup(NodeId) ->
|
lookup(NodeId) ->
|
||||||
mnesia:read(trie_node, NodeId).
|
mnesia:read(?TRIE_NODE, NodeId).
|
||||||
|
|
||||||
%% @doc Delete a topic from the trie
|
%% @doc Delete a topic from the trie
|
||||||
-spec(delete(Topic :: topic()) -> ok).
|
-spec(delete(Topic :: topic()) -> ok).
|
||||||
delete(Topic) when is_binary(Topic) ->
|
delete(Topic) when is_binary(Topic) ->
|
||||||
case mnesia:read(trie_node, Topic) of
|
case mnesia:read(?TRIE_NODE, Topic) of
|
||||||
[#trie_node{edge_count = 0}] ->
|
[#trie_node{edge_count = 0}] ->
|
||||||
mnesia:delete({trie_node, Topic}),
|
mnesia:delete({?TRIE_NODE, Topic}),
|
||||||
delete_path(lists:reverse(emqx_topic:triples(Topic)));
|
delete_path(lists:reverse(emqx_topic:triples(Topic)));
|
||||||
[TrieNode] ->
|
[TrieNode] ->
|
||||||
write_trie_node(TrieNode#trie_node{topic = undefined});
|
write_trie_node(TrieNode#trie_node{topic = undefined});
|
||||||
|
@ -102,9 +106,9 @@ delete(Topic) when is_binary(Topic) ->
|
||||||
%% @doc Add a path to the trie.
|
%% @doc Add a path to the trie.
|
||||||
add_path({Node, Word, Child}) ->
|
add_path({Node, Word, Child}) ->
|
||||||
Edge = #trie_edge{node_id = Node, word = Word},
|
Edge = #trie_edge{node_id = Node, word = Word},
|
||||||
case mnesia:read(trie_node, Node) of
|
case mnesia:read(?TRIE_NODE, Node) of
|
||||||
[TrieNode = #trie_node{edge_count = Count}] ->
|
[TrieNode = #trie_node{edge_count = Count}] ->
|
||||||
case mnesia:wread({trie, Edge}) of
|
case mnesia:wread({?TRIE, Edge}) of
|
||||||
[] ->
|
[] ->
|
||||||
write_trie_node(TrieNode#trie_node{edge_count = Count+1}),
|
write_trie_node(TrieNode#trie_node{edge_count = Count+1}),
|
||||||
write_trie(#trie{edge = Edge, node_id = Child});
|
write_trie(#trie{edge = Edge, node_id = Child});
|
||||||
|
@ -125,11 +129,11 @@ match_node(NodeId, Words) ->
|
||||||
match_node(NodeId, Words, []).
|
match_node(NodeId, Words, []).
|
||||||
|
|
||||||
match_node(NodeId, [], ResAcc) ->
|
match_node(NodeId, [], ResAcc) ->
|
||||||
mnesia:read(trie_node, NodeId) ++ 'match_#'(NodeId, ResAcc);
|
mnesia:read(?TRIE_NODE, NodeId) ++ 'match_#'(NodeId, ResAcc);
|
||||||
|
|
||||||
match_node(NodeId, [W|Words], ResAcc) ->
|
match_node(NodeId, [W|Words], ResAcc) ->
|
||||||
lists:foldl(fun(WArg, Acc) ->
|
lists:foldl(fun(WArg, Acc) ->
|
||||||
case mnesia:read(trie, #trie_edge{node_id = NodeId, word = WArg}) of
|
case mnesia:read(?TRIE, #trie_edge{node_id = NodeId, word = WArg}) of
|
||||||
[#trie{node_id = ChildId}] -> match_node(ChildId, Words, Acc);
|
[#trie{node_id = ChildId}] -> match_node(ChildId, Words, Acc);
|
||||||
[] -> Acc
|
[] -> Acc
|
||||||
end
|
end
|
||||||
|
@ -138,9 +142,9 @@ match_node(NodeId, [W|Words], ResAcc) ->
|
||||||
%% @private
|
%% @private
|
||||||
%% @doc Match node with '#'.
|
%% @doc Match node with '#'.
|
||||||
'match_#'(NodeId, ResAcc) ->
|
'match_#'(NodeId, ResAcc) ->
|
||||||
case mnesia:read(trie, #trie_edge{node_id = NodeId, word = '#'}) of
|
case mnesia:read(?TRIE, #trie_edge{node_id = NodeId, word = '#'}) of
|
||||||
[#trie{node_id = ChildId}] ->
|
[#trie{node_id = ChildId}] ->
|
||||||
mnesia:read(trie_node, ChildId) ++ ResAcc;
|
mnesia:read(?TRIE_NODE, ChildId) ++ ResAcc;
|
||||||
[] ->
|
[] ->
|
||||||
ResAcc
|
ResAcc
|
||||||
end.
|
end.
|
||||||
|
@ -150,10 +154,10 @@ match_node(NodeId, [W|Words], ResAcc) ->
|
||||||
delete_path([]) ->
|
delete_path([]) ->
|
||||||
ok;
|
ok;
|
||||||
delete_path([{NodeId, Word, _} | RestPath]) ->
|
delete_path([{NodeId, Word, _} | RestPath]) ->
|
||||||
mnesia:delete({trie, #trie_edge{node_id = NodeId, word = Word}}),
|
mnesia:delete({?TRIE, #trie_edge{node_id = NodeId, word = Word}}),
|
||||||
case mnesia:read(trie_node, NodeId) of
|
case mnesia:read(?TRIE_NODE, NodeId) of
|
||||||
[#trie_node{edge_count = 1, topic = undefined}] ->
|
[#trie_node{edge_count = 1, topic = undefined}] ->
|
||||||
mnesia:delete({trie_node, NodeId}),
|
mnesia:delete({?TRIE_NODE, NodeId}),
|
||||||
delete_path(RestPath);
|
delete_path(RestPath);
|
||||||
[TrieNode = #trie_node{edge_count = 1, topic = _}] ->
|
[TrieNode = #trie_node{edge_count = 1, topic = _}] ->
|
||||||
write_trie_node(TrieNode#trie_node{edge_count = 0});
|
write_trie_node(TrieNode#trie_node{edge_count = 0});
|
||||||
|
@ -163,11 +167,9 @@ delete_path([{NodeId, Word, _} | RestPath]) ->
|
||||||
mnesia:abort({node_not_found, NodeId})
|
mnesia:abort({node_not_found, NodeId})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% @private
|
|
||||||
write_trie(Trie) ->
|
write_trie(Trie) ->
|
||||||
mnesia:write(trie, Trie, write).
|
mnesia:write(?TRIE, Trie, write).
|
||||||
|
|
||||||
%% @private
|
|
||||||
write_trie_node(TrieNode) ->
|
write_trie_node(TrieNode) ->
|
||||||
mnesia:write(trie_node, TrieNode, write).
|
mnesia:write(?TRIE_NODE, TrieNode, write).
|
||||||
|
|
||||||
|
|
|
@ -131,5 +131,5 @@ t_delete3(_) ->
|
||||||
end).
|
end).
|
||||||
|
|
||||||
clear_tables() ->
|
clear_tables() ->
|
||||||
lists:foreach(fun mnesia:clear_table/1, [trie, trie_node]).
|
lists:foreach(fun mnesia:clear_table/1, [emqx_trie, emqx_trie_node]).
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue