Porting code for congestion alarms from 4.3.0 to e4.2.6 (#4523)

* fix(congestion): port some code from 4.3.0
* chore(emqx): update the appup file
This commit is contained in:
Shawn 2021-04-12 10:13:08 +08:00 committed by turtleDeng
parent 927264d793
commit 7d003e0bfc
6 changed files with 89 additions and 62 deletions

View File

@ -184,6 +184,30 @@ zone.external.enable_flapping_detect = off
## Example: 100KB incoming per 10 seconds. ## Example: 100KB incoming per 10 seconds.
#zone.external.rate_limit.conn_bytes_in = 100KB,10s #zone.external.rate_limit.conn_bytes_in = 100KB,10s
## Whether to alarm the congested connections.
##
## Sometimes the mqtt connection (usually an MQTT subscriber) may get "congested" because
## there're too many packets to sent. The socket trys to buffer the packets until the buffer is
## full. If more packets comes after that, the packets will be "pending" in a queue
## and we consider the connection is "congested".
##
## Enable this to send an alarm when there's any bytes pending in the queue. You could set
## the `listener.tcp.<ZoneName>.sndbuf` to a larger value if the alarm is triggered too often.
##
## The name of the alarm is of format "conn_congestion/<ClientID>/<Username>".
## Where the <ClientID> is the client-id of the congested MQTT connection.
## And the <Username> is the username or "unknown_user" of not provided by the client.
## Default: off
#zone.external.conn_congestion.alarm = off
## Won't clear the congested alarm in how long time.
## The alarm is cleared only when there're no pending bytes in the queue, and also it has been
## `min_alarm_sustain_duration` time since the last time we considered the connection is "congested".
##
## This is to avoid clearing and sending the alarm again too often.
## Default: 1m
#zone.external.conn_congestion.min_alarm_sustain_duration = 1m
## Messages quota for the each of external MQTT connection. ## Messages quota for the each of external MQTT connection.
## This value consumed by the number of recipient on a message. ## This value consumed by the number of recipient on a message.
## ##

View File

@ -999,6 +999,16 @@ end}.
{datatype, string} {datatype, string}
]}. ]}.
{mapping, "zone.$name.conn_congestion.alarm", "emqx.zones", [
{datatype, flag},
{default, off}
]}.
{mapping, "zone.$name.conn_congestion.min_alarm_sustain_duration", "emqx.zones", [
{default, "1m"},
{datatype, {duration, ms}}
]}.
{mapping, "zone.$name.quota.conn_messages_routing", "emqx.zones", [ {mapping, "zone.$name.quota.conn_messages_routing", "emqx.zones", [
{datatype, string} {datatype, string}
]}. ]}.
@ -1128,6 +1138,10 @@ end}.
{ratelimit, {conn_messages_in, Ratelimit(Val)}}; {ratelimit, {conn_messages_in, Ratelimit(Val)}};
(["rate_limit", "conn_bytes_in"], Val) -> (["rate_limit", "conn_bytes_in"], Val) ->
{ratelimit, {conn_bytes_in, Ratelimit(Val)}}; {ratelimit, {conn_bytes_in, Ratelimit(Val)}};
(["conn_congestion", "alarm"], Val) ->
{conn_congestion_alarm_enabled, Val};
(["conn_congestion", "min_alarm_sustain_duration"], Val) ->
{conn_congestion_min_alarm_sustain_duration, Val};
(["quota", "conn_messages_routing"], Val) -> (["quota", "conn_messages_routing"], Val) ->
{quota, {conn_messages_routing, Ratelimit(Val)}}; {quota, {conn_messages_routing, Ratelimit(Val)}};
(["quota", "overall_messages_routing"], Val) -> (["quota", "overall_messages_routing"], Val) ->

View File

