emqttd_server:publish/1

This commit is contained in:
Feng Lee 2016-08-10 22:22:12 +08:00
parent 0d16f766c3
commit e2dab894af
4 changed files with 8 additions and 6 deletions

View File

@ -99,8 +99,7 @@ subscribe(Topic, Subscriber, Options) ->
%% @doc Publish MQTT Message %% @doc Publish MQTT Message
-spec(publish(mqtt_message()) -> {ok, mqtt_delivery()} | ignore). -spec(publish(mqtt_message()) -> {ok, mqtt_delivery()} | ignore).
publish(Msg = #mqtt_message{topic = Topic}) -> publish(Msg) -> emqttd_server:publish(Msg).
emqttd_server:publish(Topic, Msg).
%% @doc Unsubscribe %% @doc Unsubscribe
-spec(unsubscribe(iodata()) -> ok | pubsub_error()). -spec(unsubscribe(iodata()) -> ok | pubsub_error()).

View File

@ -52,6 +52,9 @@ async_subscribe(Topic, Subscriber) ->
publish(Topic, Msg) -> publish(Topic, Msg) ->
route(emqttd_router:match(Topic), delivery(Msg)). route(emqttd_router:match(Topic), delivery(Msg)).
route([], _Delivery) ->
ignore;
%% Dispatch on the local node %% Dispatch on the local node
route([#mqtt_route{topic = To, node = Node}], route([#mqtt_route{topic = To, node = Node}],
Delivery = #mqtt_delivery{flows = Flows}) when Node =:= node() -> Delivery = #mqtt_delivery{flows = Flows}) when Node =:= node() ->

View File

@ -29,7 +29,7 @@
-export([start_link/3]). -export([start_link/3]).
%% PubSub API. %% PubSub API.
-export([subscribe/1, subscribe/2, subscribe/3, publish/2, -export([subscribe/1, subscribe/2, subscribe/3, publish/1,
unsubscribe/1, unsubscribe/2]). unsubscribe/1, unsubscribe/2]).
%% Async PubSub API. %% Async PubSub API.
@ -85,8 +85,8 @@ async_subscribe(Topic, Subscriber, Options) when is_binary(Topic) ->
cast(pick(Subscriber), {subscribe, Topic, Subscriber, Options}). cast(pick(Subscriber), {subscribe, Topic, Subscriber, Options}).
%% @doc Publish message to Topic. %% @doc Publish message to Topic.
-spec(publish(binary(), any()) -> {ok, mqtt_delivery()} | ignore). -spec(publish(mqtt_message()) -> {ok, mqtt_delivery()} | ignore).
publish(Topic, Msg = #mqtt_message{from = From}) -> publish(Msg = #mqtt_message{from = From}) ->
trace(publish, From, Msg), trace(publish, From, Msg),
case emqttd_hook:run('message.publish', [], Msg) of case emqttd_hook:run('message.publish', [], Msg) of
{ok, Msg1 = #mqtt_message{topic = Topic}} -> {ok, Msg1 = #mqtt_message{topic = Topic}} ->

View File

@ -104,7 +104,7 @@ dispatch(ClientId, Topic, Msg) ->
try ets:lookup_element(mqtt_local_session, ClientId, 2) of try ets:lookup_element(mqtt_local_session, ClientId, 2) of
Pid -> Pid ! {dispatch, Topic, Msg} Pid -> Pid ! {dispatch, Topic, Msg}
catch catch
error:badarg -> ok %%TODO: How?? error:badarg -> io:format("Session Not Found: ~p~n", [ClientId]), ok %%TODO: How??
end. end.
call(SM, Req) -> call(SM, Req) ->