diff --git a/src/emqtt_client.erl b/src/emqtt_client.erl index e7bc50124..5fa98f413 100644 --- a/src/emqtt_client.erl +++ b/src/emqtt_client.erl @@ -418,7 +418,8 @@ validate_frame(?UNSUBSCRIBE, Frame) -> validate_frame(?SUBSCRIBE, Frame); validate_frame(?SUBSCRIBE, #mqtt_frame{variable = #mqtt_frame_subscribe{topic_table = Topics}}) -> - ErrTopics = [Topic || Topic <- Topics, not emqtt_topic:validate({subscribe, Topic})], + ErrTopics = [Topic || #mqtt_topic{name=Topic} <- Topics, + not emqtt_topic:validate({subscribe, Topic})], case ErrTopics of [] -> ok; _ -> ?ERROR("error topics: ~p", [ErrTopics]), {error, badtopic} diff --git a/src/emqtt_router.erl b/src/emqtt_router.erl index 98e503d88..dd314f81c 100644 --- a/src/emqtt_router.erl +++ b/src/emqtt_router.erl @@ -24,6 +24,7 @@ -export([topics/0, subscribe/2, unsubscribe/2, + publish/1, publish/2, route/2, match/1, @@ -52,6 +53,9 @@ subscribe({Topic, Qos}, Client) when is_pid(Client) -> unsubscribe(Topic, Client) when is_list(Topic) and is_pid(Client) -> gen_server2:cast(?MODULE, {unsubscribe, Topic, Client}). +publish(Msg=#mqtt_msg{topic=Topic}) -> + publish(Topic, Msg). + %publish to cluster node. publish(Topic, Msg) when is_list(Topic) and is_record(Msg, mqtt_msg) -> lists:foreach(fun(#topic{name=Name, node=Node}) ->