Merge pull request #1376 from emqtt/send_timeout

Add send_timeout, send_timeout_close options
This commit is contained in:
Feng Lee 2017-12-06 09:40:03 +08:00 committed by GitHub
commit a86fe1c066
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 100 additions and 8 deletions

View File

@ -343,6 +343,10 @@ listener.tcp.external.access.2 = allow all
## TCP Socket Options ## TCP Socket Options
listener.tcp.external.backlog = 1024 listener.tcp.external.backlog = 1024
listener.tcp.external.send_timeout = 15s
listener.tcp.external.send_timeout_close = on
#listener.tcp.external.recbuf = 4KB #listener.tcp.external.recbuf = 4KB
#listener.tcp.external.sndbuf = 4KB #listener.tcp.external.sndbuf = 4KB
@ -371,6 +375,10 @@ listener.tcp.internal.max_clients = 102400
## TCP Socket Options ## TCP Socket Options
listener.tcp.internal.backlog = 512 listener.tcp.internal.backlog = 512
listener.tcp.internal.send_timeout = 15s
listener.tcp.external.send_timeout_close = on
listener.tcp.internal.tune_buffer = on listener.tcp.internal.tune_buffer = on
listener.tcp.internal.buffer = 1MB listener.tcp.internal.buffer = 1MB
@ -477,6 +485,10 @@ listener.ssl.external.certfile = {{ platform_etc_dir }}/certs/cert.pem
## SSL Socket Options ## SSL Socket Options
## listener.ssl.external.backlog = 1024 ## listener.ssl.external.backlog = 1024
## listener.ssl.external.send_timeout = 15s
## listener.ssl.external.send_timeout_close = on
## listener.ssl.external.recbuf = 4KB ## listener.ssl.external.recbuf = 4KB
## listener.ssl.external.sndbuf = 4KB ## listener.ssl.external.sndbuf = 4KB
@ -499,6 +511,10 @@ listener.ws.external.access.1 = allow all
## TCP Options ## TCP Options
listener.ws.external.backlog = 1024 listener.ws.external.backlog = 1024
listener.ws.external.send_timeout = 15s
listener.ws.external.send_timeout_close = on
listener.ws.external.recbuf = 4KB listener.ws.external.recbuf = 4KB
listener.ws.external.sndbuf = 4KB listener.ws.external.sndbuf = 4KB
@ -531,6 +547,20 @@ listener.wss.external.certfile = {{ platform_etc_dir }}/certs/cert.pem
## listener.wss.external.fail_if_no_peer_cert = true ## listener.wss.external.fail_if_no_peer_cert = true
listener.wss.external.backlog = 1024
listener.wss.external.send_timeout = 15s
listener.wss.external.send_timeout_close = on
## listener.wss.external.recbuf = 4KB
## listener.wss.external.sndbuf = 4KB
## listener.wss.external.buffer = 4KB
## listener.wss.external.nodelay = true
##-------------------------------------------------------------------- ##--------------------------------------------------------------------
## HTTP Management API Listener ## HTTP Management API Listener
@ -542,6 +572,12 @@ listener.api.mgmt.max_clients = 64
listener.api.mgmt.access.1 = allow all listener.api.mgmt.access.1 = allow all
listener.api.mgmt.backlog = 512
listener.api.mgmt.send_timeout = 15s
listener.api.mgmt.send_timeout_close = on
##------------------------------------------------------------------- ##-------------------------------------------------------------------
## System Monitor ## System Monitor
##------------------------------------------------------------------- ##-------------------------------------------------------------------

View File