@ -44,11 +44,17 @@
{<<"4.2.4">>, [ {<<"4.2.4">>, [
{load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_frame, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []} {load_module, emqx_session, brutal_purge, soft_purge, []},
{load_module, emqx_congestion, brutal_purge, soft_purge, []},
{load_module, emqx_connection, brutal_purge, soft_purge, []},
{load_module, emqx_alarm, brutal_purge, soft_purge, []}
]}, ]},
{<<"4.2.5">>, [ {<<"4.2.5">>, [
{load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []} {load_module, emqx_session, brutal_purge, soft_purge, []},
{load_module, emqx_congestion, brutal_purge, soft_purge, []},
{load_module, emqx_connection, brutal_purge, soft_purge, []},
{load_module, emqx_alarm, brutal_purge, soft_purge, []}
]}, ]},
{<<".*">>, []} {<<".*">>, []}
], ],
@ -95,11 +101,17 @@
{<<"4.2.4">>, [ {<<"4.2.4">>, [
{load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_frame, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_congestion, brutal_purge, soft_purge, []},
{load_module, emqx_connection, brutal_purge, soft_purge, []},
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []} {load_module, emqx_session, brutal_purge, soft_purge, []}
]}, ]},
{<<"4.2.5">>, [ {<<"4.2.5">>, [
{load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_session, brutal_purge, soft_purge, []} {load_module, emqx_session, brutal_purge, soft_purge, []},
{load_module, emqx_congestion, brutal_purge, soft_purge, []},
{load_module, emqx_connection, brutal_purge, soft_purge, []},
{load_module, emqx_alarm, brutal_purge, soft_purge, []}
]}, ]},
{<<".*">>, []} {<<".*">>, []}
] ]

View File

