diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index 1d528402f..ee4b5968b 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal @@ -19,7 +19,7 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc emqttd pubsub +%%% @doc PubSub %%% %%% @author Feng Lee %%%----------------------------------------------------------------------------- @@ -43,9 +43,7 @@ -export([start_link/4]). -export([create/2, lookup/2, subscribe/1, subscribe/2, - unsubscribe/1, unsubscribe/2, publish/1, delete/2]). - -%% Subscriptions API + publish/1, unsubscribe/1, unsubscribe/2, delete/2]). %% Local node -export([match/1]). @@ -62,8 +60,6 @@ -define(ROUTER, emqttd_router). --define(HELPER, emqttd_pubsub_helper). - %%%============================================================================= %%% Mnesia callbacks %%%============================================================================= @@ -123,14 +119,11 @@ cache_env(Key) -> %%% API %%%============================================================================= -%%------------------------------------------------------------------------------ %% @doc Start one pubsub server -%% @end -%%------------------------------------------------------------------------------ -spec start_link(Pool, Id, StatsFun, Opts) -> {ok, pid()} | ignore | {error, any()} when Pool :: atom(), Id :: pos_integer(), - StatsFun :: fun(), + StatsFun :: fun((atom()) -> any()), Opts :: list(tuple()). start_link(Pool, Id, StatsFun, Opts) -> gen_server2:start_link({local, name(Id)}, ?MODULE, [Pool, Id, StatsFun, Opts], []). @@ -138,11 +131,9 @@ start_link(Pool, Id, StatsFun, Opts) -> name(Id) -> list_to_atom("emqttd_pubsub_" ++ integer_to_list(Id)). -%%------------------------------------------------------------------------------ %% @doc Create Topic or Subscription. -%% @end -%%------------------------------------------------------------------------------ --spec create(topic | subscription, binary() | {binary(), binary(), mqtt_qos()}) -> ok | {error, any()}. +-spec create(topic, emqttd_topic:topic()) -> ok | {error, any()}; + (subscription, {binary(), binary(), mqtt_qos()}) -> ok | {error, any()}. create(topic, Topic) when is_binary(Topic) -> Record = #mqtt_topic{topic = Topic, node = node()}, case mnesia:transaction(fun add_topic/1, [Record]) of @@ -151,39 +142,33 @@ create(topic, Topic) when is_binary(Topic) -> end; create(subscription, {SubId, Topic, Qos}) when is_binary(SubId) andalso is_binary(Topic) -> - case mnesia:transaction(fun add_subscription/2, [SubId, {Topic, Qos}]) of + case mnesia:transaction(fun add_subscription/2, [SubId, {Topic, ?QOS_I(Qos)}]) of {atomic, ok} -> ok; {aborted, Error} -> {error, Error} end. -%%------------------------------------------------------------------------------ %% @doc Lookup Topic or Subscription. -%% @end -%%------------------------------------------------------------------------------ --spec lookup(topic | subscription, binary()) -> list(). -lookup(topic, Topic) -> +-spec lookup(topic, emqttd_topic:topic()) -> list(mqtt_topic()); + (subscription, binary()) -> list(mqtt_subscription()). +lookup(topic, Topic) when is_binary(Topic) -> mnesia:dirty_read(topic, Topic); -lookup(subscription, ClientId) -> - mnesia:dirty_read(subscription, ClientId). +lookup(subscription, SubId) when is_binary(SubId) -> + mnesia:dirty_read(subscription, SubId). -%%------------------------------------------------------------------------------ %% @doc Delete Topic or Subscription. -%% @end -%%------------------------------------------------------------------------------ +-spec delete(topic, emqttd_topic:topic()) -> ok | {error, any()}; + (subscription, binary() | {binary(), emqttd_topic:topic()}) -> ok. delete(topic, _Topic) -> {error, unsupported}; -delete(subscription, ClientId) when is_binary(ClientId) -> - mnesia:dirty_delete({subscription, ClientId}); +delete(subscription, SubId) when is_binary(SubId) -> + mnesia:dirty_delete({subscription, SubId}); -delete(subscription, {ClientId, Topic}) when is_binary(ClientId) -> - mnesia:async_dirty(fun remove_subscriptions/2, [ClientId, [Topic]]). +delete(subscription, {SubId, Topic}) when is_binary(SubId) andalso is_binary(Topic) -> + mnesia:async_dirty(fun remove_subscriptions/2, [SubId, [Topic]]). -%%------------------------------------------------------------------------------ %% @doc Subscribe Topics -%% @end -%%------------------------------------------------------------------------------ -spec subscribe({Topic, Qos} | list({Topic, Qos})) -> {ok, Qos | list(Qos)} | {error, any()} when Topic :: binary(), @@ -206,34 +191,28 @@ subscribe(ClientId, TopicTable) when is_binary(ClientId) andalso is_list(TopicTa fixqos(TopicTable) -> [{Topic, ?QOS_I(Qos)} || {Topic, Qos} <- TopicTable]. -call(Request) -> - PubSub = gproc_pool:pick_worker(pubsub, self()), - gen_server2:call(PubSub, Request, infinity). - -%%------------------------------------------------------------------------------ %% @doc Unsubscribe Topic or Topics -%% @end -%%------------------------------------------------------------------------------ --spec unsubscribe(binary() | list(binary())) -> ok. +-spec unsubscribe(emqttd_topic:topic() | list(emqttd_topic:topic())) -> ok. unsubscribe(Topic) when is_binary(Topic) -> unsubscribe([Topic]); unsubscribe(Topics = [Topic|_]) when is_binary(Topic) -> cast({unsubscribe, {undefined, self()}, Topics}). --spec unsubscribe(binary(), binary() | list(binary())) -> ok. +-spec unsubscribe(binary(), emqttd_topic:topic() | list(emqttd_topic:topic())) -> ok. unsubscribe(ClientId, Topic) when is_binary(ClientId) andalso is_binary(Topic) -> unsubscribe(ClientId, [Topic]); unsubscribe(ClientId, Topics = [Topic|_]) when is_binary(Topic) -> cast({unsubscribe, {ClientId, self()}, Topics}). -cast(Msg) -> - PubSub = gproc_pool:pick_worker(pubsub, self()), - gen_server2:cast(PubSub, Msg). +call(Request) -> + gen_server2:call(pick(self()), Request, infinity). + +cast(Msg) -> + gen_server2:cast(pick(self()), Msg). + +pick(Self) -> gproc_pool:pick_worker(pubsub, Self). -%%------------------------------------------------------------------------------ %% @doc Publish to cluster nodes -%% @end -%%------------------------------------------------------------------------------ -spec publish(Msg :: mqtt_message()) -> ok. publish(Msg = #mqtt_message{from = From}) -> trace(publish, From, Msg), @@ -257,35 +236,41 @@ publish(To, Msg) -> end end, match(To)). -%%------------------------------------------------------------------------------ %% @doc Match Topic Name with Topic Filters -%% @end -%%------------------------------------------------------------------------------ --spec match(binary()) -> [mqtt_topic()]. +-spec match(emqttd_topic:topic()) -> [mqtt_topic()]. match(To) -> MatchedTopics = mnesia:async_dirty(fun emqttd_trie:match/1, [To]), - %% ets:lookup for topic table will be replicated. + %% ets:lookup for topic table will be replicated to all nodes. lists:append([ets:lookup(topic, Topic) || Topic <- MatchedTopics]). %%%============================================================================= %%% gen_server callbacks %%%============================================================================= -init([Pool, Id, StatsFun, Opts]) -> - ?ROUTER:init(Opts), +init([Pool, Id, StatsFun, _Opts]) -> ?GPROC_POOL(join, Pool, Id), {ok, #state{pool = Pool, id = Id, statsfun = StatsFun}}. handle_call({subscribe, {SubId, SubPid}, TopicTable}, _From, State = #state{statsfun = StatsFun}) -> + %% Monitor SubPid first + try_monitor(SubPid), + + %% Topics Topics = [Topic || {Topic, _Qos} <- TopicTable], - %% Add routes first - ?ROUTER:add_routes(Topics, SubPid), + NewTopics = Topics -- reverse_routes(SubPid), - %% Insert topic records to global topic table - Records = [#mqtt_topic{topic = Topic, node = node()} || Topic <- Topics], + %% Add routes + ?ROUTER:add_routes(NewTopics, SubPid), + + insert_reverse_routes(SubPid, NewTopics), + + StatsFun(route), + + %% Insert topic records to mnesia + Records = [#mqtt_topic{topic = Topic, node = node()} || Topic <- NewTopics], case mnesia:transaction(fun add_topics/1, [Records]) of {atomic, _} -> @@ -307,9 +292,12 @@ handle_call(Req, _From, State) -> ?UNEXPECTED_REQ(Req, State). handle_cast({unsubscribe, {SubId, SubPid}, Topics}, State = #state{statsfun = StatsFun}) -> + %% Delete routes first ?ROUTER:delete_routes(Topics, SubPid), + delete_reverse_routes(SubPid, Topics), + %% Remove subscriptions if_subscription( fun(_) -> @@ -324,12 +312,11 @@ handle_cast(Msg, State) -> handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State) -> - Routes = ?ROUTER:lookup_routes(DownPid), + Topics = reverse_routes(DownPid), - %% Delete all routes of the process - ?ROUTER:delete_routes(DownPid), + ?ROUTER:delete_routes(Topics, DownPid), - ?HELPER:aging([Topic || Topic <- Routes, not ?ROUTER:has_route(Topic)]), + delete_reverse_routes(DownPid), {noreply, State, hibernate}; @@ -395,6 +382,31 @@ remove_subscriptions(SubId, Topics) -> delete_subscription(Record) -> mnesia:delete_object(subscription, Record, write). +reverse_routes(SubPid) -> + case ets:member(reverse_route, SubPid) of + true -> + try ets:lookup_element(reverse_route, SubPid, 2) catch error:badarg -> [] end; + false -> + [] + end. + +insert_reverse_routes(SubPid, Topics) -> + ets:insert(reverse_route, [{SubPid, Topic} || Topic <- Topics]). + +delete_reverse_routes(SubPid, Topics) -> + lists:foreach(fun(Topic) -> + ets:delete_object(reverse_route, {SubPid, Topic}) + end, Topics). + +delete_reverse_routes(SubPid) -> + ets:delete(reverse_route, SubPid). + +try_monitor(SubPid) -> + case ets:member(reverse_route, SubPid) of + true -> ignore; + false -> erlang:monitor(process, SubPid) + end. + %%%============================================================================= %%% Trace Functions %%%=============================================================================