pubsub and dispatcher

This commit is contained in:
Feng Lee 2016-08-10 12:11:52 +08:00
parent af6c779631
commit 33472a7f6e
4 changed files with 29 additions and 158 deletions

View File

@ -124,11 +124,11 @@ unsubscribe(Topic, Subscriber) ->
with_pubsub(fun(PubSub) -> PubSub:unsubscribe(iolist_to_binary(Topic), Subscriber) end).
-spec(topics() -> [binary()]).
topics() -> with_pubsub(fun(PubSub) -> PubSub:topics() end).
topics() -> emqttd_router:topics().
-spec(subscribers(iodata()) -> list(subscriber())).
subscribers(Topic) ->
with_pubsub(fun(PubSub) -> PubSub:subscribers(iolist_to_binary(Topic)) end).
emqttd_dispatcher:subscribers(Topic).
-spec(subscriptions(subscriber()) -> [{binary(), suboption()}]).
subscriptions(Subscriber) ->

View File

@ -38,10 +38,7 @@
async_unsubscribe/1, async_unsubscribe/2]).
%% Management API.
-export([setqos/3, topics/0, subscribers/1, is_subscribed/2, subscriptions/1]).
%% Route API
-export([forward/3, dispatch/2]).
-export([setqos/3, is_subscribed/2, subscriptions/1]).
%% Debug API
-export([dump/0]).
@ -98,7 +95,7 @@ publish(Topic, Msg) when is_binary(Topic) ->
%% Dispatch on the local node
route([#mqtt_route{topic = To, node = Node}],
Delivery = #mqtt_delivery{flows = Flows}) when Node =:= node() ->
dispatch(To, Delivery#mqtt_delivery{flows = [{route, Node, To} | Flows]});
emqttd_dispatch:dispatch(To, Delivery#mqtt_delivery{flows = [{route, Node, To} | Flows]});
%% Forward to other nodes
route([#mqtt_route{topic = To, node = Node}], Delivery = #mqtt_delivery{flows = Flows}) ->
@ -113,32 +110,7 @@ delivery(Msg) -> #mqtt_delivery{message = Msg, flows = []}.
%% @doc Forward message to another node...
forward(Node, To, Delivery) ->
rpc:cast(Node, ?PUBSUB, dispatch, [To, Delivery]), {ok, Delivery}.
%% @doc Dispatch Message to Subscribers
-spec(dispatch(binary(), mqtt_delivery()) -> mqtt_delivery()).
dispatch(Topic, Delivery = #mqtt_delivery{message = Msg, flows = Flows}) ->
case subscribers(Topic) of
[] ->
dropped(Topic), {ok, Delivery};
[Sub] -> %% optimize?
dispatch(Sub, Topic, Msg),
{ok, Delivery#mqtt_delivery{flows = [{dispatch, Topic, 1} | Flows]}};
Subscribers ->
Flows1 = [{dispatch, Topic, length(Subscribers)} | Flows],
lists:foreach(fun(Sub) -> dispatch(Sub, Topic, Msg) end, Subscribers),
{ok, Delivery#mqtt_delivery{flows = Flows1}}
end.
dispatch(Pid, Topic, Msg) when is_pid(Pid) ->
Pid ! {dispatch, Topic, Msg};
dispatch(SubId, Topic, Msg) when is_binary(SubId) ->
emqttd_sm:dispatch(SubId, Topic, Msg).
topics() -> emqttd_router:topics().
subscribers(Topic) ->
try ets:lookup_element(subscriber, Topic, 2) catch error:badarg -> [] end.
rpc:cast(Node, emqttd_dispatch, dispatch, [To, Delivery]), {ok, Delivery}.
subscriptions(Subscriber) ->
lists:map(fun({_, Topic}) ->
@ -159,13 +131,6 @@ dump() ->
{subscription, ets:tab2list(subscription)},
{subproperty, ets:tab2list(subproperty)}].
%% @private
%% @doc Ingore $SYS Messages.
dropped(<<"$SYS/", _/binary>>) ->
ok;
dropped(_Topic) ->
emqttd_metrics:inc('messages/dropped').
%% @doc Unsubscribe
-spec(unsubscribe(binary()) -> ok | emqttd:pubsub_error()).
unsubscribe(Topic) when is_binary(Topic) ->
@ -267,7 +232,7 @@ do_subscribe(Topic, Subscriber, Options, State) ->
case ets:lookup(subproperty, {Topic, Subscriber}) of
[] ->
add_subscription(Subscriber, Topic),
emqttd_dispatcher:async_add_subscriber(Topic, Subscriber),
emqttd_dispatch:async_subscribe(Topic, Subscriber),
ets:insert(subproperty, {{Topic, Subscriber}, Options}),
{ok, monitor_subpid(Subscriber, State)};
[_] ->
@ -280,7 +245,7 @@ add_subscription(Subscriber, Topic) ->
do_unsubscribe(Topic, Subscriber, State) ->
case ets:lookup(subproperty, {Topic, Subscriber}) of
[_] ->
emqttd_dispatcher:async_del_subscriber(Topic, Subscriber),
emqttd_dispatch:async_subscribe(Topic, Subscriber),
del_subscription(Subscriber, Topic),
ets:delete(subproperty, {Topic, Subscriber}),
{ok, case ets:member(subscription, Subscriber) of
@ -296,8 +261,8 @@ del_subscription(Subscriber, Topic) ->
subscriber_down(DownPid, Topic) ->
case ets:lookup(subproperty, {Topic, DownPid}) of
[] -> emqttd_dispatcher:async_del_subscriber(Topic, DownPid); %% warning???
[_] -> emqttd_dispatcher:async_del_subscriber(Topic, DownPid),
[] -> emqttd_dispatch:async_subscribe(Topic, DownPid); %% warning???
[_] -> emqttd_dispatch:async_subscribe(Topic, DownPid),
ets:delete(subproperty, {Topic, DownPid})
end.

View File

@ -27,26 +27,43 @@
-define(CONCURRENCY_OPTS, [{read_concurrency, true}, {write_concurrency, true}]).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, [emqttd_conf:pubsub()]).
pubsub_pool() ->
hd([Pid || {pubsub_pool, Pid, _, _} <- supervisor:which_children(?MODULE)]).
%%--------------------------------------------------------------------
%% Supervisor callbacks
%%--------------------------------------------------------------------
init([Env]) ->
%% Create ETS Tables
[create_tab(Tab) || Tab <- [subscriber, subscription, subproperty]],
%% Dispatcher Pool
DispatcherMFA = {emqttd_dispatcher, start_link, [Env]},
DispatcherPool = pool_sup(dispatcher, Env, DispatcherMFA),
%% PubSub Pool
{ok, PubSub} = emqttd:conf(pubsub_adapter),
PoolArgs = [pubsub, hash, pool_size(Env), {PubSub, start_link, [Env]}],
PoolSup = emqttd_pool_sup:spec(pubsub_pool, PoolArgs),
{ok, { {one_for_all, 10, 3600}, [PoolSup]} }.
PubSubMFA = {PubSub, start_link, [Env]},
PubSubPool = pool_sup(pubsub, Env, PubSubMFA),
{ok, { {one_for_all, 10, 3600}, [DispatcherPool, PubSubPool]} }.
pool_size(Env) ->
Schedulers = erlang:system_info(schedulers),
proplists:get_value(pool_size, Env, Schedulers).
pool_sup(Name, Env, MFA) ->
Pool = list_to_atom(atom_to_list(Name) ++ "_pool"),
emqttd_pool_sup:spec(Pool, [Name, hash, pool_size(Env), MFA]).
%%--------------------------------------------------------------------
%% Create PubSub Tables
%%--------------------------------------------------------------------

View File

@ -1,111 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqttd_submgr).
-author("Feng Lee <feng@emqtt.io>").
-behaviour(gen_server2).
-include("emqttd.hrl").
-include("emqttd_internal.hrl").
%% API Exports
-export([start_link/3, add_subscriber/2, async_add_subscriber/2,
del_subscriber/2, async_del_subscriber/2]).
%% gen_server.
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(state, {pool, id, env}).
-spec(start_link(atom(), pos_integer(), [tuple()]) -> {ok, pid()} | ignore | {error, any()}).
start_link(Pool, Id, Env) ->
gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id, Env], []).
-spec(add_subscriber(binary(), emqttd:subscriber()) -> ok).
add_subscriber(Topic, Subscriber) ->
gen_server2:call(pick(Topic), {add_subscriber, Topic, Subscriber}, infinity).
-spec(async_add_subscriber(binary(), emqttd:subscriber()) -> ok).
async_add_subscriber(Topic, Subscriber) ->
gen_server2:cast(pick(Topic), {add_subscriber, Topic, Subscriber}).
-spec(del_subscriber(binary(), emqttd:subscriber()) -> ok).
del_subscriber(Topic, Subscriber) ->
gen_server2:call(pick(Topic), {del_subscriber, Topic, Subscriber}, infinity).
-spec(async_del_subscriber(binary(), emqttd:subscriber()) -> ok).
async_del_subscriber(Topic, Subscriber) ->
gen_server2:cast(pick(Topic), {del_subscriber, Topic, Subscriber}).
pick(Topic) -> gproc_pool:pick_worker(dispatcher, Topic).
init([Pool, Id, Env]) ->
?GPROC_POOL(join, Pool, Id),
{ok, #state{pool = Pool, id = Id, env = Env}}.
handle_call({add_subscriber, Topic, Subscriber}, _From, State) ->
add_subscriber_(Topic, Subscriber),
{reply, ok, State};
handle_call({del_subscriber, Topic, Subscriber}, _From, State) ->
del_subscriber_(Topic, Subscriber),
{reply, ok, State};
handle_call(Req, _From, State) ->
?UNEXPECTED_REQ(Req, State).
handle_cast({add_subscriber, Topic, Subscriber}, State) ->
add_subscriber_(Topic, Subscriber),
{reply, ok, State};
handle_cast({del_subscriber, Topic, Subscriber}, State) ->
del_subscriber_(Topic, Subscriber),
{reply, ok, State};
handle_cast(Msg, State) ->
?UNEXPECTED_MSG(Msg, State).
handle_info(Info, State) ->
?UNEXPECTED_INFO(Info, State).
terminate(_Reason, #state{pool = Pool, id = Id}) ->
?GPROC_POOL(leave, Pool, Id).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%% Internel Functions
%%--------------------------------------------------------------------
add_subscriber_(Topic, Subscriber) ->
case ets:member(subscriber, Topic) of
false -> emqttd_router:add_route(Topic, node());
true -> ok
end,
ets:insert(subscriber, {Topic, Subscriber}).
del_subscriber_(Topic, Subscriber) ->
ets:delete_object(subscriber, {Topic, Subscriber}),
case ets:member(subscriber, Topic) of
false -> emqttd_router:del_route(Topic, node());
true -> ok
end.