server -> pubsub -> router
This commit is contained in:
parent
91e96e738d
commit
38daaa2f5c
|
@ -70,7 +70,7 @@
|
||||||
%% MQTT Subscription
|
%% MQTT Subscription
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
-record(mqtt_subscription, {
|
-record(mqtt_subscription, {
|
||||||
subid :: binary() | atom(),
|
subid :: binary() | atom() | pid(),
|
||||||
topic :: binary(),
|
topic :: binary(),
|
||||||
qos = 0 :: 0 | 1 | 2
|
qos = 0 :: 0 | 1 | 2
|
||||||
}).
|
}).
|
||||||
|
@ -119,10 +119,11 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% MQTT Session
|
%% MQTT Session
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-record(mqtt_session, {
|
-record(mqtt_session, {
|
||||||
client_id :: binary(),
|
client_id :: binary(),
|
||||||
sess_pid :: pid(),
|
sess_pid :: pid(),
|
||||||
persistent :: boolean()
|
persistent :: boolean()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-type(mqtt_session() :: #mqtt_session{}).
|
-type(mqtt_session() :: #mqtt_session{}).
|
||||||
|
|
|
@ -95,24 +95,12 @@ subscribe(Topic, Subscriber) ->
|
||||||
|
|
||||||
-spec(subscribe(iodata(), subscriber(), [suboption()]) -> ok | pubsub_error()).
|
-spec(subscribe(iodata(), subscriber(), [suboption()]) -> ok | pubsub_error()).
|
||||||
subscribe(Topic, Subscriber, Options) ->
|
subscribe(Topic, Subscriber, Options) ->
|
||||||
with_pubsub(fun(PubSub) -> PubSub:subscribe(iolist_to_binary(Topic), Subscriber, Options) end).
|
emqttd_server:subscribe(iolist_to_binary(Topic), Subscriber, Options).
|
||||||
|
|
||||||
%% @doc Publish MQTT Message
|
%% @doc Publish MQTT Message
|
||||||
-spec(publish(mqtt_message()) -> {ok, mqtt_delivery()} | ignore).
|
-spec(publish(mqtt_message()) -> {ok, mqtt_delivery()} | ignore).
|
||||||
publish(Msg = #mqtt_message{from = From}) ->
|
publish(Msg = #mqtt_message{topic = Topic}) ->
|
||||||
trace(publish, From, Msg),
|
emqttd_server:publish(Topic, Msg).
|
||||||
case run_hooks('message.publish', [], Msg) of
|
|
||||||
{ok, Msg1 = #mqtt_message{topic = Topic}} ->
|
|
||||||
%% Retain message first. Don't create retained topic.
|
|
||||||
Msg2 = case emqttd_retainer:retain(Msg1) of
|
|
||||||
ok -> emqttd_message:unset_flag(Msg1);
|
|
||||||
ignore -> Msg1
|
|
||||||
end,
|
|
||||||
with_pubsub(fun(PubSub) -> PubSub:publish(Topic, Msg2) end);
|
|
||||||
{stop, Msg1} ->
|
|
||||||
lager:warning("Stop publishing: ~s", [emqttd_message:format(Msg1)]),
|
|
||||||
ignore
|
|
||||||
end.
|
|
||||||
|
|
||||||
%% @doc Unsubscribe
|
%% @doc Unsubscribe
|
||||||
-spec(unsubscribe(iodata()) -> ok | pubsub_error()).
|
-spec(unsubscribe(iodata()) -> ok | pubsub_error()).
|
||||||
|
@ -121,22 +109,18 @@ unsubscribe(Topic) ->
|
||||||
|
|
||||||
-spec(unsubscribe(iodata(), subscriber()) -> ok | pubsub_error()).
|
-spec(unsubscribe(iodata(), subscriber()) -> ok | pubsub_error()).
|
||||||
unsubscribe(Topic, Subscriber) ->
|
unsubscribe(Topic, Subscriber) ->
|
||||||
with_pubsub(fun(PubSub) -> PubSub:unsubscribe(iolist_to_binary(Topic), Subscriber) end).
|
emqttd_server:unsubscribe(iolist_to_binary(Topic), Subscriber).
|
||||||
|
|
||||||
-spec(topics() -> [binary()]).
|
-spec(topics() -> [binary()]).
|
||||||
topics() -> emqttd_router:topics().
|
topics() -> emqttd_router:topics().
|
||||||
|
|
||||||
-spec(subscribers(iodata()) -> list(subscriber())).
|
-spec(subscribers(iodata()) -> list(subscriber())).
|
||||||
subscribers(Topic) ->
|
subscribers(Topic) ->
|
||||||
emqttd_dispatcher:subscribers(Topic).
|
emqttd_pubsub:subscribers(iolist_to_binary(Topic)).
|
||||||
|
|
||||||
-spec(subscriptions(subscriber()) -> [{binary(), suboption()}]).
|
-spec(subscriptions(subscriber()) -> [{binary(), suboption()}]).
|
||||||
subscriptions(Subscriber) ->
|
subscriptions(Subscriber) ->
|
||||||
with_pubsub(fun(PubSub) -> PubSub:subscriptions(Subscriber) end).
|
emqttd_server:get_subscriptions(Subscriber).
|
||||||
|
|
||||||
with_pubsub(Fun) -> {ok, PubSub} = conf(pubsub_adapter), Fun(PubSub).
|
|
||||||
|
|
||||||
dump() -> with_pubsub(fun(PubSub) -> lists:append(PubSub:dump(), emqttd_router:dump()) end).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Hooks API
|
%% Hooks API
|
||||||
|
@ -158,15 +142,9 @@ unhook(Hook, Function) ->
|
||||||
run_hooks(Hook, Args, Acc) ->
|
run_hooks(Hook, Args, Acc) ->
|
||||||
emqttd_hook:run(Hook, Args, Acc).
|
emqttd_hook:run(Hook, Args, Acc).
|
||||||
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Trace Functions
|
%% Debug
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
trace(publish, From, _Msg) when is_atom(From) ->
|
dump() -> lists:append([emqttd_server:dump(), emqttd_router:dump()]).
|
||||||
%% Dont' trace '$SYS' publish
|
|
||||||
ignore;
|
|
||||||
|
|
||||||
trace(publish, From, #mqtt_message{topic = Topic, payload = Payload}) ->
|
|
||||||
lager:info([{client, From}, {topic, Topic}],
|
|
||||||
"~s PUBLISH to ~s: ~p", [From, Topic, Payload]).
|
|
||||||
|
|
||||||
|
|
|
@ -1,150 +0,0 @@
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
|
|
||||||
%%
|
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
%% you may not use this file except in compliance with the License.
|
|
||||||
%% You may obtain a copy of the License at
|
|
||||||
%%
|
|
||||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
%%
|
|
||||||
%% Unless required by applicable law or agreed to in writing, software
|
|
||||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
%% See the License for the specific language governing permissions and
|
|
||||||
%% limitations under the License.
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
-module(emqttd_dispatcher).
|
|
||||||
|
|
||||||
-author("Feng Lee <feng@emqtt.io>").
|
|
||||||
|
|
||||||
-behaviour(gen_server2).
|
|
||||||
|
|
||||||
-include("emqttd.hrl").
|
|
||||||
|
|
||||||
-include("emqttd_internal.hrl").
|
|
||||||
|
|
||||||
%% API Exports
|
|
||||||
-export([start_link/3, subscribe/2, unsubscribe/2, dispatch/2,
|
|
||||||
async_subscribe/2, async_unsubscribe/2]).
|
|
||||||
|
|
||||||
-export([subscribers/1]).
|
|
||||||
|
|
||||||
%% gen_server.
|
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
|
||||||
terminate/2, code_change/3]).
|
|
||||||
|
|
||||||
-record(state, {pool, id, env}).
|
|
||||||
|
|
||||||
-spec(start_link(atom(), pos_integer(), [tuple()]) -> {ok, pid()} | ignore | {error, any()}).
|
|
||||||
start_link(Pool, Id, Env) ->
|
|
||||||
gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id, Env], []).
|
|
||||||
|
|
||||||
-spec(subscribe(binary(), emqttd:subscriber()) -> ok).
|
|
||||||
subscribe(Topic, Subscriber) ->
|
|
||||||
call(pick(Topic), {subscribe, Topic, Subscriber}).
|
|
||||||
|
|
||||||
-spec(async_subscribe(binary(), emqttd:subscriber()) -> ok).
|
|
||||||
async_subscribe(Topic, Subscriber) ->
|
|
||||||
cast(pick(Topic), {subscribe, Topic, Subscriber}).
|
|
||||||
|
|
||||||
%% @doc Dispatch Message to Subscribers
|
|
||||||
-spec(dispatch(binary(), mqtt_delivery()) -> mqtt_delivery()).
|
|
||||||
dispatch(Topic, Delivery = #mqtt_delivery{message = Msg, flows = Flows}) ->
|
|
||||||
case subscribers(Topic) of
|
|
||||||
[] ->
|
|
||||||
dropped(Topic), {ok, Delivery};
|
|
||||||
[Sub] -> %% optimize?
|
|
||||||
dispatch(Sub, Topic, Msg),
|
|
||||||
{ok, Delivery#mqtt_delivery{flows = [{dispatch, Topic, 1} | Flows]}};
|
|
||||||
Subscribers ->
|
|
||||||
Flows1 = [{dispatch, Topic, length(Subscribers)} | Flows],
|
|
||||||
lists:foreach(fun(Sub) -> dispatch(Sub, Topic, Msg) end, Subscribers),
|
|
||||||
{ok, Delivery#mqtt_delivery{flows = Flows1}}
|
|
||||||
end.
|
|
||||||
|
|
||||||
dispatch(Pid, Topic, Msg) when is_pid(Pid) ->
|
|
||||||
Pid ! {dispatch, Topic, Msg};
|
|
||||||
dispatch(SubId, Topic, Msg) when is_binary(SubId) ->
|
|
||||||
emqttd_sm:dispatch(SubId, Topic, Msg).
|
|
||||||
|
|
||||||
subscribers(Topic) ->
|
|
||||||
try ets:lookup_element(subscriber, Topic, 2) catch error:badarg -> [] end.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Ingore $SYS Messages.
|
|
||||||
dropped(<<"$SYS/", _/binary>>) ->
|
|
||||||
ok;
|
|
||||||
dropped(_Topic) ->
|
|
||||||
emqttd_metrics:inc('messages/dropped').
|
|
||||||
|
|
||||||
-spec(unsubscribe(binary(), emqttd:subscriber()) -> ok).
|
|
||||||
unsubscribe(Topic, Subscriber) ->
|
|
||||||
call(pick(Topic), {unsubscribe, Topic, Subscriber}).
|
|
||||||
|
|
||||||
-spec(async_unsubscribe(binary(), emqttd:subscriber()) -> ok).
|
|
||||||
async_unsubscribe(Topic, Subscriber) ->
|
|
||||||
cast(pick(Topic), {unsubscribe, Topic, Subscriber}).
|
|
||||||
|
|
||||||
call(Server, Req) ->
|
|
||||||
gen_server2:call(Server, Req, infinity).
|
|
||||||
|
|
||||||
cast(Server, Msg) ->
|
|
||||||
gen_server2:cast(Server, Msg).
|
|
||||||
|
|
||||||
pick(Topic) ->
|
|
||||||
gproc_pool:pick_worker(dispatcher, Topic).
|
|
||||||
|
|
||||||
init([Pool, Id, Env]) ->
|
|
||||||
?GPROC_POOL(join, Pool, Id),
|
|
||||||
{ok, #state{pool = Pool, id = Id, env = Env}}.
|
|
||||||
|
|
||||||
handle_call({subscribe, Topic, Subscriber}, _From, State) ->
|
|
||||||
add_subscriber_(Topic, Subscriber),
|
|
||||||
{reply, ok, State};
|
|
||||||
|
|
||||||
handle_call({unsubscribe, Topic, Subscriber}, _From, State) ->
|
|
||||||
del_subscriber_(Topic, Subscriber),
|
|
||||||
{reply, ok, State};
|
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
|
||||||
?UNEXPECTED_REQ(Req, State).
|
|
||||||
|
|
||||||
handle_cast({subscribe, Topic, Subscriber}, State) ->
|
|
||||||
add_subscriber_(Topic, Subscriber),
|
|
||||||
{noreply, State};
|
|
||||||
|
|
||||||
handle_cast({unsubscribe, Topic, Subscriber}, State) ->
|
|
||||||
del_subscriber_(Topic, Subscriber),
|
|
||||||
{noreply, State};
|
|
||||||
|
|
||||||
handle_cast(Msg, State) ->
|
|
||||||
?UNEXPECTED_MSG(Msg, State).
|
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
|
||||||
?UNEXPECTED_INFO(Info, State).
|
|
||||||
|
|
||||||
terminate(_Reason, #state{pool = Pool, id = Id}) ->
|
|
||||||
?GPROC_POOL(leave, Pool, Id).
|
|
||||||
|
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
|
||||||
{ok, State}.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Internel Functions
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
add_subscriber_(Topic, Subscriber) ->
|
|
||||||
case ets:member(subscriber, Topic) of
|
|
||||||
false -> emqttd_router:add_route(Topic, node());
|
|
||||||
true -> ok
|
|
||||||
end,
|
|
||||||
ets:insert(subscriber, {Topic, Subscriber}).
|
|
||||||
|
|
||||||
del_subscriber_(Topic, Subscriber) ->
|
|
||||||
ets:delete_object(subscriber, {Topic, Subscriber}),
|
|
||||||
case ets:member(subscriber, Topic) of
|
|
||||||
false -> emqttd_router:del_route(Topic, node());
|
|
||||||
true -> ok
|
|
||||||
end.
|
|
||||||
|
|
|
@ -16,93 +16,46 @@
|
||||||
|
|
||||||
-module(emqttd_pubsub).
|
-module(emqttd_pubsub).
|
||||||
|
|
||||||
-author("Feng Lee <feng@emqtt.io>").
|
|
||||||
|
|
||||||
-behaviour(gen_server2).
|
-behaviour(gen_server2).
|
||||||
|
|
||||||
-include("emqttd.hrl").
|
-include("emqttd.hrl").
|
||||||
|
|
||||||
-include("emqttd_protocol.hrl").
|
|
||||||
|
|
||||||
-include("emqttd_internal.hrl").
|
-include("emqttd_internal.hrl").
|
||||||
|
|
||||||
%% Start
|
%% API Exports
|
||||||
-export([start_link/3]).
|
-export([start_link/3, subscribe/2, unsubscribe/2, publish/2,
|
||||||
|
async_subscribe/2, async_unsubscribe/2]).
|
||||||
|
|
||||||
%% PubSub API.
|
-export([subscribers/1]).
|
||||||
-export([subscribe/1, subscribe/2, subscribe/3, publish/2,
|
|
||||||
unsubscribe/1, unsubscribe/2]).
|
|
||||||
|
|
||||||
%% Async PubSub API.
|
|
||||||
-export([async_subscribe/1, async_subscribe/2, async_subscribe/3,
|
|
||||||
async_unsubscribe/1, async_unsubscribe/2]).
|
|
||||||
|
|
||||||
-export([subscriber_down/1]).
|
|
||||||
|
|
||||||
%% Management API.
|
|
||||||
-export([setqos/3, is_subscribed/2, subscriptions/1]).
|
|
||||||
|
|
||||||
%% Debug API
|
|
||||||
-export([dump/0]).
|
|
||||||
|
|
||||||
%% gen_server.
|
%% gen_server.
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||||
terminate/2, code_change/3]).
|
terminate/2, code_change/3]).
|
||||||
|
|
||||||
-record(state, {pool, id, env, submon :: emqttd_pmon:pmon()}).
|
-record(state, {pool, id, env}).
|
||||||
|
|
||||||
-define(PUBSUB, ?MODULE).
|
-define(PUBSUB, ?MODULE).
|
||||||
|
|
||||||
-define(Dispatcher, emqttd_dispatcher).
|
|
||||||
|
|
||||||
%% @doc Start a pubsub server
|
|
||||||
-spec(start_link(atom(), pos_integer(), [tuple()]) -> {ok, pid()} | ignore | {error, any()}).
|
-spec(start_link(atom(), pos_integer(), [tuple()]) -> {ok, pid()} | ignore | {error, any()}).
|
||||||
start_link(Pool, Id, Env) ->
|
start_link(Pool, Id, Env) ->
|
||||||
gen_server2:start_link({local, ?PROC_NAME(?PUBSUB, Id)}, ?MODULE, [Pool, Id, Env], []).
|
gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id, Env], []).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
-spec(subscribe(binary(), emqttd:subscriber()) -> ok).
|
||||||
%% PubSub API
|
subscribe(Topic, Subscriber) ->
|
||||||
%%--------------------------------------------------------------------
|
call(pick(Topic), {subscribe, Topic, Subscriber}).
|
||||||
|
|
||||||
%% @doc Subscribe a Topic
|
|
||||||
-spec(subscribe(binary()) -> ok | emqttd:pubsub_error()).
|
|
||||||
subscribe(Topic) when is_binary(Topic) ->
|
|
||||||
subscribe(Topic, self()).
|
|
||||||
|
|
||||||
-spec(subscribe(binary(), emqttd:subscriber()) -> ok | emqttd:pubsub_error()).
|
|
||||||
subscribe(Topic, Subscriber) when is_binary(Topic) ->
|
|
||||||
subscribe(Topic, Subscriber, []).
|
|
||||||
|
|
||||||
-spec(subscribe(binary(), emqttd:subscriber(), [emqttd:suboption()]) ->
|
|
||||||
ok | emqttd:pubsub_error()).
|
|
||||||
subscribe(Topic, Subscriber, Options) when is_binary(Topic) ->
|
|
||||||
call(pick(Subscriber), {subscribe, Topic, Subscriber, Options}).
|
|
||||||
|
|
||||||
%% @doc Subscribe a Topic Asynchronously
|
|
||||||
-spec(async_subscribe(binary()) -> ok).
|
|
||||||
async_subscribe(Topic) when is_binary(Topic) ->
|
|
||||||
async_subscribe(Topic, self()).
|
|
||||||
|
|
||||||
-spec(async_subscribe(binary(), emqttd:subscriber()) -> ok).
|
-spec(async_subscribe(binary(), emqttd:subscriber()) -> ok).
|
||||||
async_subscribe(Topic, Subscriber) when is_binary(Topic) ->
|
async_subscribe(Topic, Subscriber) ->
|
||||||
async_subscribe(Topic, Subscriber, []).
|
cast(pick(Topic), {subscribe, Topic, Subscriber}).
|
||||||
|
|
||||||
-spec(async_subscribe(binary(), emqttd:subscriber(), [emqttd:suboption()]) -> ok).
|
|
||||||
async_subscribe(Topic, Subscriber, Options) when is_binary(Topic) ->
|
|
||||||
cast(pick(Subscriber), {subscribe, Topic, Subscriber, Options}).
|
|
||||||
|
|
||||||
subscriber_down(Subscriber) ->
|
|
||||||
cast(pick(Subscriber), {down, Subscriber}).
|
|
||||||
|
|
||||||
%% @doc Publish message to Topic.
|
|
||||||
-spec(publish(binary(), any()) -> {ok, mqtt_delivery()} | ignore).
|
-spec(publish(binary(), any()) -> {ok, mqtt_delivery()} | ignore).
|
||||||
publish(Topic, Msg) when is_binary(Topic) ->
|
publish(Topic, Msg) ->
|
||||||
route(emqttd_router:match(Topic), delivery(Msg)).
|
route(emqttd_router:match(Topic), delivery(Msg)).
|
||||||
|
|
||||||
%% Dispatch on the local node
|
%% Dispatch on the local node
|
||||||
route([#mqtt_route{topic = To, node = Node}],
|
route([#mqtt_route{topic = To, node = Node}],
|
||||||
Delivery = #mqtt_delivery{flows = Flows}) when Node =:= node() ->
|
Delivery = #mqtt_delivery{flows = Flows}) when Node =:= node() ->
|
||||||
?Dispatcher:dispatch(To, Delivery#mqtt_delivery{flows = [{route, Node, To} | Flows]});
|
dispatch(To, Delivery#mqtt_delivery{flows = [{route, Node, To} | Flows]});
|
||||||
|
|
||||||
%% Forward to other nodes
|
%% Forward to other nodes
|
||||||
route([#mqtt_route{topic = To, node = Node}], Delivery = #mqtt_delivery{flows = Flows}) ->
|
route([#mqtt_route{topic = To, node = Node}], Delivery = #mqtt_delivery{flows = Flows}) ->
|
||||||
|
@ -117,110 +70,81 @@ delivery(Msg) -> #mqtt_delivery{message = Msg, flows = []}.
|
||||||
|
|
||||||
%% @doc Forward message to another node...
|
%% @doc Forward message to another node...
|
||||||
forward(Node, To, Delivery) ->
|
forward(Node, To, Delivery) ->
|
||||||
rpc:cast(Node, ?Dispatcher, dispatch, [To, Delivery]), {ok, Delivery}.
|
rpc:cast(Node, ?PUBSUB, dispatch, [To, Delivery]), {ok, Delivery}.
|
||||||
|
|
||||||
subscriptions(Subscriber) ->
|
%% @doc Dispatch Message to Subscribers
|
||||||
lists:map(fun({_, Topic}) ->
|
-spec(dispatch(binary(), mqtt_delivery()) -> mqtt_delivery()).
|
||||||
subscription(Topic, Subscriber)
|
dispatch(Topic, Delivery = #mqtt_delivery{message = Msg, flows = Flows}) ->
|
||||||
end, ets:lookup(mqtt_subscription, Subscriber)).
|
case subscribers(Topic) of
|
||||||
|
[] ->
|
||||||
|
dropped(Topic), {ok, Delivery};
|
||||||
|
[Sub] -> %% optimize?
|
||||||
|
dispatch(Sub, Topic, Msg),
|
||||||
|
{ok, Delivery#mqtt_delivery{flows = [{dispatch, Topic, 1} | Flows]}};
|
||||||
|
Subscribers ->
|
||||||
|
Flows1 = [{dispatch, Topic, length(Subscribers)} | Flows],
|
||||||
|
lists:foreach(fun(Sub) -> dispatch(Sub, Topic, Msg) end, Subscribers),
|
||||||
|
{ok, Delivery#mqtt_delivery{flows = Flows1}}
|
||||||
|
end.
|
||||||
|
|
||||||
subscription(Topic, Subscriber) ->
|
dispatch(Pid, Topic, Msg) when is_pid(Pid) ->
|
||||||
{Topic, ets:lookup_element(mqtt_pubsub, {Topic, Subscriber}, 2)}.
|
Pid ! {dispatch, Topic, Msg};
|
||||||
|
dispatch(SubId, Topic, Msg) when is_binary(SubId) ->
|
||||||
|
emqttd_sm:dispatch(SubId, Topic, Msg).
|
||||||
|
|
||||||
is_subscribed(Topic, Subscriber) when is_binary(Topic) ->
|
subscribers(Topic) ->
|
||||||
ets:member(mqtt_pubsub, {Topic, Subscriber}).
|
try ets:lookup_element(subscriber, Topic, 2) catch error:badarg -> [] end.
|
||||||
|
|
||||||
setqos(Topic, Subscriber, Qos) when is_binary(Topic) ->
|
%% @private
|
||||||
call(pick(Subscriber), {setqos, Topic, Subscriber, Qos}).
|
%% @doc Ingore $SYS Messages.
|
||||||
|
dropped(<<"$SYS/", _/binary>>) ->
|
||||||
|
ok;
|
||||||
|
dropped(_Topic) ->
|
||||||
|
emqttd_metrics:inc('messages/dropped').
|
||||||
|
|
||||||
dump() ->
|
-spec(unsubscribe(binary(), emqttd:subscriber()) -> ok).
|
||||||
[{Tab, ets:tab2list(Tab)} || Tab <- [mqtt_pubsub, mqtt_subscription, mqtt_subscriber]].
|
unsubscribe(Topic, Subscriber) ->
|
||||||
|
call(pick(Topic), {unsubscribe, Topic, Subscriber}).
|
||||||
%% @doc Unsubscribe
|
|
||||||
-spec(unsubscribe(binary()) -> ok | emqttd:pubsub_error()).
|
|
||||||
unsubscribe(Topic) when is_binary(Topic) ->
|
|
||||||
unsubscribe(Topic, self()).
|
|
||||||
|
|
||||||
%% @doc Unsubscribe
|
|
||||||
-spec(unsubscribe(binary(), emqttd:subscriber()) -> ok | emqttd:pubsub_error()).
|
|
||||||
unsubscribe(Topic, Subscriber) when is_binary(Topic) ->
|
|
||||||
call(pick(Subscriber), {unsubscribe, Topic, Subscriber}).
|
|
||||||
|
|
||||||
%% @doc Async Unsubscribe
|
|
||||||
-spec(async_unsubscribe(binary()) -> ok).
|
|
||||||
async_unsubscribe(Topic) when is_binary(Topic) ->
|
|
||||||
async_unsubscribe(Topic, self()).
|
|
||||||
|
|
||||||
-spec(async_unsubscribe(binary(), emqttd:subscriber()) -> ok).
|
-spec(async_unsubscribe(binary(), emqttd:subscriber()) -> ok).
|
||||||
async_unsubscribe(Topic, Subscriber) when is_binary(Topic) ->
|
async_unsubscribe(Topic, Subscriber) ->
|
||||||
cast(pick(Subscriber), {unsubscribe, Topic, Subscriber}).
|
cast(pick(Topic), {unsubscribe, Topic, Subscriber}).
|
||||||
|
|
||||||
call(PubSub, Req) when is_pid(PubSub) ->
|
call(Server, Req) ->
|
||||||
gen_server2:call(PubSub, Req, infinity).
|
gen_server2:call(Server, Req, infinity).
|
||||||
|
|
||||||
cast(PubSub, Msg) when is_pid(PubSub) ->
|
cast(Server, Msg) ->
|
||||||
gen_server2:cast(PubSub, Msg).
|
gen_server2:cast(Server, Msg).
|
||||||
|
|
||||||
pick(Subscriber) ->
|
pick(Topic) ->
|
||||||
gproc_pool:pick_worker(pubsub, Subscriber).
|
gproc_pool:pick_worker(pubsub, Topic).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% gen_server Callbacks
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
init([Pool, Id, Env]) ->
|
init([Pool, Id, Env]) ->
|
||||||
?GPROC_POOL(join, Pool, Id),
|
?GPROC_POOL(join, Pool, Id),
|
||||||
{ok, #state{pool = Pool, id = Id, env = Env, submon = emqttd_pmon:new()}}.
|
{ok, #state{pool = Pool, id = Id, env = Env}}.
|
||||||
|
|
||||||
handle_call({subscribe, Topic, Subscriber, Options}, _From, State) ->
|
handle_call({subscribe, Topic, Subscriber}, _From, State) ->
|
||||||
case do_subscribe(Topic, Subscriber, Options, State) of
|
add_subscriber_(Topic, Subscriber),
|
||||||
{ok, NewState} -> {reply, ok, setstats(NewState)};
|
{reply, ok, setstats(State)};
|
||||||
{error, Error} -> {reply, {error, Error}, State}
|
|
||||||
end;
|
|
||||||
|
|
||||||
handle_call({unsubscribe, Topic, Subscriber}, _From, State) ->
|
handle_call({unsubscribe, Topic, Subscriber}, _From, State) ->
|
||||||
case do_unsubscribe(Topic, Subscriber, State) of
|
del_subscriber_(Topic, Subscriber),
|
||||||
{ok, NewState} -> {reply, ok, setstats(NewState), hibernate};
|
{reply, ok, setstats(State)};
|
||||||
{error, Error} -> {reply, {error, Error}, State}
|
|
||||||
end;
|
|
||||||
|
|
||||||
handle_call({setqos, Topic, Subscriber, Qos}, _From, State) ->
|
|
||||||
Key = {Topic, Subscriber},
|
|
||||||
case ets:lookup(mqtt_pubsub, Key) of
|
|
||||||
[{_, Opts}] ->
|
|
||||||
Opts1 = lists:ukeymerge(1, [{qos, Qos}], Opts),
|
|
||||||
ets:insert(mqtt_pubsub, {Key, Opts1}),
|
|
||||||
{reply, ok, State};
|
|
||||||
[] ->
|
|
||||||
{reply, {error, {subscription_not_found, Topic}}, State}
|
|
||||||
end;
|
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
?UNEXPECTED_REQ(Req, State).
|
?UNEXPECTED_REQ(Req, State).
|
||||||
|
|
||||||
handle_cast({subscribe, Topic, Subscriber, Options}, State) ->
|
handle_cast({subscribe, Topic, Subscriber}, State) ->
|
||||||
case do_subscribe(Topic, Subscriber, Options, State) of
|
add_subscriber_(Topic, Subscriber),
|
||||||
{ok, NewState} -> {noreply, setstats(NewState)};
|
{noreply, setstats(State)};
|
||||||
{error, _Error} -> {noreply, State}
|
|
||||||
end;
|
|
||||||
|
|
||||||
handle_cast({unsubscribe, Topic, Subscriber}, State) ->
|
handle_cast({unsubscribe, Topic, Subscriber}, State) ->
|
||||||
case do_unsubscribe(Topic, Subscriber, State) of
|
del_subscriber_(Topic, Subscriber),
|
||||||
{ok, NewState} -> {noreply, setstats(NewState), hibernate};
|
{noreply, setstats(State)};
|
||||||
{error, _Error} -> {noreply, State}
|
|
||||||
end;
|
|
||||||
|
|
||||||
handle_cast({down, Subscriber}, State) ->
|
|
||||||
subscriber_down_(Subscriber),
|
|
||||||
{noreply, State};
|
|
||||||
|
|
||||||
handle_cast(Msg, State) ->
|
handle_cast(Msg, State) ->
|
||||||
?UNEXPECTED_MSG(Msg, State).
|
?UNEXPECTED_MSG(Msg, State).
|
||||||
|
|
||||||
handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{submon = PM}) ->
|
|
||||||
subscriber_down_(DownPid),
|
|
||||||
{noreply, setstats(State#state{submon = PM:erase(DownPid)}), hibernate};
|
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
?UNEXPECTED_INFO(Info, State).
|
?UNEXPECTED_INFO(Info, State).
|
||||||
|
|
||||||
|
@ -228,71 +152,27 @@ terminate(_Reason, #state{pool = Pool, id = Id}) ->
|
||||||
?GPROC_POOL(leave, Pool, Id).
|
?GPROC_POOL(leave, Pool, Id).
|
||||||
|
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal Functions
|
%% Internel Functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
do_subscribe(Topic, Subscriber, Options, State) ->
|
add_subscriber_(Topic, Subscriber) ->
|
||||||
case ets:lookup(mqtt_pubsub, {Topic, Subscriber}) of
|
case ets:member(mqtt_subscriber, Topic) of
|
||||||
[] ->
|
false -> emqttd_router:add_route(Topic, node());
|
||||||
?Dispatcher:async_subscribe(Topic, Subscriber),
|
true -> ok
|
||||||
add_subscription(Subscriber, Topic),
|
end,
|
||||||
ets:insert(mqtt_pubsub, {{Topic, Subscriber}, Options}),
|
ets:insert(subscriber, {Topic, Subscriber}).
|
||||||
{ok, monitor_subpid(Subscriber, State)};
|
|
||||||
[_] ->
|
del_subscriber_(Topic, Subscriber) ->
|
||||||
{error, {already_subscribed, Topic}}
|
ets:delete_object(mqtt_subscriber, {Topic, Subscriber}),
|
||||||
|
case ets:member(mqtt_subscriber, Topic) of
|
||||||
|
false -> emqttd_router:del_route(Topic, node());
|
||||||
|
true -> ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
add_subscription(Subscriber, Topic) ->
|
|
||||||
ets:insert(mqtt_subscription, {Subscriber, Topic}).
|
|
||||||
|
|
||||||
do_unsubscribe(Topic, Subscriber, State) ->
|
|
||||||
case ets:lookup(mqtt_pubsub, {Topic, Subscriber}) of
|
|
||||||
[_] ->
|
|
||||||
?Dispatcher:async_unsubscribe(Topic, Subscriber),
|
|
||||||
del_subscription(Subscriber, Topic),
|
|
||||||
ets:delete(mqtt_pubsub, {Topic, Subscriber}),
|
|
||||||
{ok, case ets:member(mqtt_subscription, Subscriber) of
|
|
||||||
true -> State;
|
|
||||||
false -> demonitor_subpid(Subscriber, State)
|
|
||||||
end};
|
|
||||||
[] ->
|
|
||||||
{error, {subscription_not_found, Topic}}
|
|
||||||
end.
|
|
||||||
|
|
||||||
del_subscription(Subscriber, Topic) ->
|
|
||||||
ets:delete_object(mqtt_subscription, {Subscriber, Topic}).
|
|
||||||
|
|
||||||
subscriber_down_(Subscriber) ->
|
|
||||||
lists:foreach(fun({_, Topic}) ->
|
|
||||||
subscriber_down_(Subscriber, Topic)
|
|
||||||
end, ets:lookup(mqtt_subscription, Subscriber)),
|
|
||||||
ets:delete(mqtt_subscription, Subscriber).
|
|
||||||
|
|
||||||
subscriber_down_(DownPid, Topic) ->
|
|
||||||
case ets:lookup(mqtt_pubsub, {Topic, DownPid}) of
|
|
||||||
[] ->
|
|
||||||
%% here?
|
|
||||||
?Dispatcher:async_unsubscribe(Topic, DownPid);
|
|
||||||
[_] ->
|
|
||||||
?Dispatcher:async_unsubscribe(Topic, DownPid),
|
|
||||||
ets:delete(mqtt_pubsub, {Topic, DownPid})
|
|
||||||
end.
|
|
||||||
|
|
||||||
monitor_subpid(SubPid, State = #state{submon = PMon}) when is_pid(SubPid) ->
|
|
||||||
State#state{submon = PMon:monitor(SubPid)};
|
|
||||||
monitor_subpid(_SubPid, State) ->
|
|
||||||
State.
|
|
||||||
|
|
||||||
demonitor_subpid(SubPid, State = #state{submon = PMon}) when is_pid(SubPid) ->
|
|
||||||
State#state{submon = PMon:demonitor(SubPid)};
|
|
||||||
demonitor_subpid(_SubPid, State) ->
|
|
||||||
State.
|
|
||||||
|
|
||||||
setstats(State) when is_record(State, state) ->
|
setstats(State) when is_record(State, state) ->
|
||||||
emqttd_stats:setstats('subscribers/count', 'subscribers/max', ets:info(mqtt_subscriber, size)),
|
emqttd_stats:setstats('subscribers/count', 'subscribers/max',
|
||||||
emqttd_stats:setstats('subscriptions/count', 'subscriptions/max', ets:info(mqtt_subscription, size)),
|
ets:info(mqtt_subscriber, size)).
|
||||||
State.
|
|
||||||
|
|
||||||
|
|
|
@ -43,18 +43,17 @@ pubsub_pool() ->
|
||||||
|
|
||||||
init([Env]) ->
|
init([Env]) ->
|
||||||
%% Create ETS Tables
|
%% Create ETS Tables
|
||||||
[create_tab(Tab) || Tab <- [mqtt_pubsub, mqtt_subscriber, mqtt_subscription]],
|
[create_tab(Tab) || Tab <- [mqtt_subpropery, mqtt_subscriber, mqtt_subscription]],
|
||||||
|
|
||||||
%% Dispatcher Pool
|
|
||||||
DispatcherMFA = {emqttd_dispatcher, start_link, [Env]},
|
|
||||||
DispatcherPool = pool_sup(dispatcher, Env, DispatcherMFA),
|
|
||||||
|
|
||||||
%% PubSub Pool
|
%% PubSub Pool
|
||||||
{ok, PubSub} = emqttd:conf(pubsub_adapter),
|
PubSubMFA = {emqttd_pubsub, start_link, [Env]},
|
||||||
PubSubMFA = {PubSub, start_link, [Env]},
|
|
||||||
PubSubPool = pool_sup(pubsub, Env, PubSubMFA),
|
PubSubPool = pool_sup(pubsub, Env, PubSubMFA),
|
||||||
|
|
||||||
{ok, { {one_for_all, 10, 3600}, [DispatcherPool, PubSubPool]} }.
|
%% Server Pool
|
||||||
|
ServerMFA = {emqttd_server, start_link, [Env]},
|
||||||
|
ServerPool = pool_sup(server, Env, ServerMFA),
|
||||||
|
|
||||||
|
{ok, { {one_for_all, 10, 3600}, [PubSubPool, ServerPool]} }.
|
||||||
|
|
||||||
pool_size(Env) ->
|
pool_size(Env) ->
|
||||||
Schedulers = erlang:system_info(schedulers),
|
Schedulers = erlang:system_info(schedulers),
|
||||||
|
@ -68,9 +67,9 @@ pool_sup(Name, Env, MFA) ->
|
||||||
%% Create PubSub Tables
|
%% Create PubSub Tables
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
create_tab(mqtt_pubsub) ->
|
create_tab(mqtt_subproperty) ->
|
||||||
%% Subproperty: {Topic, Sub} -> [{qos, 1}]
|
%% Subproperty: {Topic, Sub} -> [{qos, 1}]
|
||||||
ensure_tab(mqtt_pubsub, [public, named_table, set | ?CONCURRENCY_OPTS]);
|
ensure_tab(mqtt_subproperty, [public, named_table, set | ?CONCURRENCY_OPTS]);
|
||||||
|
|
||||||
create_tab(mqtt_subscriber) ->
|
create_tab(mqtt_subscriber) ->
|
||||||
%% Subscriber: Topic -> Sub1, Sub2, Sub3, ..., SubN
|
%% Subscriber: Topic -> Sub1, Sub2, Sub3, ..., SubN
|
||||||
|
|
|
@ -0,0 +1,286 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqttd_server).
|
||||||
|
|
||||||
|
-author("Feng Lee <feng@emqtt.io>").
|
||||||
|
|
||||||
|
-behaviour(gen_server2).
|
||||||
|
|
||||||
|
-include("emqttd.hrl").
|
||||||
|
|
||||||
|
-include("emqttd_protocol.hrl").
|
||||||
|
|
||||||
|
-include("emqttd_internal.hrl").
|
||||||
|
|
||||||
|
-export([start_link/3]).
|
||||||
|
|
||||||
|
%% PubSub API.
|
||||||
|
-export([subscribe/1, subscribe/2, subscribe/3, publish/2,
|
||||||
|
unsubscribe/1, unsubscribe/2]).
|
||||||
|
|
||||||
|
%% Async PubSub API.
|
||||||
|
-export([async_subscribe/1, async_subscribe/2, async_subscribe/3,
|
||||||
|
async_unsubscribe/1, async_unsubscribe/2]).
|
||||||
|
|
||||||
|
%% Management API.
|
||||||
|
-export([setqos/3, is_subscribed/2, get_subscriptions/1, subscriber_down/1]).
|
||||||
|
|
||||||
|
%% Debug API
|
||||||
|
-export([dump/0]).
|
||||||
|
|
||||||
|
%% gen_server.
|
||||||
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||||
|
terminate/2, code_change/3]).
|
||||||
|
|
||||||
|
-record(state, {pool, id, env, submon :: emqttd_pmon:pmon()}).
|
||||||
|
|
||||||
|
%% @doc Start server
|
||||||
|
-spec(start_link(atom(), pos_integer(), list()) -> {ok, pid()} | ignore | {error, any()}).
|
||||||
|
start_link(Pool, Id, Env) ->
|
||||||
|
gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id, Env], []).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% PubSub API
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
%% @doc Subscribe a Topic
|
||||||
|
-spec(subscribe(binary()) -> ok | emqttd:pubsub_error()).
|
||||||
|
subscribe(Topic) when is_binary(Topic) ->
|
||||||
|
subscribe(Topic, self()).
|
||||||
|
|
||||||
|
-spec(subscribe(binary(), emqttd:subscriber()) -> ok | emqttd:pubsub_error()).
|
||||||
|
subscribe(Topic, Subscriber) when is_binary(Topic) ->
|
||||||
|
subscribe(Topic, Subscriber, []).
|
||||||
|
|
||||||
|
-spec(subscribe(binary(), emqttd:subscriber(), [emqttd:suboption()]) ->
|
||||||
|
ok | emqttd:pubsub_error()).
|
||||||
|
subscribe(Topic, Subscriber, Options) when is_binary(Topic) ->
|
||||||
|
call(pick(Subscriber), {subscribe, Topic, Subscriber, Options}).
|
||||||
|
|
||||||
|
%% @doc Subscribe a Topic Asynchronously
|
||||||
|
-spec(async_subscribe(binary()) -> ok).
|
||||||
|
async_subscribe(Topic) when is_binary(Topic) ->
|
||||||
|
async_subscribe(Topic, self()).
|
||||||
|
|
||||||
|
-spec(async_subscribe(binary(), emqttd:subscriber()) -> ok).
|
||||||
|
async_subscribe(Topic, Subscriber) when is_binary(Topic) ->
|
||||||
|
async_subscribe(Topic, Subscriber, []).
|
||||||
|
|
||||||
|
-spec(async_subscribe(binary(), emqttd:subscriber(), [emqttd:suboption()]) -> ok).
|
||||||
|
async_subscribe(Topic, Subscriber, Options) when is_binary(Topic) ->
|
||||||
|
cast(pick(Subscriber), {subscribe, Topic, Subscriber, Options}).
|
||||||
|
|
||||||
|
%% @doc Publish message to Topic.
|
||||||
|
-spec(publish(binary(), any()) -> {ok, mqtt_delivery()} | ignore).
|
||||||
|
publish(Topic, Msg = #mqtt_message{from = From}) ->
|
||||||
|
trace(publish, From, Msg),
|
||||||
|
case emqttd_hook:run('message.publish', [], Msg) of
|
||||||
|
{ok, Msg1 = #mqtt_message{topic = Topic}} ->
|
||||||
|
%% Retain message first. Don't create retained topic.
|
||||||
|
Msg2 = case emqttd_retainer:retain(Msg1) of
|
||||||
|
ok -> emqttd_message:unset_flag(Msg1);
|
||||||
|
ignore -> Msg1
|
||||||
|
end,
|
||||||
|
emqttd_pubsub:publish(Topic, Msg2);
|
||||||
|
{stop, Msg1} ->
|
||||||
|
lager:warning("Stop publishing: ~s", [emqttd_message:format(Msg1)]),
|
||||||
|
ignore
|
||||||
|
end.
|
||||||
|
|
||||||
|
trace(publish, From, _Msg) when is_atom(From) ->
|
||||||
|
%% Dont' trace '$SYS' publish
|
||||||
|
ignore;
|
||||||
|
|
||||||
|
trace(publish, From, #mqtt_message{topic = Topic, payload = Payload}) ->
|
||||||
|
lager:info([{client, From}, {topic, Topic}],
|
||||||
|
"~s PUBLISH to ~s: ~p", [From, Topic, Payload]).
|
||||||
|
|
||||||
|
%% @doc Unsubscribe
|
||||||
|
-spec(unsubscribe(binary()) -> ok | emqttd:pubsub_error()).
|
||||||
|
unsubscribe(Topic) when is_binary(Topic) ->
|
||||||
|
unsubscribe(Topic, self()).
|
||||||
|
|
||||||
|
%% @doc Unsubscribe
|
||||||
|
-spec(unsubscribe(binary(), emqttd:subscriber()) -> ok | emqttd:pubsub_error()).
|
||||||
|
unsubscribe(Topic, Subscriber) when is_binary(Topic) ->
|
||||||
|
call(pick(Subscriber), {unsubscribe, Topic, Subscriber}).
|
||||||
|
|
||||||
|
%% @doc Async Unsubscribe
|
||||||
|
-spec(async_unsubscribe(binary()) -> ok).
|
||||||
|
async_unsubscribe(Topic) when is_binary(Topic) ->
|
||||||
|
async_unsubscribe(Topic, self()).
|
||||||
|
|
||||||
|
-spec(async_unsubscribe(binary(), emqttd:subscriber()) -> ok).
|
||||||
|
async_unsubscribe(Topic, Subscriber) when is_binary(Topic) ->
|
||||||
|
cast(pick(Subscriber), {unsubscribe, Topic, Subscriber}).
|
||||||
|
|
||||||
|
setqos(Topic, Subscriber, Qos) when is_binary(Topic) ->
|
||||||
|
call(pick(Subscriber), {setqos, Topic, Subscriber, Qos}).
|
||||||
|
|
||||||
|
-spec(is_subscribed(binary(), emqttd:subscriber()) -> boolean()).
|
||||||
|
is_subscribed(Topic, Subscriber) when is_binary(Topic) ->
|
||||||
|
ets:member(mqtt_subproperty, {Topic, Subscriber}).
|
||||||
|
|
||||||
|
-spec(get_subscriptions(emqttd:subscriber()) -> [{binary(), list()}]).
|
||||||
|
get_subscriptions(Subscriber) ->
|
||||||
|
lists:map(fun({_, Topic}) ->
|
||||||
|
subscription(Topic, Subscriber)
|
||||||
|
end, ets:lookup(mqtt_subscription, Subscriber)).
|
||||||
|
|
||||||
|
subscription(Topic, Subscriber) ->
|
||||||
|
{Topic, ets:lookup_element(mqtt_subproperty, {Topic, Subscriber}, 2)}.
|
||||||
|
|
||||||
|
subscriber_down(Subscriber) ->
|
||||||
|
cast(pick(Subscriber), {subscriber_down, Subscriber}).
|
||||||
|
|
||||||
|
call(Server, Req) ->
|
||||||
|
gen_server2:call(Server, Req, infinity).
|
||||||
|
|
||||||
|
cast(Server, Msg) when is_pid(Server) ->
|
||||||
|
gen_server2:cast(Server, Msg).
|
||||||
|
|
||||||
|
pick(Subscriber) ->
|
||||||
|
gproc_pool:pick_worker(server, Subscriber).
|
||||||
|
|
||||||
|
dump() ->
|
||||||
|
[{Tab, ets:tab2list(Tab)} || Tab <- [mqtt_subproperty, mqtt_subscription, mqtt_subscriber]].
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% gen_server Callbacks
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
init([Pool, Id, Env]) ->
|
||||||
|
?GPROC_POOL(join, Pool, Id),
|
||||||
|
{ok, #state{pool = Pool, id = Id, env = Env, submon = emqttd_pmon:new()}}.
|
||||||
|
|
||||||
|
handle_call({subscribe, Topic, Subscriber, Options}, _From, State) ->
|
||||||
|
case subscribe_(Topic, Subscriber, Options, State) of
|
||||||
|
{ok, NewState} -> {reply, ok, setstats(NewState)};
|
||||||
|
{error, Error} -> {reply, {error, Error}, State}
|
||||||
|
end;
|
||||||
|
|
||||||
|
handle_call({unsubscribe, Topic, Subscriber}, _From, State) ->
|
||||||
|
case unsubscribe_(Topic, Subscriber, State) of
|
||||||
|
{ok, NewState} -> {reply, ok, setstats(NewState), hibernate};
|
||||||
|
{error, Error} -> {reply, {error, Error}, State}
|
||||||
|
end;
|
||||||
|
|
||||||
|
handle_call({setqos, Topic, Subscriber, Qos}, _From, State) ->
|
||||||
|
Key = {Topic, Subscriber},
|
||||||
|
case ets:lookup(mqtt_subproperty, Key) of
|
||||||
|
[{_, Opts}] ->
|
||||||
|
Opts1 = lists:ukeymerge(1, [{qos, Qos}], Opts),
|
||||||
|
ets:insert(mqtt_subproperty, {Key, Opts1}),
|
||||||
|
{reply, ok, State};
|
||||||
|
[] ->
|
||||||
|
{reply, {error, {subscription_not_found, Topic}}, State}
|
||||||
|
end;
|
||||||
|
|
||||||
|
handle_call(Req, _From, State) ->
|
||||||
|
?UNEXPECTED_REQ(Req, State).
|
||||||
|
|
||||||
|
handle_cast({subscribe, Topic, Subscriber, Options}, State) ->
|
||||||
|
case subscribe_(Topic, Subscriber, Options, State) of
|
||||||
|
{ok, NewState} -> {noreply, setstats(NewState)};
|
||||||
|
{error, _Error} -> {noreply, State}
|
||||||
|
end;
|
||||||
|
|
||||||
|
handle_cast({unsubscribe, Topic, Subscriber}, State) ->
|
||||||
|
case unsubscribe_(Topic, Subscriber, State) of
|
||||||
|
{ok, NewState} -> {noreply, setstats(NewState), hibernate};
|
||||||
|
{error, _Error} -> {noreply, State}
|
||||||
|
end;
|
||||||
|
|
||||||
|
handle_cast({subscriber_down, Subscriber}, State) ->
|
||||||
|
subscriber_down_(Subscriber),
|
||||||
|
{noreply, setstats(State)};
|
||||||
|
|
||||||
|
handle_cast(Msg, State) ->
|
||||||
|
?UNEXPECTED_MSG(Msg, State).
|
||||||
|
|
||||||
|
handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{submon = PM}) ->
|
||||||
|
subscriber_down_(DownPid),
|
||||||
|
{noreply, setstats(State#state{submon = PM:erase(DownPid)}), hibernate};
|
||||||
|
|
||||||
|
handle_info(Info, State) ->
|
||||||
|
?UNEXPECTED_INFO(Info, State).
|
||||||
|
|
||||||
|
terminate(_Reason, #state{pool = Pool, id = Id}) ->
|
||||||
|
?GPROC_POOL(leave, Pool, Id).
|
||||||
|
|
||||||
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
|
{ok, State}.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Internal Functions
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
subscribe_(Topic, Subscriber, Options, State) ->
|
||||||
|
case ets:lookup(mqtt_subproperty, {Topic, Subscriber}) of
|
||||||
|
[] ->
|
||||||
|
emqttd_pubsub:async_subscribe(Topic, Subscriber),
|
||||||
|
ets:insert(mqtt_subscription, {Subscriber, Topic}),
|
||||||
|
ets:insert(mqtt_subproperty, {{Topic, Subscriber}, Options}),
|
||||||
|
{ok, monitor_subpid(Subscriber, State)};
|
||||||
|
[_] ->
|
||||||
|
{error, {already_subscribed, Topic}}
|
||||||
|
end.
|
||||||
|
|
||||||
|
unsubscribe_(Topic, Subscriber, State) ->
|
||||||
|
case ets:lookup(mqtt_subproperty, {Topic, Subscriber}) of
|
||||||
|
[_] ->
|
||||||
|
emqttd_pubsub:async_unsubscribe(Topic, Subscriber),
|
||||||
|
ets:delete_object(mqtt_subscription, {Subscriber, Topic}),
|
||||||
|
ets:delete(mqtt_subproperty, {Topic, Subscriber}),
|
||||||
|
{ok, case ets:member(mqtt_subscription, Subscriber) of
|
||||||
|
true -> State;
|
||||||
|
false -> demonitor_subpid(Subscriber, State)
|
||||||
|
end};
|
||||||
|
[] ->
|
||||||
|
{error, {subscription_not_found, Topic}}
|
||||||
|
end.
|
||||||
|
|
||||||
|
monitor_subpid(SubPid, State = #state{submon = PMon}) when is_pid(SubPid) ->
|
||||||
|
State#state{submon = PMon:monitor(SubPid)};
|
||||||
|
monitor_subpid(_SubPid, State) ->
|
||||||
|
State.
|
||||||
|
|
||||||
|
demonitor_subpid(SubPid, State = #state{submon = PMon}) when is_pid(SubPid) ->
|
||||||
|
State#state{submon = PMon:demonitor(SubPid)};
|
||||||
|
demonitor_subpid(_SubPid, State) ->
|
||||||
|
State.
|
||||||
|
|
||||||
|
subscriber_down_(Subscriber) ->
|
||||||
|
lists:foreach(fun({_, Topic}) ->
|
||||||
|
subscriber_down_(Subscriber, Topic)
|
||||||
|
end, ets:lookup(mqtt_subscription, Subscriber)),
|
||||||
|
ets:delete(mqtt_subscription, Subscriber).
|
||||||
|
|
||||||
|
subscriber_down_(Subscriber, Topic) ->
|
||||||
|
case ets:lookup(mqtt_subproperty, {Topic, Subscriber}) of
|
||||||
|
[] ->
|
||||||
|
%% here?
|
||||||
|
emqttd_pubsub:async_unsubscribe(Topic, Subscriber);
|
||||||
|
[_] ->
|
||||||
|
emqttd_pubsub:async_unsubscribe(Topic, Subscriber),
|
||||||
|
ets:delete(mqtt_subproperty, {Topic, Subscriber})
|
||||||
|
end.
|
||||||
|
|
||||||
|
setstats(State) ->
|
||||||
|
emqttd_stats:setstats('subscriptions/count', 'subscriptions/max',
|
||||||
|
ets:info(mqtt_subscription, size)), State.
|
||||||
|
|
|
@ -232,7 +232,7 @@ init([CleanSess, ClientId, ClientPid]) ->
|
||||||
expired_after = get_value(expired_after, SessEnv) * 60,
|
expired_after = get_value(expired_after, SessEnv) * 60,
|
||||||
collect_interval = get_value(collect_interval, SessEnv, 0),
|
collect_interval = get_value(collect_interval, SessEnv, 0),
|
||||||
timestamp = os:timestamp()},
|
timestamp = os:timestamp()},
|
||||||
emqttd_sm:register_session(CleanSess, ClientId, sess_info(Session)),
|
emqttd_sm:register_session(ClientId, CleanSess, sess_info(Session)),
|
||||||
%% Start statistics
|
%% Start statistics
|
||||||
{ok, start_collector(Session), hibernate}.
|
{ok, start_collector(Session), hibernate}.
|
||||||
|
|
||||||
|
@ -297,7 +297,7 @@ handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id =
|
||||||
?LOG(warning, "duplicated subscribe: ~s, qos = ~w", [Topic, Qos], Session),
|
?LOG(warning, "duplicated subscribe: ~s, qos = ~w", [Topic, Qos], Session),
|
||||||
SubDict;
|
SubDict;
|
||||||
{ok, OldQos} ->
|
{ok, OldQos} ->
|
||||||
emqttd_pubsub:setqos(Topic, ClientId, Qos),
|
emqttd_server:setqos(Topic, ClientId, Qos),
|
||||||
?LOG(warning, "duplicated subscribe ~s, old_qos=~w, new_qos=~w", [Topic, OldQos, Qos], Session),
|
?LOG(warning, "duplicated subscribe ~s, old_qos=~w, new_qos=~w", [Topic, OldQos, Qos], Session),
|
||||||
dict:store(Topic, Qos, SubDict);
|
dict:store(Topic, Qos, SubDict);
|
||||||
error ->
|
error ->
|
||||||
|
@ -385,8 +385,8 @@ handle_cast({resume, ClientId, ClientPid}, Session = #session{client_id = C
|
||||||
if
|
if
|
||||||
CleanSess =:= true ->
|
CleanSess =:= true ->
|
||||||
?LOG(warning, "CleanSess changed to false.", [], Session),
|
?LOG(warning, "CleanSess changed to false.", [], Session),
|
||||||
emqttd_sm:unregister_session(CleanSess, ClientId),
|
%% emqttd_sm:unregister_session(CleanSess, ClientId),
|
||||||
emqttd_sm:register_session(false, ClientId, sess_info(Session1));
|
emqttd_sm:register_session(ClientId, false, sess_info(Session1));
|
||||||
CleanSess =:= false ->
|
CleanSess =:= false ->
|
||||||
ok
|
ok
|
||||||
end,
|
end,
|
||||||
|
@ -500,7 +500,7 @@ handle_info({timeout, awaiting_comp, PktId}, Session = #session{awaiting_comp =
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_info(collect_info, Session = #session{clean_sess = CleanSess, client_id = ClientId}) ->
|
handle_info(collect_info, Session = #session{clean_sess = CleanSess, client_id = ClientId}) ->
|
||||||
emqttd_sm:register_session(CleanSess, ClientId, sess_info(Session)),
|
emqttd_sm:register_session(ClientId, CleanSess, sess_info(Session)),
|
||||||
hibernate(start_collector(Session));
|
hibernate(start_collector(Session));
|
||||||
|
|
||||||
handle_info({'EXIT', ClientPid, _Reason}, Session = #session{clean_sess = true,
|
handle_info({'EXIT', ClientPid, _Reason}, Session = #session{clean_sess = true,
|
||||||
|
@ -531,10 +531,10 @@ handle_info(expired, Session) ->
|
||||||
handle_info(Info, Session) ->
|
handle_info(Info, Session) ->
|
||||||
?UNEXPECTED_INFO(Info, Session).
|
?UNEXPECTED_INFO(Info, Session).
|
||||||
|
|
||||||
terminate(_Reason, #session{clean_sess = CleanSess, client_id = ClientId}) ->
|
terminate(_Reason, #session{client_id = ClientId}) ->
|
||||||
%%TODO: ...
|
%%TODO: ...
|
||||||
emqttd_pubsub:subscriber_down(ClientId),
|
emqttd_server:subscriber_down(ClientId),
|
||||||
emqttd_sm:unregister_session(CleanSess, ClientId).
|
emqttd_sm:unregister_session(ClientId).
|
||||||
|
|
||||||
code_change(_OldVsn, Session, _Extra) ->
|
code_change(_OldVsn, Session, _Extra) ->
|
||||||
{ok, Session}.
|
{ok, Session}.
|
||||||
|
|
|
@ -91,8 +91,8 @@ lookup_session(ClientId) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% @doc Register a session with info.
|
%% @doc Register a session with info.
|
||||||
-spec(register_session(boolean(), binary(), [tuple()]) -> true).
|
-spec(register_session(binary(), boolean(), [tuple()]) -> true).
|
||||||
register_session(CleanSess, ClientId, Properties) ->
|
register_session(ClientId, CleanSess, Properties) ->
|
||||||
ets:insert(mqtt_local_session, {ClientId, self(), CleanSess, Properties}).
|
ets:insert(mqtt_local_session, {ClientId, self(), CleanSess, Properties}).
|
||||||
|
|
||||||
%% @doc Unregister a session.
|
%% @doc Unregister a session.
|
||||||
|
|
Loading…
Reference in New Issue