@ -805,8 +805,18 @@ end}.
]}. ]}.
{mapping, "listener.tcp.$name.backlog", "emqttd.listeners", [ {mapping, "listener.tcp.$name.backlog", "emqttd.listeners", [
{default, 1024}, {datatype, integer},
{datatype, integer} {default, 1024}
]}.
{mapping, "listener.tcp.$name.send_timeout", "emqttd.listeners", [
{datatype, {duration, ms}},
{default, "15s"}
]}.
{mapping, "listener.tcp.$name.send_timeout_close", "emqttd.listeners", [
{datatype, flag},
{default, on}
]}. ]}.
{mapping, "listener.tcp.$name.recbuf", "emqttd.listeners", [ {mapping, "listener.tcp.$name.recbuf", "emqttd.listeners", [
@ -883,6 +893,16 @@ end}.
{datatype, integer} {datatype, integer}
]}. ]}.
{mapping, "listener.ssl.$name.send_timeout", "emqttd.listeners", [
{datatype, {duration, ms}},
{default, "15s"}
]}.
{mapping, "listener.ssl.$name.send_timeout_close", "emqttd.listeners", [
{datatype, flag},
{default, on}
]}.
{mapping, "listener.ssl.$name.recbuf", "emqttd.listeners", [ {mapping, "listener.ssl.$name.recbuf", "emqttd.listeners", [
{datatype, bytesize}, {datatype, bytesize},
hidden hidden
@ -996,6 +1016,16 @@ end}.
{datatype, integer} {datatype, integer}
]}. ]}.
{mapping, "listener.ws.$name.send_timeout", "emqttd.listeners", [
{datatype, {duration, ms}},
{default, "15s"}
]}.
{mapping, "listener.ws.$name.send_timeout_close", "emqttd.listeners", [
{datatype, flag},
{default, on}
]}.
{mapping, "listener.ws.$name.recbuf", "emqttd.listeners", [ {mapping, "listener.ws.$name.recbuf", "emqttd.listeners", [
{datatype, bytesize}, {datatype, bytesize},
hidden hidden
@ -1059,6 +1089,16 @@ end}.
{datatype, integer} {datatype, integer}
]}. ]}.
{mapping, "listener.wss.$name.send_timeout", "emqttd.listeners", [
{datatype, {duration, ms}},
{default, "15s"}
]}.
{mapping, "listener.wss.$name.send_timeout_close", "emqttd.listeners", [
{datatype, flag},
{default, on}
]}.
{mapping, "listener.wss.$name.recbuf", "emqttd.listeners", [ {mapping, "listener.wss.$name.recbuf", "emqttd.listeners", [
{datatype, bytesize}, {datatype, bytesize},
hidden hidden
@ -1145,6 +1185,8 @@ end}.
end, end,
TcpOpts = fun(Prefix) -> TcpOpts = fun(Prefix) ->
Filter([{backlog, cuttlefish:conf_get(Prefix ++ ".backlog", Conf, undefined)}, Filter([{backlog, cuttlefish:conf_get(Prefix ++ ".backlog", Conf, undefined)},
{send_timeout, cuttlefish:conf_get(Prefix ++ ".send_timeout", Conf, undefined)},
{send_timeout_close, cuttlefish:conf_get(Prefix ++ ".send_timeout_close", Conf, undefined)},
{recbuf, cuttlefish:conf_get(Prefix ++ ".recbuf", Conf, undefined)}, {recbuf, cuttlefish:conf_get(Prefix ++ ".recbuf", Conf, undefined)},
{sndbuf, cuttlefish:conf_get(Prefix ++ ".sndbuf", Conf, undefined)}, {sndbuf, cuttlefish:conf_get(Prefix ++ ".sndbuf", Conf, undefined)},
{buffer, cuttlefish:conf_get(Prefix ++ ".buffer", Conf, undefined)}, {buffer, cuttlefish:conf_get(Prefix ++ ".buffer", Conf, undefined)},
@ -1252,6 +1294,16 @@ end}.
{datatype, integer} {datatype, integer}
]}. ]}.
{mapping, "listener.api.$name.send_timeout", "emqttd.listeners", [
{datatype, {duration, ms}},
{default, "15s"}
]}.
{mapping, "listener.api.$name.send_timeout_close", "emqttd.listeners", [
{datatype, flag},
{default, on}
]}.
{mapping, "listener.api.$name.recbuf", "emqttd.listeners", [ {mapping, "listener.api.$name.recbuf", "emqttd.listeners", [
{datatype, bytesize}, {datatype, bytesize},
hidden hidden

View File

@ -140,7 +140,8 @@ send_fun(Conn, Peername) ->
?LOG(debug, "SEND ~p", [Data], #client_state{peername = Peername}), ?LOG(debug, "SEND ~p", [Data], #client_state{peername = Peername}),
emqttd_metrics:inc('bytes/sent', iolist_size(Data)), emqttd_metrics:inc('bytes/sent', iolist_size(Data)),
try Conn:async_send(Data) of try Conn:async_send(Data) of
true -> ok ok -> ok;
{error, Reason} -> Self ! {shutdown, Reason}
catch catch
error:Error -> Self ! {shutdown, Error} error:Error -> Self ! {shutdown, Error}
end end

View File

@ -341,13 +341,11 @@ send(Msg, State = #proto_state{client_id = ClientId,
emqttd_hooks:run('message.delivered', [ClientId, Username], Msg), emqttd_hooks:run('message.delivered', [ClientId, Username], Msg),
send(emqttd_message:to_packet(unmount(MountPoint, clean_retain(IsBridge, Msg))), State); send(emqttd_message:to_packet(unmount(MountPoint, clean_retain(IsBridge, Msg))), State);
send(Packet = ?PACKET(Type), send(Packet = ?PACKET(Type), State = #proto_state{sendfun = SendFun, stats_data = Stats}) ->
State = #proto_state{sendfun = SendFun, stats_data = Stats}) ->
trace(send, Packet, State), trace(send, Packet, State),
emqttd_metrics:sent(Packet), emqttd_metrics:sent(Packet),
SendFun(Packet), SendFun(Packet),
Stats1 = inc_stats(send, Type, Stats), {ok, State#proto_state{stats_data = inc_stats(send, Type, Stats)}}.
{ok, State#proto_state{stats_data = Stats1}}.
trace(recv, Packet, ProtoState) -> trace(recv, Packet, ProtoState) ->
?LOG(debug, "RECV ~s", [emqttd_packet:format(Packet)], ProtoState); ?LOG(debug, "RECV ~s", [emqttd_packet:format(Packet)], ProtoState);

View File

@ -38,6 +38,7 @@ handle_request(Req) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% MQTT Over WebSocket %% MQTT Over WebSocket
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
handle_request('GET', "/mqtt", Req) -> handle_request('GET', "/mqtt", Req) ->
lager:debug("WebSocket Connection from: ~s", [Req:get(peer)]), lager:debug("WebSocket Connection from: ~s", [Req:get(peer)]),
Upgrade = Req:get_header_value("Upgrade"), Upgrade = Req:get_header_value("Upgrade"),

View File

@ -272,10 +272,14 @@ code_change(_OldVsn, State, _Extra) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
send_fun(ReplyChannel) -> send_fun(ReplyChannel) ->
Self = self(),
fun(Packet) -> fun(Packet) ->
Data = emqttd_serializer:serialize(Packet), Data = emqttd_serializer:serialize(Packet),
emqttd_metrics:inc('bytes/sent', iolist_size(Data)), emqttd_metrics:inc('bytes/sent', iolist_size(Data)),
ReplyChannel({binary, Data}) case ReplyChannel({binary, Data}) of
ok -> ok;
{error, Reason} -> Self ! {shutdown, Reason}
end
end. end.
stat_fun(Conn) -> stat_fun(Conn) ->