diff --git a/src/emqttd.erl b/src/emqttd.erl index a741adc59..2518d39b3 100644 --- a/src/emqttd.erl +++ b/src/emqttd.erl @@ -99,8 +99,7 @@ subscribe(Topic, Subscriber, Options) -> %% @doc Publish MQTT Message -spec(publish(mqtt_message()) -> {ok, mqtt_delivery()} | ignore). -publish(Msg = #mqtt_message{topic = Topic}) -> - emqttd_server:publish(Topic, Msg). +publish(Msg) -> emqttd_server:publish(Msg). %% @doc Unsubscribe -spec(unsubscribe(iodata()) -> ok | pubsub_error()). diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index 1cb68cf08..af1c1537d 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -52,6 +52,9 @@ async_subscribe(Topic, Subscriber) -> publish(Topic, Msg) -> route(emqttd_router:match(Topic), delivery(Msg)). +route([], _Delivery) -> + ignore; + %% Dispatch on the local node route([#mqtt_route{topic = To, node = Node}], Delivery = #mqtt_delivery{flows = Flows}) when Node =:= node() -> diff --git a/src/emqttd_server.erl b/src/emqttd_server.erl index 9feb3eb98..c56ed2309 100644 --- a/src/emqttd_server.erl +++ b/src/emqttd_server.erl @@ -29,7 +29,7 @@ -export([start_link/3]). %% PubSub API. --export([subscribe/1, subscribe/2, subscribe/3, publish/2, +-export([subscribe/1, subscribe/2, subscribe/3, publish/1, unsubscribe/1, unsubscribe/2]). %% Async PubSub API. @@ -85,8 +85,8 @@ async_subscribe(Topic, Subscriber, Options) when is_binary(Topic) -> cast(pick(Subscriber), {subscribe, Topic, Subscriber, Options}). %% @doc Publish message to Topic. --spec(publish(binary(), any()) -> {ok, mqtt_delivery()} | ignore). -publish(Topic, Msg = #mqtt_message{from = From}) -> +-spec(publish(mqtt_message()) -> {ok, mqtt_delivery()} | ignore). +publish(Msg = #mqtt_message{from = From}) -> trace(publish, From, Msg), case emqttd_hook:run('message.publish', [], Msg) of {ok, Msg1 = #mqtt_message{topic = Topic}} -> diff --git a/src/emqttd_sm.erl b/src/emqttd_sm.erl index 0be4524c7..43de1e91a 100644 --- a/src/emqttd_sm.erl +++ b/src/emqttd_sm.erl @@ -104,7 +104,7 @@ dispatch(ClientId, Topic, Msg) -> try ets:lookup_element(mqtt_local_session, ClientId, 2) of Pid -> Pid ! {dispatch, Topic, Msg} catch - error:badarg -> ok %%TODO: How?? + error:badarg -> io:format("Session Not Found: ~p~n", [ClientId]), ok %%TODO: How?? end. call(SM, Req) ->