improve the pubsub design
This commit is contained in:
parent
4c4d9a718c
commit
af6c779631
|
@ -26,8 +26,8 @@
|
||||||
|
|
||||||
-include("emqttd_internal.hrl").
|
-include("emqttd_internal.hrl").
|
||||||
|
|
||||||
%% Init And Start
|
%% Start
|
||||||
-export([init_tabs/0, start_link/3]).
|
-export([start_link/3]).
|
||||||
|
|
||||||
%% PubSub API.
|
%% PubSub API.
|
||||||
-export([subscribe/1, subscribe/2, subscribe/3, publish/2,
|
-export([subscribe/1, subscribe/2, subscribe/3, publish/2,
|
||||||
|
@ -54,40 +54,7 @@
|
||||||
|
|
||||||
-define(PUBSUB, ?MODULE).
|
-define(PUBSUB, ?MODULE).
|
||||||
|
|
||||||
-define(is_local(Options), lists:member(local, Options)).
|
%% @doc Start a pubsub server
|
||||||
|
|
||||||
-define(CONCURRENCY_OPTS, [{read_concurrency, true}, {write_concurrency, true}]).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Init ETS Tables
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
init_tabs() ->
|
|
||||||
%% Create ETS Tabs
|
|
||||||
lists:foreach(fun create_tab/1, [subscriber, subscription, subproperty]).
|
|
||||||
|
|
||||||
create_tab(subscriber) ->
|
|
||||||
%% Subscriber: Topic -> Sub1, {Share, Sub2}, {Share, Sub3}, ..., SubN
|
|
||||||
%% duplicate_bag: o(1) insert
|
|
||||||
ensure_tab(subscriber, [public, named_table, duplicate_bag | ?CONCURRENCY_OPTS]);
|
|
||||||
|
|
||||||
create_tab(subscription) ->
|
|
||||||
%% Subscription: Sub -> Topic1, {Share, Topic2}, {Share, Topic3}, ..., TopicN
|
|
||||||
%% bag: o(n) insert
|
|
||||||
ensure_tab(subscription, [public, named_table, bag | ?CONCURRENCY_OPTS]);
|
|
||||||
|
|
||||||
create_tab(subproperty) ->
|
|
||||||
%% Subproperty: {Topic, Sub} -> [local, {qos, 1}, {share, <<"share">>}]
|
|
||||||
ensure_tab(subproperty, [public, named_table, ordered_set | ?CONCURRENCY_OPTS]).
|
|
||||||
|
|
||||||
ensure_tab(Tab, Opts) ->
|
|
||||||
case ets:info(Tab, name) of undefined -> ets:new(Tab, Opts); _ -> ok end.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Start PubSub
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
%% @doc Start one pubsub
|
|
||||||
-spec(start_link(atom(), pos_integer(), [tuple()]) -> {ok, pid()} | ignore | {error, any()}).
|
-spec(start_link(atom(), pos_integer(), [tuple()]) -> {ok, pid()} | ignore | {error, any()}).
|
||||||
start_link(Pool, Id, Env) ->
|
start_link(Pool, Id, Env) ->
|
||||||
gen_server2:start_link({local, ?PROC_NAME(?PUBSUB, Id)}, ?MODULE, [Pool, Id, Env], []).
|
gen_server2:start_link({local, ?PROC_NAME(?PUBSUB, Id)}, ?MODULE, [Pool, Id, Env], []).
|
||||||
|
@ -132,6 +99,7 @@ publish(Topic, Msg) when is_binary(Topic) ->
|
||||||
route([#mqtt_route{topic = To, node = Node}],
|
route([#mqtt_route{topic = To, node = Node}],
|
||||||
Delivery = #mqtt_delivery{flows = Flows}) when Node =:= node() ->
|
Delivery = #mqtt_delivery{flows = Flows}) when Node =:= node() ->
|
||||||
dispatch(To, Delivery#mqtt_delivery{flows = [{route, Node, To} | Flows]});
|
dispatch(To, Delivery#mqtt_delivery{flows = [{route, Node, To} | Flows]});
|
||||||
|
|
||||||
%% Forward to other nodes
|
%% Forward to other nodes
|
||||||
route([#mqtt_route{topic = To, node = Node}], Delivery = #mqtt_delivery{flows = Flows}) ->
|
route([#mqtt_route{topic = To, node = Node}], Delivery = #mqtt_delivery{flows = Flows}) ->
|
||||||
forward(Node, To, Delivery#mqtt_delivery{flows = [{route, Node, To}|Flows]});
|
forward(Node, To, Delivery#mqtt_delivery{flows = [{route, Node, To}|Flows]});
|
||||||
|
@ -153,9 +121,9 @@ dispatch(Topic, Delivery = #mqtt_delivery{message = Msg, flows = Flows}) ->
|
||||||
case subscribers(Topic) of
|
case subscribers(Topic) of
|
||||||
[] ->
|
[] ->
|
||||||
dropped(Topic), {ok, Delivery};
|
dropped(Topic), {ok, Delivery};
|
||||||
[Sub] ->
|
[Sub] -> %% optimize?
|
||||||
dispatch(Sub, Topic, Msg),
|
dispatch(Sub, Topic, Msg),
|
||||||
{ok, Delivery#mqtt_delivery{flows = [{dispatch, Topic, 1}|Flows]}};
|
{ok, Delivery#mqtt_delivery{flows = [{dispatch, Topic, 1} | Flows]}};
|
||||||
Subscribers ->
|
Subscribers ->
|
||||||
Flows1 = [{dispatch, Topic, length(Subscribers)} | Flows],
|
Flows1 = [{dispatch, Topic, length(Subscribers)} | Flows],
|
||||||
lists:foreach(fun(Sub) -> dispatch(Sub, Topic, Msg) end, Subscribers),
|
lists:foreach(fun(Sub) -> dispatch(Sub, Topic, Msg) end, Subscribers),
|
||||||
|
@ -223,8 +191,8 @@ call(PubSub, Req) when is_pid(PubSub) ->
|
||||||
cast(PubSub, Msg) when is_pid(PubSub) ->
|
cast(PubSub, Msg) when is_pid(PubSub) ->
|
||||||
gen_server2:cast(PubSub, Msg).
|
gen_server2:cast(PubSub, Msg).
|
||||||
|
|
||||||
pick(Topic) ->
|
pick(Subscriber) ->
|
||||||
gproc_pool:pick_worker(pubsub, Topic).
|
gproc_pool:pick_worker(pubsub, Subscriber).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% gen_server Callbacks
|
%% gen_server Callbacks
|
||||||
|
@ -235,13 +203,13 @@ init([Pool, Id, Env]) ->
|
||||||
{ok, #state{pool = Pool, id = Id, env = Env, submon = emqttd_pmon:new()}}.
|
{ok, #state{pool = Pool, id = Id, env = Env, submon = emqttd_pmon:new()}}.
|
||||||
|
|
||||||
handle_call({subscribe, Topic, Subscriber, Options}, _From, State) ->
|
handle_call({subscribe, Topic, Subscriber, Options}, _From, State) ->
|
||||||
case do_subscribe_(Topic, Subscriber, Options, State) of
|
case do_subscribe(Topic, Subscriber, Options, State) of
|
||||||
{ok, NewState} -> {reply, ok, setstats(NewState)};
|
{ok, NewState} -> {reply, ok, setstats(NewState)};
|
||||||
{error, Error} -> {reply, {error, Error}, State}
|
{error, Error} -> {reply, {error, Error}, State}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_call({unsubscribe, Topic, Subscriber}, _From, State) ->
|
handle_call({unsubscribe, Topic, Subscriber}, _From, State) ->
|
||||||
case do_unsubscribe_(Topic, Subscriber, State) of
|
case do_unsubscribe(Topic, Subscriber, State) of
|
||||||
{ok, NewState} -> {reply, ok, setstats(NewState), hibernate};
|
{ok, NewState} -> {reply, ok, setstats(NewState), hibernate};
|
||||||
{error, Error} -> {reply, {error, Error}, State}
|
{error, Error} -> {reply, {error, Error}, State}
|
||||||
end;
|
end;
|
||||||
|
@ -261,13 +229,13 @@ handle_call(Req, _From, State) ->
|
||||||
?UNEXPECTED_REQ(Req, State).
|
?UNEXPECTED_REQ(Req, State).
|
||||||
|
|
||||||
handle_cast({subscribe, Topic, Subscriber, Options}, State) ->
|
handle_cast({subscribe, Topic, Subscriber, Options}, State) ->
|
||||||
case do_subscribe_(Topic, Subscriber, Options, State) of
|
case do_subscribe(Topic, Subscriber, Options, State) of
|
||||||
{ok, NewState} -> {noreply, setstats(NewState)};
|
{ok, NewState} -> {noreply, setstats(NewState)};
|
||||||
{error, _Error} -> {noreply, State}
|
{error, _Error} -> {noreply, State}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_cast({unsubscribe, Topic, Subscriber}, State) ->
|
handle_cast({unsubscribe, Topic, Subscriber}, State) ->
|
||||||
case do_unsubscribe_(Topic, Subscriber, State) of
|
case do_unsubscribe(Topic, Subscriber, State) of
|
||||||
{ok, NewState} -> {noreply, setstats(NewState), hibernate};
|
{ok, NewState} -> {noreply, setstats(NewState), hibernate};
|
||||||
{error, _Error} -> {noreply, State}
|
{error, _Error} -> {noreply, State}
|
||||||
end;
|
end;
|
||||||
|
@ -277,7 +245,7 @@ handle_cast(Msg, State) ->
|
||||||
|
|
||||||
handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{submon = PM}) ->
|
handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{submon = PM}) ->
|
||||||
lists:foreach(fun({_, Topic}) ->
|
lists:foreach(fun({_, Topic}) ->
|
||||||
subscriber_down_(DownPid, Topic)
|
subscriber_down(DownPid, Topic)
|
||||||
end, ets:lookup(subscription, DownPid)),
|
end, ets:lookup(subscription, DownPid)),
|
||||||
ets:delete(subscription, DownPid),
|
ets:delete(subscription, DownPid),
|
||||||
{noreply, setstats(State#state{submon = PM:erase(DownPid)}), hibernate};
|
{noreply, setstats(State#state{submon = PM:erase(DownPid)}), hibernate};
|
||||||
|
@ -295,35 +263,24 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%% Internal Functions
|
%% Internal Functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
do_subscribe_(Topic, Subscriber, Options, State) ->
|
do_subscribe(Topic, Subscriber, Options, State) ->
|
||||||
case ets:lookup(subproperty, {Topic, Subscriber}) of
|
case ets:lookup(subproperty, {Topic, Subscriber}) of
|
||||||
[] ->
|
[] ->
|
||||||
do_subscribe2_(Topic, Subscriber, Options),
|
add_subscription(Subscriber, Topic),
|
||||||
|
emqttd_dispatcher:async_add_subscriber(Topic, Subscriber),
|
||||||
ets:insert(subproperty, {{Topic, Subscriber}, Options}),
|
ets:insert(subproperty, {{Topic, Subscriber}, Options}),
|
||||||
{ok, monitor_subpid(Subscriber, State)};
|
{ok, monitor_subpid(Subscriber, State)};
|
||||||
[_] ->
|
[_] ->
|
||||||
{error, {already_subscribed, Topic}}
|
{error, {already_subscribed, Topic}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_subscribe2_(Topic, Subscriber, _Options) ->
|
add_subscription(Subscriber, Topic) ->
|
||||||
add_subscription_(Subscriber, Topic),
|
|
||||||
add_subscriber_(Topic, Subscriber).
|
|
||||||
|
|
||||||
add_subscription_(Subscriber, Topic) ->
|
|
||||||
ets:insert(subscription, {Subscriber, Topic}).
|
ets:insert(subscription, {Subscriber, Topic}).
|
||||||
|
|
||||||
add_subscriber_(Topic, Subscriber) ->
|
do_unsubscribe(Topic, Subscriber, State) ->
|
||||||
%%TODO: LOCK here...
|
|
||||||
case ets:member(subscriber, Topic) of
|
|
||||||
false -> emqttd_router:add_route(Topic, node());
|
|
||||||
true -> ok
|
|
||||||
end,
|
|
||||||
ets:insert(subscriber, {Topic, Subscriber}).
|
|
||||||
|
|
||||||
do_unsubscribe_(Topic, Subscriber, State) ->
|
|
||||||
case ets:lookup(subproperty, {Topic, Subscriber}) of
|
case ets:lookup(subproperty, {Topic, Subscriber}) of
|
||||||
[_] ->
|
[_] ->
|
||||||
del_subscriber_(Topic, Subscriber),
|
emqttd_dispatcher:async_del_subscriber(Topic, Subscriber),
|
||||||
del_subscription(Subscriber, Topic),
|
del_subscription(Subscriber, Topic),
|
||||||
ets:delete(subproperty, {Topic, Subscriber}),
|
ets:delete(subproperty, {Topic, Subscriber}),
|
||||||
{ok, case ets:member(subscription, Subscriber) of
|
{ok, case ets:member(subscription, Subscriber) of
|
||||||
|
@ -337,18 +294,10 @@ do_unsubscribe_(Topic, Subscriber, State) ->
|
||||||
del_subscription(Subscriber, Topic) ->
|
del_subscription(Subscriber, Topic) ->
|
||||||
ets:delete_object(subscription, {Subscriber, Topic}).
|
ets:delete_object(subscription, {Subscriber, Topic}).
|
||||||
|
|
||||||
del_subscriber_(Topic, Subscriber) ->
|
subscriber_down(DownPid, Topic) ->
|
||||||
ets:delete_object(subscriber, {Topic, Subscriber}),
|
|
||||||
%%TODO: LOCK TOPIC
|
|
||||||
case ets:member(subscriber, Topic) of
|
|
||||||
false -> emqttd_router:del_route(Topic, node());
|
|
||||||
true -> ok
|
|
||||||
end.
|
|
||||||
|
|
||||||
subscriber_down_(DownPid, Topic) ->
|
|
||||||
case ets:lookup(subproperty, {Topic, DownPid}) of
|
case ets:lookup(subproperty, {Topic, DownPid}) of
|
||||||
[] -> del_subscriber_(Topic, DownPid); %%TODO: warning?
|
[] -> emqttd_dispatcher:async_del_subscriber(Topic, DownPid); %% warning???
|
||||||
[_] -> del_subscriber_(Topic, DownPid),
|
[_] -> emqttd_dispatcher:async_del_subscriber(Topic, DownPid),
|
||||||
ets:delete(subproperty, {Topic, DownPid})
|
ets:delete(subproperty, {Topic, DownPid})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -363,11 +312,7 @@ demonitor_subpid(_SubPid, State) ->
|
||||||
State.
|
State.
|
||||||
|
|
||||||
setstats(State) when is_record(State, state) ->
|
setstats(State) when is_record(State, state) ->
|
||||||
setstats(subscriber), setstats(subscription), State;
|
emqttd_stats:setstats('subscribers/count', 'subscribers/max', ets:info(subscriber, size)),
|
||||||
|
emqttd_stats:setstats('subscriptions/count', 'subscriptions/max', ets:info(subscription, size)),
|
||||||
setstats(subscriber) ->
|
State.
|
||||||
emqttd_stats:setstats('subscribers/count', 'subscribers/max', ets:info(subscriber, size));
|
|
||||||
|
|
||||||
setstats(subscription) ->
|
|
||||||
emqttd_stats:setstats('subscriptions/count', 'subscriptions/max', ets:info(subscription, size)).
|
|
||||||
|
|
||||||
|
|
|
@ -25,6 +25,8 @@
|
||||||
%% Supervisor callbacks
|
%% Supervisor callbacks
|
||||||
-export([init/1]).
|
-export([init/1]).
|
||||||
|
|
||||||
|
-define(CONCURRENCY_OPTS, [{read_concurrency, true}, {write_concurrency, true}]).
|
||||||
|
|
||||||
start_link() ->
|
start_link() ->
|
||||||
supervisor:start_link({local, ?MODULE}, ?MODULE, [emqttd_conf:pubsub()]).
|
supervisor:start_link({local, ?MODULE}, ?MODULE, [emqttd_conf:pubsub()]).
|
||||||
|
|
||||||
|
@ -32,7 +34,11 @@ pubsub_pool() ->
|
||||||
hd([Pid || {pubsub_pool, Pid, _, _} <- supervisor:which_children(?MODULE)]).
|
hd([Pid || {pubsub_pool, Pid, _, _} <- supervisor:which_children(?MODULE)]).
|
||||||
|
|
||||||
init([Env]) ->
|
init([Env]) ->
|
||||||
{ok, PubSub} = emqttd:conf(pubsub_adapter), PubSub:init_tabs(),
|
%% Create ETS Tables
|
||||||
|
[create_tab(Tab) || Tab <- [subscriber, subscription, subproperty]],
|
||||||
|
|
||||||
|
%% PubSub Pool
|
||||||
|
{ok, PubSub} = emqttd:conf(pubsub_adapter),
|
||||||
PoolArgs = [pubsub, hash, pool_size(Env), {PubSub, start_link, [Env]}],
|
PoolArgs = [pubsub, hash, pool_size(Env), {PubSub, start_link, [Env]}],
|
||||||
PoolSup = emqttd_pool_sup:spec(pubsub_pool, PoolArgs),
|
PoolSup = emqttd_pool_sup:spec(pubsub_pool, PoolArgs),
|
||||||
{ok, { {one_for_all, 10, 3600}, [PoolSup]} }.
|
{ok, { {one_for_all, 10, 3600}, [PoolSup]} }.
|
||||||
|
@ -41,3 +47,24 @@ pool_size(Env) ->
|
||||||
Schedulers = erlang:system_info(schedulers),
|
Schedulers = erlang:system_info(schedulers),
|
||||||
proplists:get_value(pool_size, Env, Schedulers).
|
proplists:get_value(pool_size, Env, Schedulers).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Create PubSub Tables
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
create_tab(subscriber) ->
|
||||||
|
%% Subscriber: Topic -> Sub1, Sub2, Sub3, ..., SubN
|
||||||
|
%% duplicate_bag: o(1) insert
|
||||||
|
ensure_tab(subscriber, [public, named_table, duplicate_bag | ?CONCURRENCY_OPTS]);
|
||||||
|
|
||||||
|
create_tab(subscription) ->
|
||||||
|
%% Subscription: Sub -> Topic1, Topic2, Topic3, ..., TopicN
|
||||||
|
%% bag: o(n) insert
|
||||||
|
ensure_tab(subscription, [public, named_table, bag | ?CONCURRENCY_OPTS]);
|
||||||
|
|
||||||
|
create_tab(subproperty) ->
|
||||||
|
%% Subproperty: {Topic, Sub} -> [{qos, 1}]
|
||||||
|
ensure_tab(subproperty, [public, named_table, ordered_set | ?CONCURRENCY_OPTS]).
|
||||||
|
|
||||||
|
ensure_tab(Tab, Opts) ->
|
||||||
|
case ets:info(Tab, name) of undefined -> ets:new(Tab, Opts); _ -> ok end.
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,111 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqttd_submgr).
|
||||||
|
|
||||||
|
-author("Feng Lee <feng@emqtt.io>").
|
||||||
|
|
||||||
|
-behaviour(gen_server2).
|
||||||
|
|
||||||
|
-include("emqttd.hrl").
|
||||||
|
|
||||||
|
-include("emqttd_internal.hrl").
|
||||||
|
|
||||||
|
%% API Exports
|
||||||
|
-export([start_link/3, add_subscriber/2, async_add_subscriber/2,
|
||||||
|
del_subscriber/2, async_del_subscriber/2]).
|
||||||
|
|
||||||
|
%% gen_server.
|
||||||
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||||
|
terminate/2, code_change/3]).
|
||||||
|
|
||||||
|
-record(state, {pool, id, env}).
|
||||||
|
|
||||||
|
-spec(start_link(atom(), pos_integer(), [tuple()]) -> {ok, pid()} | ignore | {error, any()}).
|
||||||
|
start_link(Pool, Id, Env) ->
|
||||||
|
gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id, Env], []).
|
||||||
|
|
||||||
|
-spec(add_subscriber(binary(), emqttd:subscriber()) -> ok).
|
||||||
|
add_subscriber(Topic, Subscriber) ->
|
||||||
|
gen_server2:call(pick(Topic), {add_subscriber, Topic, Subscriber}, infinity).
|
||||||
|
|
||||||
|
-spec(async_add_subscriber(binary(), emqttd:subscriber()) -> ok).
|
||||||
|
async_add_subscriber(Topic, Subscriber) ->
|
||||||
|
gen_server2:cast(pick(Topic), {add_subscriber, Topic, Subscriber}).
|
||||||
|
|
||||||
|
-spec(del_subscriber(binary(), emqttd:subscriber()) -> ok).
|
||||||
|
del_subscriber(Topic, Subscriber) ->
|
||||||
|
gen_server2:call(pick(Topic), {del_subscriber, Topic, Subscriber}, infinity).
|
||||||
|
|
||||||
|
-spec(async_del_subscriber(binary(), emqttd:subscriber()) -> ok).
|
||||||
|
async_del_subscriber(Topic, Subscriber) ->
|
||||||
|
gen_server2:cast(pick(Topic), {del_subscriber, Topic, Subscriber}).
|
||||||
|
|
||||||
|
pick(Topic) -> gproc_pool:pick_worker(dispatcher, Topic).
|
||||||
|
|
||||||
|
init([Pool, Id, Env]) ->
|
||||||
|
?GPROC_POOL(join, Pool, Id),
|
||||||
|
{ok, #state{pool = Pool, id = Id, env = Env}}.
|
||||||
|
|
||||||
|
handle_call({add_subscriber, Topic, Subscriber}, _From, State) ->
|
||||||
|
add_subscriber_(Topic, Subscriber),
|
||||||
|
{reply, ok, State};
|
||||||
|
|
||||||
|
handle_call({del_subscriber, Topic, Subscriber}, _From, State) ->
|
||||||
|
del_subscriber_(Topic, Subscriber),
|
||||||
|
{reply, ok, State};
|
||||||
|
|
||||||
|
handle_call(Req, _From, State) ->
|
||||||
|
?UNEXPECTED_REQ(Req, State).
|
||||||
|
|
||||||
|
handle_cast({add_subscriber, Topic, Subscriber}, State) ->
|
||||||
|
add_subscriber_(Topic, Subscriber),
|
||||||
|
{reply, ok, State};
|
||||||
|
|
||||||
|
handle_cast({del_subscriber, Topic, Subscriber}, State) ->
|
||||||
|
del_subscriber_(Topic, Subscriber),
|
||||||
|
{reply, ok, State};
|
||||||
|
|
||||||
|
handle_cast(Msg, State) ->
|
||||||
|
?UNEXPECTED_MSG(Msg, State).
|
||||||
|
|
||||||
|
handle_info(Info, State) ->
|
||||||
|
?UNEXPECTED_INFO(Info, State).
|
||||||
|
|
||||||
|
terminate(_Reason, #state{pool = Pool, id = Id}) ->
|
||||||
|
?GPROC_POOL(leave, Pool, Id).
|
||||||
|
|
||||||
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
|
{ok, State}.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Internel Functions
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
add_subscriber_(Topic, Subscriber) ->
|
||||||
|
case ets:member(subscriber, Topic) of
|
||||||
|
false -> emqttd_router:add_route(Topic, node());
|
||||||
|
true -> ok
|
||||||
|
end,
|
||||||
|
ets:insert(subscriber, {Topic, Subscriber}).
|
||||||
|
|
||||||
|
del_subscriber_(Topic, Subscriber) ->
|
||||||
|
ets:delete_object(subscriber, {Topic, Subscriber}),
|
||||||
|
case ets:member(subscriber, Topic) of
|
||||||
|
false -> emqttd_router:del_route(Topic, node());
|
||||||
|
true -> ok
|
||||||
|
end.
|
||||||
|
|
Loading…
Reference in New Issue