send(Packet)
This commit is contained in:
parent
842df63605
commit
61a64ea0b0
|
@ -80,13 +80,20 @@ init([OriginConn, MqttEnv]) ->
|
||||||
end,
|
end,
|
||||||
ConnName = esockd_net:format(PeerName),
|
ConnName = esockd_net:format(PeerName),
|
||||||
Self = self(),
|
Self = self(),
|
||||||
SendFun = fun(Data) ->
|
|
||||||
|
%%TODO: Send packet...
|
||||||
|
SendFun = fun(Packet) ->
|
||||||
|
Data = emqttd_serializer:serialize(Packet),
|
||||||
|
%%TODO: How to Log???
|
||||||
|
?LOG(debug, "SEND ~p", [Data], #client_state{connname = ConnName}),
|
||||||
|
emqttd_metrics:inc('bytes/sent', size(Data)),
|
||||||
try Connection:async_send(Data) of
|
try Connection:async_send(Data) of
|
||||||
true -> ok
|
true -> ok
|
||||||
catch
|
catch
|
||||||
error:Error -> Self ! {shutdown, Error}
|
error:Error -> Self ! {shutdown, Error}
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
|
|
||||||
PktOpts = proplists:get_value(packet, MqttEnv),
|
PktOpts = proplists:get_value(packet, MqttEnv),
|
||||||
ParserFun = emqttd_parser:new(PktOpts),
|
ParserFun = emqttd_parser:new(PktOpts),
|
||||||
ProtoState = emqttd_protocol:init(PeerName, SendFun, PktOpts),
|
ProtoState = emqttd_protocol:init(PeerName, SendFun, PktOpts),
|
||||||
|
|
|
@ -236,8 +236,8 @@ publish(Packet = ?PUBLISH_PACKET(?QOS_2, _PacketId), State) ->
|
||||||
|
|
||||||
with_puback(Type, Packet = ?PUBLISH_PACKET(_Qos, PacketId),
|
with_puback(Type, Packet = ?PUBLISH_PACKET(_Qos, PacketId),
|
||||||
State = #proto_state{client_id = ClientId,
|
State = #proto_state{client_id = ClientId,
|
||||||
username = Username,
|
username = Username,
|
||||||
session = Session}) ->
|
session = Session}) ->
|
||||||
Msg = emqttd_message:from_packet(Username, ClientId, Packet),
|
Msg = emqttd_message:from_packet(Username, ClientId, Packet),
|
||||||
case emqttd_session:publish(Session, Msg) of
|
case emqttd_session:publish(Session, Msg) of
|
||||||
ok ->
|
ok ->
|
||||||
|
@ -256,10 +256,7 @@ send(Packet, State = #proto_state{sendfun = SendFun})
|
||||||
when is_record(Packet, mqtt_packet) ->
|
when is_record(Packet, mqtt_packet) ->
|
||||||
trace(send, Packet, State),
|
trace(send, Packet, State),
|
||||||
emqttd_metrics:sent(Packet),
|
emqttd_metrics:sent(Packet),
|
||||||
Data = emqttd_serializer:serialize(Packet),
|
SendFun(Packet),
|
||||||
?LOG(debug, "SEND ~p", [Data], State),
|
|
||||||
emqttd_metrics:inc('bytes/sent', size(Data)),
|
|
||||||
SendFun(Data),
|
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
|
||||||
trace(recv, Packet, ProtoState) ->
|
trace(recv, Packet, ProtoState) ->
|
||||||
|
|
|
@ -108,7 +108,11 @@ init([WsPid, Req, ReplyChannel, PktOpts]) ->
|
||||||
%%issue#413: trap_exit is unnecessary
|
%%issue#413: trap_exit is unnecessary
|
||||||
%%process_flag(trap_exit, true),
|
%%process_flag(trap_exit, true),
|
||||||
{ok, Peername} = Req:get(peername),
|
{ok, Peername} = Req:get(peername),
|
||||||
SendFun = fun(Payload) -> ReplyChannel({binary, Payload}) end,
|
SendFun = fun(Packet) ->
|
||||||
|
Data = emqttd_serializer:serialize(Packet),
|
||||||
|
emqttd_metrics:inc('bytes/sent', size(Data)),
|
||||||
|
ReplyChannel({binary, Data})
|
||||||
|
end,
|
||||||
Headers = mochiweb_request:get(headers, Req),
|
Headers = mochiweb_request:get(headers, Req),
|
||||||
HeadersList = mochiweb_headers:to_list(Headers),
|
HeadersList = mochiweb_headers:to_list(Headers),
|
||||||
ProtoState = emqttd_protocol:init(Peername, SendFun,
|
ProtoState = emqttd_protocol:init(Peername, SendFun,
|
||||||
|
|
Loading…
Reference in New Issue