feature(tcp): alarm when tcp connection congested (#3818)

This commit is contained in:
Shawn 2020-11-10 19:07:02 +08:00 committed by JianBo He
parent cb50156194
commit cc69225b6d
6 changed files with 77 additions and 3 deletions

View File

@ -116,6 +116,13 @@ listener.tcp.external.send_timeout_close = on
## Value: on | off
## listener.tcp.external.tune_buffer = off
## The socket is set to a busy state when the amount of data queued internally
## by the ERTS socket implementation reaches this limit.
##
## Value: on | off
## Defaults to 1MB
## listener.tcp.external.high_watermark = 1MB
## The TCP_NODELAY flag for MQTT connections. Small amounts of data are
## sent immediately if the option is enabled.
##

View File

@ -1238,6 +1238,11 @@ end}.
hidden
]}.
{mapping, "listener.tcp.$name.high_watermark", "emqx.listeners", [
{datatype, bytesize},
{default, "1MB"}
]}.
{mapping, "listener.tcp.$name.tune_buffer", "emqx.listeners", [
{datatype, flag},
hidden
@ -1330,6 +1335,11 @@ end}.
hidden
]}.
{mapping, "listener.ssl.$name.high_watermark", "emqx.listeners", [
{datatype, bytesize},
{default, "1MB"}
]}.
{mapping, "listener.ssl.$name.tune_buffer", "emqx.listeners", [
{datatype, flag},
hidden
@ -1840,6 +1850,7 @@ end}.
{recbuf, cuttlefish:conf_get(Prefix ++ ".recbuf", Conf, undefined)},
{sndbuf, cuttlefish:conf_get(Prefix ++ ".sndbuf", Conf, undefined)},
{buffer, cuttlefish:conf_get(Prefix ++ ".buffer", Conf, undefined)},
{high_watermark, cuttlefish:conf_get(Prefix ++ ".high_watermark", Conf, undefined)},
{nodelay, cuttlefish:conf_get(Prefix ++ ".nodelay", Conf, true)},
{reuseaddr, cuttlefish:conf_get(Prefix ++ ".reuseaddr", Conf, undefined)}])
end,

14
src/emqx.appup.src Normal file
View File

@ -0,0 +1,14 @@
{"4.2.1",
[
{"4.2.0", [
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
{load_module, emqx_connection, brutal_purge, soft_purge, []}
]}
],
[
{"4.2.0", [
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
{load_module, emqx_connection, brutal_purge, soft_purge, []}
]}
]
}.

View File

@ -344,6 +344,8 @@ normalize_message(partition, #{occurred := Node}) ->
list_to_binary(io_lib:format("Partition occurs at node ~s", [Node]));
normalize_message(<<"resource", _/binary>>, #{type := Type, id := ID}) ->
list_to_binary(io_lib:format("Resource ~s(~s) is down", [Type, ID]));
normalize_message(<<"mqtt_conn/congested/", ClientId/binary>>, _) ->
list_to_binary(io_lib:format("MQTT connection for clientid '~s' is congested", [ClientId]));
normalize_message(_Name, _UnknownDetails) ->
<<"Unknown alarm">>.

View File

@ -103,6 +103,9 @@
-define(ENABLED(X), (X =/= undefined)).
-define(ALARM_TCP_CONGEST(Channel),
list_to_binary(io_lib:format("mqtt_conn/congested/~s", [emqx_channel:info(clientid, Channel)]))).
-dialyzer({no_match, [info/2]}).
-dialyzer({nowarn_function, [ init/4
, init_state/3
@ -428,6 +431,7 @@ handle_msg(Msg, State) ->
terminate(Reason, State = #state{channel = Channel}) ->
?LOG(debug, "Terminated due to ~p", [Reason]),
emqx_alarm:deactivate(?ALARM_TCP_CONGEST(Channel)),
emqx_channel:terminate(Reason, Channel),
close_socket(State),
exit(Reason).
@ -594,11 +598,12 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
%% Send data
-spec(send(iodata(), state()) -> ok).
send(IoData, #state{transport = Transport, socket = Socket}) ->
send(IoData, #state{transport = Transport, socket = Socket, channel = Channel}) ->
Oct = iolist_size(IoData),
ok = emqx_metrics:inc('bytes.sent', Oct),
emqx_pd:inc_counter(outgoing_bytes, Oct),
case Transport:async_send(Socket, IoData) of
maybe_warn_congestion(Socket, Transport, Channel),
case Transport:async_send(Socket, IoData, [nosuspend]) of
ok -> ok;
Error = {error, _Reason} ->
%% Send an inet_reply to postpone handling the error
@ -606,6 +611,36 @@ send(IoData, #state{transport = Transport, socket = Socket}) ->
ok
end.
maybe_warn_congestion(Socket, Transport, Channel) ->
IsCongestAlarmSet = is_congestion_alarm_set(),
case is_congested(Socket, Transport) of
true when not IsCongestAlarmSet ->
{ok, Stat} = Transport:getstat(Socket, [recv_cnt, recv_oct, send_cnt, send_oct]),
{ok, Opts} = Transport:getopts(Socket, [high_watermark,high_msgq_watermark, sndbuf, recbuf, buffer]),
ok = set_congestion_alarm(),
emqx_alarm:activate(?ALARM_TCP_CONGEST(Channel), maps:from_list(Stat++Opts));
false when IsCongestAlarmSet ->
ok = clear_congestion_alarm(),
emqx_alarm:deactivate(?ALARM_TCP_CONGEST(Channel));
_ -> ok
end.
is_congested(Socket, Transport) ->
case Transport:getstat(Socket, [send_pend]) of
{ok, [{send_pend, N}]} when N > 0 -> true;
_ -> false
end.
is_congestion_alarm_set() ->
case erlang:get(conn_congested) of
true -> true;
_ -> false
end.
set_congestion_alarm() ->
erlang:put(conn_congested, true), ok.
clear_congestion_alarm() ->
erlang:put(conn_congested, false), ok.
%%--------------------------------------------------------------------
%% Handle Info
@ -621,7 +656,7 @@ handle_info(activate_socket, State = #state{sockstate = OldSst}) ->
end;
handle_info({sock_error, Reason}, State) ->
?LOG(debug, "Socket error: ~p", [Reason]),
Reason =/= closed andalso ?LOG(error, "Socket error: ~p", [Reason]),
handle_info({sock_closed, Reason}, close_socket(State));
handle_info(Info, State) ->

View File

@ -52,6 +52,9 @@ init_per_suite(Config) ->
ok = meck:expect(emqx_channel, ensure_disconnected, fun(_, Channel) -> Channel end),
ok = meck:expect(emqx_alarm, activate, fun(_, _) -> ok end),
ok = meck:expect(emqx_alarm, deactivate, fun(_) -> ok end),
Config.
end_per_suite(_Config) ->
@ -62,6 +65,7 @@ end_per_suite(_Config) ->
ok = meck:unload(emqx_pd),
ok = meck:unload(emqx_metrics),
ok = meck:unload(emqx_hooks),
ok = meck:unload(emqx_alarm),
ok.
init_per_testcase(_TestCase, Config) ->
@ -77,6 +81,7 @@ init_per_testcase(_TestCase, Config) ->
{ok, [{K, 0} || K <- Options]}
end),
ok = meck:expect(emqx_transport, async_send, fun(_Sock, _Data) -> ok end),
ok = meck:expect(emqx_transport, async_send, fun(_Sock, _Data, _Opts) -> ok end),
ok = meck:expect(emqx_transport, fast_close, fun(_Sock) -> ok end),
Config.