diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index abca547c7..2a8859d0c 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -87,7 +87,13 @@ init([OriginConn, MqttEnv]) -> end, ConnName = esockd_net:format(PeerName), Self = self(), - SendFun = fun(Data) -> + + %%TODO: Send packet... + SendFun = fun(Packet) -> + Data = emqttd_serializer:serialize(Packet), + %%TODO: How to Log??? + ?LOG(debug, "SEND ~p", [Data], #client_state{connname = ConnName}), + emqttd_metrics:inc('bytes/sent', size(Data)), try Connection:async_send(Data) of true -> ok catch diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index bfaee267f..3448ce387 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -236,8 +236,8 @@ publish(Packet = ?PUBLISH_PACKET(?QOS_2, _PacketId), State) -> with_puback(Type, Packet = ?PUBLISH_PACKET(_Qos, PacketId), State = #proto_state{client_id = ClientId, - username = Username, - session = Session}) -> + username = Username, + session = Session}) -> Msg = emqttd_message:from_packet(Username, ClientId, Packet), case emqttd_session:publish(Session, Msg) of ok -> @@ -256,10 +256,7 @@ send(Packet, State = #proto_state{sendfun = SendFun}) when is_record(Packet, mqtt_packet) -> trace(send, Packet, State), emqttd_metrics:sent(Packet), - Data = emqttd_serializer:serialize(Packet), - ?LOG(debug, "SEND ~p", [Data], State), - emqttd_metrics:inc('bytes/sent', size(Data)), - SendFun(Data), + SendFun(Packet), {ok, State}. trace(recv, Packet, ProtoState) -> diff --git a/src/emqttd_ws_client.erl b/src/emqttd_ws_client.erl index 6803c668c..d8f30d309 100644 --- a/src/emqttd_ws_client.erl +++ b/src/emqttd_ws_client.erl @@ -66,7 +66,12 @@ init([MqttEnv, WsPid, Req, ReplyChannel]) -> {ok, Peername} = Req:get(peername), Headers = mochiweb_headers:to_list( mochiweb_request:get(headers, Req)), - SendFun = fun(Payload) -> ReplyChannel({binary, Payload}) end, + %% SendFun = fun(Payload) -> ReplyChannel({binary, Payload}) end, + SendFun = fun(Packet) -> + Data = emqttd_serializer:serialize(Packet), + emqttd_metrics:inc('bytes/sent', size(Data)), + ReplyChannel({binary, Data}) + end, ProtoState = emqttd_protocol:init(Peername, SendFun, [{ws_initial_headers, Headers} | MqttEnv]), {ok, #wsclient_state{ws_pid = WsPid, peer = Req:get(peer), diff --git a/test/emqttd_SUITE.erl b/test/emqttd_SUITE.erl index e8ecd2aea..69047c11c 100644 --- a/test/emqttd_SUITE.erl +++ b/test/emqttd_SUITE.erl @@ -22,6 +22,8 @@ -include_lib("eunit/include/eunit.hrl"). +-define(CONTENT_TYPE, "application/x-www-form-urlencoded"). + all() -> [{group, protocol}, {group, pubsub}, @@ -62,7 +64,10 @@ groups() -> [dispatch_retained_messages]}, {backend, [sequence], []}, - {http, [sequence], [request_status]}, + {http, [sequence], + [request_status, + request_publish + ]}, {cli, [sequence], [ctl_register_cmd, cli_status, @@ -330,6 +335,32 @@ request_status(_) -> httpc:request(get, {Url, []}, [], []), ?assertEqual(binary_to_list(Status), Return). +request_publish(_) -> + ok = emqttd:subscribe(<<"a/b/c">>, self(), [{qos, 1}]), + Params = "qos=1&retain=0&topic=a/b/c&message=hello", + ?assert(connect_emqttd_publish_(post, "mqtt/publish", Params, auth_header_("", ""))), + ?assert(receive {dispatch, <<"a/b/c">>, _} -> true after 2 -> false end), + emqttd:unsubscribe(<<"a/b/c">>). + +connect_emqttd_publish_(Method, Api, Params, Auth) -> + Url = "http://127.0.0.1:8083/" ++ Api, + case httpc:request(Method, {Url, [Auth], ?CONTENT_TYPE, Params}, [], []) of + {error, socket_closed_remotely} -> + false; + {ok, {{"HTTP/1.1", 200, "OK"}, _, _Return} } -> + true; + {ok, {{"HTTP/1.1", 400, _}, _, []}} -> + false; + {ok, {{"HTTP/1.1", 404, _}, _, []}} -> + false + end. + +auth_header_(User, Pass) -> + Encoded = base64:encode_to_string(lists:append([User,":",Pass])), + {"Authorization","Basic " ++ Encoded}. + + + %% cli group %%--------------------------------------------------------------------