publish(To, ...

This commit is contained in:
Feng 2015-12-07 21:35:06 +08:00
parent 3a8ada21e0
commit 00f362fabc
1 changed files with 23 additions and 18 deletions

View File

@ -211,35 +211,35 @@ cast(Msg) ->
-spec publish(Msg :: mqtt_message()) -> ok. -spec publish(Msg :: mqtt_message()) -> ok.
publish(Msg = #mqtt_message{from = From}) -> publish(Msg = #mqtt_message{from = From}) ->
trace(publish, From, Msg), trace(publish, From, Msg),
Msg1 = #mqtt_message{topic = Topic} Msg1 = #mqtt_message{topic = To}
= emqttd_broker:foldl_hooks('message.publish', [], Msg), = emqttd_broker:foldl_hooks('message.publish', [], Msg),
%% Retain message first. Don't create retained topic. %% Retain message first. Don't create retained topic.
case emqttd_retainer:retain(Msg1) of case emqttd_retainer:retain(Msg1) of
ok -> ok ->
%% TODO: why unset 'retain' flag? %% TODO: why unset 'retain' flag?
publish(Topic, emqttd_message:unset_flag(Msg1)); publish(To, emqttd_message:unset_flag(Msg1));
ignore -> ignore ->
publish(Topic, Msg1) publish(To, Msg1)
end. end.
publish(Topic, Msg) when is_binary(Topic) -> publish(To, Msg) ->
lists:foreach(fun(#mqtt_topic{topic = Name, node = Node}) -> lists:foreach(fun(#mqtt_topic{topic = Topic, node = Node}) ->
case Node =:= node() of case Node =:= node() of
true -> ?ROUTER:route(Name, Msg); true -> ?ROUTER:route(Topic, Msg);
false -> rpc:cast(Node, ?ROUTER, route, [Name, Msg]) false -> rpc:cast(Node, ?ROUTER, route, [Topic, Msg])
end end
end, match(Topic)). end, match(To)).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Match Topic Name with Topic Filters %% @doc Match Topic Name with Topic Filters
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec match(Topic :: binary()) -> [mqtt_topic()]. -spec match(binary()) -> [mqtt_topic()].
match(Topic) when is_binary(Topic) -> match(To) ->
MatchedTopics = mnesia:async_dirty(fun emqttd_trie:match/1, [Topic]), MatchedTopics = mnesia:async_dirty(fun emqttd_trie:match/1, [To]),
%% ets:lookup for topic table will be copied. %% ets:lookup for topic table will be replicated.
lists:append([ets:lookup(topic, Name) || Name <- MatchedTopics]). lists:append([ets:lookup(topic, Topic) || Topic <- MatchedTopics]).
%%%============================================================================= %%%=============================================================================
%%% gen_server callbacks %%% gen_server callbacks
@ -252,13 +252,16 @@ init([Pool, Id, StatsFun, Opts]) ->
handle_call({subscribe, {SubId, SubPid}, TopicTable}, _From, handle_call({subscribe, {SubId, SubPid}, TopicTable}, _From,
State = #state{statsfun = StatsFun}) -> State = #state{statsfun = StatsFun}) ->
Topics = [Topic || {Topic, _Qos} <- TopicTable],
%% Add routes first %% Add routes first
?ROUTER:add_routes(TopicTable, SubPid), ?ROUTER:add_routes(Topics, SubPid),
%% Add topics %% Insert topic records to global topic table
Topics = [#mqtt_topic{topic = Topic, node = node()} || {Topic, _Qos} <- TopicTable], Records = [#mqtt_topic{topic = Topic, node = node()} || Topic <- Topics],
case mnesia:transaction(fun add_topics/1, [Topics]) of case mnesia:transaction(fun add_topics/1, [Records]) of
{atomic, _} -> {atomic, _} ->
StatsFun(topic), StatsFun(topic),
if_subscription( if_subscription(
@ -268,6 +271,7 @@ handle_call({subscribe, {SubId, SubPid}, TopicTable}, _From,
emqttd_pooler:async_submit({mnesia, async_dirty, Args}), emqttd_pooler:async_submit({mnesia, async_dirty, Args}),
StatsFun(subscription) StatsFun(subscription)
end), end),
%% Grant all qos...
{reply, {ok, [Qos || {_Topic, Qos} <- TopicTable]}, State}; {reply, {ok, [Qos || {_Topic, Qos} <- TopicTable]}, State};
{aborted, Error} -> {aborted, Error} ->
{reply, {error, Error}, State} {reply, {error, Error}, State}
@ -293,12 +297,13 @@ handle_cast(Msg, State) ->
?UNEXPECTED_MSG(Msg, State). ?UNEXPECTED_MSG(Msg, State).
handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State) -> handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State) ->
Routes = ?ROUTER:lookup_routes(DownPid), Routes = ?ROUTER:lookup_routes(DownPid),
%% Delete all routes of the process %% Delete all routes of the process
?ROUTER:delete_routes(DownPid), ?ROUTER:delete_routes(DownPid),
?HELPER:aging([Topic || {Topic, _Qos} <- Routes, not ?ROUTER:has_route(Topic)]), ?HELPER:aging([Topic || Topic <- Routes, not ?ROUTER:has_route(Topic)]),
{noreply, State, hibernate}; {noreply, State, hibernate};