Shutdown the connection if an error occurred when sending data

This commit is contained in:
Feng Lee 2017-12-05 23:41:40 +08:00
parent 8e41aeeeb8
commit 51533dbe9e
4 changed files with 10 additions and 6 deletions

View File

@ -140,7 +140,8 @@ send_fun(Conn, Peername) ->
?LOG(debug, "SEND ~p", [Data], #client_state{peername = Peername}), ?LOG(debug, "SEND ~p", [Data], #client_state{peername = Peername}),
emqttd_metrics:inc('bytes/sent', iolist_size(Data)), emqttd_metrics:inc('bytes/sent', iolist_size(Data)),
try Conn:async_send(Data) of try Conn:async_send(Data) of
true -> ok ok -> ok;
{error, Reason} -> Self ! {shutdown, Reason}
catch catch
error:Error -> Self ! {shutdown, Error} error:Error -> Self ! {shutdown, Error}
end end

View File

@ -341,13 +341,11 @@ send(Msg, State = #proto_state{client_id = ClientId,
emqttd_hooks:run('message.delivered', [ClientId, Username], Msg), emqttd_hooks:run('message.delivered', [ClientId, Username], Msg),
send(emqttd_message:to_packet(unmount(MountPoint, clean_retain(IsBridge, Msg))), State); send(emqttd_message:to_packet(unmount(MountPoint, clean_retain(IsBridge, Msg))), State);
send(Packet = ?PACKET(Type), send(Packet = ?PACKET(Type), State = #proto_state{sendfun = SendFun, stats_data = Stats}) ->
State = #proto_state{sendfun = SendFun, stats_data = Stats}) ->
trace(send, Packet, State), trace(send, Packet, State),
emqttd_metrics:sent(Packet), emqttd_metrics:sent(Packet),
SendFun(Packet), SendFun(Packet),
Stats1 = inc_stats(send, Type, Stats), {ok, State#proto_state{stats_data = inc_stats(send, Type, Stats)}}.
{ok, State#proto_state{stats_data = Stats1}}.
trace(recv, Packet, ProtoState) -> trace(recv, Packet, ProtoState) ->
?LOG(debug, "RECV ~s", [emqttd_packet:format(Packet)], ProtoState); ?LOG(debug, "RECV ~s", [emqttd_packet:format(Packet)], ProtoState);

View File

@ -38,6 +38,7 @@ handle_request(Req) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% MQTT Over WebSocket %% MQTT Over WebSocket
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
handle_request('GET', "/mqtt", Req) -> handle_request('GET', "/mqtt", Req) ->
lager:debug("WebSocket Connection from: ~s", [Req:get(peer)]), lager:debug("WebSocket Connection from: ~s", [Req:get(peer)]),
Upgrade = Req:get_header_value("Upgrade"), Upgrade = Req:get_header_value("Upgrade"),

View File

@ -272,10 +272,14 @@ code_change(_OldVsn, State, _Extra) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
send_fun(ReplyChannel) -> send_fun(ReplyChannel) ->
Self = self(),
fun(Packet) -> fun(Packet) ->
Data = emqttd_serializer:serialize(Packet), Data = emqttd_serializer:serialize(Packet),
emqttd_metrics:inc('bytes/sent', iolist_size(Data)), emqttd_metrics:inc('bytes/sent', iolist_size(Data)),
ReplyChannel({binary, Data}) case ReplyChannel({binary, Data}) of
ok -> ok;
{error, Reason} -> Self ! {shutdown, Reason}
end
end. end.
stat_fun(Conn) -> stat_fun(Conn) ->