From 38daaa2f5ca22afe12ab400929c848580656131a Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 10 Aug 2016 21:29:55 +0800 Subject: [PATCH] server -> pubsub -> router --- include/emqttd.hrl | 9 +- src/emqttd.erl | 40 ++---- src/emqttd_dispatcher.erl | 150 -------------------- src/emqttd_pubsub.erl | 276 +++++++++++------------------------- src/emqttd_pubsub_sup.erl | 19 ++- src/emqttd_server.erl | 286 ++++++++++++++++++++++++++++++++++++++ src/emqttd_session.erl | 16 +-- src/emqttd_sm.erl | 4 +- 8 files changed, 397 insertions(+), 403 deletions(-) delete mode 100644 src/emqttd_dispatcher.erl create mode 100644 src/emqttd_server.erl diff --git a/include/emqttd.hrl b/include/emqttd.hrl index 6142407d1..edd403f9b 100644 --- a/include/emqttd.hrl +++ b/include/emqttd.hrl @@ -70,7 +70,7 @@ %% MQTT Subscription %%-------------------------------------------------------------------- -record(mqtt_subscription, { - subid :: binary() | atom(), + subid :: binary() | atom() | pid(), topic :: binary(), qos = 0 :: 0 | 1 | 2 }). @@ -119,10 +119,11 @@ %%-------------------------------------------------------------------- %% MQTT Session %%-------------------------------------------------------------------- + -record(mqtt_session, { - client_id :: binary(), - sess_pid :: pid(), - persistent :: boolean() + client_id :: binary(), + sess_pid :: pid(), + persistent :: boolean() }). -type(mqtt_session() :: #mqtt_session{}). diff --git a/src/emqttd.erl b/src/emqttd.erl index 28d7441e2..a741adc59 100644 --- a/src/emqttd.erl +++ b/src/emqttd.erl @@ -95,24 +95,12 @@ subscribe(Topic, Subscriber) -> -spec(subscribe(iodata(), subscriber(), [suboption()]) -> ok | pubsub_error()). subscribe(Topic, Subscriber, Options) -> - with_pubsub(fun(PubSub) -> PubSub:subscribe(iolist_to_binary(Topic), Subscriber, Options) end). + emqttd_server:subscribe(iolist_to_binary(Topic), Subscriber, Options). %% @doc Publish MQTT Message -spec(publish(mqtt_message()) -> {ok, mqtt_delivery()} | ignore). -publish(Msg = #mqtt_message{from = From}) -> - trace(publish, From, Msg), - case run_hooks('message.publish', [], Msg) of - {ok, Msg1 = #mqtt_message{topic = Topic}} -> - %% Retain message first. Don't create retained topic. - Msg2 = case emqttd_retainer:retain(Msg1) of - ok -> emqttd_message:unset_flag(Msg1); - ignore -> Msg1 - end, - with_pubsub(fun(PubSub) -> PubSub:publish(Topic, Msg2) end); - {stop, Msg1} -> - lager:warning("Stop publishing: ~s", [emqttd_message:format(Msg1)]), - ignore - end. +publish(Msg = #mqtt_message{topic = Topic}) -> + emqttd_server:publish(Topic, Msg). %% @doc Unsubscribe -spec(unsubscribe(iodata()) -> ok | pubsub_error()). @@ -121,22 +109,18 @@ unsubscribe(Topic) -> -spec(unsubscribe(iodata(), subscriber()) -> ok | pubsub_error()). unsubscribe(Topic, Subscriber) -> - with_pubsub(fun(PubSub) -> PubSub:unsubscribe(iolist_to_binary(Topic), Subscriber) end). + emqttd_server:unsubscribe(iolist_to_binary(Topic), Subscriber). -spec(topics() -> [binary()]). topics() -> emqttd_router:topics(). -spec(subscribers(iodata()) -> list(subscriber())). subscribers(Topic) -> - emqttd_dispatcher:subscribers(Topic). + emqttd_pubsub:subscribers(iolist_to_binary(Topic)). -spec(subscriptions(subscriber()) -> [{binary(), suboption()}]). subscriptions(Subscriber) -> - with_pubsub(fun(PubSub) -> PubSub:subscriptions(Subscriber) end). - -with_pubsub(Fun) -> {ok, PubSub} = conf(pubsub_adapter), Fun(PubSub). - -dump() -> with_pubsub(fun(PubSub) -> lists:append(PubSub:dump(), emqttd_router:dump()) end). + emqttd_server:get_subscriptions(Subscriber). %%-------------------------------------------------------------------- %% Hooks API @@ -158,15 +142,9 @@ unhook(Hook, Function) -> run_hooks(Hook, Args, Acc) -> emqttd_hook:run(Hook, Args, Acc). + %%-------------------------------------------------------------------- -%% Trace Functions +%% Debug %%-------------------------------------------------------------------- -trace(publish, From, _Msg) when is_atom(From) -> - %% Dont' trace '$SYS' publish - ignore; - -trace(publish, From, #mqtt_message{topic = Topic, payload = Payload}) -> - lager:info([{client, From}, {topic, Topic}], - "~s PUBLISH to ~s: ~p", [From, Topic, Payload]). - +dump() -> lists:append([emqttd_server:dump(), emqttd_router:dump()]). diff --git a/src/emqttd_dispatcher.erl b/src/emqttd_dispatcher.erl deleted file mode 100644 index 44e44219a..000000000 --- a/src/emqttd_dispatcher.erl +++ /dev/null @@ -1,150 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2012-2016 Feng Lee . -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqttd_dispatcher). - --author("Feng Lee "). - --behaviour(gen_server2). - --include("emqttd.hrl"). - --include("emqttd_internal.hrl"). - -%% API Exports --export([start_link/3, subscribe/2, unsubscribe/2, dispatch/2, - async_subscribe/2, async_unsubscribe/2]). - --export([subscribers/1]). - -%% gen_server. --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --record(state, {pool, id, env}). - --spec(start_link(atom(), pos_integer(), [tuple()]) -> {ok, pid()} | ignore | {error, any()}). -start_link(Pool, Id, Env) -> - gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id, Env], []). - --spec(subscribe(binary(), emqttd:subscriber()) -> ok). -subscribe(Topic, Subscriber) -> - call(pick(Topic), {subscribe, Topic, Subscriber}). - --spec(async_subscribe(binary(), emqttd:subscriber()) -> ok). -async_subscribe(Topic, Subscriber) -> - cast(pick(Topic), {subscribe, Topic, Subscriber}). - -%% @doc Dispatch Message to Subscribers --spec(dispatch(binary(), mqtt_delivery()) -> mqtt_delivery()). -dispatch(Topic, Delivery = #mqtt_delivery{message = Msg, flows = Flows}) -> - case subscribers(Topic) of - [] -> - dropped(Topic), {ok, Delivery}; - [Sub] -> %% optimize? - dispatch(Sub, Topic, Msg), - {ok, Delivery#mqtt_delivery{flows = [{dispatch, Topic, 1} | Flows]}}; - Subscribers -> - Flows1 = [{dispatch, Topic, length(Subscribers)} | Flows], - lists:foreach(fun(Sub) -> dispatch(Sub, Topic, Msg) end, Subscribers), - {ok, Delivery#mqtt_delivery{flows = Flows1}} - end. - -dispatch(Pid, Topic, Msg) when is_pid(Pid) -> - Pid ! {dispatch, Topic, Msg}; -dispatch(SubId, Topic, Msg) when is_binary(SubId) -> - emqttd_sm:dispatch(SubId, Topic, Msg). - -subscribers(Topic) -> - try ets:lookup_element(subscriber, Topic, 2) catch error:badarg -> [] end. - -%% @private -%% @doc Ingore $SYS Messages. -dropped(<<"$SYS/", _/binary>>) -> - ok; -dropped(_Topic) -> - emqttd_metrics:inc('messages/dropped'). - --spec(unsubscribe(binary(), emqttd:subscriber()) -> ok). -unsubscribe(Topic, Subscriber) -> - call(pick(Topic), {unsubscribe, Topic, Subscriber}). - --spec(async_unsubscribe(binary(), emqttd:subscriber()) -> ok). -async_unsubscribe(Topic, Subscriber) -> - cast(pick(Topic), {unsubscribe, Topic, Subscriber}). - -call(Server, Req) -> - gen_server2:call(Server, Req, infinity). - -cast(Server, Msg) -> - gen_server2:cast(Server, Msg). - -pick(Topic) -> - gproc_pool:pick_worker(dispatcher, Topic). - -init([Pool, Id, Env]) -> - ?GPROC_POOL(join, Pool, Id), - {ok, #state{pool = Pool, id = Id, env = Env}}. - -handle_call({subscribe, Topic, Subscriber}, _From, State) -> - add_subscriber_(Topic, Subscriber), - {reply, ok, State}; - -handle_call({unsubscribe, Topic, Subscriber}, _From, State) -> - del_subscriber_(Topic, Subscriber), - {reply, ok, State}; - -handle_call(Req, _From, State) -> - ?UNEXPECTED_REQ(Req, State). - -handle_cast({subscribe, Topic, Subscriber}, State) -> - add_subscriber_(Topic, Subscriber), - {noreply, State}; - -handle_cast({unsubscribe, Topic, Subscriber}, State) -> - del_subscriber_(Topic, Subscriber), - {noreply, State}; - -handle_cast(Msg, State) -> - ?UNEXPECTED_MSG(Msg, State). - -handle_info(Info, State) -> - ?UNEXPECTED_INFO(Info, State). - -terminate(_Reason, #state{pool = Pool, id = Id}) -> - ?GPROC_POOL(leave, Pool, Id). - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%-------------------------------------------------------------------- -%% Internel Functions -%%-------------------------------------------------------------------- - -add_subscriber_(Topic, Subscriber) -> - case ets:member(subscriber, Topic) of - false -> emqttd_router:add_route(Topic, node()); - true -> ok - end, - ets:insert(subscriber, {Topic, Subscriber}). - -del_subscriber_(Topic, Subscriber) -> - ets:delete_object(subscriber, {Topic, Subscriber}), - case ets:member(subscriber, Topic) of - false -> emqttd_router:del_route(Topic, node()); - true -> ok - end. - diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index 9c9524ee6..cd58d2b95 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -16,93 +16,46 @@ -module(emqttd_pubsub). --author("Feng Lee "). - -behaviour(gen_server2). -include("emqttd.hrl"). --include("emqttd_protocol.hrl"). - -include("emqttd_internal.hrl"). -%% Start --export([start_link/3]). +%% API Exports +-export([start_link/3, subscribe/2, unsubscribe/2, publish/2, + async_subscribe/2, async_unsubscribe/2]). -%% PubSub API. --export([subscribe/1, subscribe/2, subscribe/3, publish/2, - unsubscribe/1, unsubscribe/2]). - -%% Async PubSub API. --export([async_subscribe/1, async_subscribe/2, async_subscribe/3, - async_unsubscribe/1, async_unsubscribe/2]). - --export([subscriber_down/1]). - -%% Management API. --export([setqos/3, is_subscribed/2, subscriptions/1]). - -%% Debug API --export([dump/0]). +-export([subscribers/1]). %% gen_server. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {pool, id, env, submon :: emqttd_pmon:pmon()}). +-record(state, {pool, id, env}). -define(PUBSUB, ?MODULE). --define(Dispatcher, emqttd_dispatcher). - -%% @doc Start a pubsub server -spec(start_link(atom(), pos_integer(), [tuple()]) -> {ok, pid()} | ignore | {error, any()}). start_link(Pool, Id, Env) -> - gen_server2:start_link({local, ?PROC_NAME(?PUBSUB, Id)}, ?MODULE, [Pool, Id, Env], []). + gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id, Env], []). -%%-------------------------------------------------------------------- -%% PubSub API -%%-------------------------------------------------------------------- - -%% @doc Subscribe a Topic --spec(subscribe(binary()) -> ok | emqttd:pubsub_error()). -subscribe(Topic) when is_binary(Topic) -> - subscribe(Topic, self()). - --spec(subscribe(binary(), emqttd:subscriber()) -> ok | emqttd:pubsub_error()). -subscribe(Topic, Subscriber) when is_binary(Topic) -> - subscribe(Topic, Subscriber, []). - --spec(subscribe(binary(), emqttd:subscriber(), [emqttd:suboption()]) -> - ok | emqttd:pubsub_error()). -subscribe(Topic, Subscriber, Options) when is_binary(Topic) -> - call(pick(Subscriber), {subscribe, Topic, Subscriber, Options}). - -%% @doc Subscribe a Topic Asynchronously --spec(async_subscribe(binary()) -> ok). -async_subscribe(Topic) when is_binary(Topic) -> - async_subscribe(Topic, self()). +-spec(subscribe(binary(), emqttd:subscriber()) -> ok). +subscribe(Topic, Subscriber) -> + call(pick(Topic), {subscribe, Topic, Subscriber}). -spec(async_subscribe(binary(), emqttd:subscriber()) -> ok). -async_subscribe(Topic, Subscriber) when is_binary(Topic) -> - async_subscribe(Topic, Subscriber, []). +async_subscribe(Topic, Subscriber) -> + cast(pick(Topic), {subscribe, Topic, Subscriber}). --spec(async_subscribe(binary(), emqttd:subscriber(), [emqttd:suboption()]) -> ok). -async_subscribe(Topic, Subscriber, Options) when is_binary(Topic) -> - cast(pick(Subscriber), {subscribe, Topic, Subscriber, Options}). - -subscriber_down(Subscriber) -> - cast(pick(Subscriber), {down, Subscriber}). - -%% @doc Publish message to Topic. -spec(publish(binary(), any()) -> {ok, mqtt_delivery()} | ignore). -publish(Topic, Msg) when is_binary(Topic) -> +publish(Topic, Msg) -> route(emqttd_router:match(Topic), delivery(Msg)). %% Dispatch on the local node route([#mqtt_route{topic = To, node = Node}], Delivery = #mqtt_delivery{flows = Flows}) when Node =:= node() -> - ?Dispatcher:dispatch(To, Delivery#mqtt_delivery{flows = [{route, Node, To} | Flows]}); + dispatch(To, Delivery#mqtt_delivery{flows = [{route, Node, To} | Flows]}); %% Forward to other nodes route([#mqtt_route{topic = To, node = Node}], Delivery = #mqtt_delivery{flows = Flows}) -> @@ -117,110 +70,81 @@ delivery(Msg) -> #mqtt_delivery{message = Msg, flows = []}. %% @doc Forward message to another node... forward(Node, To, Delivery) -> - rpc:cast(Node, ?Dispatcher, dispatch, [To, Delivery]), {ok, Delivery}. + rpc:cast(Node, ?PUBSUB, dispatch, [To, Delivery]), {ok, Delivery}. -subscriptions(Subscriber) -> - lists:map(fun({_, Topic}) -> - subscription(Topic, Subscriber) - end, ets:lookup(mqtt_subscription, Subscriber)). +%% @doc Dispatch Message to Subscribers +-spec(dispatch(binary(), mqtt_delivery()) -> mqtt_delivery()). +dispatch(Topic, Delivery = #mqtt_delivery{message = Msg, flows = Flows}) -> + case subscribers(Topic) of + [] -> + dropped(Topic), {ok, Delivery}; + [Sub] -> %% optimize? + dispatch(Sub, Topic, Msg), + {ok, Delivery#mqtt_delivery{flows = [{dispatch, Topic, 1} | Flows]}}; + Subscribers -> + Flows1 = [{dispatch, Topic, length(Subscribers)} | Flows], + lists:foreach(fun(Sub) -> dispatch(Sub, Topic, Msg) end, Subscribers), + {ok, Delivery#mqtt_delivery{flows = Flows1}} + end. -subscription(Topic, Subscriber) -> - {Topic, ets:lookup_element(mqtt_pubsub, {Topic, Subscriber}, 2)}. +dispatch(Pid, Topic, Msg) when is_pid(Pid) -> + Pid ! {dispatch, Topic, Msg}; +dispatch(SubId, Topic, Msg) when is_binary(SubId) -> + emqttd_sm:dispatch(SubId, Topic, Msg). -is_subscribed(Topic, Subscriber) when is_binary(Topic) -> - ets:member(mqtt_pubsub, {Topic, Subscriber}). +subscribers(Topic) -> + try ets:lookup_element(subscriber, Topic, 2) catch error:badarg -> [] end. -setqos(Topic, Subscriber, Qos) when is_binary(Topic) -> - call(pick(Subscriber), {setqos, Topic, Subscriber, Qos}). +%% @private +%% @doc Ingore $SYS Messages. +dropped(<<"$SYS/", _/binary>>) -> + ok; +dropped(_Topic) -> + emqttd_metrics:inc('messages/dropped'). -dump() -> - [{Tab, ets:tab2list(Tab)} || Tab <- [mqtt_pubsub, mqtt_subscription, mqtt_subscriber]]. - -%% @doc Unsubscribe --spec(unsubscribe(binary()) -> ok | emqttd:pubsub_error()). -unsubscribe(Topic) when is_binary(Topic) -> - unsubscribe(Topic, self()). - -%% @doc Unsubscribe --spec(unsubscribe(binary(), emqttd:subscriber()) -> ok | emqttd:pubsub_error()). -unsubscribe(Topic, Subscriber) when is_binary(Topic) -> - call(pick(Subscriber), {unsubscribe, Topic, Subscriber}). - -%% @doc Async Unsubscribe --spec(async_unsubscribe(binary()) -> ok). -async_unsubscribe(Topic) when is_binary(Topic) -> - async_unsubscribe(Topic, self()). +-spec(unsubscribe(binary(), emqttd:subscriber()) -> ok). +unsubscribe(Topic, Subscriber) -> + call(pick(Topic), {unsubscribe, Topic, Subscriber}). -spec(async_unsubscribe(binary(), emqttd:subscriber()) -> ok). -async_unsubscribe(Topic, Subscriber) when is_binary(Topic) -> - cast(pick(Subscriber), {unsubscribe, Topic, Subscriber}). +async_unsubscribe(Topic, Subscriber) -> + cast(pick(Topic), {unsubscribe, Topic, Subscriber}). -call(PubSub, Req) when is_pid(PubSub) -> - gen_server2:call(PubSub, Req, infinity). +call(Server, Req) -> + gen_server2:call(Server, Req, infinity). -cast(PubSub, Msg) when is_pid(PubSub) -> - gen_server2:cast(PubSub, Msg). +cast(Server, Msg) -> + gen_server2:cast(Server, Msg). -pick(Subscriber) -> - gproc_pool:pick_worker(pubsub, Subscriber). - -%%-------------------------------------------------------------------- -%% gen_server Callbacks -%%-------------------------------------------------------------------- +pick(Topic) -> + gproc_pool:pick_worker(pubsub, Topic). init([Pool, Id, Env]) -> ?GPROC_POOL(join, Pool, Id), - {ok, #state{pool = Pool, id = Id, env = Env, submon = emqttd_pmon:new()}}. + {ok, #state{pool = Pool, id = Id, env = Env}}. -handle_call({subscribe, Topic, Subscriber, Options}, _From, State) -> - case do_subscribe(Topic, Subscriber, Options, State) of - {ok, NewState} -> {reply, ok, setstats(NewState)}; - {error, Error} -> {reply, {error, Error}, State} - end; +handle_call({subscribe, Topic, Subscriber}, _From, State) -> + add_subscriber_(Topic, Subscriber), + {reply, ok, setstats(State)}; handle_call({unsubscribe, Topic, Subscriber}, _From, State) -> - case do_unsubscribe(Topic, Subscriber, State) of - {ok, NewState} -> {reply, ok, setstats(NewState), hibernate}; - {error, Error} -> {reply, {error, Error}, State} - end; - -handle_call({setqos, Topic, Subscriber, Qos}, _From, State) -> - Key = {Topic, Subscriber}, - case ets:lookup(mqtt_pubsub, Key) of - [{_, Opts}] -> - Opts1 = lists:ukeymerge(1, [{qos, Qos}], Opts), - ets:insert(mqtt_pubsub, {Key, Opts1}), - {reply, ok, State}; - [] -> - {reply, {error, {subscription_not_found, Topic}}, State} - end; + del_subscriber_(Topic, Subscriber), + {reply, ok, setstats(State)}; handle_call(Req, _From, State) -> ?UNEXPECTED_REQ(Req, State). -handle_cast({subscribe, Topic, Subscriber, Options}, State) -> - case do_subscribe(Topic, Subscriber, Options, State) of - {ok, NewState} -> {noreply, setstats(NewState)}; - {error, _Error} -> {noreply, State} - end; +handle_cast({subscribe, Topic, Subscriber}, State) -> + add_subscriber_(Topic, Subscriber), + {noreply, setstats(State)}; handle_cast({unsubscribe, Topic, Subscriber}, State) -> - case do_unsubscribe(Topic, Subscriber, State) of - {ok, NewState} -> {noreply, setstats(NewState), hibernate}; - {error, _Error} -> {noreply, State} - end; - -handle_cast({down, Subscriber}, State) -> - subscriber_down_(Subscriber), - {noreply, State}; + del_subscriber_(Topic, Subscriber), + {noreply, setstats(State)}; handle_cast(Msg, State) -> ?UNEXPECTED_MSG(Msg, State). -handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{submon = PM}) -> - subscriber_down_(DownPid), - {noreply, setstats(State#state{submon = PM:erase(DownPid)}), hibernate}; - handle_info(Info, State) -> ?UNEXPECTED_INFO(Info, State). @@ -228,71 +152,27 @@ terminate(_Reason, #state{pool = Pool, id = Id}) -> ?GPROC_POOL(leave, Pool, Id). code_change(_OldVsn, State, _Extra) -> - {ok, State}. + {ok, State}. %%-------------------------------------------------------------------- -%% Internal Functions +%% Internel Functions %%-------------------------------------------------------------------- -do_subscribe(Topic, Subscriber, Options, State) -> - case ets:lookup(mqtt_pubsub, {Topic, Subscriber}) of - [] -> - ?Dispatcher:async_subscribe(Topic, Subscriber), - add_subscription(Subscriber, Topic), - ets:insert(mqtt_pubsub, {{Topic, Subscriber}, Options}), - {ok, monitor_subpid(Subscriber, State)}; - [_] -> - {error, {already_subscribed, Topic}} +add_subscriber_(Topic, Subscriber) -> + case ets:member(mqtt_subscriber, Topic) of + false -> emqttd_router:add_route(Topic, node()); + true -> ok + end, + ets:insert(subscriber, {Topic, Subscriber}). + +del_subscriber_(Topic, Subscriber) -> + ets:delete_object(mqtt_subscriber, {Topic, Subscriber}), + case ets:member(mqtt_subscriber, Topic) of + false -> emqttd_router:del_route(Topic, node()); + true -> ok end. -add_subscription(Subscriber, Topic) -> - ets:insert(mqtt_subscription, {Subscriber, Topic}). - -do_unsubscribe(Topic, Subscriber, State) -> - case ets:lookup(mqtt_pubsub, {Topic, Subscriber}) of - [_] -> - ?Dispatcher:async_unsubscribe(Topic, Subscriber), - del_subscription(Subscriber, Topic), - ets:delete(mqtt_pubsub, {Topic, Subscriber}), - {ok, case ets:member(mqtt_subscription, Subscriber) of - true -> State; - false -> demonitor_subpid(Subscriber, State) - end}; - [] -> - {error, {subscription_not_found, Topic}} - end. - -del_subscription(Subscriber, Topic) -> - ets:delete_object(mqtt_subscription, {Subscriber, Topic}). - -subscriber_down_(Subscriber) -> - lists:foreach(fun({_, Topic}) -> - subscriber_down_(Subscriber, Topic) - end, ets:lookup(mqtt_subscription, Subscriber)), - ets:delete(mqtt_subscription, Subscriber). - -subscriber_down_(DownPid, Topic) -> - case ets:lookup(mqtt_pubsub, {Topic, DownPid}) of - [] -> - %% here? - ?Dispatcher:async_unsubscribe(Topic, DownPid); - [_] -> - ?Dispatcher:async_unsubscribe(Topic, DownPid), - ets:delete(mqtt_pubsub, {Topic, DownPid}) - end. - -monitor_subpid(SubPid, State = #state{submon = PMon}) when is_pid(SubPid) -> - State#state{submon = PMon:monitor(SubPid)}; -monitor_subpid(_SubPid, State) -> - State. - -demonitor_subpid(SubPid, State = #state{submon = PMon}) when is_pid(SubPid) -> - State#state{submon = PMon:demonitor(SubPid)}; -demonitor_subpid(_SubPid, State) -> - State. - setstats(State) when is_record(State, state) -> - emqttd_stats:setstats('subscribers/count', 'subscribers/max', ets:info(mqtt_subscriber, size)), - emqttd_stats:setstats('subscriptions/count', 'subscriptions/max', ets:info(mqtt_subscription, size)), - State. + emqttd_stats:setstats('subscribers/count', 'subscribers/max', + ets:info(mqtt_subscriber, size)). diff --git a/src/emqttd_pubsub_sup.erl b/src/emqttd_pubsub_sup.erl index 9b794dcfd..167ad4942 100644 --- a/src/emqttd_pubsub_sup.erl +++ b/src/emqttd_pubsub_sup.erl @@ -43,18 +43,17 @@ pubsub_pool() -> init([Env]) -> %% Create ETS Tables - [create_tab(Tab) || Tab <- [mqtt_pubsub, mqtt_subscriber, mqtt_subscription]], - - %% Dispatcher Pool - DispatcherMFA = {emqttd_dispatcher, start_link, [Env]}, - DispatcherPool = pool_sup(dispatcher, Env, DispatcherMFA), + [create_tab(Tab) || Tab <- [mqtt_subpropery, mqtt_subscriber, mqtt_subscription]], %% PubSub Pool - {ok, PubSub} = emqttd:conf(pubsub_adapter), - PubSubMFA = {PubSub, start_link, [Env]}, + PubSubMFA = {emqttd_pubsub, start_link, [Env]}, PubSubPool = pool_sup(pubsub, Env, PubSubMFA), - {ok, { {one_for_all, 10, 3600}, [DispatcherPool, PubSubPool]} }. + %% Server Pool + ServerMFA = {emqttd_server, start_link, [Env]}, + ServerPool = pool_sup(server, Env, ServerMFA), + + {ok, { {one_for_all, 10, 3600}, [PubSubPool, ServerPool]} }. pool_size(Env) -> Schedulers = erlang:system_info(schedulers), @@ -68,9 +67,9 @@ pool_sup(Name, Env, MFA) -> %% Create PubSub Tables %%-------------------------------------------------------------------- -create_tab(mqtt_pubsub) -> +create_tab(mqtt_subproperty) -> %% Subproperty: {Topic, Sub} -> [{qos, 1}] - ensure_tab(mqtt_pubsub, [public, named_table, set | ?CONCURRENCY_OPTS]); + ensure_tab(mqtt_subproperty, [public, named_table, set | ?CONCURRENCY_OPTS]); create_tab(mqtt_subscriber) -> %% Subscriber: Topic -> Sub1, Sub2, Sub3, ..., SubN diff --git a/src/emqttd_server.erl b/src/emqttd_server.erl new file mode 100644 index 000000000..9feb3eb98 --- /dev/null +++ b/src/emqttd_server.erl @@ -0,0 +1,286 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2012-2016 Feng Lee . +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqttd_server). + +-author("Feng Lee "). + +-behaviour(gen_server2). + +-include("emqttd.hrl"). + +-include("emqttd_protocol.hrl"). + +-include("emqttd_internal.hrl"). + +-export([start_link/3]). + +%% PubSub API. +-export([subscribe/1, subscribe/2, subscribe/3, publish/2, + unsubscribe/1, unsubscribe/2]). + +%% Async PubSub API. +-export([async_subscribe/1, async_subscribe/2, async_subscribe/3, + async_unsubscribe/1, async_unsubscribe/2]). + +%% Management API. +-export([setqos/3, is_subscribed/2, get_subscriptions/1, subscriber_down/1]). + +%% Debug API +-export([dump/0]). + +%% gen_server. +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(state, {pool, id, env, submon :: emqttd_pmon:pmon()}). + +%% @doc Start server +-spec(start_link(atom(), pos_integer(), list()) -> {ok, pid()} | ignore | {error, any()}). +start_link(Pool, Id, Env) -> + gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id, Env], []). + +%%-------------------------------------------------------------------- +%% PubSub API +%%-------------------------------------------------------------------- + +%% @doc Subscribe a Topic +-spec(subscribe(binary()) -> ok | emqttd:pubsub_error()). +subscribe(Topic) when is_binary(Topic) -> + subscribe(Topic, self()). + +-spec(subscribe(binary(), emqttd:subscriber()) -> ok | emqttd:pubsub_error()). +subscribe(Topic, Subscriber) when is_binary(Topic) -> + subscribe(Topic, Subscriber, []). + +-spec(subscribe(binary(), emqttd:subscriber(), [emqttd:suboption()]) -> + ok | emqttd:pubsub_error()). +subscribe(Topic, Subscriber, Options) when is_binary(Topic) -> + call(pick(Subscriber), {subscribe, Topic, Subscriber, Options}). + +%% @doc Subscribe a Topic Asynchronously +-spec(async_subscribe(binary()) -> ok). +async_subscribe(Topic) when is_binary(Topic) -> + async_subscribe(Topic, self()). + +-spec(async_subscribe(binary(), emqttd:subscriber()) -> ok). +async_subscribe(Topic, Subscriber) when is_binary(Topic) -> + async_subscribe(Topic, Subscriber, []). + +-spec(async_subscribe(binary(), emqttd:subscriber(), [emqttd:suboption()]) -> ok). +async_subscribe(Topic, Subscriber, Options) when is_binary(Topic) -> + cast(pick(Subscriber), {subscribe, Topic, Subscriber, Options}). + +%% @doc Publish message to Topic. +-spec(publish(binary(), any()) -> {ok, mqtt_delivery()} | ignore). +publish(Topic, Msg = #mqtt_message{from = From}) -> + trace(publish, From, Msg), + case emqttd_hook:run('message.publish', [], Msg) of + {ok, Msg1 = #mqtt_message{topic = Topic}} -> + %% Retain message first. Don't create retained topic. + Msg2 = case emqttd_retainer:retain(Msg1) of + ok -> emqttd_message:unset_flag(Msg1); + ignore -> Msg1 + end, + emqttd_pubsub:publish(Topic, Msg2); + {stop, Msg1} -> + lager:warning("Stop publishing: ~s", [emqttd_message:format(Msg1)]), + ignore + end. + +trace(publish, From, _Msg) when is_atom(From) -> + %% Dont' trace '$SYS' publish + ignore; + +trace(publish, From, #mqtt_message{topic = Topic, payload = Payload}) -> + lager:info([{client, From}, {topic, Topic}], + "~s PUBLISH to ~s: ~p", [From, Topic, Payload]). + +%% @doc Unsubscribe +-spec(unsubscribe(binary()) -> ok | emqttd:pubsub_error()). +unsubscribe(Topic) when is_binary(Topic) -> + unsubscribe(Topic, self()). + +%% @doc Unsubscribe +-spec(unsubscribe(binary(), emqttd:subscriber()) -> ok | emqttd:pubsub_error()). +unsubscribe(Topic, Subscriber) when is_binary(Topic) -> + call(pick(Subscriber), {unsubscribe, Topic, Subscriber}). + +%% @doc Async Unsubscribe +-spec(async_unsubscribe(binary()) -> ok). +async_unsubscribe(Topic) when is_binary(Topic) -> + async_unsubscribe(Topic, self()). + +-spec(async_unsubscribe(binary(), emqttd:subscriber()) -> ok). +async_unsubscribe(Topic, Subscriber) when is_binary(Topic) -> + cast(pick(Subscriber), {unsubscribe, Topic, Subscriber}). + +setqos(Topic, Subscriber, Qos) when is_binary(Topic) -> + call(pick(Subscriber), {setqos, Topic, Subscriber, Qos}). + +-spec(is_subscribed(binary(), emqttd:subscriber()) -> boolean()). +is_subscribed(Topic, Subscriber) when is_binary(Topic) -> + ets:member(mqtt_subproperty, {Topic, Subscriber}). + +-spec(get_subscriptions(emqttd:subscriber()) -> [{binary(), list()}]). +get_subscriptions(Subscriber) -> + lists:map(fun({_, Topic}) -> + subscription(Topic, Subscriber) + end, ets:lookup(mqtt_subscription, Subscriber)). + +subscription(Topic, Subscriber) -> + {Topic, ets:lookup_element(mqtt_subproperty, {Topic, Subscriber}, 2)}. + +subscriber_down(Subscriber) -> + cast(pick(Subscriber), {subscriber_down, Subscriber}). + +call(Server, Req) -> + gen_server2:call(Server, Req, infinity). + +cast(Server, Msg) when is_pid(Server) -> + gen_server2:cast(Server, Msg). + +pick(Subscriber) -> + gproc_pool:pick_worker(server, Subscriber). + +dump() -> + [{Tab, ets:tab2list(Tab)} || Tab <- [mqtt_subproperty, mqtt_subscription, mqtt_subscriber]]. + +%%-------------------------------------------------------------------- +%% gen_server Callbacks +%%-------------------------------------------------------------------- + +init([Pool, Id, Env]) -> + ?GPROC_POOL(join, Pool, Id), + {ok, #state{pool = Pool, id = Id, env = Env, submon = emqttd_pmon:new()}}. + +handle_call({subscribe, Topic, Subscriber, Options}, _From, State) -> + case subscribe_(Topic, Subscriber, Options, State) of + {ok, NewState} -> {reply, ok, setstats(NewState)}; + {error, Error} -> {reply, {error, Error}, State} + end; + +handle_call({unsubscribe, Topic, Subscriber}, _From, State) -> + case unsubscribe_(Topic, Subscriber, State) of + {ok, NewState} -> {reply, ok, setstats(NewState), hibernate}; + {error, Error} -> {reply, {error, Error}, State} + end; + +handle_call({setqos, Topic, Subscriber, Qos}, _From, State) -> + Key = {Topic, Subscriber}, + case ets:lookup(mqtt_subproperty, Key) of + [{_, Opts}] -> + Opts1 = lists:ukeymerge(1, [{qos, Qos}], Opts), + ets:insert(mqtt_subproperty, {Key, Opts1}), + {reply, ok, State}; + [] -> + {reply, {error, {subscription_not_found, Topic}}, State} + end; + +handle_call(Req, _From, State) -> + ?UNEXPECTED_REQ(Req, State). + +handle_cast({subscribe, Topic, Subscriber, Options}, State) -> + case subscribe_(Topic, Subscriber, Options, State) of + {ok, NewState} -> {noreply, setstats(NewState)}; + {error, _Error} -> {noreply, State} + end; + +handle_cast({unsubscribe, Topic, Subscriber}, State) -> + case unsubscribe_(Topic, Subscriber, State) of + {ok, NewState} -> {noreply, setstats(NewState), hibernate}; + {error, _Error} -> {noreply, State} + end; + +handle_cast({subscriber_down, Subscriber}, State) -> + subscriber_down_(Subscriber), + {noreply, setstats(State)}; + +handle_cast(Msg, State) -> + ?UNEXPECTED_MSG(Msg, State). + +handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{submon = PM}) -> + subscriber_down_(DownPid), + {noreply, setstats(State#state{submon = PM:erase(DownPid)}), hibernate}; + +handle_info(Info, State) -> + ?UNEXPECTED_INFO(Info, State). + +terminate(_Reason, #state{pool = Pool, id = Id}) -> + ?GPROC_POOL(leave, Pool, Id). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% Internal Functions +%%-------------------------------------------------------------------- + +subscribe_(Topic, Subscriber, Options, State) -> + case ets:lookup(mqtt_subproperty, {Topic, Subscriber}) of + [] -> + emqttd_pubsub:async_subscribe(Topic, Subscriber), + ets:insert(mqtt_subscription, {Subscriber, Topic}), + ets:insert(mqtt_subproperty, {{Topic, Subscriber}, Options}), + {ok, monitor_subpid(Subscriber, State)}; + [_] -> + {error, {already_subscribed, Topic}} + end. + +unsubscribe_(Topic, Subscriber, State) -> + case ets:lookup(mqtt_subproperty, {Topic, Subscriber}) of + [_] -> + emqttd_pubsub:async_unsubscribe(Topic, Subscriber), + ets:delete_object(mqtt_subscription, {Subscriber, Topic}), + ets:delete(mqtt_subproperty, {Topic, Subscriber}), + {ok, case ets:member(mqtt_subscription, Subscriber) of + true -> State; + false -> demonitor_subpid(Subscriber, State) + end}; + [] -> + {error, {subscription_not_found, Topic}} + end. + +monitor_subpid(SubPid, State = #state{submon = PMon}) when is_pid(SubPid) -> + State#state{submon = PMon:monitor(SubPid)}; +monitor_subpid(_SubPid, State) -> + State. + +demonitor_subpid(SubPid, State = #state{submon = PMon}) when is_pid(SubPid) -> + State#state{submon = PMon:demonitor(SubPid)}; +demonitor_subpid(_SubPid, State) -> + State. + +subscriber_down_(Subscriber) -> + lists:foreach(fun({_, Topic}) -> + subscriber_down_(Subscriber, Topic) + end, ets:lookup(mqtt_subscription, Subscriber)), + ets:delete(mqtt_subscription, Subscriber). + +subscriber_down_(Subscriber, Topic) -> + case ets:lookup(mqtt_subproperty, {Topic, Subscriber}) of + [] -> + %% here? + emqttd_pubsub:async_unsubscribe(Topic, Subscriber); + [_] -> + emqttd_pubsub:async_unsubscribe(Topic, Subscriber), + ets:delete(mqtt_subproperty, {Topic, Subscriber}) + end. + +setstats(State) -> + emqttd_stats:setstats('subscriptions/count', 'subscriptions/max', + ets:info(mqtt_subscription, size)), State. + diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 63feb4617..f27ba605b 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -232,7 +232,7 @@ init([CleanSess, ClientId, ClientPid]) -> expired_after = get_value(expired_after, SessEnv) * 60, collect_interval = get_value(collect_interval, SessEnv, 0), timestamp = os:timestamp()}, - emqttd_sm:register_session(CleanSess, ClientId, sess_info(Session)), + emqttd_sm:register_session(ClientId, CleanSess, sess_info(Session)), %% Start statistics {ok, start_collector(Session), hibernate}. @@ -297,7 +297,7 @@ handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = ?LOG(warning, "duplicated subscribe: ~s, qos = ~w", [Topic, Qos], Session), SubDict; {ok, OldQos} -> - emqttd_pubsub:setqos(Topic, ClientId, Qos), + emqttd_server:setqos(Topic, ClientId, Qos), ?LOG(warning, "duplicated subscribe ~s, old_qos=~w, new_qos=~w", [Topic, OldQos, Qos], Session), dict:store(Topic, Qos, SubDict); error -> @@ -385,8 +385,8 @@ handle_cast({resume, ClientId, ClientPid}, Session = #session{client_id = C if CleanSess =:= true -> ?LOG(warning, "CleanSess changed to false.", [], Session), - emqttd_sm:unregister_session(CleanSess, ClientId), - emqttd_sm:register_session(false, ClientId, sess_info(Session1)); + %% emqttd_sm:unregister_session(CleanSess, ClientId), + emqttd_sm:register_session(ClientId, false, sess_info(Session1)); CleanSess =:= false -> ok end, @@ -500,7 +500,7 @@ handle_info({timeout, awaiting_comp, PktId}, Session = #session{awaiting_comp = end; handle_info(collect_info, Session = #session{clean_sess = CleanSess, client_id = ClientId}) -> - emqttd_sm:register_session(CleanSess, ClientId, sess_info(Session)), + emqttd_sm:register_session(ClientId, CleanSess, sess_info(Session)), hibernate(start_collector(Session)); handle_info({'EXIT', ClientPid, _Reason}, Session = #session{clean_sess = true, @@ -531,10 +531,10 @@ handle_info(expired, Session) -> handle_info(Info, Session) -> ?UNEXPECTED_INFO(Info, Session). -terminate(_Reason, #session{clean_sess = CleanSess, client_id = ClientId}) -> +terminate(_Reason, #session{client_id = ClientId}) -> %%TODO: ... - emqttd_pubsub:subscriber_down(ClientId), - emqttd_sm:unregister_session(CleanSess, ClientId). + emqttd_server:subscriber_down(ClientId), + emqttd_sm:unregister_session(ClientId). code_change(_OldVsn, Session, _Extra) -> {ok, Session}. diff --git a/src/emqttd_sm.erl b/src/emqttd_sm.erl index de23d3702..a7cfb1724 100644 --- a/src/emqttd_sm.erl +++ b/src/emqttd_sm.erl @@ -91,8 +91,8 @@ lookup_session(ClientId) -> end. %% @doc Register a session with info. --spec(register_session(boolean(), binary(), [tuple()]) -> true). -register_session(CleanSess, ClientId, Properties) -> +-spec(register_session(binary(), boolean(), [tuple()]) -> true). +register_session(ClientId, CleanSess, Properties) -> ets:insert(mqtt_local_session, {ClientId, self(), CleanSess, Properties}). %% @doc Unregister a session.