diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index f3169339a..89cc6a4a7 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -80,13 +80,20 @@ init([OriginConn, MqttEnv]) -> end, ConnName = esockd_net:format(PeerName), Self = self(), - SendFun = fun(Data) -> + + %%TODO: Send packet... + SendFun = fun(Packet) -> + Data = emqttd_serializer:serialize(Packet), + %%TODO: How to Log??? + ?LOG(debug, "SEND ~p", [Data], #client_state{connname = ConnName}), + emqttd_metrics:inc('bytes/sent', size(Data)), try Connection:async_send(Data) of true -> ok catch error:Error -> Self ! {shutdown, Error} end end, + PktOpts = proplists:get_value(packet, MqttEnv), ParserFun = emqttd_parser:new(PktOpts), ProtoState = emqttd_protocol:init(PeerName, SendFun, PktOpts), diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index 7c8a87146..64594866e 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -236,8 +236,8 @@ publish(Packet = ?PUBLISH_PACKET(?QOS_2, _PacketId), State) -> with_puback(Type, Packet = ?PUBLISH_PACKET(_Qos, PacketId), State = #proto_state{client_id = ClientId, - username = Username, - session = Session}) -> + username = Username, + session = Session}) -> Msg = emqttd_message:from_packet(Username, ClientId, Packet), case emqttd_session:publish(Session, Msg) of ok -> @@ -256,10 +256,7 @@ send(Packet, State = #proto_state{sendfun = SendFun}) when is_record(Packet, mqtt_packet) -> trace(send, Packet, State), emqttd_metrics:sent(Packet), - Data = emqttd_serializer:serialize(Packet), - ?LOG(debug, "SEND ~p", [Data], State), - emqttd_metrics:inc('bytes/sent', size(Data)), - SendFun(Data), + SendFun(Packet), {ok, State}. trace(recv, Packet, ProtoState) -> diff --git a/src/emqttd_ws_client.erl b/src/emqttd_ws_client.erl index 776dc4ce5..7466148eb 100644 --- a/src/emqttd_ws_client.erl +++ b/src/emqttd_ws_client.erl @@ -108,7 +108,11 @@ init([WsPid, Req, ReplyChannel, PktOpts]) -> %%issue#413: trap_exit is unnecessary %%process_flag(trap_exit, true), {ok, Peername} = Req:get(peername), - SendFun = fun(Payload) -> ReplyChannel({binary, Payload}) end, + SendFun = fun(Packet) -> + Data = emqttd_serializer:serialize(Packet), + emqttd_metrics:inc('bytes/sent', size(Data)), + ReplyChannel({binary, Data}) + end, Headers = mochiweb_request:get(headers, Req), HeadersList = mochiweb_headers:to_list(Headers), ProtoState = emqttd_protocol:init(Peername, SendFun,