diff --git a/etc/emq.conf b/etc/emq.conf index 9b37860b9..d6dacc8fb 100644 --- a/etc/emq.conf +++ b/etc/emq.conf @@ -343,6 +343,10 @@ listener.tcp.external.access.2 = allow all ## TCP Socket Options listener.tcp.external.backlog = 1024 +listener.tcp.external.send_timeout = 15s + +listener.tcp.external.send_timeout_close = on + #listener.tcp.external.recbuf = 4KB #listener.tcp.external.sndbuf = 4KB @@ -371,6 +375,10 @@ listener.tcp.internal.max_clients = 102400 ## TCP Socket Options listener.tcp.internal.backlog = 512 +listener.tcp.internal.send_timeout = 15s + +listener.tcp.external.send_timeout_close = on + listener.tcp.internal.tune_buffer = on listener.tcp.internal.buffer = 1MB @@ -477,6 +485,10 @@ listener.ssl.external.certfile = {{ platform_etc_dir }}/certs/cert.pem ## SSL Socket Options ## listener.ssl.external.backlog = 1024 +## listener.ssl.external.send_timeout = 15s + +## listener.ssl.external.send_timeout_close = on + ## listener.ssl.external.recbuf = 4KB ## listener.ssl.external.sndbuf = 4KB @@ -499,6 +511,10 @@ listener.ws.external.access.1 = allow all ## TCP Options listener.ws.external.backlog = 1024 +listener.ws.external.send_timeout = 15s + +listener.ws.external.send_timeout_close = on + listener.ws.external.recbuf = 4KB listener.ws.external.sndbuf = 4KB @@ -531,6 +547,20 @@ listener.wss.external.certfile = {{ platform_etc_dir }}/certs/cert.pem ## listener.wss.external.fail_if_no_peer_cert = true +listener.wss.external.backlog = 1024 + +listener.wss.external.send_timeout = 15s + +listener.wss.external.send_timeout_close = on + +## listener.wss.external.recbuf = 4KB + +## listener.wss.external.sndbuf = 4KB + +## listener.wss.external.buffer = 4KB + +## listener.wss.external.nodelay = true + ##-------------------------------------------------------------------- ## HTTP Management API Listener @@ -542,6 +572,12 @@ listener.api.mgmt.max_clients = 64 listener.api.mgmt.access.1 = allow all +listener.api.mgmt.backlog = 512 + +listener.api.mgmt.send_timeout = 15s + +listener.api.mgmt.send_timeout_close = on + ##------------------------------------------------------------------- ## System Monitor ##------------------------------------------------------------------- diff --git a/priv/emq.schema b/priv/emq.schema index d05cc79cf..e8746582b 100644 --- a/priv/emq.schema +++ b/priv/emq.schema @@ -805,8 +805,18 @@ end}. ]}. {mapping, "listener.tcp.$name.backlog", "emqttd.listeners", [ - {default, 1024}, - {datatype, integer} + {datatype, integer}, + {default, 1024} +]}. + +{mapping, "listener.tcp.$name.send_timeout", "emqttd.listeners", [ + {datatype, {duration, ms}}, + {default, "15s"} +]}. + +{mapping, "listener.tcp.$name.send_timeout_close", "emqttd.listeners", [ + {datatype, flag}, + {default, on} ]}. {mapping, "listener.tcp.$name.recbuf", "emqttd.listeners", [ @@ -883,6 +893,16 @@ end}. {datatype, integer} ]}. +{mapping, "listener.ssl.$name.send_timeout", "emqttd.listeners", [ + {datatype, {duration, ms}}, + {default, "15s"} +]}. + +{mapping, "listener.ssl.$name.send_timeout_close", "emqttd.listeners", [ + {datatype, flag}, + {default, on} +]}. + {mapping, "listener.ssl.$name.recbuf", "emqttd.listeners", [ {datatype, bytesize}, hidden @@ -996,6 +1016,16 @@ end}. {datatype, integer} ]}. +{mapping, "listener.ws.$name.send_timeout", "emqttd.listeners", [ + {datatype, {duration, ms}}, + {default, "15s"} +]}. + +{mapping, "listener.ws.$name.send_timeout_close", "emqttd.listeners", [ + {datatype, flag}, + {default, on} +]}. + {mapping, "listener.ws.$name.recbuf", "emqttd.listeners", [ {datatype, bytesize}, hidden @@ -1059,6 +1089,16 @@ end}. {datatype, integer} ]}. +{mapping, "listener.wss.$name.send_timeout", "emqttd.listeners", [ + {datatype, {duration, ms}}, + {default, "15s"} +]}. + +{mapping, "listener.wss.$name.send_timeout_close", "emqttd.listeners", [ + {datatype, flag}, + {default, on} +]}. + {mapping, "listener.wss.$name.recbuf", "emqttd.listeners", [ {datatype, bytesize}, hidden @@ -1145,6 +1185,8 @@ end}. end, TcpOpts = fun(Prefix) -> Filter([{backlog, cuttlefish:conf_get(Prefix ++ ".backlog", Conf, undefined)}, + {send_timeout, cuttlefish:conf_get(Prefix ++ ".send_timeout", Conf, undefined)}, + {send_timeout_close, cuttlefish:conf_get(Prefix ++ ".send_timeout_close", Conf, undefined)}, {recbuf, cuttlefish:conf_get(Prefix ++ ".recbuf", Conf, undefined)}, {sndbuf, cuttlefish:conf_get(Prefix ++ ".sndbuf", Conf, undefined)}, {buffer, cuttlefish:conf_get(Prefix ++ ".buffer", Conf, undefined)}, @@ -1252,6 +1294,16 @@ end}. {datatype, integer} ]}. +{mapping, "listener.api.$name.send_timeout", "emqttd.listeners", [ + {datatype, {duration, ms}}, + {default, "15s"} +]}. + +{mapping, "listener.api.$name.send_timeout_close", "emqttd.listeners", [ + {datatype, flag}, + {default, on} +]}. + {mapping, "listener.api.$name.recbuf", "emqttd.listeners", [ {datatype, bytesize}, hidden diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index 6631b4566..63bac7c8d 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -140,7 +140,8 @@ send_fun(Conn, Peername) -> ?LOG(debug, "SEND ~p", [Data], #client_state{peername = Peername}), emqttd_metrics:inc('bytes/sent', iolist_size(Data)), try Conn:async_send(Data) of - true -> ok + ok -> ok; + {error, Reason} -> Self ! {shutdown, Reason} catch error:Error -> Self ! {shutdown, Error} end diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index 384f93225..c35e8ae50 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -341,13 +341,11 @@ send(Msg, State = #proto_state{client_id = ClientId, emqttd_hooks:run('message.delivered', [ClientId, Username], Msg), send(emqttd_message:to_packet(unmount(MountPoint, clean_retain(IsBridge, Msg))), State); -send(Packet = ?PACKET(Type), - State = #proto_state{sendfun = SendFun, stats_data = Stats}) -> +send(Packet = ?PACKET(Type), State = #proto_state{sendfun = SendFun, stats_data = Stats}) -> trace(send, Packet, State), emqttd_metrics:sent(Packet), SendFun(Packet), - Stats1 = inc_stats(send, Type, Stats), - {ok, State#proto_state{stats_data = Stats1}}. + {ok, State#proto_state{stats_data = inc_stats(send, Type, Stats)}}. trace(recv, Packet, ProtoState) -> ?LOG(debug, "RECV ~s", [emqttd_packet:format(Packet)], ProtoState); diff --git a/src/emqttd_ws.erl b/src/emqttd_ws.erl index c7d0b2119..35a7f9852 100644 --- a/src/emqttd_ws.erl +++ b/src/emqttd_ws.erl @@ -38,6 +38,7 @@ handle_request(Req) -> %%-------------------------------------------------------------------- %% MQTT Over WebSocket %%-------------------------------------------------------------------- + handle_request('GET', "/mqtt", Req) -> lager:debug("WebSocket Connection from: ~s", [Req:get(peer)]), Upgrade = Req:get_header_value("Upgrade"), diff --git a/src/emqttd_ws_client.erl b/src/emqttd_ws_client.erl index b9d25ad3e..206f461bb 100644 --- a/src/emqttd_ws_client.erl +++ b/src/emqttd_ws_client.erl @@ -272,10 +272,14 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- send_fun(ReplyChannel) -> + Self = self(), fun(Packet) -> Data = emqttd_serializer:serialize(Packet), emqttd_metrics:inc('bytes/sent', iolist_size(Data)), - ReplyChannel({binary, Data}) + case ReplyChannel({binary, Data}) of + ok -> ok; + {error, Reason} -> Self ! {shutdown, Reason} + end end. stat_fun(Conn) ->