diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index cdf5cba31..704dbe663 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -26,8 +26,8 @@ -include("emqttd_internal.hrl"). -%% Init And Start --export([init_tabs/0, start_link/3]). +%% Start +-export([start_link/3]). %% PubSub API. -export([subscribe/1, subscribe/2, subscribe/3, publish/2, @@ -54,40 +54,7 @@ -define(PUBSUB, ?MODULE). --define(is_local(Options), lists:member(local, Options)). - --define(CONCURRENCY_OPTS, [{read_concurrency, true}, {write_concurrency, true}]). - -%%-------------------------------------------------------------------- -%% Init ETS Tables -%%-------------------------------------------------------------------- - -init_tabs() -> - %% Create ETS Tabs - lists:foreach(fun create_tab/1, [subscriber, subscription, subproperty]). - -create_tab(subscriber) -> - %% Subscriber: Topic -> Sub1, {Share, Sub2}, {Share, Sub3}, ..., SubN - %% duplicate_bag: o(1) insert - ensure_tab(subscriber, [public, named_table, duplicate_bag | ?CONCURRENCY_OPTS]); - -create_tab(subscription) -> - %% Subscription: Sub -> Topic1, {Share, Topic2}, {Share, Topic3}, ..., TopicN - %% bag: o(n) insert - ensure_tab(subscription, [public, named_table, bag | ?CONCURRENCY_OPTS]); - -create_tab(subproperty) -> - %% Subproperty: {Topic, Sub} -> [local, {qos, 1}, {share, <<"share">>}] - ensure_tab(subproperty, [public, named_table, ordered_set | ?CONCURRENCY_OPTS]). - -ensure_tab(Tab, Opts) -> - case ets:info(Tab, name) of undefined -> ets:new(Tab, Opts); _ -> ok end. - -%%-------------------------------------------------------------------- -%% Start PubSub -%%-------------------------------------------------------------------- - -%% @doc Start one pubsub +%% @doc Start a pubsub server -spec(start_link(atom(), pos_integer(), [tuple()]) -> {ok, pid()} | ignore | {error, any()}). start_link(Pool, Id, Env) -> gen_server2:start_link({local, ?PROC_NAME(?PUBSUB, Id)}, ?MODULE, [Pool, Id, Env], []). @@ -110,7 +77,7 @@ subscribe(Topic, Subscriber) when is_binary(Topic) -> subscribe(Topic, Subscriber, Options) when is_binary(Topic) -> call(pick(Subscriber), {subscribe, Topic, Subscriber, Options}). -%% @doc Subscribe a Topic Asynchronously +%% @doc Subscribe a Topic Asynchronously -spec(async_subscribe(binary()) -> ok). async_subscribe(Topic) when is_binary(Topic) -> async_subscribe(Topic, self()). @@ -132,6 +99,7 @@ publish(Topic, Msg) when is_binary(Topic) -> route([#mqtt_route{topic = To, node = Node}], Delivery = #mqtt_delivery{flows = Flows}) when Node =:= node() -> dispatch(To, Delivery#mqtt_delivery{flows = [{route, Node, To} | Flows]}); + %% Forward to other nodes route([#mqtt_route{topic = To, node = Node}], Delivery = #mqtt_delivery{flows = Flows}) -> forward(Node, To, Delivery#mqtt_delivery{flows = [{route, Node, To}|Flows]}); @@ -153,9 +121,9 @@ dispatch(Topic, Delivery = #mqtt_delivery{message = Msg, flows = Flows}) -> case subscribers(Topic) of [] -> dropped(Topic), {ok, Delivery}; - [Sub] -> + [Sub] -> %% optimize? dispatch(Sub, Topic, Msg), - {ok, Delivery#mqtt_delivery{flows = [{dispatch, Topic, 1}|Flows]}}; + {ok, Delivery#mqtt_delivery{flows = [{dispatch, Topic, 1} | Flows]}}; Subscribers -> Flows1 = [{dispatch, Topic, length(Subscribers)} | Flows], lists:foreach(fun(Sub) -> dispatch(Sub, Topic, Msg) end, Subscribers), @@ -223,8 +191,8 @@ call(PubSub, Req) when is_pid(PubSub) -> cast(PubSub, Msg) when is_pid(PubSub) -> gen_server2:cast(PubSub, Msg). -pick(Topic) -> - gproc_pool:pick_worker(pubsub, Topic). +pick(Subscriber) -> + gproc_pool:pick_worker(pubsub, Subscriber). %%-------------------------------------------------------------------- %% gen_server Callbacks @@ -235,13 +203,13 @@ init([Pool, Id, Env]) -> {ok, #state{pool = Pool, id = Id, env = Env, submon = emqttd_pmon:new()}}. handle_call({subscribe, Topic, Subscriber, Options}, _From, State) -> - case do_subscribe_(Topic, Subscriber, Options, State) of + case do_subscribe(Topic, Subscriber, Options, State) of {ok, NewState} -> {reply, ok, setstats(NewState)}; {error, Error} -> {reply, {error, Error}, State} end; handle_call({unsubscribe, Topic, Subscriber}, _From, State) -> - case do_unsubscribe_(Topic, Subscriber, State) of + case do_unsubscribe(Topic, Subscriber, State) of {ok, NewState} -> {reply, ok, setstats(NewState), hibernate}; {error, Error} -> {reply, {error, Error}, State} end; @@ -261,13 +229,13 @@ handle_call(Req, _From, State) -> ?UNEXPECTED_REQ(Req, State). handle_cast({subscribe, Topic, Subscriber, Options}, State) -> - case do_subscribe_(Topic, Subscriber, Options, State) of + case do_subscribe(Topic, Subscriber, Options, State) of {ok, NewState} -> {noreply, setstats(NewState)}; {error, _Error} -> {noreply, State} end; handle_cast({unsubscribe, Topic, Subscriber}, State) -> - case do_unsubscribe_(Topic, Subscriber, State) of + case do_unsubscribe(Topic, Subscriber, State) of {ok, NewState} -> {noreply, setstats(NewState), hibernate}; {error, _Error} -> {noreply, State} end; @@ -277,7 +245,7 @@ handle_cast(Msg, State) -> handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{submon = PM}) -> lists:foreach(fun({_, Topic}) -> - subscriber_down_(DownPid, Topic) + subscriber_down(DownPid, Topic) end, ets:lookup(subscription, DownPid)), ets:delete(subscription, DownPid), {noreply, setstats(State#state{submon = PM:erase(DownPid)}), hibernate}; @@ -295,35 +263,24 @@ code_change(_OldVsn, State, _Extra) -> %% Internal Functions %%-------------------------------------------------------------------- -do_subscribe_(Topic, Subscriber, Options, State) -> +do_subscribe(Topic, Subscriber, Options, State) -> case ets:lookup(subproperty, {Topic, Subscriber}) of [] -> - do_subscribe2_(Topic, Subscriber, Options), + add_subscription(Subscriber, Topic), + emqttd_dispatcher:async_add_subscriber(Topic, Subscriber), ets:insert(subproperty, {{Topic, Subscriber}, Options}), {ok, monitor_subpid(Subscriber, State)}; [_] -> {error, {already_subscribed, Topic}} end. -do_subscribe2_(Topic, Subscriber, _Options) -> - add_subscription_(Subscriber, Topic), - add_subscriber_(Topic, Subscriber). - -add_subscription_(Subscriber, Topic) -> +add_subscription(Subscriber, Topic) -> ets:insert(subscription, {Subscriber, Topic}). -add_subscriber_(Topic, Subscriber) -> - %%TODO: LOCK here... - case ets:member(subscriber, Topic) of - false -> emqttd_router:add_route(Topic, node()); - true -> ok - end, - ets:insert(subscriber, {Topic, Subscriber}). - -do_unsubscribe_(Topic, Subscriber, State) -> +do_unsubscribe(Topic, Subscriber, State) -> case ets:lookup(subproperty, {Topic, Subscriber}) of [_] -> - del_subscriber_(Topic, Subscriber), + emqttd_dispatcher:async_del_subscriber(Topic, Subscriber), del_subscription(Subscriber, Topic), ets:delete(subproperty, {Topic, Subscriber}), {ok, case ets:member(subscription, Subscriber) of @@ -337,18 +294,10 @@ do_unsubscribe_(Topic, Subscriber, State) -> del_subscription(Subscriber, Topic) -> ets:delete_object(subscription, {Subscriber, Topic}). -del_subscriber_(Topic, Subscriber) -> - ets:delete_object(subscriber, {Topic, Subscriber}), - %%TODO: LOCK TOPIC - case ets:member(subscriber, Topic) of - false -> emqttd_router:del_route(Topic, node()); - true -> ok - end. - -subscriber_down_(DownPid, Topic) -> +subscriber_down(DownPid, Topic) -> case ets:lookup(subproperty, {Topic, DownPid}) of - [] -> del_subscriber_(Topic, DownPid); %%TODO: warning? - [_] -> del_subscriber_(Topic, DownPid), + [] -> emqttd_dispatcher:async_del_subscriber(Topic, DownPid); %% warning??? + [_] -> emqttd_dispatcher:async_del_subscriber(Topic, DownPid), ets:delete(subproperty, {Topic, DownPid}) end. @@ -363,11 +312,7 @@ demonitor_subpid(_SubPid, State) -> State. setstats(State) when is_record(State, state) -> - setstats(subscriber), setstats(subscription), State; - -setstats(subscriber) -> - emqttd_stats:setstats('subscribers/count', 'subscribers/max', ets:info(subscriber, size)); - -setstats(subscription) -> - emqttd_stats:setstats('subscriptions/count', 'subscriptions/max', ets:info(subscription, size)). + emqttd_stats:setstats('subscribers/count', 'subscribers/max', ets:info(subscriber, size)), + emqttd_stats:setstats('subscriptions/count', 'subscriptions/max', ets:info(subscription, size)), + State. diff --git a/src/emqttd_pubsub_sup.erl b/src/emqttd_pubsub_sup.erl index 414868fbf..845aea56e 100644 --- a/src/emqttd_pubsub_sup.erl +++ b/src/emqttd_pubsub_sup.erl @@ -25,6 +25,8 @@ %% Supervisor callbacks -export([init/1]). +-define(CONCURRENCY_OPTS, [{read_concurrency, true}, {write_concurrency, true}]). + start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, [emqttd_conf:pubsub()]). @@ -32,7 +34,11 @@ pubsub_pool() -> hd([Pid || {pubsub_pool, Pid, _, _} <- supervisor:which_children(?MODULE)]). init([Env]) -> - {ok, PubSub} = emqttd:conf(pubsub_adapter), PubSub:init_tabs(), + %% Create ETS Tables + [create_tab(Tab) || Tab <- [subscriber, subscription, subproperty]], + + %% PubSub Pool + {ok, PubSub} = emqttd:conf(pubsub_adapter), PoolArgs = [pubsub, hash, pool_size(Env), {PubSub, start_link, [Env]}], PoolSup = emqttd_pool_sup:spec(pubsub_pool, PoolArgs), {ok, { {one_for_all, 10, 3600}, [PoolSup]} }. @@ -41,3 +47,24 @@ pool_size(Env) -> Schedulers = erlang:system_info(schedulers), proplists:get_value(pool_size, Env, Schedulers). +%%-------------------------------------------------------------------- +%% Create PubSub Tables +%%-------------------------------------------------------------------- + +create_tab(subscriber) -> + %% Subscriber: Topic -> Sub1, Sub2, Sub3, ..., SubN + %% duplicate_bag: o(1) insert + ensure_tab(subscriber, [public, named_table, duplicate_bag | ?CONCURRENCY_OPTS]); + +create_tab(subscription) -> + %% Subscription: Sub -> Topic1, Topic2, Topic3, ..., TopicN + %% bag: o(n) insert + ensure_tab(subscription, [public, named_table, bag | ?CONCURRENCY_OPTS]); + +create_tab(subproperty) -> + %% Subproperty: {Topic, Sub} -> [{qos, 1}] + ensure_tab(subproperty, [public, named_table, ordered_set | ?CONCURRENCY_OPTS]). + +ensure_tab(Tab, Opts) -> + case ets:info(Tab, name) of undefined -> ets:new(Tab, Opts); _ -> ok end. + diff --git a/src/emqttd_submgr.erl b/src/emqttd_submgr.erl new file mode 100644 index 000000000..1cff03ea9 --- /dev/null +++ b/src/emqttd_submgr.erl @@ -0,0 +1,111 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2012-2016 Feng Lee . +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqttd_submgr). + +-author("Feng Lee "). + +-behaviour(gen_server2). + +-include("emqttd.hrl"). + +-include("emqttd_internal.hrl"). + +%% API Exports +-export([start_link/3, add_subscriber/2, async_add_subscriber/2, + del_subscriber/2, async_del_subscriber/2]). + +%% gen_server. +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(state, {pool, id, env}). + +-spec(start_link(atom(), pos_integer(), [tuple()]) -> {ok, pid()} | ignore | {error, any()}). +start_link(Pool, Id, Env) -> + gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id, Env], []). + +-spec(add_subscriber(binary(), emqttd:subscriber()) -> ok). +add_subscriber(Topic, Subscriber) -> + gen_server2:call(pick(Topic), {add_subscriber, Topic, Subscriber}, infinity). + +-spec(async_add_subscriber(binary(), emqttd:subscriber()) -> ok). +async_add_subscriber(Topic, Subscriber) -> + gen_server2:cast(pick(Topic), {add_subscriber, Topic, Subscriber}). + +-spec(del_subscriber(binary(), emqttd:subscriber()) -> ok). +del_subscriber(Topic, Subscriber) -> + gen_server2:call(pick(Topic), {del_subscriber, Topic, Subscriber}, infinity). + +-spec(async_del_subscriber(binary(), emqttd:subscriber()) -> ok). +async_del_subscriber(Topic, Subscriber) -> + gen_server2:cast(pick(Topic), {del_subscriber, Topic, Subscriber}). + +pick(Topic) -> gproc_pool:pick_worker(dispatcher, Topic). + +init([Pool, Id, Env]) -> + ?GPROC_POOL(join, Pool, Id), + {ok, #state{pool = Pool, id = Id, env = Env}}. + +handle_call({add_subscriber, Topic, Subscriber}, _From, State) -> + add_subscriber_(Topic, Subscriber), + {reply, ok, State}; + +handle_call({del_subscriber, Topic, Subscriber}, _From, State) -> + del_subscriber_(Topic, Subscriber), + {reply, ok, State}; + +handle_call(Req, _From, State) -> + ?UNEXPECTED_REQ(Req, State). + +handle_cast({add_subscriber, Topic, Subscriber}, State) -> + add_subscriber_(Topic, Subscriber), + {reply, ok, State}; + +handle_cast({del_subscriber, Topic, Subscriber}, State) -> + del_subscriber_(Topic, Subscriber), + {reply, ok, State}; + +handle_cast(Msg, State) -> + ?UNEXPECTED_MSG(Msg, State). + +handle_info(Info, State) -> + ?UNEXPECTED_INFO(Info, State). + +terminate(_Reason, #state{pool = Pool, id = Id}) -> + ?GPROC_POOL(leave, Pool, Id). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% Internel Functions +%%-------------------------------------------------------------------- + +add_subscriber_(Topic, Subscriber) -> + case ets:member(subscriber, Topic) of + false -> emqttd_router:add_route(Topic, node()); + true -> ok + end, + ets:insert(subscriber, {Topic, Subscriber}). + +del_subscriber_(Topic, Subscriber) -> + ets:delete_object(subscriber, {Topic, Subscriber}), + case ets:member(subscriber, Topic) of + false -> emqttd_router:del_route(Topic, node()); + true -> ok + end. +