diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index 235d78be3..96851ae79 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -211,35 +211,35 @@ cast(Msg) -> -spec publish(Msg :: mqtt_message()) -> ok. publish(Msg = #mqtt_message{from = From}) -> trace(publish, From, Msg), - Msg1 = #mqtt_message{topic = Topic} + Msg1 = #mqtt_message{topic = To} = emqttd_broker:foldl_hooks('message.publish', [], Msg), %% Retain message first. Don't create retained topic. case emqttd_retainer:retain(Msg1) of ok -> %% TODO: why unset 'retain' flag? - publish(Topic, emqttd_message:unset_flag(Msg1)); + publish(To, emqttd_message:unset_flag(Msg1)); ignore -> - publish(Topic, Msg1) + publish(To, Msg1) end. -publish(Topic, Msg) when is_binary(Topic) -> - lists:foreach(fun(#mqtt_topic{topic = Name, node = Node}) -> +publish(To, Msg) -> + lists:foreach(fun(#mqtt_topic{topic = Topic, node = Node}) -> case Node =:= node() of - true -> ?ROUTER:route(Name, Msg); - false -> rpc:cast(Node, ?ROUTER, route, [Name, Msg]) + true -> ?ROUTER:route(Topic, Msg); + false -> rpc:cast(Node, ?ROUTER, route, [Topic, Msg]) end - end, match(Topic)). + end, match(To)). %%------------------------------------------------------------------------------ %% @doc Match Topic Name with Topic Filters %% @end %%------------------------------------------------------------------------------ --spec match(Topic :: binary()) -> [mqtt_topic()]. -match(Topic) when is_binary(Topic) -> - MatchedTopics = mnesia:async_dirty(fun emqttd_trie:match/1, [Topic]), - %% ets:lookup for topic table will be copied. - lists:append([ets:lookup(topic, Name) || Name <- MatchedTopics]). +-spec match(binary()) -> [mqtt_topic()]. +match(To) -> + MatchedTopics = mnesia:async_dirty(fun emqttd_trie:match/1, [To]), + %% ets:lookup for topic table will be replicated. + lists:append([ets:lookup(topic, Topic) || Topic <- MatchedTopics]). %%%============================================================================= %%% gen_server callbacks @@ -252,13 +252,16 @@ init([Pool, Id, StatsFun, Opts]) -> handle_call({subscribe, {SubId, SubPid}, TopicTable}, _From, State = #state{statsfun = StatsFun}) -> + + Topics = [Topic || {Topic, _Qos} <- TopicTable], + %% Add routes first - ?ROUTER:add_routes(TopicTable, SubPid), + ?ROUTER:add_routes(Topics, SubPid), - %% Add topics - Topics = [#mqtt_topic{topic = Topic, node = node()} || {Topic, _Qos} <- TopicTable], + %% Insert topic records to global topic table + Records = [#mqtt_topic{topic = Topic, node = node()} || Topic <- Topics], - case mnesia:transaction(fun add_topics/1, [Topics]) of + case mnesia:transaction(fun add_topics/1, [Records]) of {atomic, _} -> StatsFun(topic), if_subscription( @@ -268,6 +271,7 @@ handle_call({subscribe, {SubId, SubPid}, TopicTable}, _From, emqttd_pooler:async_submit({mnesia, async_dirty, Args}), StatsFun(subscription) end), + %% Grant all qos... {reply, {ok, [Qos || {_Topic, Qos} <- TopicTable]}, State}; {aborted, Error} -> {reply, {error, Error}, State} @@ -293,12 +297,13 @@ handle_cast(Msg, State) -> ?UNEXPECTED_MSG(Msg, State). handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State) -> + Routes = ?ROUTER:lookup_routes(DownPid), %% Delete all routes of the process ?ROUTER:delete_routes(DownPid), - ?HELPER:aging([Topic || {Topic, _Qos} <- Routes, not ?ROUTER:has_route(Topic)]), + ?HELPER:aging([Topic || Topic <- Routes, not ?ROUTER:has_route(Topic)]), {noreply, State, hibernate};