From 26655f1ee3c3f89cf15ef70bd0d270215337b5c7 Mon Sep 17 00:00:00 2001 From: Feng Date: Tue, 8 Mar 2016 13:25:19 +0800 Subject: [PATCH] emqttd:publish/1 --- src/emqttd_backend.erl | 90 ++++++++ src/emqttd_http.erl | 2 +- src/emqttd_pubsub.erl | 265 +++++++++++++++++++++++ src/emqttd_pubsub_old.erl | 414 ------------------------------------ src/emqttd_pubsub_sup.erl | 67 ++---- src/emqttd_router.erl | 225 ++++++++++++++++++++ src/emqttd_server.erl | 254 ++++++++++++++++++++++ src/lager_emqtt_backend.erl | 4 +- 8 files changed, 859 insertions(+), 462 deletions(-) create mode 100644 src/emqttd_backend.erl create mode 100644 src/emqttd_pubsub.erl delete mode 100644 src/emqttd_pubsub_old.erl create mode 100644 src/emqttd_router.erl create mode 100644 src/emqttd_server.erl diff --git a/src/emqttd_backend.erl b/src/emqttd_backend.erl new file mode 100644 index 000000000..39c1a8c2d --- /dev/null +++ b/src/emqttd_backend.erl @@ -0,0 +1,90 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2012-2016 Feng Lee . +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqttd_backend). + +-include("emqttd.hrl"). + +%% Mnesia Callbacks +-export([mnesia/1]). + +-boot_mnesia({mnesia, [boot]}). +-copy_mnesia({mnesia, [copy]}). + +%% API. +-export([add_static_subscription/1, lookup_static_subscriptions/1, + del_static_subscriptions/1, del_static_subscription/2]). + +%%-------------------------------------------------------------------- +%% Mnesia callbacks +%%-------------------------------------------------------------------- + +mnesia(boot) -> + ok = emqttd_mnesia:create_table(static_subscription, [ + {type, bag}, + {disc_copies, [node()]}, + {record_name, mqtt_subscription}, + {attributes, record_info(fields, mqtt_subscription)}, + {storage_properties, [{ets, [compressed]}, + {dets, [{auto_save, 5000}]}]}]); + +mnesia(copy) -> + ok = emqttd_mnesia:copy_table(static_subscription). + +%%-------------------------------------------------------------------- +%% Static Subscriptions +%%-------------------------------------------------------------------- + +%% @doc Add a static subscription manually. +-spec add_static_subscription(mqtt_subscription()) -> {atom, ok}. +add_static_subscription(Subscription = #mqtt_subscription{subid = SubId, topic = Topic}) -> + Pattern = match_pattern(SubId, Topic), + mnesia:transaction( + fun() -> + case mnesia:match_object(static_subscription, Pattern, write) of + [] -> + mnesia:write(static_subscription, Subscription, write); + [Subscription] -> + mnesia:abort({error, existed}); + [Subscription1] -> %% QoS is different + mnesia:delete_object(static_subscription, Subscription1, write), + mnesia:write(static_subscription, Subscription, write) + end + end). + +%% @doc Lookup static subscriptions. +-spec lookup_static_subscriptions(binary()) -> list(mqtt_subscription()). +lookup_static_subscriptions(ClientId) when is_binary(ClientId) -> + mnesia:dirty_read(static_subscription, ClientId). + +%% @doc Delete static subscriptions by ClientId manually. +-spec del_static_subscriptions(binary()) -> ok. +del_static_subscriptions(ClientId) when is_binary(ClientId) -> + mnesia:transaction(fun mnesia:delete/1, [{static_subscription, ClientId}]). + +%% @doc Delete a static subscription manually. +-spec del_static_subscription(binary(), binary()) -> ok. +del_static_subscription(ClientId, Topic) when is_binary(ClientId) andalso is_binary(Topic) -> + mnesia:transaction(fun del_static_subscription_/1, [match_pattern(ClientId, Topic)]). + +del_static_subscription_(Pattern) -> + lists:foreach(fun(Subscription) -> + mnesia:delete_object(static_subscription, Subscription, write) + end, mnesia:match_object(static_subscription, Pattern, write)). + +match_pattern(SubId, Topic) -> + #mqtt_subscription{subid = SubId, topic = Topic, qos = '_'}. + diff --git a/src/emqttd_http.erl b/src/emqttd_http.erl index d673b14d5..36dc778d9 100644 --- a/src/emqttd_http.erl +++ b/src/emqttd_http.erl @@ -56,7 +56,7 @@ handle_request('POST', "/mqtt/publish", Req) -> case {validate(qos, Qos), validate(topic, Topic)} of {true, true} -> Msg = emqttd_message:make(ClientId, Qos, Topic, Payload), - emqttd_pubsub:publish(Msg#mqtt_message{retain = Retain}), + emqttd:publish(Msg#mqtt_message{retain = Retain}), Req:ok({"text/plain", <<"ok">>}); {false, _} -> Req:respond({400, [], <<"Bad QoS">>}); diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl new file mode 100644 index 000000000..de1cf245c --- /dev/null +++ b/src/emqttd_pubsub.erl @@ -0,0 +1,265 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2012-2016 Feng Lee . +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqttd_pubsub). + +-behaviour(gen_server2). + +-include("emqttd.hrl"). + +-include("emqttd_protocol.hrl"). + +-include("emqttd_internal.hrl"). + +%% Mnesia Callbacks +-export([mnesia/1]). + +-boot_mnesia({mnesia, [boot]}). +-copy_mnesia({mnesia, [copy]}). + +%% API Exports +-export([start_link/3, create_topic/1, lookup_topic/1]). + +-export([subscribe/2, unsubscribe/2, publish/2, dispatch/2, + async_subscribe/2, async_unsubscribe/2]). + +%% gen_server. +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(state, {pool, id, env}). + +%%-------------------------------------------------------------------- +%% Mnesia callbacks +%%-------------------------------------------------------------------- + +mnesia(boot) -> + ok = emqttd_mnesia:create_table(topic, [ + {ram_copies, [node()]}, + {record_name, mqtt_topic}, + {attributes, record_info(fields, mqtt_topic)}]); + +mnesia(copy) -> + ok = emqttd_mnesia:copy_table(topic). + +%%-------------------------------------------------------------------- +%% Start PubSub +%%-------------------------------------------------------------------- + +%% @doc Start one pubsub +-spec start_link(Pool, Id, Env) -> {ok, pid()} | ignore | {error, any()} when + Pool :: atom(), + Id :: pos_integer(), + Env :: list(tuple()). +start_link(Pool, Id, Env) -> + gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id, Env], []). + +%% @doc Create a Topic. +-spec create_topic(emqttd_topic:topic()) -> ok | {error, any()}. +create_topic(Topic) when is_binary(Topic) -> + case mnesia:transaction(fun add_topic_/2, [Topic, [static]]) of + {atomic, ok} -> ok; + {aborted, Error} -> {error, Error} + end. + +%% @doc Lookup a Topic. +-spec lookup_topic(emqttd_topic:topic()) -> list(mqtt_topic()). +lookup_topic(Topic) when is_binary(Topic) -> + mnesia:dirty_read(topic, Topic). + +%%-------------------------------------------------------------------- +%% PubSub API +%%-------------------------------------------------------------------- + +%% @doc Subscribe a Topic +-spec subscribe(binary(), pid()) -> ok. +subscribe(Topic, SubPid) when is_binary(Topic) -> + call(pick(Topic), {subscribe, Topic, SubPid}). + +%% @doc Asynchronous Subscribe +-spec async_subscribe(binary(), pid()) -> ok. +async_subscribe(Topic, SubPid) when is_binary(Topic) -> + cast(pick(Topic), {subscribe, Topic, SubPid}). + +%% @doc Publish message to Topic. +-spec publish(binary(), any()) -> ok. +publish(Topic, Msg) -> + lists:foreach( + fun(#mqtt_route{topic = To, node = Node}) when Node =:= node() -> + ?MODULE:dispatch(To, Msg); + (#mqtt_route{topic = To, node = Node}) -> + rpc:cast(Node, ?MODULE, dispatch, [To, Msg]) + end, emqttd_router:lookup(Topic)). + +%% @doc Dispatch Message to Subscribers +-spec dispatch(binary(), mqtt_message()) -> ok. +dispatch(Queue = <<"$Q/", _Q>>, Msg) -> + case subscribers(Queue) of + [] -> + dropped(Queue); + [SubPid] -> + SubPid ! {dispatch, Queue, Msg}; + SubPids -> + Idx = crypto:rand_uniform(1, length(SubPids) + 1), + SubPid = lists:nth(Idx, SubPids), + SubPid ! {dispatch, Queue, Msg} + end; + +dispatch(Topic, Msg) -> + case subscribers(Topic) of + [] -> + dropped(Topic); + [SubPid] -> + SubPid ! {dispatch, Topic, Msg}; + SubPids -> + lists:foreach(fun(SubPid) -> + SubPid ! {dispatch, Topic, Msg} + end, SubPids) + end. + +%% @private +%% @doc Find all subscribers +subscribers(Topic) -> + case ets:member(subscriber, Topic) of + true -> %% faster then lookup? + try ets:lookup_element(subscriber, Topic, 2) catch error:badarg -> [] end; + false -> + [] + end. + +%% @private +%% @doc Ingore $SYS Messages. +dropped(<<"$SYS/", _/binary>>) -> + ok; +dropped(_Topic) -> + emqttd_metrics:inc('messages/dropped'). + +%% @doc Unsubscribe +-spec unsubscribe(binary(), pid()) -> ok. +unsubscribe(Topic, SubPid) when is_binary(Topic) -> + call(pick(Topic), {unsubscribe, Topic, SubPid}). + +%% @doc Asynchronous Unsubscribe +-spec async_unsubscribe(binary(), pid()) -> ok. +async_unsubscribe(Topic, SubPid) when is_binary(Topic) -> + cast(pick(Topic), {unsubscribe, Topic, SubPid}). + +call(PubSub, Req) when is_pid(PubSub) -> + gen_server2:call(PubSub, Req, infinity). + +cast(PubSub, Msg) when is_pid(PubSub) -> + gen_server2:cast(PubSub, Msg). + +pick(Topic) -> gproc_pool:pick_worker(pubsub, Topic). + +%%-------------------------------------------------------------------- +%% gen_server Callbacks +%%-------------------------------------------------------------------- + +init([Pool, Id, Env]) -> + ?GPROC_POOL(join, Pool, Id), + {ok, #state{pool = Pool, id = Id, env = Env}}. + +handle_call({subscribe, Topic, SubPid}, _From, State) -> + add_subscriber_(Topic, SubPid), + {reply, ok, setstats(State)}; + +handle_call({unsubscribe, Topic, SubPid}, _From, State) -> + del_subscriber_(Topic, SubPid), + {reply, ok, setstats(State)}; + +handle_call(Req, _From, State) -> + ?UNEXPECTED_REQ(Req, State). + +handle_cast({subscribe, Topic, SubPid}, State) -> + add_subscriber_(Topic, SubPid), + {noreply, setstats(State)}; + +handle_cast({unsubscribe, Topic, SubPid}, State) -> + del_subscriber_(Topic, SubPid), + {noreply, setstats(State)}; + +handle_cast(Msg, State) -> + ?UNEXPECTED_MSG(Msg, State). + +handle_info(Info, State) -> + ?UNEXPECTED_INFO(Info, State). + +terminate(_Reason, #state{pool = Pool, id = Id}) -> + ?GPROC_POOL(leave, Pool, Id). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% Internal Functions +%%-------------------------------------------------------------------- + +add_subscriber_(Topic, SubPid) -> + case ets:member(subscriber, Topic) of + false -> + mnesia:transaction(fun add_topic_/1, [Topic]), + emqttd_router:add_route(Topic, node()), + setstats(topic); + true -> + ok + end, + ets:insert(subscriber, {Topic, SubPid}). + +del_subscriber_(Topic, SubPid) -> + ets:delete_object(subscriber, {Topic, SubPid}), + case ets:lookup(subscriber, Topic) of + [] -> + emqttd_router:del_route(Topic, node()), + mnesia:transaction(fun del_topic_/1, [Topic]), + setstats(topic); + [_|_] -> + ok + end. + +add_topic_(Topic) -> + add_topic_(Topic, []). + +add_topic_(Topic, Flags) -> + Record = #mqtt_topic{topic = Topic, flags = Flags}, + case mnesia:wread({topic, Topic}) of + [] -> mnesia:write(topic, Record, write); + [_] -> ok + end. + +del_topic_(Topic) -> + case emqttd_router:has_route(Topic) of + true -> ok; + false -> do_del_topic_(Topic) + end. + +do_del_topic_(Topic) -> + case mnesia:wread({topic, Topic}) of + [#mqtt_topic{flags = []}] -> + mnesia:delete(topic, Topic, write); + _ -> + ok + end. + +setstats(State) when is_record(State, state) -> + setstats(subscriber), State; + +setstats(topic) -> + emqttd_stats:setstats('topics/count', 'topics/max', mnesia:table_info(topic, size)); + +setstats(subscriber) -> + emqttd_stats:setstats('subscribers/count', 'subscribers/max', ets:info(subscriber, size)). + diff --git a/src/emqttd_pubsub_old.erl b/src/emqttd_pubsub_old.erl deleted file mode 100644 index eb60fdb1a..000000000 --- a/src/emqttd_pubsub_old.erl +++ /dev/null @@ -1,414 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2012-2016 Feng Lee . -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqttd_pubsub). - --behaviour(gen_server2). - --include("emqttd.hrl"). - --include("emqttd_protocol.hrl"). - --include("emqttd_internal.hrl"). - -%% Mnesia Callbacks --export([mnesia/1]). - --boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). - -%% API Exports --export([start_link/4]). - --export([create/2, lookup/2, subscribe/1, subscribe/2, - publish/1, unsubscribe/1, unsubscribe/2, delete/2]). - -%% Local node --export([match/1]). - -%% gen_server Function Exports --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --record(state, {pool, id, statsfun}). - --define(ROUTER, emqttd_router). - -%%-------------------------------------------------------------------- -%% Mnesia callbacks -%%-------------------------------------------------------------------- -mnesia(boot) -> - ok = create_table(topic, ram_copies), - if_subscription(fun(RamOrDisc) -> - ok = create_table(subscription, RamOrDisc) - end); - -mnesia(copy) -> - ok = emqttd_mnesia:copy_table(topic), - %% Only one disc_copy??? - if_subscription(fun(_RamOrDisc) -> - ok = emqttd_mnesia:copy_table(subscription) - end). - -%% Topic Table -create_table(topic, RamOrDisc) -> - emqttd_mnesia:create_table(topic, [ - {type, bag}, - {RamOrDisc, [node()]}, - {record_name, mqtt_topic}, - {attributes, record_info(fields, mqtt_topic)}]); - -%% Subscription Table -create_table(subscription, RamOrDisc) -> - emqttd_mnesia:create_table(subscription, [ - {type, bag}, - {RamOrDisc, [node()]}, - {record_name, mqtt_subscription}, - {attributes, record_info(fields, mqtt_subscription)}, - {storage_properties, [{ets, [compressed]}, - {dets, [{auto_save, 5000}]}]}]). - -if_subscription(Fun) -> - case env(subscription) of - disc -> Fun(disc_copies); - ram -> Fun(ram_copies); - false -> ok; - undefined -> ok - end. - -env(Key) -> - case get({pubsub, Key}) of - undefined -> - cache_env(Key); - Val -> - Val - end. - -cache_env(Key) -> - Val = proplists:get_value(Key, emqttd_broker:env(pubsub)), - put({pubsub, Key}, Val), - Val. - -%%-------------------------------------------------------------------- -%% API -%%-------------------------------------------------------------------- - -%% @doc Start one pubsub server --spec start_link(Pool, Id, StatsFun, Opts) -> {ok, pid()} | ignore | {error, any()} when - Pool :: atom(), - Id :: pos_integer(), - StatsFun :: fun((atom()) -> any()), - Opts :: list(tuple()). -start_link(Pool, Id, StatsFun, Opts) -> - gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, - ?MODULE, [Pool, Id, StatsFun, Opts], []). - -%% @doc Create Topic or Subscription. --spec create(topic, emqttd_topic:topic()) -> ok | {error, any()}; - (subscription, {binary(), binary(), mqtt_qos()}) -> ok | {error, any()}. -create(topic, Topic) when is_binary(Topic) -> - Record = #mqtt_topic{topic = Topic, node = node()}, - case mnesia:transaction(fun add_topic/1, [Record]) of - {atomic, ok} -> ok; - {aborted, Error} -> {error, Error} - end; - -create(subscription, {SubId, Topic, Qos}) when is_binary(SubId) andalso is_binary(Topic) -> - case mnesia:transaction(fun add_subscription/2, [SubId, {Topic, ?QOS_I(Qos)}]) of - {atomic, ok} -> ok; - {aborted, Error} -> {error, Error} - end. - -%% @doc Lookup Topic or Subscription. --spec lookup(topic, emqttd_topic:topic()) -> list(mqtt_topic()); - (subscription, binary()) -> list(mqtt_subscription()). -lookup(topic, Topic) when is_binary(Topic) -> - mnesia:dirty_read(topic, Topic); - -lookup(subscription, SubId) when is_binary(SubId) -> - mnesia:dirty_read(subscription, SubId). - -%% @doc Delete Topic or Subscription. --spec delete(topic, emqttd_topic:topic()) -> ok | {error, any()}; - (subscription, binary() | {binary(), emqttd_topic:topic()}) -> ok. -delete(topic, _Topic) -> - {error, unsupported}; - -delete(subscription, SubId) when is_binary(SubId) -> - mnesia:dirty_delete({subscription, SubId}); - -delete(subscription, {SubId, Topic}) when is_binary(SubId) andalso is_binary(Topic) -> - mnesia:async_dirty(fun remove_subscriptions/2, [SubId, [Topic]]). - -%% @doc Subscribe Topics --spec subscribe({Topic, Qos} | list({Topic, Qos})) -> - {ok, Qos | list(Qos)} | {error, any()} when - Topic :: binary(), - Qos :: mqtt_qos() | mqtt_qos_name(). -subscribe({Topic, Qos}) -> - subscribe([{Topic, Qos}]); -subscribe(TopicTable) when is_list(TopicTable) -> - call({subscribe, {undefined, self()}, fixqos(TopicTable)}). - --spec subscribe(ClientId, {Topic, Qos} | list({Topic, Qos})) -> - {ok, Qos | list(Qos)} | {error, any()} when - ClientId :: binary(), - Topic :: binary(), - Qos :: mqtt_qos() | mqtt_qos_name(). -subscribe(ClientId, {Topic, Qos}) when is_binary(ClientId) -> - subscribe(ClientId, [{Topic, Qos}]); -subscribe(ClientId, TopicTable) when is_binary(ClientId) andalso is_list(TopicTable) -> - call({subscribe, {ClientId, self()}, fixqos(TopicTable)}). - -fixqos(TopicTable) -> - [{Topic, ?QOS_I(Qos)} || {Topic, Qos} <- TopicTable]. - -%% @doc Unsubscribe Topic or Topics --spec unsubscribe(emqttd_topic:topic() | list(emqttd_topic:topic())) -> ok. -unsubscribe(Topic) when is_binary(Topic) -> - unsubscribe([Topic]); -unsubscribe(Topics = [Topic|_]) when is_binary(Topic) -> - cast({unsubscribe, {undefined, self()}, Topics}). - --spec unsubscribe(binary(), emqttd_topic:topic() | list(emqttd_topic:topic())) -> ok. -unsubscribe(ClientId, Topic) when is_binary(ClientId) andalso is_binary(Topic) -> - unsubscribe(ClientId, [Topic]); -unsubscribe(ClientId, Topics = [Topic|_]) when is_binary(Topic) -> - cast({unsubscribe, {ClientId, self()}, Topics}). - -call(Request) -> - gen_server2:call(pick(self()), Request, infinity). - -cast(Msg) -> - gen_server2:cast(pick(self()), Msg). - -pick(Self) -> gproc_pool:pick_worker(pubsub, Self). - -%% @doc Publish to cluster nodes --spec publish(Msg :: mqtt_message()) -> ok. -publish(Msg = #mqtt_message{from = From}) -> - trace(publish, From, Msg), - Msg1 = #mqtt_message{topic = To} - = emqttd_broker:foldl_hooks('message.publish', [], Msg), - - %% Retain message first. Don't create retained topic. - case emqttd_retainer:retain(Msg1) of - ok -> - %% TODO: why unset 'retain' flag? - publish(To, emqttd_message:unset_flag(Msg1)); - ignore -> - publish(To, Msg1) - end. - -publish(To, Msg) -> - lists:foreach(fun(#mqtt_topic{topic = Topic, node = Node}) -> - case Node =:= node() of - true -> ?ROUTER:route(Topic, Msg); - false -> rpc:cast(Node, ?ROUTER, route, [Topic, Msg]) - end - end, match(To)). - -%% @doc Match Topic Name with Topic Filters --spec match(emqttd_topic:topic()) -> [mqtt_topic()]. -match(To) -> - Matched = mnesia:async_dirty(fun emqttd_trie:match/1, [To]), - %% ets:lookup for topic table will be replicated to all nodes. - lists:append([ets:lookup(topic, Topic) || Topic <- [To | Matched]]). - -%%-------------------------------------------------------------------- -%% gen_server callbacks -%%-------------------------------------------------------------------- - -init([Pool, Id, StatsFun, _Opts]) -> - ?GPROC_POOL(join, Pool, Id), - {ok, #state{pool = Pool, id = Id, statsfun = StatsFun}}. - -handle_call({subscribe, {SubId, SubPid}, TopicTable}, _From, - State = #state{statsfun = StatsFun}) -> - - %% Monitor SubPid first - try_monitor(SubPid), - - %% Topics - Topics = [Topic || {Topic, _Qos} <- TopicTable], - - NewTopics = Topics -- reverse_routes(SubPid), - - %% Add routes - ?ROUTER:add_routes(NewTopics, SubPid), - - insert_reverse_routes(SubPid, NewTopics), - - StatsFun(reverse_route), - - %% Insert topic records to mnesia - Records = [#mqtt_topic{topic = Topic, node = node()} || Topic <- NewTopics], - - case mnesia:transaction(fun add_topics/1, [Records]) of - {atomic, _} -> - StatsFun(topic), - if_subscription( - fun(_) -> - %% Add subscriptions - Args = [fun add_subscriptions/2, [SubId, TopicTable]], - emqttd_pooler:async_submit({mnesia, async_dirty, Args}), - StatsFun(subscription) - end), - %% Grant all qos... - {reply, {ok, [Qos || {_Topic, Qos} <- TopicTable]}, State}; - {aborted, Error} -> - {reply, {error, Error}, State} - end; - -handle_call(Req, _From, State) -> - ?UNEXPECTED_REQ(Req, State). - -handle_cast({unsubscribe, {SubId, SubPid}, Topics}, State = #state{statsfun = StatsFun}) -> - - %% Delete routes first - ?ROUTER:delete_routes(Topics, SubPid), - - delete_reverse_routes(SubPid, Topics), - - StatsFun(reverse_route), - - %% Remove subscriptions - if_subscription( - fun(_) -> - Args = [fun remove_subscriptions/2, [SubId, Topics]], - emqttd_pooler:async_submit({mnesia, async_dirty, Args}), - StatsFun(subscription) - end), - - {noreply, State}; - -handle_cast(Msg, State) -> - ?UNEXPECTED_MSG(Msg, State). - -handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State = #state{statsfun = StatsFun}) -> - - Topics = reverse_routes(DownPid), - - ?ROUTER:delete_routes(Topics, DownPid), - - delete_reverse_routes(DownPid), - - StatsFun(reverse_route), - - {noreply, State, hibernate}; - -handle_info(Info, State) -> - ?UNEXPECTED_INFO(Info, State). - -terminate(_Reason, #state{pool = Pool, id = Id}) -> - ?GPROC_POOL(leave, Pool, Id). - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- - -add_topics(Records) -> - lists:foreach(fun add_topic/1, Records). - -add_topic(TopicR = #mqtt_topic{topic = Topic}) -> - case mnesia:wread({topic, Topic}) of - [] -> - case emqttd_topic:wildcard(Topic) of - true -> emqttd_trie:insert(Topic); - false -> ok - end, - mnesia:write(topic, TopicR, write); - Records -> - case lists:member(TopicR, Records) of - true -> ok; - false -> mnesia:write(topic, TopicR, write) - end - end. - -add_subscriptions(undefined, _TopicTable) -> - ok; -add_subscriptions(SubId, TopicTable) -> - lists:foreach(fun({Topic, Qos}) -> - add_subscription(SubId, {Topic, Qos}) - end,TopicTable). - -add_subscription(SubId, {Topic, Qos}) -> - Subscription = #mqtt_subscription{subid = SubId, topic = Topic, qos = Qos}, - Pattern = #mqtt_subscription{subid = SubId, topic = Topic, qos = '_'}, - Records = mnesia:match_object(subscription, Pattern, write), - case lists:member(Subscription, Records) of - true -> - ok; - false -> - [delete_subscription(Record) || Record <- Records], - insert_subscription(Subscription) - end. - -insert_subscription(Record) -> - mnesia:write(subscription, Record, write). - -remove_subscriptions(undefined, _Topics) -> - ok; -remove_subscriptions(SubId, Topics) -> - lists:foreach(fun(Topic) -> - Pattern = #mqtt_subscription{subid = SubId, topic = Topic, qos = '_'}, - Records = mnesia:match_object(subscription, Pattern, write), - lists:foreach(fun delete_subscription/1, Records) - end, Topics). - -delete_subscription(Record) -> - mnesia:delete_object(subscription, Record, write). - -reverse_routes(SubPid) -> - case ets:member(reverse_route, SubPid) of - true -> - try ets:lookup_element(reverse_route, SubPid, 2) catch error:badarg -> [] end; - false -> - [] - end. - -insert_reverse_routes(SubPid, Topics) -> - ets:insert(reverse_route, [{SubPid, Topic} || Topic <- Topics]). - -delete_reverse_routes(SubPid, Topics) -> - lists:foreach(fun(Topic) -> - ets:delete_object(reverse_route, {SubPid, Topic}) - end, Topics). - -delete_reverse_routes(SubPid) -> - ets:delete(reverse_route, SubPid). - -try_monitor(SubPid) -> - case ets:member(reverse_route, SubPid) of - true -> ignore; - false -> erlang:monitor(process, SubPid) - end. - -%%-------------------------------------------------------------------- -%% Trace Functions -%%-------------------------------------------------------------------- - -trace(publish, From, _Msg) when is_atom(From) -> - %% Dont' trace '$SYS' publish - ignore; - -trace(publish, From, #mqtt_message{topic = Topic, payload = Payload}) -> - lager:info([{client, From}, {topic, Topic}], - "~s PUBLISH to ~s: ~p", [From, Topic, Payload]). - diff --git a/src/emqttd_pubsub_sup.erl b/src/emqttd_pubsub_sup.erl index c04b486f6..28ff8033b 100644 --- a/src/emqttd_pubsub_sup.erl +++ b/src/emqttd_pubsub_sup.erl @@ -21,8 +21,6 @@ -include("emqttd.hrl"). --define(HELPER, emqttd_pubsub_helper). - -define(CONCURRENCY_OPTS, [{read_concurrency, true}, {write_concurrency, true}]). %% API @@ -38,33 +36,37 @@ pubsub_pool() -> hd([Pid|| {pubsub_pool, Pid, _, _} <- supervisor:which_children(?MODULE)]). init([Env]) -> - %% Create tabs - create_tab(route), create_tab(reverse_route), - %% PubSub Helper - Helper = {helper, {?HELPER, start_link, [fun setstats/1]}, - permanent, infinity, worker, [?HELPER]}, + %% Create ETS Tabs + create_tab(subscriber), create_tab(subscribed), - %% Router Pool Sup - RouterMFA = {emqttd_router, start_link, [fun setstats/1, Env]}, - - %% Pool_size / 2 - RouterSup = emqttd_pool_sup:spec(router_pool, [router, hash, router_pool(Env), RouterMFA]), + %% Router + Router = {router, {emqttd_router, start_link, []}, + permanent, 5000, worker, [emqttd_router]}, %% PubSub Pool Sup - PubSubMFA = {emqttd_pubsub, start_link, [fun setstats/1, Env]}, - PubSubSup = emqttd_pool_sup:spec(pubsub_pool, [pubsub, hash, pool_size(Env), PubSubMFA]), + PubSubMFA = {emqttd_pubsub, start_link, [Env]}, + PubSubPoolSup = emqttd_pool_sup:spec(pubsub_pool, [pubsub, hash, pool_size(Env), PubSubMFA]), - {ok, {{one_for_all, 10, 60}, [Helper, RouterSup, PubSubSup]}}. + %% Server Pool Sup + ServerMFA = {emqttd_server, start_link, [Env]}, + ServerPoolSup = emqttd_pool_sup:spec(server_pool, [server, hash, pool_size(Env), ServerMFA]), -create_tab(route) -> - %% Route Table: Topic -> Pid1, Pid2, ..., PidN + {ok, {{one_for_all, 5, 60}, [Router, PubSubPoolSup, ServerPoolSup]}}. + +pool_size(Env) -> + Schedulers = erlang:system_info(schedulers), + proplists:get_value(pool_size, Env, Schedulers). + +create_tab(subscriber) -> + %% subscriber: Topic -> Pid1, Pid2, ..., PidN %% duplicate_bag: o(1) insert - ensure_tab(route, [public, named_table, duplicate_bag | ?CONCURRENCY_OPTS]); + ensure_tab(subscriber, [public, named_table, duplicate_bag | ?CONCURRENCY_OPTS]); -create_tab(reverse_route) -> - %% Reverse Route Table: Pid -> Topic1, Topic2, ..., TopicN - ensure_tab(reverse_route, [public, named_table, bag | ?CONCURRENCY_OPTS]). +create_tab(subscribed) -> + %% subscribed: Pid -> Topic1, Topic2, ..., TopicN + %% bag: o(n) insert + ensure_tab(subscribed, [public, named_table, bag | ?CONCURRENCY_OPTS]). ensure_tab(Tab, Opts) -> case ets:info(Tab, name) of @@ -72,26 +74,3 @@ ensure_tab(Tab, Opts) -> _ -> ok end. -router_pool(Env) -> - case pool_size(Env) div 2 of - 0 -> 1; - I -> I - end. - -pool_size(Env) -> - Schedulers = erlang:system_info(schedulers), - proplists:get_value(pool_size, Env, Schedulers). - -setstats(route) -> - emqttd_stats:setstat('routes/count', ets:info(route, size)); - -setstats(reverse_route) -> - emqttd_stats:setstat('routes/reverse', ets:info(reverse_route, size)); - -setstats(topic) -> - emqttd_stats:setstats('topics/count', 'topics/max', mnesia:table_info(topic, size)); - -setstats(subscription) -> - emqttd_stats:setstats('subscriptions/count', 'subscriptions/max', - mnesia:table_info(subscription, size)). - diff --git a/src/emqttd_router.erl b/src/emqttd_router.erl new file mode 100644 index 000000000..ac744e94a --- /dev/null +++ b/src/emqttd_router.erl @@ -0,0 +1,225 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2012-2016 Feng Lee . +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqttd_router). + +-behaviour(gen_server). + +-include("emqttd.hrl"). + +%% Mnesia Bootstrap +-export([mnesia/1]). + +-boot_mnesia({mnesia, [boot]}). +-copy_mnesia({mnesia, [copy]}). + +-export([start_link/0, stop/0]). + +-export([add_route/1, add_route/2, add_routes/1, lookup/1, print/1, + del_route/1, del_route/2, del_routes/1, has_route/1]). + +%% gen_server Function Exports +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(state, {}). + +%%-------------------------------------------------------------------- +%% Mnesia Bootstrap +%%-------------------------------------------------------------------- + +mnesia(boot) -> + ok = emqttd_mnesia:create_table(route, [ + {type, bag}, + {ram_copies, [node()]}, + {record_name, mqtt_route}, + {attributes, record_info(fields, mqtt_route)}]); + +mnesia(copy) -> + ok = emqttd_mnesia:copy_table(route, ram_copies). + +%%-------------------------------------------------------------------- +%% Start the Router +%%-------------------------------------------------------------------- + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- + +%% @doc Lookup Routes. +-spec lookup(Topic:: binary()) -> [mqtt_route()]. +lookup(Topic) when is_binary(Topic) -> + Matched = mnesia:async_dirty(fun emqttd_trie:match/1, [Topic]), + %% Optimize: route table will be replicated to all nodes. + lists:append([ets:lookup(route, To) || To <- [Topic | Matched]]). + +%% @doc Print Routes. +-spec print(Topic :: binary()) -> [ok]. +print(Topic) -> + [io:format("~s -> ~s~n", [To, Node]) || + #mqtt_route{topic = To, node = Node} <- lookup(Topic)]. + +%% @doc Add Route +-spec add_route(binary() | mqtt_route()) -> ok | {error, Reason :: any()}. +add_route(Topic) when is_binary(Topic) -> + add_route(#mqtt_route{topic = Topic, node = node()}); +add_route(Route) when is_record(Route, mqtt_route) -> + add_routes([Route]). + +-spec add_route(Topic :: binary(), Node :: node()) -> ok | {error, Reason :: any()}. +add_route(Topic, Node) when is_binary(Topic), is_atom(Node) -> + add_route(#mqtt_route{topic = Topic, node = Node}). + +%% @doc Add Routes +-spec add_routes([mqtt_route()]) -> ok | {errory, Reason :: any()}. +add_routes(Routes) -> + Add = fun() -> [add_route_(Route) || Route <- Routes] end, + case mnesia:transaction(Add) of + {atomic, _} -> update_stats_(), ok; + {aborted, Error} -> {error, Error} + end. + +%% @private +add_route_(Route = #mqtt_route{topic = Topic}) -> + case mnesia:wread({route, Topic}) of + [] -> + case emqttd_topic:wildcard(Topic) of + true -> emqttd_trie:insert(Topic); + false -> ok + end, + mnesia:write(route, Route, write); + Records -> + case lists:member(Route, Records) of + true -> ok; + false -> mnesia:write(route, Route, write) + end + end. + +%% @doc Delete Route +-spec del_route(binary() | mqtt_route()) -> ok | {error, Reason :: any()}. +del_route(Topic) when is_binary(Topic) -> + del_route(#mqtt_route{topic = Topic, node = node()}); +del_route(Route) when is_record(Route, mqtt_route) -> + del_routes([Route]). + +-spec del_route(Topic :: binary(), Node :: node()) -> ok | {error, Reason :: any()}. +del_route(Topic, Node) when is_binary(Topic), is_atom(Node) -> + del_route(#mqtt_route{topic = Topic, node = Node}). + +%% @doc Delete Routes +-spec del_routes([mqtt_route()]) -> ok | {error, any()}. +del_routes(Routes) -> + Del = fun() -> [del_route_(Route) || Route <- Routes] end, + case mnesia:transaction(Del) of + {atomic, _} -> update_stats_(), ok; + {aborted, Error} -> {error, Error} + end. + +del_route_(Route = #mqtt_route{topic = Topic}) -> + case mnesia:wread({route, Topic}) of + [] -> + ok; + [Route] -> + %% Remove route and trie + mnesia:delete_object(route, Route, write), + case emqttd_topic:wildcard(Topic) of + true -> emqttd_trie:delete(Topic); + false -> ok + end; + _More -> + %% Remove route only + mnesia:delete_object(route, Route, write) + end. + +%% @doc Has Route? +-spec has_route(binary()) -> boolean(). +has_route(Topic) -> + Routes = case mnesia:is_transaction() of + true -> mnesia:read(route, Topic); + false -> mnesia:dirty_read(route, Topic) + end, + length(Routes) > 0. + +stop() -> gen_server:call(?MODULE, stop). + +%%-------------------------------------------------------------------- +%% gen_server Callbacks +%%-------------------------------------------------------------------- + +init([]) -> + mnesia:subscribe(system), + {ok, #state{}}. + +handle_call(stop, _From, State) -> + {stop, normal, ok, State}; + +handle_call(_Req, _From, State) -> + {reply, ignore, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({mnesia_system_event, {mnesia_up, Node}}, State) -> + lager:error("Mnesia up: ~p~n", [Node]), + {noreply, State}; + +handle_info({mnesia_system_event, {mnesia_down, Node}}, State) -> + lager:error("Mnesia down: ~p~n", [Node]), + clean_routes_(Node), + update_stats_(), + {noreply, State}; + +handle_info({mnesia_system_event, {inconsistent_database, Context, Node}}, State) -> + %% 1. Backup and restart + %% 2. Set master nodes + lager:critical("Mnesia inconsistent_database event: ~p, ~p~n", [Context, Node]), + {noreply, State}; + +handle_info({mnesia_system_event, {mnesia_overload, Details}}, State) -> + lager:critical("Mnesia overload: ~p~n", [Details]), + {noreply, State}; + +handle_info({mnesia_system_event, _Event}, State) -> + {noreply, State}; + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + mnesia:unsubscribe(system). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% Internal Functions +%%-------------------------------------------------------------------- + +%% Clean Routes on Node +clean_routes_(Node) -> + Pattern = #mqtt_route{_ = '_', node = Node}, + Clean = fun() -> + [mnesia:delete_object(route, R, write) || + R <- mnesia:match_object(route, Pattern, write)] + end, + mnesia:transaction(Clean). + +update_stats_() -> + emqttd_stats:setstats('routes/count', 'routes/max', mnesia:table_info(route, size)). + diff --git a/src/emqttd_server.erl b/src/emqttd_server.erl new file mode 100644 index 000000000..e119a5029 --- /dev/null +++ b/src/emqttd_server.erl @@ -0,0 +1,254 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2012-2016 Feng Lee . +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqttd_server). + +-behaviour(gen_server2). + +-include("emqttd.hrl"). + +-include("emqttd_protocol.hrl"). + +-include("emqttd_internal.hrl"). + +%% Mnesia Callbacks +-export([mnesia/1]). + +-boot_mnesia({mnesia, [boot]}). +-copy_mnesia({mnesia, [copy]}). + +%% API Exports +-export([start_link/3]). + +%% PubSub API +-export([subscribe/1, subscribe/3, publish/1, unsubscribe/1, unsubscribe/3, + update_subscription/4]). + +%% gen_server Function Exports +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(state, {pool, id, env, monitors}). + +%%-------------------------------------------------------------------- +%% Mnesia callbacks +%%-------------------------------------------------------------------- + +mnesia(boot) -> + ok = emqttd_mnesia:create_table(subscription, [ + {type, bag}, + {ram_copies, [node()]}, + {local_content, true}, %% subscription table is local + {record_name, mqtt_subscription}, + {attributes, record_info(fields, mqtt_subscription)}]); + +mnesia(copy) -> + ok = emqttd_mnesia:copy_table(subscription). + +%%-------------------------------------------------------------------- +%% Start server +%%-------------------------------------------------------------------- + +%% @doc Start a Server +-spec start_link(Pool, Id, Env) -> {ok, pid()} | ignore | {error, any()} when + Pool :: atom(), + Id :: pos_integer(), + Env :: list(tuple()). +start_link(Pool, Id, Env) -> + gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id, Env], []). + +%%-------------------------------------------------------------------- +%% PubSub API +%%-------------------------------------------------------------------- + +%% @doc Subscribe a Topic +-spec subscribe(binary()) -> ok. +subscribe(Topic) when is_binary(Topic) -> + From = self(), call(server(From), {subscribe, From, Topic}). + +%% @doc Subscribe from a MQTT session. +-spec subscribe(binary(), binary(), mqtt_qos()) -> ok. +subscribe(ClientId, Topic, Qos) -> + From = self(), call(server(From), {subscribe, From, ClientId, Topic, ?QOS_I(Qos)}). + +%% @doc Update a subscription. +-spec update_subscription(binary(), binary(), mqtt_qos(), mqtt_qos()) -> ok. +update_subscription(ClientId, Topic, OldQos, NewQos) -> + call(server(self()), {update_subscription, ClientId, Topic, ?QOS_I(OldQos), ?QOS_I(NewQos)}). + +%% @doc Publish a Message +-spec publish(Msg :: mqtt_message()) -> ok. +publish(Msg = #mqtt_message{from = From}) -> + trace(publish, From, Msg), + Msg1 = #mqtt_message{topic = Topic} + = emqttd_broker:foldl_hooks('message.publish', [], Msg), + %% Retain message first. Don't create retained topic. + Msg2 = case emqttd_retainer:retain(Msg1) of + ok -> emqttd_message:unset_flag(Msg1); + ignore -> Msg1 + end, + emqttd_pubsub:publish(Topic, Msg2). + +%% @doc Unsubscribe a Topic +-spec unsubscribe(binary()) -> ok. +unsubscribe(Topic) when is_binary(Topic) -> + From = self(), call(server(From), {unsubscribe, From, Topic}). + +%% @doc Unsubscribe a Topic from a MQTT session +-spec unsubscribe(binary(), binary(), mqtt_qos()) -> ok. +unsubscribe(ClientId, Topic, Qos) -> + From = self(), call(server(From), {unsubscribe, From, ClientId, Topic, Qos}). + +call(Server, Req) -> + gen_server2:call(Server, Req, infinity). + +server(From) -> + gproc_pool:pick_worker(server, From). + +%%-------------------------------------------------------------------- +%% gen_server Callbacks +%%-------------------------------------------------------------------- + +init([Pool, Id, Env]) -> + ?GPROC_POOL(join, Pool, Id), + {ok, #state{pool = Pool, id = Id, env = Env, monitors = dict:new()}}. + +handle_call({subscribe, SubPid, ClientId, Topic, Qos}, _From, State) -> + add_subscription_(ClientId, Topic, Qos), + set_subscription_stats(), + do_subscribe_(SubPid, Topic), + ok(monitor_subscriber_(ClientId, SubPid, State)); + +handle_call({subscribe, SubPid, Topic}, _From, State) -> + do_subscribe_(SubPid, Topic), + ok(monitor_subscriber_(undefined, SubPid, State)); + +handle_call({update_subscription, ClientId, Topic, OldQos, NewQos}, _From, State) -> + OldSub = #mqtt_subscription{subid = ClientId, topic = Topic, qos = OldQos}, + NewSub = #mqtt_subscription{subid = ClientId, topic = Topic, qos = NewQos}, + mnesia:transaction(fun update_subscription_/2, [OldSub, NewSub]), + set_subscription_stats(), ok(State); + +handle_call({unsubscribe, SubPid, ClientId, Topic, Qos}, From, State) -> + del_subscription_(ClientId, Topic, Qos), + set_subscription_stats(), + handle_call({unsubscribe, SubPid, Topic}, From, State); + +handle_call({unsubscribe, SubPid, Topic}, _From, State) -> + emqttd_pubsub:unsubscribe(Topic, SubPid), + ets:delete_object(subscribed, {SubPid, Topic}), + ok(State); + +handle_call(Req, _From, State) -> + ?UNEXPECTED_REQ(Req, State). + +handle_cast(Msg, State) -> + ?UNEXPECTED_MSG(Msg, State). + +handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{monitors = Monitors}) -> + %% unsubscribe + lists:foreach(fun({_, Topic}) -> + emqttd_pubsub:async_unsubscribe(Topic, DownPid) + end, ets:lookup(subscribed, DownPid)), + ets:delete(subscribed, DownPid), + + %% clean subscriptions + case dict:find(DownPid, Monitors) of + {ok, {undefined, _}} -> ok; + {ok, {ClientId, _}} -> mnesia:dirty_delete(subscription, ClientId); + error -> ok + end, + {noreply, State#state{monitors = dict:erase(DownPid, Monitors)}}; + +handle_info(Info, State) -> + ?UNEXPECTED_INFO(Info, State). + +terminate(_Reason, #state{pool = Pool, id = Id}) -> + ?GPROC_POOL(leave, Pool, Id). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% Internal Functions +%%-------------------------------------------------------------------- + +%% @private +%% @doc Add a subscription. +-spec add_subscription_(binary(), binary(), mqtt_qos()) -> ok. +add_subscription_(ClientId, Topic, Qos) -> + add_subscription_(#mqtt_subscription{subid = ClientId, topic = Topic, qos = Qos}). + +-spec add_subscription_(mqtt_subscription()) -> ok. +add_subscription_(Subscription) when is_record(Subscription, mqtt_subscription) -> + mnesia:dirty_write(subscription, Subscription). + +update_subscription_(OldSub, NewSub) -> + mnesia:delete_object(subscription, OldSub, write), + mnesia:write(subscription, NewSub, write). + +%% @private +%% @doc Delete a subscription +-spec del_subscription_(binary(), binary(), mqtt_qos()) -> ok. +del_subscription_(ClientId, Topic, Qos) -> + del_subscription_(#mqtt_subscription{subid = ClientId, topic = Topic, qos = Qos}). + +del_subscription_(Subscription) when is_record(Subscription, mqtt_subscription) -> + mnesia:dirty_delete_object(subscription, Subscription). + +%% @private +%% @doc Call pubsub to subscribe +do_subscribe_(SubPid, Topic) -> + case ets:match(subscribed, {SubPid, Topic}) of + [] -> + emqttd_pubsub:subscribe(Topic, SubPid), + ets:insert(subscribed, {SubPid, Topic}); + [_] -> + false + end. + +monitor_subscriber_(ClientId, SubPid, State = #state{monitors = Monitors}) -> + case dict:find(SubPid, Monitors) of + {ok, _} -> + State; + error -> + MRef = erlang:monitor(process, SubPid), + State#state{monitors = dict:store(SubPid, {ClientId, MRef}, Monitors)} + end. + +%%-------------------------------------------------------------------- +%% Trace Functions +%%-------------------------------------------------------------------- + +trace(publish, From, _Msg) when is_atom(From) -> + %% Dont' trace '$SYS' publish + ignore; + +trace(publish, From, #mqtt_message{topic = Topic, payload = Payload}) -> + lager:info([{client, From}, {topic, Topic}], + "~s PUBLISH to ~s: ~p", [From, Topic, Payload]). + +%%-------------------------------------------------------------------- +%% Subscription Statistics +%%-------------------------------------------------------------------- + +set_subscription_stats() -> + emqttd_stats:setstats('subscriptions/count', 'subscriptions/max', mnesia:table_info(subscription, size)). + +%%-------------------------------------------------------------------- + +ok(State) -> {reply, ok, State}. + diff --git a/src/lager_emqtt_backend.erl b/src/lager_emqtt_backend.erl index 9c28090bf..9deccac52 100644 --- a/src/lager_emqtt_backend.erl +++ b/src/lager_emqtt_backend.erl @@ -77,9 +77,7 @@ publish_log(Message, State = #state{formatter = Formatter, format_config = FormatConfig}) -> Severity = lager_msg:severity(Message), Payload = Formatter:format(Message, FormatConfig), - emqttd_pubsub:publish( - emqttd_message:make( - log, topic(Severity), iolist_to_binary(Payload))), + emqttd:publish(emqttd_message:make(log, topic(Severity), iolist_to_binary(Payload))), {ok, State}. topic(Severity) ->