From 33472a7f6e9c882876d68270c8172d32f9a0b26a Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 10 Aug 2016 12:11:52 +0800 Subject: [PATCH] pubsub and dispatcher --- src/emqttd.erl | 4 +- src/emqttd_pubsub.erl | 49 +++-------------- src/emqttd_pubsub_sup.erl | 23 ++++++-- src/emqttd_submgr.erl | 111 -------------------------------------- 4 files changed, 29 insertions(+), 158 deletions(-) delete mode 100644 src/emqttd_submgr.erl diff --git a/src/emqttd.erl b/src/emqttd.erl index 6914d7993..28d7441e2 100644 --- a/src/emqttd.erl +++ b/src/emqttd.erl @@ -124,11 +124,11 @@ unsubscribe(Topic, Subscriber) -> with_pubsub(fun(PubSub) -> PubSub:unsubscribe(iolist_to_binary(Topic), Subscriber) end). -spec(topics() -> [binary()]). -topics() -> with_pubsub(fun(PubSub) -> PubSub:topics() end). +topics() -> emqttd_router:topics(). -spec(subscribers(iodata()) -> list(subscriber())). subscribers(Topic) -> - with_pubsub(fun(PubSub) -> PubSub:subscribers(iolist_to_binary(Topic)) end). + emqttd_dispatcher:subscribers(Topic). -spec(subscriptions(subscriber()) -> [{binary(), suboption()}]). subscriptions(Subscriber) -> diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index 704dbe663..75e2e58d5 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -38,10 +38,7 @@ async_unsubscribe/1, async_unsubscribe/2]). %% Management API. --export([setqos/3, topics/0, subscribers/1, is_subscribed/2, subscriptions/1]). - -%% Route API --export([forward/3, dispatch/2]). +-export([setqos/3, is_subscribed/2, subscriptions/1]). %% Debug API -export([dump/0]). @@ -98,7 +95,7 @@ publish(Topic, Msg) when is_binary(Topic) -> %% Dispatch on the local node route([#mqtt_route{topic = To, node = Node}], Delivery = #mqtt_delivery{flows = Flows}) when Node =:= node() -> - dispatch(To, Delivery#mqtt_delivery{flows = [{route, Node, To} | Flows]}); + emqttd_dispatch:dispatch(To, Delivery#mqtt_delivery{flows = [{route, Node, To} | Flows]}); %% Forward to other nodes route([#mqtt_route{topic = To, node = Node}], Delivery = #mqtt_delivery{flows = Flows}) -> @@ -113,32 +110,7 @@ delivery(Msg) -> #mqtt_delivery{message = Msg, flows = []}. %% @doc Forward message to another node... forward(Node, To, Delivery) -> - rpc:cast(Node, ?PUBSUB, dispatch, [To, Delivery]), {ok, Delivery}. - -%% @doc Dispatch Message to Subscribers --spec(dispatch(binary(), mqtt_delivery()) -> mqtt_delivery()). -dispatch(Topic, Delivery = #mqtt_delivery{message = Msg, flows = Flows}) -> - case subscribers(Topic) of - [] -> - dropped(Topic), {ok, Delivery}; - [Sub] -> %% optimize? - dispatch(Sub, Topic, Msg), - {ok, Delivery#mqtt_delivery{flows = [{dispatch, Topic, 1} | Flows]}}; - Subscribers -> - Flows1 = [{dispatch, Topic, length(Subscribers)} | Flows], - lists:foreach(fun(Sub) -> dispatch(Sub, Topic, Msg) end, Subscribers), - {ok, Delivery#mqtt_delivery{flows = Flows1}} - end. - -dispatch(Pid, Topic, Msg) when is_pid(Pid) -> - Pid ! {dispatch, Topic, Msg}; -dispatch(SubId, Topic, Msg) when is_binary(SubId) -> - emqttd_sm:dispatch(SubId, Topic, Msg). - -topics() -> emqttd_router:topics(). - -subscribers(Topic) -> - try ets:lookup_element(subscriber, Topic, 2) catch error:badarg -> [] end. + rpc:cast(Node, emqttd_dispatch, dispatch, [To, Delivery]), {ok, Delivery}. subscriptions(Subscriber) -> lists:map(fun({_, Topic}) -> @@ -159,13 +131,6 @@ dump() -> {subscription, ets:tab2list(subscription)}, {subproperty, ets:tab2list(subproperty)}]. -%% @private -%% @doc Ingore $SYS Messages. -dropped(<<"$SYS/", _/binary>>) -> - ok; -dropped(_Topic) -> - emqttd_metrics:inc('messages/dropped'). - %% @doc Unsubscribe -spec(unsubscribe(binary()) -> ok | emqttd:pubsub_error()). unsubscribe(Topic) when is_binary(Topic) -> @@ -267,7 +232,7 @@ do_subscribe(Topic, Subscriber, Options, State) -> case ets:lookup(subproperty, {Topic, Subscriber}) of [] -> add_subscription(Subscriber, Topic), - emqttd_dispatcher:async_add_subscriber(Topic, Subscriber), + emqttd_dispatch:async_subscribe(Topic, Subscriber), ets:insert(subproperty, {{Topic, Subscriber}, Options}), {ok, monitor_subpid(Subscriber, State)}; [_] -> @@ -280,7 +245,7 @@ add_subscription(Subscriber, Topic) -> do_unsubscribe(Topic, Subscriber, State) -> case ets:lookup(subproperty, {Topic, Subscriber}) of [_] -> - emqttd_dispatcher:async_del_subscriber(Topic, Subscriber), + emqttd_dispatch:async_subscribe(Topic, Subscriber), del_subscription(Subscriber, Topic), ets:delete(subproperty, {Topic, Subscriber}), {ok, case ets:member(subscription, Subscriber) of @@ -296,8 +261,8 @@ del_subscription(Subscriber, Topic) -> subscriber_down(DownPid, Topic) -> case ets:lookup(subproperty, {Topic, DownPid}) of - [] -> emqttd_dispatcher:async_del_subscriber(Topic, DownPid); %% warning??? - [_] -> emqttd_dispatcher:async_del_subscriber(Topic, DownPid), + [] -> emqttd_dispatch:async_subscribe(Topic, DownPid); %% warning??? + [_] -> emqttd_dispatch:async_subscribe(Topic, DownPid), ets:delete(subproperty, {Topic, DownPid}) end. diff --git a/src/emqttd_pubsub_sup.erl b/src/emqttd_pubsub_sup.erl index 845aea56e..4663151ed 100644 --- a/src/emqttd_pubsub_sup.erl +++ b/src/emqttd_pubsub_sup.erl @@ -27,26 +27,43 @@ -define(CONCURRENCY_OPTS, [{read_concurrency, true}, {write_concurrency, true}]). +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- + start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, [emqttd_conf:pubsub()]). pubsub_pool() -> hd([Pid || {pubsub_pool, Pid, _, _} <- supervisor:which_children(?MODULE)]). +%%-------------------------------------------------------------------- +%% Supervisor callbacks +%%-------------------------------------------------------------------- + init([Env]) -> %% Create ETS Tables [create_tab(Tab) || Tab <- [subscriber, subscription, subproperty]], + %% Dispatcher Pool + DispatcherMFA = {emqttd_dispatcher, start_link, [Env]}, + DispatcherPool = pool_sup(dispatcher, Env, DispatcherMFA), + %% PubSub Pool {ok, PubSub} = emqttd:conf(pubsub_adapter), - PoolArgs = [pubsub, hash, pool_size(Env), {PubSub, start_link, [Env]}], - PoolSup = emqttd_pool_sup:spec(pubsub_pool, PoolArgs), - {ok, { {one_for_all, 10, 3600}, [PoolSup]} }. + PubSubMFA = {PubSub, start_link, [Env]}, + PubSubPool = pool_sup(pubsub, Env, PubSubMFA), + + {ok, { {one_for_all, 10, 3600}, [DispatcherPool, PubSubPool]} }. pool_size(Env) -> Schedulers = erlang:system_info(schedulers), proplists:get_value(pool_size, Env, Schedulers). +pool_sup(Name, Env, MFA) -> + Pool = list_to_atom(atom_to_list(Name) ++ "_pool"), + emqttd_pool_sup:spec(Pool, [Name, hash, pool_size(Env), MFA]). + %%-------------------------------------------------------------------- %% Create PubSub Tables %%-------------------------------------------------------------------- diff --git a/src/emqttd_submgr.erl b/src/emqttd_submgr.erl deleted file mode 100644 index 1cff03ea9..000000000 --- a/src/emqttd_submgr.erl +++ /dev/null @@ -1,111 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2012-2016 Feng Lee . -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqttd_submgr). - --author("Feng Lee "). - --behaviour(gen_server2). - --include("emqttd.hrl"). - --include("emqttd_internal.hrl"). - -%% API Exports --export([start_link/3, add_subscriber/2, async_add_subscriber/2, - del_subscriber/2, async_del_subscriber/2]). - -%% gen_server. --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --record(state, {pool, id, env}). - --spec(start_link(atom(), pos_integer(), [tuple()]) -> {ok, pid()} | ignore | {error, any()}). -start_link(Pool, Id, Env) -> - gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id, Env], []). - --spec(add_subscriber(binary(), emqttd:subscriber()) -> ok). -add_subscriber(Topic, Subscriber) -> - gen_server2:call(pick(Topic), {add_subscriber, Topic, Subscriber}, infinity). - --spec(async_add_subscriber(binary(), emqttd:subscriber()) -> ok). -async_add_subscriber(Topic, Subscriber) -> - gen_server2:cast(pick(Topic), {add_subscriber, Topic, Subscriber}). - --spec(del_subscriber(binary(), emqttd:subscriber()) -> ok). -del_subscriber(Topic, Subscriber) -> - gen_server2:call(pick(Topic), {del_subscriber, Topic, Subscriber}, infinity). - --spec(async_del_subscriber(binary(), emqttd:subscriber()) -> ok). -async_del_subscriber(Topic, Subscriber) -> - gen_server2:cast(pick(Topic), {del_subscriber, Topic, Subscriber}). - -pick(Topic) -> gproc_pool:pick_worker(dispatcher, Topic). - -init([Pool, Id, Env]) -> - ?GPROC_POOL(join, Pool, Id), - {ok, #state{pool = Pool, id = Id, env = Env}}. - -handle_call({add_subscriber, Topic, Subscriber}, _From, State) -> - add_subscriber_(Topic, Subscriber), - {reply, ok, State}; - -handle_call({del_subscriber, Topic, Subscriber}, _From, State) -> - del_subscriber_(Topic, Subscriber), - {reply, ok, State}; - -handle_call(Req, _From, State) -> - ?UNEXPECTED_REQ(Req, State). - -handle_cast({add_subscriber, Topic, Subscriber}, State) -> - add_subscriber_(Topic, Subscriber), - {reply, ok, State}; - -handle_cast({del_subscriber, Topic, Subscriber}, State) -> - del_subscriber_(Topic, Subscriber), - {reply, ok, State}; - -handle_cast(Msg, State) -> - ?UNEXPECTED_MSG(Msg, State). - -handle_info(Info, State) -> - ?UNEXPECTED_INFO(Info, State). - -terminate(_Reason, #state{pool = Pool, id = Id}) -> - ?GPROC_POOL(leave, Pool, Id). - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%-------------------------------------------------------------------- -%% Internel Functions -%%-------------------------------------------------------------------- - -add_subscriber_(Topic, Subscriber) -> - case ets:member(subscriber, Topic) of - false -> emqttd_router:add_route(Topic, node()); - true -> ok - end, - ets:insert(subscriber, {Topic, Subscriber}). - -del_subscriber_(Topic, Subscriber) -> - ets:delete_object(subscriber, {Topic, Subscriber}), - case ets:member(subscriber, Topic) of - false -> emqttd_router:del_route(Topic, node()); - true -> ok - end. -