diff --git a/etc/emqx.conf b/etc/emqx.conf index ddc1ed755..daa69d713 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -1139,6 +1139,13 @@ listener.tcp.external.send_timeout_close = on ## Value: on | off ## listener.tcp.external.tune_buffer = off +## The socket is set to a busy state when the amount of data queued internally +## by the ERTS socket implementation reaches this limit. +## +## Value: on | off +## Defaults to 1MB +## listener.tcp.external.high_watermark = 1MB + ## The TCP_NODELAY flag for MQTT connections. Small amounts of data are ## sent immediately if the option is enabled. ## diff --git a/priv/emqx.schema b/priv/emqx.schema index ccecd8315..3abd63f76 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1244,6 +1244,11 @@ end}. hidden ]}. +{mapping, "listener.tcp.$name.high_watermark", "emqx.listeners", [ + {datatype, bytesize}, + {default, "1MB"} + ]}. + {mapping, "listener.tcp.$name.tune_buffer", "emqx.listeners", [ {datatype, flag}, hidden @@ -1336,6 +1341,11 @@ end}. hidden ]}. +{mapping, "listener.ssl.$name.high_watermark", "emqx.listeners", [ + {datatype, bytesize}, + {default, "1MB"} + ]}. + {mapping, "listener.ssl.$name.tune_buffer", "emqx.listeners", [ {datatype, flag}, hidden @@ -1844,6 +1854,7 @@ end}. {recbuf, cuttlefish:conf_get(Prefix ++ ".recbuf", Conf, undefined)}, {sndbuf, cuttlefish:conf_get(Prefix ++ ".sndbuf", Conf, undefined)}, {buffer, cuttlefish:conf_get(Prefix ++ ".buffer", Conf, undefined)}, + {high_watermark, cuttlefish:conf_get(Prefix ++ ".high_watermark", Conf, undefined)}, {nodelay, cuttlefish:conf_get(Prefix ++ ".nodelay", Conf, true)}, {reuseaddr, cuttlefish:conf_get(Prefix ++ ".reuseaddr", Conf, undefined)}]) end, diff --git a/src/emqx_alarm.erl b/src/emqx_alarm.erl index 37d6fe1e8..f224f5c03 100644 --- a/src/emqx_alarm.erl +++ b/src/emqx_alarm.erl @@ -344,6 +344,8 @@ normalize_message(partition, #{occurred := Node}) -> list_to_binary(io_lib:format("Partition occurs at node ~s", [Node])); normalize_message(<<"resource", _/binary>>, #{type := Type, id := ID}) -> list_to_binary(io_lib:format("Resource ~s(~s) is down", [Type, ID])); +normalize_message(<<"mqtt_conn/congested/", ClientId/binary>>, _) -> + list_to_binary(io_lib:format("MQTT connection for clientid '~s' is congested", [ClientId])); normalize_message(_Name, _UnknownDetails) -> <<"Unknown alarm">>. diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index cdd57e752..0714d1798 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -103,6 +103,9 @@ -define(ENABLED(X), (X =/= undefined)). +-define(ALARM_TCP_CONGEST(Channel), + list_to_binary(io_lib:format("mqtt_conn/congested/~s", [emqx_channel:info(clientid, Channel)]))). + -dialyzer({no_match, [info/2]}). -dialyzer({nowarn_function, [ init/4 , init_state/3 @@ -429,6 +432,7 @@ handle_msg(Msg, State) -> terminate(Reason, State = #state{channel = Channel}) -> ?LOG(debug, "Terminated due to ~p", [Reason]), + emqx_alarm:deactivate(?ALARM_TCP_CONGEST(Channel)), emqx_channel:terminate(Reason, Channel), close_socket(State), exit(Reason). @@ -595,11 +599,12 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> %% Send data -spec(send(iodata(), state()) -> ok). -send(IoData, #state{transport = Transport, socket = Socket}) -> +send(IoData, #state{transport = Transport, socket = Socket, channel = Channel}) -> Oct = iolist_size(IoData), ok = emqx_metrics:inc('bytes.sent', Oct), emqx_pd:inc_counter(outgoing_bytes, Oct), - case Transport:async_send(Socket, IoData) of + maybe_warn_congestion(Socket, Transport, Channel), + case Transport:async_send(Socket, IoData, [nosuspend]) of ok -> ok; Error = {error, _Reason} -> %% Send an inet_reply to postpone handling the error @@ -607,6 +612,36 @@ send(IoData, #state{transport = Transport, socket = Socket}) -> ok end. +maybe_warn_congestion(Socket, Transport, Channel) -> + IsCongestAlarmSet = is_congestion_alarm_set(), + case is_congested(Socket, Transport) of + true when not IsCongestAlarmSet -> + {ok, Stat} = Transport:getstat(Socket, [recv_cnt, recv_oct, send_cnt, send_oct]), + {ok, Opts} = Transport:getopts(Socket, [high_watermark,high_msgq_watermark, sndbuf, recbuf, buffer]), + ok = set_congestion_alarm(), + emqx_alarm:activate(?ALARM_TCP_CONGEST(Channel), maps:from_list(Stat++Opts)); + false when IsCongestAlarmSet -> + ok = clear_congestion_alarm(), + emqx_alarm:deactivate(?ALARM_TCP_CONGEST(Channel)); + _ -> ok + end. + +is_congested(Socket, Transport) -> + case Transport:getstat(Socket, [send_pend]) of + {ok, [{send_pend, N}]} when N > 0 -> true; + _ -> false + end. + +is_congestion_alarm_set() -> + case erlang:get(conn_congested) of + true -> true; + _ -> false + end. +set_congestion_alarm() -> + erlang:put(conn_congested, true), ok. +clear_congestion_alarm() -> + erlang:put(conn_congested, false), ok. + %%-------------------------------------------------------------------- %% Handle Info @@ -622,7 +657,7 @@ handle_info(activate_socket, State = #state{sockstate = OldSst}) -> end; handle_info({sock_error, Reason}, State) -> - ?LOG(debug, "Socket error: ~p", [Reason]), + Reason =/= closed andalso ?LOG(error, "Socket error: ~p", [Reason]), handle_info({sock_closed, Reason}, close_socket(State)); handle_info(Info, State) -> diff --git a/test/emqx_connection_SUITE.erl b/test/emqx_connection_SUITE.erl index b908fe4ab..2538aeecb 100644 --- a/test/emqx_connection_SUITE.erl +++ b/test/emqx_connection_SUITE.erl @@ -52,6 +52,9 @@ init_per_suite(Config) -> ok = meck:expect(emqx_channel, ensure_disconnected, fun(_, Channel) -> Channel end), + ok = meck:expect(emqx_alarm, activate, fun(_, _) -> ok end), + ok = meck:expect(emqx_alarm, deactivate, fun(_) -> ok end), + Config. end_per_suite(_Config) -> @@ -62,6 +65,7 @@ end_per_suite(_Config) -> ok = meck:unload(emqx_pd), ok = meck:unload(emqx_metrics), ok = meck:unload(emqx_hooks), + ok = meck:unload(emqx_alarm), ok. init_per_testcase(_TestCase, Config) -> @@ -77,6 +81,7 @@ init_per_testcase(_TestCase, Config) -> {ok, [{K, 0} || K <- Options]} end), ok = meck:expect(emqx_transport, async_send, fun(_Sock, _Data) -> ok end), + ok = meck:expect(emqx_transport, async_send, fun(_Sock, _Data, _Opts) -> ok end), ok = meck:expect(emqx_transport, fast_close, fun(_Sock) -> ok end), Config.