improve the design of pubsub and router
This commit is contained in:
parent
e73df7f54f
commit
226933018a
|
@ -156,11 +156,12 @@
|
|||
%% MQTT Delivery
|
||||
%%--------------------------------------------------------------------
|
||||
-record(mqtt_delivery, {
|
||||
message :: mqtt_message(), %% Message
|
||||
dispatched = [] :: list(),
|
||||
flow_through :: [node()]
|
||||
message :: mqtt_message(), %% Message
|
||||
flows :: list()
|
||||
}).
|
||||
|
||||
-type(mqtt_delivery() :: #mqtt_delivery{}).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% MQTT Alarm
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
120
src/emqttd.erl
120
src/emqttd.erl
|
@ -16,6 +16,8 @@
|
|||
|
||||
-module(emqttd).
|
||||
|
||||
-author("Feng Lee <feng@emqtt.io>").
|
||||
|
||||
-include("emqttd.hrl").
|
||||
|
||||
-include("emqttd_protocol.hrl").
|
||||
|
@ -23,19 +25,31 @@
|
|||
-export([start/0, conf/1, conf/2, env/1, env/2, is_running/1]).
|
||||
|
||||
%% PubSub API
|
||||
-export([create/2, lookup/2, publish/1, subscribe/1, subscribe/3,
|
||||
unsubscribe/1, unsubscribe/3]).
|
||||
-export([subscribe/1, subscribe/2, subscribe/3, publish/1,
|
||||
unsubscribe/1, unsubscribe/2]).
|
||||
|
||||
%% Route and Forward API
|
||||
%% -export([route/2, forward/2]).
|
||||
%% PubSub Management API
|
||||
-export([topics/0, subscribers/1, subscriptions/1]).
|
||||
|
||||
%% Hooks API
|
||||
-export([hook/4, hook/3, unhook/2, run_hooks/3]).
|
||||
|
||||
%% Debug API
|
||||
-export([dump/0]).
|
||||
|
||||
-type(subscriber() :: pid() | binary() | function()).
|
||||
|
||||
-type(suboption() :: local | {qos, non_neg_integer()} | {share, {'$queue' | binary()}}).
|
||||
|
||||
-type(pubsub_error() :: {error, {already_subscribed, binary()}
|
||||
| {subscription_not_found, binary()}}).
|
||||
|
||||
-export_type([subscriber/0, suboption/0, pubsub_error/0]).
|
||||
|
||||
-define(APP, ?MODULE).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Bootstrap, environment, is_running...
|
||||
%% Bootstrap, environment, configuration, is_running...
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @doc Start emqttd application.
|
||||
|
@ -67,52 +81,62 @@ is_running(Node) ->
|
|||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% PubSub APIs that wrap emqttd_server, emqttd_pubsub
|
||||
%% PubSub APIs that wrap emqttd_pubsub
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @doc Lookup Topic or Subscription
|
||||
-spec(lookup(topic, binary()) -> [mqtt_topic()];
|
||||
(subscription, binary()) -> [mqtt_subscription()]).
|
||||
lookup(topic, Topic) when is_binary(Topic) ->
|
||||
emqttd_pubsub:lookup_topic(Topic);
|
||||
%% @doc Subscribe
|
||||
-spec(subscribe(iodata()) -> ok | {error, any()}).
|
||||
subscribe(Topic) ->
|
||||
subscribe(Topic, self()).
|
||||
|
||||
lookup(subscription, ClientId) when is_binary(ClientId) ->
|
||||
emqttd_server:lookup_subscription(ClientId).
|
||||
-spec(subscribe(iodata(), subscriber()) -> ok | {error, any()}).
|
||||
subscribe(Topic, Subscriber) ->
|
||||
subscribe(Topic, Subscriber, []).
|
||||
|
||||
%% @doc Create a Topic or Subscription
|
||||
-spec(create(topic | subscription, binary()) -> ok | {error, any()}).
|
||||
create(topic, Topic) when is_binary(Topic) ->
|
||||
emqttd_pubsub:create_topic(Topic);
|
||||
|
||||
create(subscription, {ClientId, Topic, Qos}) ->
|
||||
Subscription = #mqtt_subscription{subid = ClientId, topic = Topic, qos = ?QOS_I(Qos)},
|
||||
emqttd_backend:add_subscription(Subscription).
|
||||
-spec(subscribe(iodata(), subscriber(), [suboption()]) -> ok | pubsub_error()).
|
||||
subscribe(Topic, Subscriber, Options) ->
|
||||
with_pubsub(fun(PubSub) -> PubSub:subscribe(iolist_to_binary(Topic), Subscriber, Options) end).
|
||||
|
||||
%% @doc Publish MQTT Message
|
||||
-spec(publish(mqtt_message()) -> ok).
|
||||
publish(Msg) when is_record(Msg, mqtt_message) ->
|
||||
emqttd_server:publish(Msg), ok.
|
||||
|
||||
%% @doc Subscribe
|
||||
-spec(subscribe(binary()) -> ok;
|
||||
({binary(), binary(), mqtt_qos()}) -> ok).
|
||||
subscribe(Topic) when is_binary(Topic) ->
|
||||
emqttd_server:subscribe(Topic);
|
||||
subscribe({ClientId, Topic, Qos}) ->
|
||||
subscribe(ClientId, Topic, Qos).
|
||||
|
||||
-spec(subscribe(binary(), binary(), mqtt_qos()) -> {ok, mqtt_qos()}).
|
||||
subscribe(ClientId, Topic, Qos) ->
|
||||
emqttd_server:subscribe(ClientId, Topic, Qos).
|
||||
-spec(publish(mqtt_message()) -> {ok, mqtt_delivery()} | ignore).
|
||||
publish(Msg = #mqtt_message{from = From}) ->
|
||||
trace(publish, From, Msg),
|
||||
case run_hooks('message.publish', [], Msg) of
|
||||
{ok, Msg1 = #mqtt_message{topic = Topic}} ->
|
||||
%% Retain message first. Don't create retained topic.
|
||||
Msg2 = case emqttd_retainer:retain(Msg1) of
|
||||
ok -> emqttd_message:unset_flag(Msg1);
|
||||
ignore -> Msg1
|
||||
end,
|
||||
with_pubsub(fun(PubSub) -> PubSub:publish(Topic, Msg2) end);
|
||||
{stop, Msg1} ->
|
||||
lager:warning("Stop publishing: ~s", [emqttd_message:format(Msg1)]),
|
||||
ignore
|
||||
end.
|
||||
|
||||
%% @doc Unsubscribe
|
||||
-spec(unsubscribe(binary()) -> ok).
|
||||
unsubscribe(Topic) when is_binary(Topic) ->
|
||||
emqttd_server:unsubscribe(Topic).
|
||||
-spec(unsubscribe(iodata()) -> ok | pubsub_error()).
|
||||
unsubscribe(Topic) ->
|
||||
unsubscribe(Topic, self()).
|
||||
|
||||
-spec(unsubscribe(binary(), binary(), mqtt_qos()) -> ok).
|
||||
unsubscribe(ClientId, Topic, Qos) ->
|
||||
emqttd_server:unsubscribe(ClientId, Topic, Qos).
|
||||
-spec(unsubscribe(iodata(), subscriber()) -> ok | pubsub_error()).
|
||||
unsubscribe(Topic, Subscriber) ->
|
||||
with_pubsub(fun(PubSub) -> PubSub:unsubscribe(iolist_to_binary(Topic), Subscriber) end).
|
||||
|
||||
-spec(topics() -> [binary()]).
|
||||
topics() -> with_pubsub(fun(PubSub) -> PubSub:topics() end).
|
||||
|
||||
-spec(subscribers(iodata()) -> list(subscriber())).
|
||||
subscribers(Topic) ->
|
||||
with_pubsub(fun(PubSub) -> PubSub:subscribers(iolist_to_binary(Topic)) end).
|
||||
|
||||
-spec(subscriptions(subscriber()) -> [{binary(), suboption()}]).
|
||||
subscriptions(Subscriber) ->
|
||||
with_pubsub(fun(PubSub) -> PubSub:subscriptions(Subscriber) end).
|
||||
|
||||
with_pubsub(Fun) -> Fun(conf(pubsub_adapter)).
|
||||
|
||||
dump() -> with_pubsub(fun(PubSub) -> lists:append(PubSub:dump(), zenmq_router:dump()) end).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Hooks API
|
||||
|
@ -134,3 +158,15 @@ unhook(Hook, Function) ->
|
|||
run_hooks(Hook, Args, Acc) ->
|
||||
emqttd_hook:run(Hook, Args, Acc).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Trace Functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
trace(publish, From, _Msg) when is_atom(From) ->
|
||||
%% Dont' trace '$SYS' publish
|
||||
ignore;
|
||||
|
||||
trace(publish, From, #mqtt_message{topic = Topic, payload = Payload}) ->
|
||||
lager:info([{client, From}, {topic, Topic}],
|
||||
"~s PUBLISH to ~s: ~p", [From, Topic, Payload]).
|
||||
|
||||
|
|
|
@ -77,6 +77,7 @@ print_vsn() ->
|
|||
start_servers(Sup) ->
|
||||
Servers = [{"emqttd ctl", emqttd_ctl},
|
||||
{"emqttd hook", emqttd_hook},
|
||||
{"emqttd router", emqttd_router},
|
||||
{"emqttd pubsub", {supervisor, emqttd_pubsub_sup}},
|
||||
{"emqttd stats", emqttd_stats},
|
||||
{"emqttd metrics", emqttd_metrics},
|
||||
|
|
|
@ -0,0 +1,95 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqttd_backend).
|
||||
|
||||
-include("emqttd.hrl").
|
||||
|
||||
-include_lib("stdlib/include/ms_transform.hrl").
|
||||
|
||||
%% Mnesia Callbacks
|
||||
-export([mnesia/1]).
|
||||
|
||||
-boot_mnesia({mnesia, [boot]}).
|
||||
-copy_mnesia({mnesia, [copy]}).
|
||||
|
||||
%% Retained Message API
|
||||
-export([retain_message/1, read_messages/1, match_messages/1, delete_message/1,
|
||||
expire_messages/1, retained_count/0]).
|
||||
|
||||
-record(retained_message, {topic, msg}).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Mnesia callbacks
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
mnesia(boot) ->
|
||||
ok = emqttd_mnesia:create_table(retained_message, [
|
||||
{type, ordered_set},
|
||||
{disc_copies, [node()]},
|
||||
{record_name, retained_message},
|
||||
{attributes, record_info(fields, retained_message)},
|
||||
{storage_properties, [{ets, [compressed]},
|
||||
{dets, [{auto_save, 1000}]}]}]);
|
||||
|
||||
mnesia(copy) ->
|
||||
ok = emqttd_mnesia:copy_table(retained_message).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Retained Message
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-spec(retain_message(mqtt_message()) -> ok).
|
||||
retain_message(Msg = #mqtt_message{topic = Topic}) ->
|
||||
mnesia:dirty_write(#retained_message{topic = Topic, msg = Msg}).
|
||||
|
||||
-spec(read_messages(binary()) -> [mqtt_message()]).
|
||||
read_messages(Topic) ->
|
||||
[Msg || #retained_message{msg = Msg} <- mnesia:dirty_read(retained_message, Topic)].
|
||||
|
||||
-spec(match_messages(binary()) -> [mqtt_message()]).
|
||||
match_messages(Filter) ->
|
||||
%% TODO: optimize later...
|
||||
Fun = fun(#retained_message{topic = Name, msg = Msg}, Acc) ->
|
||||
case emqttd_topic:match(Name, Filter) of
|
||||
true -> [Msg|Acc];
|
||||
false -> Acc
|
||||
end
|
||||
end,
|
||||
mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], retained_message]).
|
||||
|
||||
-spec(delete_message(binary()) -> ok).
|
||||
delete_message(Topic) ->
|
||||
mnesia:dirty_delete(retained_message, Topic).
|
||||
|
||||
-spec(expire_messages(pos_integer()) -> any()).
|
||||
expire_messages(Time) when is_integer(Time) ->
|
||||
mnesia:transaction(
|
||||
fun() ->
|
||||
Match = ets:fun2ms(
|
||||
fun(#retained_message{topic = Topic, msg = #mqtt_message{timestamp = {MegaSecs, Secs, _}}})
|
||||
when Time > (MegaSecs * 1000000 + Secs) -> Topic
|
||||
end),
|
||||
Topics = mnesia:select(retained_message, Match, write),
|
||||
lists:foreach(fun(<<"$SYS/", _/binary>>) -> ok; %% ignore $SYS/# messages
|
||||
(Topic) -> mnesia:delete({retained_message, Topic})
|
||||
end, Topics)
|
||||
end).
|
||||
|
||||
-spec(retained_count() -> non_neg_integer()).
|
||||
retained_count() ->
|
||||
mnesia:table_info(retained_message, size).
|
||||
|
|
@ -82,6 +82,5 @@ remove(Node) ->
|
|||
end.
|
||||
|
||||
%% @doc Cluster status
|
||||
status() ->
|
||||
emqttd_mnesia:cluster_status().
|
||||
status() -> emqttd_mnesia:cluster_status().
|
||||
|
||||
|
|
|
@ -16,6 +16,8 @@
|
|||
|
||||
-module(emqttd_pubsub).
|
||||
|
||||
-author("Feng Lee <feng@emqtt.io>").
|
||||
|
||||
-behaviour(gen_server2).
|
||||
|
||||
-include("emqttd.hrl").
|
||||
|
@ -24,121 +26,170 @@
|
|||
|
||||
-include("emqttd_internal.hrl").
|
||||
|
||||
%% Mnesia Callbacks
|
||||
-export([mnesia/1]).
|
||||
%% Init And Start
|
||||
-export([init_tabs/0, start_link/3]).
|
||||
|
||||
-boot_mnesia({mnesia, [boot]}).
|
||||
-copy_mnesia({mnesia, [copy]}).
|
||||
%% PubSub API.
|
||||
-export([subscribe/1, subscribe/2, subscribe/3, publish/2,
|
||||
unsubscribe/1, unsubscribe/2]).
|
||||
|
||||
%% API Exports
|
||||
-export([start_link/3, create_topic/1, lookup_topic/1]).
|
||||
%% Async PubSub API.
|
||||
-export([async_subscribe/1, async_subscribe/2, async_subscribe/3,
|
||||
async_unsubscribe/1, async_unsubscribe/2]).
|
||||
|
||||
-export([subscribe/2, unsubscribe/2, publish/2, dispatch/2,
|
||||
async_subscribe/2, async_unsubscribe/2]).
|
||||
%% Management API.
|
||||
-export([setqos/3, topics/0, subscribers/1, is_subscribed/2, subscriptions/1]).
|
||||
|
||||
%% Route API
|
||||
-export([forward/3, dispatch/2]).
|
||||
|
||||
%% Debug API
|
||||
-export([dump/0]).
|
||||
|
||||
%% gen_server.
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||
terminate/2, code_change/3]).
|
||||
|
||||
-record(state, {pool, id, env}).
|
||||
-record(state, {pool, id, env, submon :: emqttd_pmon:pmon()}).
|
||||
|
||||
-define(PUBSUB, ?MODULE).
|
||||
|
||||
-define(is_local(Options), lists:member(local, Options)).
|
||||
|
||||
-define(CONCURRENCY_OPTS, [{read_concurrency, true}, {write_concurrency, true}]).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Mnesia callbacks
|
||||
%% Init ETS Tables
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
mnesia(boot) ->
|
||||
ok = emqttd_mnesia:create_table(topic, [
|
||||
{ram_copies, [node()]},
|
||||
{record_name, mqtt_topic},
|
||||
{attributes, record_info(fields, mqtt_topic)}]);
|
||||
init_tabs() ->
|
||||
%% Create ETS Tabs
|
||||
lists:foreach(fun create_tab/1, [subscriber, subscription, subproperty]).
|
||||
|
||||
mnesia(copy) ->
|
||||
ok = emqttd_mnesia:copy_table(topic).
|
||||
create_tab(subscriber) ->
|
||||
%% Subscriber: Topic -> Sub1, {Share, Sub2}, {Share, Sub3}, ..., SubN
|
||||
%% duplicate_bag: o(1) insert
|
||||
ensure_tab(subscriber, [public, named_table, duplicate_bag | ?CONCURRENCY_OPTS]);
|
||||
|
||||
create_tab(subscription) ->
|
||||
%% Subscription: Sub -> Topic1, {Share, Topic2}, {Share, Topic3}, ..., TopicN
|
||||
%% bag: o(n) insert
|
||||
ensure_tab(subscription, [public, named_table, bag | ?CONCURRENCY_OPTS]);
|
||||
|
||||
create_tab(subproperty) ->
|
||||
%% Subproperty: {Topic, Sub} -> [local, {qos, 1}, {share, <<"share">>}]
|
||||
ensure_tab(subproperty, [public, named_table, ordered_set | ?CONCURRENCY_OPTS]).
|
||||
|
||||
ensure_tab(Tab, Opts) ->
|
||||
case ets:info(Tab, name) of undefined -> ets:new(Tab, Opts); _ -> ok end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Start PubSub
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @doc Start one pubsub
|
||||
-spec(start_link(Pool, Id, Env) -> {ok, pid()} | ignore | {error, any()} when
|
||||
Pool :: atom(),
|
||||
Id :: pos_integer(),
|
||||
Env :: list(tuple())).
|
||||
-spec(start_link(atom(), pos_integer(), [tuple()]) -> {ok, pid()} | ignore | {error, any()}).
|
||||
start_link(Pool, Id, Env) ->
|
||||
gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id, Env], []).
|
||||
|
||||
%% @doc Create a Topic.
|
||||
-spec(create_topic(binary()) -> ok | {error, any()}).
|
||||
create_topic(Topic) when is_binary(Topic) ->
|
||||
case mnesia:transaction(fun add_topic_/2, [Topic, [static]]) of
|
||||
{atomic, ok} -> ok;
|
||||
{aborted, Error} -> {error, Error}
|
||||
end.
|
||||
|
||||
%% @doc Lookup a Topic.
|
||||
-spec(lookup_topic(binary()) -> list(mqtt_topic())).
|
||||
lookup_topic(Topic) when is_binary(Topic) ->
|
||||
mnesia:dirty_read(topic, Topic).
|
||||
gen_server2:start_link({local, ?PROC_NAME(?PUBSUB, Id)}, ?MODULE, [Pool, Id, Env], []).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% PubSub API
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @doc Subscribe a Topic
|
||||
-spec(subscribe(binary(), pid()) -> ok).
|
||||
subscribe(Topic, SubPid) when is_binary(Topic) ->
|
||||
call(pick(Topic), {subscribe, Topic, SubPid}).
|
||||
-spec(subscribe(binary()) -> ok | emqttd:pubsub_error()).
|
||||
subscribe(Topic) when is_binary(Topic) ->
|
||||
subscribe(Topic, self()).
|
||||
|
||||
%% @doc Asynchronous Subscribe
|
||||
-spec(async_subscribe(binary(), pid()) -> ok).
|
||||
async_subscribe(Topic, SubPid) when is_binary(Topic) ->
|
||||
cast(pick(Topic), {subscribe, Topic, SubPid}).
|
||||
-spec(subscribe(binary(), emqttd:subscriber()) -> ok | emqttd:pubsub_error()).
|
||||
subscribe(Topic, Subscriber) when is_binary(Topic) ->
|
||||
subscribe(Topic, Subscriber, []).
|
||||
|
||||
-spec(subscribe(binary(), emqttd:subscriber(), [emqttd:suboption()]) ->
|
||||
ok | emqttd:pubsub_error()).
|
||||
subscribe(Topic, Subscriber, Options) when is_binary(Topic) ->
|
||||
call(pick(Subscriber), {subscribe, Topic, Subscriber, Options}).
|
||||
|
||||
%% @doc Subscribe a Topic Asynchronously
|
||||
-spec(async_subscribe(binary()) -> ok).
|
||||
async_subscribe(Topic) when is_binary(Topic) ->
|
||||
async_subscribe(Topic, self()).
|
||||
|
||||
-spec(async_subscribe(binary(), emqttd:subscriber()) -> ok).
|
||||
async_subscribe(Topic, Subscriber) when is_binary(Topic) ->
|
||||
async_subscribe(Topic, Subscriber, []).
|
||||
|
||||
-spec(async_subscribe(binary(), emqttd:subscriber(), [emqttd:suboption()]) -> ok).
|
||||
async_subscribe(Topic, Subscriber, Options) when is_binary(Topic) ->
|
||||
cast(pick(Subscriber), {subscribe, Topic, Subscriber, Options}).
|
||||
|
||||
%% @doc Publish message to Topic.
|
||||
-spec(publish(binary(), any()) -> any()).
|
||||
publish(Topic, Msg) ->
|
||||
lists:foreach(
|
||||
fun(#mqtt_route{topic = To, node = Node}) when Node =:= node() ->
|
||||
?MODULE:dispatch(To, Msg);
|
||||
(#mqtt_route{topic = To, node = Node}) ->
|
||||
rpc:cast(Node, ?MODULE, dispatch, [To, Msg])
|
||||
end, emqttd_router:match(Topic)).
|
||||
-spec(publish(binary(), any()) -> {ok, mqtt_delivery()} | ignore).
|
||||
publish(Topic, Msg) when is_binary(Topic) ->
|
||||
route(emqttd_router:match(Topic), delivery(Msg)).
|
||||
|
||||
%% Dispatch on the local node
|
||||
route([#mqtt_route{topic = To, node = Node}],
|
||||
Delivery = #mqtt_delivery{flows = Flows}) when Node =:= node() ->
|
||||
dispatch(To, Delivery#mqtt_delivery{flows = [{route, Node, To} | Flows]});
|
||||
%% Forward to other nodes
|
||||
route([#mqtt_route{topic = To, node = Node}], Delivery = #mqtt_delivery{flows = Flows}) ->
|
||||
forward(Node, To, Delivery#mqtt_delivery{flows = [{route, Node, To}|Flows]});
|
||||
|
||||
route(Routes, Delivery) ->
|
||||
{ok, lists:foldl(fun(Route, DelAcc) ->
|
||||
{ok, DelAcc1} = route([Route], DelAcc), DelAcc1
|
||||
end, Delivery, Routes)}.
|
||||
|
||||
delivery(Msg) -> #mqtt_delivery{message = Msg, flows = []}.
|
||||
|
||||
%% @doc Forward message to another node...
|
||||
forward(Node, To, Delivery) ->
|
||||
rpc:cast(Node, ?PUBSUB, dispatch, [To, Delivery]), {ok, Delivery}.
|
||||
|
||||
%% @doc Dispatch Message to Subscribers
|
||||
-spec(dispatch(binary(), mqtt_message()) -> ok).
|
||||
dispatch(Queue = <<"$queue/", _Q/binary>>, Msg) ->
|
||||
case subscribers(Queue) of
|
||||
[] ->
|
||||
dropped(Queue);
|
||||
[SubPid] ->
|
||||
SubPid ! {dispatch, Queue, Msg};
|
||||
SubPids ->
|
||||
Idx = crypto:rand_uniform(1, length(SubPids) + 1),
|
||||
SubPid = lists:nth(Idx, SubPids),
|
||||
SubPid ! {dispatch, Queue, Msg}
|
||||
end;
|
||||
|
||||
dispatch(Topic, Msg) ->
|
||||
-spec(dispatch(binary(), mqtt_delivery()) -> mqtt_delivery()).
|
||||
dispatch(Topic, Delivery = #mqtt_delivery{message = Msg, flows = Flows}) ->
|
||||
case subscribers(Topic) of
|
||||
[] ->
|
||||
dropped(Topic);
|
||||
[SubPid] ->
|
||||
SubPid ! {dispatch, Topic, Msg};
|
||||
SubPids ->
|
||||
lists:foreach(fun(SubPid) ->
|
||||
SubPid ! {dispatch, Topic, Msg}
|
||||
end, SubPids)
|
||||
dropped(Topic), {ok, Delivery};
|
||||
[Sub] ->
|
||||
dispatch(Sub, Topic, Msg),
|
||||
{ok, Delivery#mqtt_delivery{flows = [{dispatch, Topic, 1}|Flows]}};
|
||||
Subscribers ->
|
||||
Flows1 = [{dispatch, Topic, length(Subscribers)} | Flows],
|
||||
lists:foreach(fun(Sub) -> dispatch(Sub, Topic, Msg) end, Subscribers),
|
||||
{ok, Delivery#mqtt_delivery{flows = Flows1}}
|
||||
end.
|
||||
|
||||
%% @private
|
||||
%% @doc Find all subscribers
|
||||
dispatch(Pid, Topic, Msg) when is_pid(Pid) ->
|
||||
Pid ! {dispatch, Topic, Msg};
|
||||
dispatch(SubId, Topic, Msg) when is_binary(SubId) ->
|
||||
emqttd_sm:dispatch(SubId, Topic, Msg).
|
||||
|
||||
topics() -> emqttd_router:topics().
|
||||
|
||||
subscribers(Topic) ->
|
||||
case ets:member(subscriber, Topic) of
|
||||
true -> %% faster then lookup?
|
||||
try ets:lookup_element(subscriber, Topic, 2) catch error:badarg -> [] end;
|
||||
false ->
|
||||
[]
|
||||
end.
|
||||
try ets:lookup_element(subscriber, Topic, 2) catch error:badarg -> [] end.
|
||||
|
||||
subscriptions(Subscriber) ->
|
||||
lists:map(fun({_, Topic}) ->
|
||||
subscription(Topic, Subscriber)
|
||||
end, ets:lookup(subscription, Subscriber)).
|
||||
|
||||
subscription(Topic, Subscriber) ->
|
||||
{Topic, ets:lookup_element(subproperty, {Topic, Subscriber}, 2)}.
|
||||
|
||||
is_subscribed(Topic, Subscriber) when is_binary(Topic) ->
|
||||
ets:member(subproperty, {Topic, Subscriber}).
|
||||
|
||||
setqos(Topic, Subscriber, Qos) when is_binary(Topic) ->
|
||||
call(pick(Subscriber), {setqos, Topic, Subscriber, Qos}).
|
||||
|
||||
dump() ->
|
||||
[{subscriber, ets:tab2list(subscriber)},
|
||||
{subscription, ets:tab2list(subscription)},
|
||||
{subproperty, ets:tab2list(subproperty)}].
|
||||
|
||||
%% @private
|
||||
%% @doc Ingore $SYS Messages.
|
||||
|
@ -148,14 +199,23 @@ dropped(_Topic) ->
|
|||
emqttd_metrics:inc('messages/dropped').
|
||||
|
||||
%% @doc Unsubscribe
|
||||
-spec(unsubscribe(binary(), pid()) -> ok).
|
||||
unsubscribe(Topic, SubPid) when is_binary(Topic) ->
|
||||
call(pick(Topic), {unsubscribe, Topic, SubPid}).
|
||||
-spec(unsubscribe(binary()) -> ok | emqttd:pubsub_error()).
|
||||
unsubscribe(Topic) when is_binary(Topic) ->
|
||||
unsubscribe(Topic, self()).
|
||||
|
||||
%% @doc Asynchronous Unsubscribe
|
||||
-spec(async_unsubscribe(binary(), pid()) -> ok).
|
||||
async_unsubscribe(Topic, SubPid) when is_binary(Topic) ->
|
||||
cast(pick(Topic), {unsubscribe, Topic, SubPid}).
|
||||
%% @doc Unsubscribe
|
||||
-spec(unsubscribe(binary(), emqttd:subscriber()) -> ok | emqttd:pubsub_error()).
|
||||
unsubscribe(Topic, Subscriber) when is_binary(Topic) ->
|
||||
call(pick(Subscriber), {unsubscribe, Topic, Subscriber}).
|
||||
|
||||
%% @doc Async Unsubscribe
|
||||
-spec(async_unsubscribe(binary()) -> ok).
|
||||
async_unsubscribe(Topic) when is_binary(Topic) ->
|
||||
async_unsubscribe(Topic, self()).
|
||||
|
||||
-spec(async_unsubscribe(binary(), emqttd:subscriber()) -> ok).
|
||||
async_unsubscribe(Topic, Subscriber) when is_binary(Topic) ->
|
||||
cast(pick(Subscriber), {unsubscribe, Topic, Subscriber}).
|
||||
|
||||
call(PubSub, Req) when is_pid(PubSub) ->
|
||||
gen_server2:call(PubSub, Req, infinity).
|
||||
|
@ -172,30 +232,56 @@ pick(Topic) ->
|
|||
|
||||
init([Pool, Id, Env]) ->
|
||||
?GPROC_POOL(join, Pool, Id),
|
||||
{ok, #state{pool = Pool, id = Id, env = Env}}.
|
||||
{ok, #state{pool = Pool, id = Id, env = Env, submon = emqttd_pmon:new()}}.
|
||||
|
||||
handle_call({subscribe, Topic, SubPid}, _From, State) ->
|
||||
add_subscriber_(Topic, SubPid),
|
||||
{reply, ok, setstats(State)};
|
||||
handle_call({subscribe, Topic, Subscriber, Options}, _From, State) ->
|
||||
case do_subscribe_(Topic, Subscriber, Options, State) of
|
||||
{ok, NewState} -> {reply, ok, setstats(NewState)};
|
||||
{error, Error} -> {reply, {error, Error}, State}
|
||||
end;
|
||||
|
||||
handle_call({unsubscribe, Topic, SubPid}, _From, State) ->
|
||||
del_subscriber_(Topic, SubPid),
|
||||
{reply, ok, setstats(State)};
|
||||
handle_call({unsubscribe, Topic, Subscriber}, _From, State) ->
|
||||
case do_unsubscribe_(Topic, Subscriber, State) of
|
||||
{ok, NewState} -> {reply, ok, setstats(NewState), hibernate};
|
||||
{error, Error} -> {reply, {error, Error}, State}
|
||||
end;
|
||||
|
||||
handle_call({setqos, Topic, Subscriber, Qos}, _From, State) ->
|
||||
Key = {Topic, Subscriber},
|
||||
case ets:lookup(subproperty, Key) of
|
||||
[{_, Opts}] ->
|
||||
Opts1 = lists:ukeymerge(1, [{qos, Qos}], Opts),
|
||||
ets:insert(subproperty, {Key, Opts1}),
|
||||
{reply, ok, State};
|
||||
[] ->
|
||||
{reply, {error, {subscription_not_found, Topic}}, State}
|
||||
end;
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
?UNEXPECTED_REQ(Req, State).
|
||||
|
||||
handle_cast({subscribe, Topic, SubPid}, State) ->
|
||||
add_subscriber_(Topic, SubPid),
|
||||
{noreply, setstats(State)};
|
||||
handle_cast({subscribe, Topic, Subscriber, Options}, State) ->
|
||||
case do_subscribe_(Topic, Subscriber, Options, State) of
|
||||
{ok, NewState} -> {noreply, setstats(NewState)};
|
||||
{error, _Error} -> {noreply, State}
|
||||
end;
|
||||
|
||||
handle_cast({unsubscribe, Topic, SubPid}, State) ->
|
||||
del_subscriber_(Topic, SubPid),
|
||||
{noreply, setstats(State)};
|
||||
handle_cast({unsubscribe, Topic, Subscriber}, State) ->
|
||||
case do_unsubscribe_(Topic, Subscriber, State) of
|
||||
{ok, NewState} -> {noreply, setstats(NewState), hibernate};
|
||||
{error, _Error} -> {noreply, State}
|
||||
end;
|
||||
|
||||
handle_cast(Msg, State) ->
|
||||
?UNEXPECTED_MSG(Msg, State).
|
||||
|
||||
handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{submon = PM}) ->
|
||||
lists:foreach(fun({_, Topic}) ->
|
||||
subscriber_down_(DownPid, Topic)
|
||||
end, ets:lookup(subscription, DownPid)),
|
||||
ets:delete(subscription, DownPid),
|
||||
{noreply, setstats(State#state{submon = PM:erase(DownPid)}), hibernate};
|
||||
|
||||
handle_info(Info, State) ->
|
||||
?UNEXPECTED_INFO(Info, State).
|
||||
|
||||
|
@ -209,62 +295,79 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
%% Internal Functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
add_subscriber_(Topic, SubPid) ->
|
||||
case ets:member(subscriber, Topic) of
|
||||
false ->
|
||||
mnesia:transaction(fun add_topic_route_/2, [Topic, node()]),
|
||||
setstats(topic);
|
||||
true ->
|
||||
ok
|
||||
end,
|
||||
ets:insert(subscriber, {Topic, SubPid}).
|
||||
|
||||
del_subscriber_(Topic, SubPid) ->
|
||||
ets:delete_object(subscriber, {Topic, SubPid}),
|
||||
case ets:lookup(subscriber, Topic) of
|
||||
do_subscribe_(Topic, Subscriber, Options, State) ->
|
||||
case ets:lookup(subproperty, {Topic, Subscriber}) of
|
||||
[] ->
|
||||
mnesia:transaction(fun del_topic_route_/2, [Topic, node()]),
|
||||
setstats(topic);
|
||||
[_|_] ->
|
||||
ok
|
||||
do_subscribe2_(Topic, Subscriber, Options),
|
||||
ets:insert(subproperty, {{Topic, Subscriber}, Options}),
|
||||
{ok, monitor_subpid(Subscriber, State)};
|
||||
[_] ->
|
||||
{error, {already_subscribed, Topic}}
|
||||
end.
|
||||
|
||||
add_topic_route_(Topic, Node) ->
|
||||
add_topic_(Topic), emqttd_router:add_route(Topic, Node).
|
||||
do_subscribe2_(Topic, Subscriber, _Options) ->
|
||||
add_subscription_(Subscriber, Topic),
|
||||
add_subscriber_(Topic, Subscriber).
|
||||
|
||||
add_topic_(Topic) ->
|
||||
add_topic_(Topic, []).
|
||||
add_subscription_(Subscriber, Topic) ->
|
||||
ets:insert(subscription, {Subscriber, Topic}).
|
||||
|
||||
add_topic_(Topic, Flags) ->
|
||||
Record = #mqtt_topic{topic = Topic, flags = Flags},
|
||||
case mnesia:wread({topic, Topic}) of
|
||||
[] -> mnesia:write(topic, Record, write);
|
||||
[_] -> ok
|
||||
add_subscriber_(Topic, Subscriber) ->
|
||||
%%TODO: LOCK here...
|
||||
case ets:member(subscriber, Topic) of
|
||||
false -> emqttd_router:add_route(Topic, node());
|
||||
true -> ok
|
||||
end,
|
||||
ets:insert(subscriber, {Topic, Subscriber}).
|
||||
|
||||
do_unsubscribe_(Topic, Subscriber, State) ->
|
||||
case ets:lookup(subproperty, {Topic, Subscriber}) of
|
||||
[_] ->
|
||||
del_subscriber_(Topic, Subscriber),
|
||||
del_subscription(Subscriber, Topic),
|
||||
ets:delete(subproperty, {Topic, Subscriber}),
|
||||
{ok, case ets:member(subscription, Subscriber) of
|
||||
true -> State;
|
||||
false -> demonitor_subpid(Subscriber, State)
|
||||
end};
|
||||
[] ->
|
||||
{error, {subscription_not_found, Topic}}
|
||||
end.
|
||||
|
||||
del_topic_route_(Topic, Node) ->
|
||||
emqttd_router:del_route(Topic, Node), del_topic_(Topic).
|
||||
del_subscription(Subscriber, Topic) ->
|
||||
ets:delete_object(subscription, {Subscriber, Topic}).
|
||||
|
||||
del_topic_(Topic) ->
|
||||
case emqttd_router:has_route(Topic) of
|
||||
true -> ok;
|
||||
false -> do_del_topic_(Topic)
|
||||
del_subscriber_(Topic, Subscriber) ->
|
||||
ets:delete_object(subscriber, {Topic, Subscriber}),
|
||||
%%TODO: LOCK TOPIC
|
||||
case ets:member(subscriber, Topic) of
|
||||
false -> emqttd_router:del_route(Topic, node());
|
||||
true -> ok
|
||||
end.
|
||||
|
||||
do_del_topic_(Topic) ->
|
||||
case mnesia:wread({topic, Topic}) of
|
||||
[#mqtt_topic{flags = []}] ->
|
||||
mnesia:delete(topic, Topic, write);
|
||||
_ ->
|
||||
ok
|
||||
subscriber_down_(DownPid, Topic) ->
|
||||
case ets:lookup(subproperty, {Topic, DownPid}) of
|
||||
[] -> del_subscriber_(Topic, DownPid); %%TODO: warning?
|
||||
[_] -> del_subscriber_(Topic, DownPid),
|
||||
ets:delete(subproperty, {Topic, DownPid})
|
||||
end.
|
||||
|
||||
monitor_subpid(SubPid, State = #state{submon = PMon}) when is_pid(SubPid) ->
|
||||
State#state{submon = PMon:monitor(SubPid)};
|
||||
monitor_subpid(_SubPid, State) ->
|
||||
State.
|
||||
|
||||
demonitor_subpid(SubPid, State = #state{submon = PMon}) when is_pid(SubPid) ->
|
||||
State#state{submon = PMon:demonitor(SubPid)};
|
||||
demonitor_subpid(_SubPid, State) ->
|
||||
State.
|
||||
|
||||
setstats(State) when is_record(State, state) ->
|
||||
setstats(subscriber), State;
|
||||
|
||||
setstats(topic) ->
|
||||
emqttd_stats:setstats('topics/count', 'topics/max', mnesia:table_info(topic, size));
|
||||
setstats(subscriber), setstats(subscription), State;
|
||||
|
||||
setstats(subscriber) ->
|
||||
emqttd_stats:setstats('subscribers/count', 'subscribers/max', ets:info(subscriber, size)).
|
||||
emqttd_stats:setstats('subscribers/count', 'subscribers/max', ets:info(subscriber, size));
|
||||
|
||||
setstats(subscription) ->
|
||||
emqttd_stats:setstats('subscriptions/count', 'subscriptions/max', ets:info(subscription, size)).
|
||||
|
||||
|
|
|
@ -19,10 +19,6 @@
|
|||
|
||||
-behaviour(supervisor).
|
||||
|
||||
-include("emqttd.hrl").
|
||||
|
||||
-define(CONCURRENCY_OPTS, [{read_concurrency, true}, {write_concurrency, true}]).
|
||||
|
||||
%% API
|
||||
-export([start_link/0, pubsub_pool/0]).
|
||||
|
||||
|
@ -36,41 +32,13 @@ pubsub_pool() ->
|
|||
hd([Pid || {pubsub_pool, Pid, _, _} <- supervisor:which_children(?MODULE)]).
|
||||
|
||||
init([Env]) ->
|
||||
|
||||
%% Create ETS Tabs
|
||||
create_tab(subscriber), create_tab(subscribed),
|
||||
|
||||
%% Router
|
||||
Router = {router, {emqttd_router, start_link, []},
|
||||
permanent, 5000, worker, [emqttd_router]},
|
||||
|
||||
%% PubSub Pool Sup
|
||||
PubSubMFA = {emqttd_pubsub, start_link, [Env]},
|
||||
PubSubPoolSup = emqttd_pool_sup:spec(pubsub_pool, [pubsub, hash, pool_size(Env), PubSubMFA]),
|
||||
|
||||
%% Server Pool Sup
|
||||
ServerMFA = {emqttd_server, start_link, [Env]},
|
||||
ServerPoolSup = emqttd_pool_sup:spec(server_pool, [server, hash, pool_size(Env), ServerMFA]),
|
||||
|
||||
{ok, {{one_for_all, 5, 60}, [Router, PubSubPoolSup, ServerPoolSup]}}.
|
||||
PubSub = emqttd:conf(pubsub_adapter),
|
||||
PubSubMFA = {PubSub, start_link, [Env]},
|
||||
PoolArgs = [pubsub, hash, pool_size(Env), PubSubMFA],
|
||||
PubSubPoolSup = emqttd_pool_sup:spec(pubsub_pool, PoolArgs),
|
||||
{ok, { {one_for_all, 10, 3600}, [PubSubPoolSup]} }.
|
||||
|
||||
pool_size(Env) ->
|
||||
Schedulers = erlang:system_info(schedulers),
|
||||
proplists:get_value(pool_size, Env, Schedulers).
|
||||
|
||||
create_tab(subscriber) ->
|
||||
%% subscriber: Topic -> Pid1, Pid2, ..., PidN
|
||||
%% duplicate_bag: o(1) insert
|
||||
ensure_tab(subscriber, [public, named_table, duplicate_bag | ?CONCURRENCY_OPTS]);
|
||||
|
||||
create_tab(subscribed) ->
|
||||
%% subscribed: Pid -> Topic1, Topic2, ..., TopicN
|
||||
%% bag: o(n) insert
|
||||
ensure_tab(subscribed, [public, named_table, bag | ?CONCURRENCY_OPTS]).
|
||||
|
||||
ensure_tab(Tab, Opts) ->
|
||||
case ets:info(Tab, name) of
|
||||
undefined -> ets:new(Tab, Opts);
|
||||
_ -> ok
|
||||
end.
|
||||
|
||||
|
|
|
@ -1,278 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqttd_server).
|
||||
|
||||
-behaviour(gen_server2).
|
||||
|
||||
-include("emqttd.hrl").
|
||||
|
||||
-include("emqttd_protocol.hrl").
|
||||
|
||||
-include("emqttd_internal.hrl").
|
||||
|
||||
%% Mnesia Callbacks
|
||||
-export([mnesia/1]).
|
||||
|
||||
-boot_mnesia({mnesia, [boot]}).
|
||||
-copy_mnesia({mnesia, [copy]}).
|
||||
|
||||
%% API Exports
|
||||
-export([start_link/3]).
|
||||
|
||||
%% PubSub API
|
||||
-export([subscribe/1, subscribe/3, publish/1, unsubscribe/1, unsubscribe/3,
|
||||
lookup_subscription/1, update_subscription/4]).
|
||||
|
||||
%% gen_server Function Exports
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||
terminate/2, code_change/3]).
|
||||
|
||||
-record(state, {pool, id, env, monitors}).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Mnesia callbacks
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
mnesia(boot) ->
|
||||
ok = emqttd_mnesia:create_table(subscription, [
|
||||
{type, bag},
|
||||
{ram_copies, [node()]},
|
||||
{local_content, true}, %% subscription table is local
|
||||
{record_name, mqtt_subscription},
|
||||
{attributes, record_info(fields, mqtt_subscription)}]);
|
||||
|
||||
mnesia(copy) ->
|
||||
ok = emqttd_mnesia:copy_table(subscription).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Start server
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @doc Start a Server
|
||||
-spec(start_link(Pool, Id, Env) -> {ok, pid()} | ignore | {error, any()} when
|
||||
Pool :: atom(),
|
||||
Id :: pos_integer(),
|
||||
Env :: list(tuple())).
|
||||
start_link(Pool, Id, Env) ->
|
||||
gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id, Env], []).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% PubSub API
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @doc Subscribe a Topic
|
||||
-spec(subscribe(binary()) -> ok).
|
||||
subscribe(Topic) when is_binary(Topic) ->
|
||||
From = self(), call(server(From), {subscribe, From, Topic}).
|
||||
|
||||
%% @doc Subscribe from a MQTT session.
|
||||
-spec(subscribe(binary(), binary(), mqtt_qos()) -> ok).
|
||||
subscribe(ClientId, Topic, Qos) ->
|
||||
From = self(), call(server(From), {subscribe, From, ClientId, Topic, ?QOS_I(Qos)}).
|
||||
|
||||
%% @doc Lookup subscriptions.
|
||||
-spec(lookup_subscription(binary()) -> [#mqtt_subscription{}]).
|
||||
lookup_subscription(ClientId) ->
|
||||
mnesia:dirty_read(subscription, ClientId).
|
||||
|
||||
%% @doc Update a subscription.
|
||||
-spec(update_subscription(binary(), binary(), mqtt_qos(), mqtt_qos()) -> ok).
|
||||
update_subscription(ClientId, Topic, OldQos, NewQos) ->
|
||||
call(server(self()), {update_subscription, ClientId, Topic, ?QOS_I(OldQos), ?QOS_I(NewQos)}).
|
||||
|
||||
%% @doc Publish a Message
|
||||
-spec(publish(Msg :: mqtt_message()) -> any()).
|
||||
publish(Msg = #mqtt_message{from = From}) ->
|
||||
trace(publish, From, Msg),
|
||||
case emqttd:run_hooks('message.publish', [], Msg) of
|
||||
{ok, Msg1 = #mqtt_message{topic = Topic}} ->
|
||||
%% Retain message first. Don't create retained topic.
|
||||
Msg2 = case emqttd_retainer:retain(Msg1) of
|
||||
ok -> emqttd_message:unset_flag(Msg1);
|
||||
ignore -> Msg1
|
||||
end,
|
||||
emqttd_pubsub:publish(Topic, Msg2);
|
||||
{stop, Msg1} ->
|
||||
lager:warning("Stop publishing: ~s", [emqttd_message:format(Msg1)])
|
||||
end.
|
||||
|
||||
%% @doc Unsubscribe a Topic
|
||||
-spec(unsubscribe(binary()) -> ok).
|
||||
unsubscribe(Topic) when is_binary(Topic) ->
|
||||
From = self(), call(server(From), {unsubscribe, From, Topic}).
|
||||
|
||||
%% @doc Unsubscribe a Topic from a MQTT session
|
||||
-spec(unsubscribe(binary(), binary(), mqtt_qos()) -> ok).
|
||||
unsubscribe(ClientId, Topic, Qos) ->
|
||||
From = self(), call(server(From), {unsubscribe, From, ClientId, Topic, Qos}).
|
||||
|
||||
call(Server, Req) ->
|
||||
gen_server2:call(Server, Req, infinity).
|
||||
|
||||
server(From) ->
|
||||
gproc_pool:pick_worker(server, From).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% gen_server Callbacks
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
init([Pool, Id, Env]) ->
|
||||
?GPROC_POOL(join, Pool, Id),
|
||||
{ok, #state{pool = Pool, id = Id, env = Env, monitors = dict:new()}}.
|
||||
|
||||
handle_call({subscribe, SubPid, ClientId, Topic, Qos}, _From, State) ->
|
||||
pubsub_subscribe_(SubPid, Topic),
|
||||
if_subsciption(State, fun() ->
|
||||
add_subscription_(ClientId, Topic, Qos),
|
||||
set_subscription_stats()
|
||||
end),
|
||||
ok(monitor_subscriber_(ClientId, SubPid, State));
|
||||
|
||||
handle_call({subscribe, SubPid, Topic}, _From, State) ->
|
||||
pubsub_subscribe_(SubPid, Topic),
|
||||
ok(monitor_subscriber_(undefined, SubPid, State));
|
||||
|
||||
handle_call({update_subscription, ClientId, Topic, OldQos, NewQos}, _From, State) ->
|
||||
if_subsciption(State, fun() ->
|
||||
OldSub = #mqtt_subscription{subid = ClientId, topic = Topic, qos = OldQos},
|
||||
NewSub = #mqtt_subscription{subid = ClientId, topic = Topic, qos = NewQos},
|
||||
mnesia:transaction(fun update_subscription_/2, [OldSub, NewSub]),
|
||||
set_subscription_stats()
|
||||
end), ok(State);
|
||||
|
||||
handle_call({unsubscribe, SubPid, ClientId, Topic, Qos}, _From, State) ->
|
||||
pubsub_unsubscribe_(SubPid, Topic),
|
||||
if_subsciption(State, fun() ->
|
||||
del_subscription_(ClientId, Topic, Qos),
|
||||
set_subscription_stats()
|
||||
end), ok(State);
|
||||
|
||||
handle_call({unsubscribe, SubPid, Topic}, _From, State) ->
|
||||
pubsub_unsubscribe_(SubPid, Topic), ok(State);
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
?UNEXPECTED_REQ(Req, State).
|
||||
|
||||
handle_cast(Msg, State) ->
|
||||
?UNEXPECTED_MSG(Msg, State).
|
||||
|
||||
handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{monitors = Monitors}) ->
|
||||
%% unsubscribe
|
||||
lists:foreach(fun({_, Topic}) ->
|
||||
emqttd_pubsub:async_unsubscribe(Topic, DownPid)
|
||||
end, ets:lookup(subscribed, DownPid)),
|
||||
ets:delete(subscribed, DownPid),
|
||||
|
||||
%% clean subscriptions
|
||||
case dict:find(DownPid, Monitors) of
|
||||
{ok, {undefined, _}} -> ok;
|
||||
{ok, {ClientId, _}} -> mnesia:dirty_delete(subscription, ClientId);
|
||||
error -> ok
|
||||
end,
|
||||
{noreply, State#state{monitors = dict:erase(DownPid, Monitors)}, hibernate};
|
||||
|
||||
handle_info(Info, State) ->
|
||||
?UNEXPECTED_INFO(Info, State).
|
||||
|
||||
terminate(_Reason, #state{pool = Pool, id = Id}) ->
|
||||
?GPROC_POOL(leave, Pool, Id).
|
||||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Internal Functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
if_subsciption(#state{env = Env}, Fun) ->
|
||||
case proplists:get_value(subscription, Env, true) of
|
||||
false -> ok;
|
||||
_true -> Fun()
|
||||
end.
|
||||
|
||||
%% @private
|
||||
%% @doc Add a subscription.
|
||||
-spec(add_subscription_(binary(), binary(), mqtt_qos()) -> ok).
|
||||
add_subscription_(ClientId, Topic, Qos) ->
|
||||
add_subscription_(#mqtt_subscription{subid = ClientId, topic = Topic, qos = Qos}).
|
||||
|
||||
-spec(add_subscription_(mqtt_subscription()) -> ok).
|
||||
add_subscription_(Subscription) when is_record(Subscription, mqtt_subscription) ->
|
||||
mnesia:dirty_write(subscription, Subscription).
|
||||
|
||||
update_subscription_(OldSub, NewSub) ->
|
||||
mnesia:delete_object(subscription, OldSub, write),
|
||||
mnesia:write(subscription, NewSub, write).
|
||||
|
||||
%% @private
|
||||
%% @doc Delete a subscription
|
||||
-spec(del_subscription_(binary(), binary(), mqtt_qos()) -> ok).
|
||||
del_subscription_(ClientId, Topic, Qos) ->
|
||||
del_subscription_(#mqtt_subscription{subid = ClientId, topic = Topic, qos = Qos}).
|
||||
|
||||
del_subscription_(Subscription) when is_record(Subscription, mqtt_subscription) ->
|
||||
mnesia:dirty_delete_object(subscription, Subscription).
|
||||
|
||||
%% @private
|
||||
%% @doc Call pubsub to subscribe
|
||||
pubsub_subscribe_(SubPid, Topic) ->
|
||||
case ets:match(subscribed, {SubPid, Topic}) of
|
||||
[] ->
|
||||
emqttd_pubsub:async_subscribe(Topic, SubPid),
|
||||
ets:insert(subscribed, {SubPid, Topic});
|
||||
[_] ->
|
||||
false
|
||||
end.
|
||||
|
||||
%% @private
|
||||
pubsub_unsubscribe_(SubPid, Topic) ->
|
||||
emqttd_pubsub:async_unsubscribe(Topic, SubPid),
|
||||
ets:delete_object(subscribed, {SubPid, Topic}).
|
||||
|
||||
monitor_subscriber_(ClientId, SubPid, State = #state{monitors = Monitors}) ->
|
||||
case dict:find(SubPid, Monitors) of
|
||||
{ok, _} ->
|
||||
State;
|
||||
error ->
|
||||
MRef = erlang:monitor(process, SubPid),
|
||||
State#state{monitors = dict:store(SubPid, {ClientId, MRef}, Monitors)}
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Trace Functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
trace(publish, From, _Msg) when is_atom(From) ->
|
||||
%% Dont' trace '$SYS' publish
|
||||
ignore;
|
||||
|
||||
trace(publish, From, #mqtt_message{topic = Topic, payload = Payload}) ->
|
||||
lager:info([{client, From}, {topic, Topic}],
|
||||
"~s PUBLISH to ~s: ~p", [From, Topic, Payload]).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Subscription Statistics
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
set_subscription_stats() ->
|
||||
emqttd_stats:setstats('subscriptions/count', 'subscriptions/max',
|
||||
mnesia:table_info(subscription, size)).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
ok(State) -> {reply, ok, State}.
|
||||
|
|
@ -297,11 +297,11 @@ handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id =
|
|||
?LOG(warning, "duplicated subscribe: ~s, qos = ~w", [Topic, Qos], Session),
|
||||
SubDict;
|
||||
{ok, OldQos} ->
|
||||
emqttd_server:update_subscription(ClientId, Topic, OldQos, Qos),
|
||||
emqttd_pubsub:setqos(Topic, ClientId, Qos),
|
||||
?LOG(warning, "duplicated subscribe ~s, old_qos=~w, new_qos=~w", [Topic, OldQos, Qos], Session),
|
||||
dict:store(Topic, Qos, SubDict);
|
||||
error ->
|
||||
emqttd:subscribe(ClientId, Topic, Qos),
|
||||
emqttd:subscribe(Topic, ClientId, [{qos, Qos}]),
|
||||
%%TODO: the design is ugly...
|
||||
%% <MQTT V3.1.1>: 3.8.4
|
||||
%% Where the Topic Filter is not identical to any existing Subscription’s filter,
|
||||
|
|
Loading…
Reference in New Issue