@ -361,7 +361,7 @@ normalize_message(partition, #{occurred := Node}) ->
list_to_binary(io_lib:format("Partition occurs at node ~s", [Node])); list_to_binary(io_lib:format("Partition occurs at node ~s", [Node]));
normalize_message(<<"resource", _/binary>>, #{type := Type, id := ID}) -> normalize_message(<<"resource", _/binary>>, #{type := Type, id := ID}) ->
list_to_binary(io_lib:format("Resource ~s(~s) is down", [Type, ID])); list_to_binary(io_lib:format("Resource ~s(~s) is down", [Type, ID]));
normalize_message(<<"mqtt_conn/congested/", Info/binary>>, _) -> normalize_message(<<"conn_congestion/", Info/binary>>, _) ->
list_to_binary(io_lib:format("MQTT connection congested: ~s", [Info])); list_to_binary(io_lib:format("connection congested: ~s", [Info]));
normalize_message(_Name, _UnknownDetails) -> normalize_message(_Name, _UnknownDetails) ->
<<"Unknown alarm">>. <<"Unknown alarm">>.

View File

@ -16,17 +16,16 @@
-module(emqx_congestion). -module(emqx_congestion).
-export([ maybe_alarm_port_busy/3 -export([ maybe_alarm_conn_congestion/3
, maybe_alarm_port_busy/4
, maybe_alarm_too_many_publish/5
, maybe_alarm_too_many_publish/6
, cancel_alarms/3 , cancel_alarms/3
]). ]).
-define(ALARM_CONN_CONGEST(Channel, Reason), -define(ALARM_CONN_CONGEST(Channel, Reason),
list_to_binary(io_lib:format("mqtt_conn/congested/~s/~s/~s", [emqx_channel:info(clientid, Channel), list_to_binary(
maps:get(username, emqx_channel:info(clientinfo, Channel), <<"undefined">>), io_lib:format("~s/~s/~s",
Reason]))). [Reason, emqx_channel:info(clientid, Channel),
maps:get(username, emqx_channel:info(clientinfo, Channel),
<<"unknown_user">>)]))).
-define(ALARM_CONN_INFO_KEYS, [ -define(ALARM_CONN_INFO_KEYS, [
socktype, sockname, peername, clientid, username, proto_name, proto_ver, socktype, sockname, peername, clientid, username, proto_name, proto_ver,
@ -36,44 +35,28 @@
-define(ALARM_SOCK_OPTS_KEYS, [high_watermark, high_msgq_watermark, sndbuf, recbuf, buffer]). -define(ALARM_SOCK_OPTS_KEYS, [high_watermark, high_msgq_watermark, sndbuf, recbuf, buffer]).
-define(PROC_INFO_KEYS, [message_queue_len, memory, reductions]). -define(PROC_INFO_KEYS, [message_queue_len, memory, reductions]).
-define(ALARM_SENT(REASON), {alarm_sent, REASON}). -define(ALARM_SENT(REASON), {alarm_sent, REASON}).
-define(ALL_ALARM_REASONS, [port_busy, too_many_publish]). -define(ALL_ALARM_REASONS, [conn_congestion]).
-define(CONFIRM_CLEAR(REASON), {alarm_confirm_clear, REASON}). -define(WONT_CLEAR_IN, 60000).
-define(CONFIRM_CLEAR_INTERVAL, 60000).
maybe_alarm_port_busy(Socket, Transport, Channel) -> maybe_alarm_conn_congestion(Socket, Transport, Channel) ->
maybe_alarm_port_busy(Socket, Transport, Channel, false). case is_alarm_enabled(Channel) of
false -> ok;
maybe_alarm_port_busy(Socket, Transport, Channel, ForceClear) -> true ->
case is_tcp_congested(Socket, Transport) of case is_tcp_congested(Socket, Transport) of
true -> alarm_congestion(Socket, Transport, Channel, port_busy); true -> alarm_congestion(Socket, Transport, Channel, conn_congestion);
false -> cancel_alarm_congestion(Socket, Transport, Channel, port_busy, false -> cancel_alarm_congestion(Socket, Transport, Channel, conn_congestion)
ForceClear) end
end. end.
maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount,
MaxBatchSize) ->
maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount,
MaxBatchSize, false).
maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount,
PubMsgCount = _MaxBatchSize, _ForceClear) ->
%% we only alarm it when the process is "too busy"
alarm_congestion(Socket, Transport, Channel, too_many_publish);
maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount,
_MaxBatchSize, ForceClear) when PubMsgCount == 0 ->
%% but we clear the alarm until it is really "idle", to avoid sending
%% alarms and clears too frequently
cancel_alarm_congestion(Socket, Transport, Channel, too_many_publish,
ForceClear);
maybe_alarm_too_many_publish(_Socket, _Transport, _Channel, _PubMsgCount,
_MaxBatchSize, _ForceClear) ->
ok.
cancel_alarms(Socket, Transport, Channel) -> cancel_alarms(Socket, Transport, Channel) ->
lists:foreach(fun(Reason) -> lists:foreach(fun(Reason) ->
do_cancel_alarm_congestion(Socket, Transport, Channel, Reason) do_cancel_alarm_congestion(Socket, Transport, Channel, Reason)
end, ?ALL_ALARM_REASONS). end, ?ALL_ALARM_REASONS).
is_alarm_enabled(Channel) ->
emqx_zone:get_env(emqx_channel:info(zone, Channel),
conn_congestion_alarm_enabled, false).
alarm_congestion(Socket, Transport, Channel, Reason) -> alarm_congestion(Socket, Transport, Channel, Reason) ->
case has_alarm_sent(Reason) of case has_alarm_sent(Reason) of
false -> do_alarm_congestion(Socket, Transport, Channel, Reason); false -> do_alarm_congestion(Socket, Transport, Channel, Reason);
@ -82,8 +65,11 @@ alarm_congestion(Socket, Transport, Channel, Reason) ->
update_alarm_sent_at(Reason) update_alarm_sent_at(Reason)
end. end.
cancel_alarm_congestion(Socket, Transport, Channel, Reason, ForceClear) -> cancel_alarm_congestion(Socket, Transport, Channel, Reason) ->
case is_alarm_allowed_clear(Reason, ForceClear) of Zone = emqx_channel:info(zone, Channel),
WontClearIn = emqx_zone:get_env(Zone, conn_congestion_min_alarm_sustain_duration,
?WONT_CLEAR_IN),
case has_alarm_sent(Reason) andalso long_time_since_last_alarm(Reason, WontClearIn) of
true -> do_cancel_alarm_congestion(Socket, Transport, Channel, Reason); true -> do_cancel_alarm_congestion(Socket, Transport, Channel, Reason);
false -> ok false -> ok
end. end.
@ -123,13 +109,11 @@ get_alarm_sent_at(Reason) ->
LastSentAt -> LastSentAt LastSentAt -> LastSentAt
end. end.
is_alarm_allowed_clear(Reason, _ForceClear = true) -> long_time_since_last_alarm(Reason, WontClearIn) ->
has_alarm_sent(Reason);
is_alarm_allowed_clear(Reason, _ForceClear = false) ->
%% only sent clears when the alarm was not triggered in the last %% only sent clears when the alarm was not triggered in the last
%% ?CONFIRM_CLEAR_INTERVAL time %% WontClearIn time
case timenow() - get_alarm_sent_at(Reason) of case timenow() - get_alarm_sent_at(Reason) of
Elapse when Elapse >= ?CONFIRM_CLEAR_INTERVAL -> true; Elapse when Elapse >= WontClearIn -> true;
_ -> false _ -> false
end. end.

View File

@ -374,12 +374,8 @@ handle_msg({Passive, _Sock}, State)
handle_info(activate_socket, NState1); handle_info(activate_socket, NState1);
handle_msg(Deliver = {deliver, _Topic, _Msg}, handle_msg(Deliver = {deliver, _Topic, _Msg},
#state{active_n = MaxBatchSize, transport = Transport, #state{active_n = ActiveN} = State) ->
socket = Socket, channel = Channel} = State) -> Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)],
Delivers0 = emqx_misc:drain_deliver(MaxBatchSize),
emqx_congestion:maybe_alarm_too_many_publish(Socket, Transport, Channel,
length(Delivers0), MaxBatchSize),
Delivers = [Deliver|Delivers0],
with_channel(handle_deliver, [Delivers], State); with_channel(handle_deliver, [Delivers], State);
%% Something sent %% Something sent
@ -548,12 +544,9 @@ handle_timeout(_TRef, limit_timeout, State) ->
}, },
handle_info(activate_socket, NState); handle_info(activate_socket, NState);
handle_timeout(_TRef, emit_stats, State = #state{active_n = MaxBatchSize, handle_timeout(_TRef, emit_stats, State = #state{channel = Channel, transport = Transport,
channel = Channel, transport = Transport, socket = Socket}) -> socket = Socket}) ->
{_, MsgQLen} = erlang:process_info(self(), message_queue_len), emqx_congestion:maybe_alarm_conn_congestion(Socket, Transport, Channel),
emqx_congestion:maybe_alarm_port_busy(Socket, Transport, Channel),
emqx_congestion:maybe_alarm_too_many_publish(Socket, Transport, Channel,
MsgQLen, MaxBatchSize),
ClientId = emqx_channel:info(clientid, Channel), ClientId = emqx_channel:info(clientid, Channel),
emqx_cm:set_chan_stats(ClientId, stats(State)), emqx_cm:set_chan_stats(ClientId, stats(State)),
{ok, State#state{stats_timer = undefined}}; {ok, State#state{stats_timer = undefined}};
@ -666,7 +659,7 @@ send(IoData, #state{transport = Transport, socket = Socket, channel = Channel})
Oct = iolist_size(IoData), Oct = iolist_size(IoData),
ok = emqx_metrics:inc('bytes.sent', Oct), ok = emqx_metrics:inc('bytes.sent', Oct),
emqx_pd:inc_counter(outgoing_bytes, Oct), emqx_pd:inc_counter(outgoing_bytes, Oct),
emqx_congestion:maybe_alarm_port_busy(Socket, Transport, Channel), emqx_congestion:maybe_alarm_conn_congestion(Socket, Transport, Channel),
case Transport:async_send(Socket, IoData, [nosuspend]) of case Transport:async_send(Socket, IoData, [nosuspend]) of
ok -> ok; ok -> ok;
Error = {error, _Reason} -> Error = {error, _Reason} ->