diff --git a/etc/zones.conf b/etc/zones.conf index ea1e1807c..a290eaa4d 100644 --- a/etc/zones.conf +++ b/etc/zones.conf @@ -184,6 +184,30 @@ zone.external.enable_flapping_detect = off ## Example: 100KB incoming per 10 seconds. #zone.external.rate_limit.conn_bytes_in = 100KB,10s +## Whether to alarm the congested connections. +## +## Sometimes the mqtt connection (usually an MQTT subscriber) may get "congested" because +## there're too many packets to sent. The socket trys to buffer the packets until the buffer is +## full. If more packets comes after that, the packets will be "pending" in a queue +## and we consider the connection is "congested". +## +## Enable this to send an alarm when there's any bytes pending in the queue. You could set +## the `listener.tcp..sndbuf` to a larger value if the alarm is triggered too often. +## +## The name of the alarm is of format "conn_congestion//". +## Where the is the client-id of the congested MQTT connection. +## And the is the username or "unknown_user" of not provided by the client. +## Default: off +#zone.external.conn_congestion.alarm = off + +## Won't clear the congested alarm in how long time. +## The alarm is cleared only when there're no pending bytes in the queue, and also it has been +## `min_alarm_sustain_duration` time since the last time we considered the connection is "congested". +## +## This is to avoid clearing and sending the alarm again too often. +## Default: 1m +#zone.external.conn_congestion.min_alarm_sustain_duration = 1m + ## Messages quota for the each of external MQTT connection. ## This value consumed by the number of recipient on a message. ## diff --git a/priv/emqx.schema b/priv/emqx.schema index 830a0e934..50ccc8e9f 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -999,6 +999,16 @@ end}. {datatype, string} ]}. +{mapping, "zone.$name.conn_congestion.alarm", "emqx.zones", [ + {datatype, flag}, + {default, off} +]}. + +{mapping, "zone.$name.conn_congestion.min_alarm_sustain_duration", "emqx.zones", [ + {default, "1m"}, + {datatype, {duration, ms}} +]}. + {mapping, "zone.$name.quota.conn_messages_routing", "emqx.zones", [ {datatype, string} ]}. @@ -1128,6 +1138,10 @@ end}. {ratelimit, {conn_messages_in, Ratelimit(Val)}}; (["rate_limit", "conn_bytes_in"], Val) -> {ratelimit, {conn_bytes_in, Ratelimit(Val)}}; + (["conn_congestion", "alarm"], Val) -> + {conn_congestion_alarm_enabled, Val}; + (["conn_congestion", "min_alarm_sustain_duration"], Val) -> + {conn_congestion_min_alarm_sustain_duration, Val}; (["quota", "conn_messages_routing"], Val) -> {quota, {conn_messages_routing, Ratelimit(Val)}}; (["quota", "overall_messages_routing"], Val) -> diff --git a/src/emqx.appup.src b/src/emqx.appup.src index d3f5ba2b6..2f5639c03 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -44,11 +44,17 @@ {<<"4.2.4">>, [ {load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_channel, brutal_purge, soft_purge, []}, - {load_module, emqx_session, brutal_purge, soft_purge, []} + {load_module, emqx_session, brutal_purge, soft_purge, []}, + {load_module, emqx_congestion, brutal_purge, soft_purge, []}, + {load_module, emqx_connection, brutal_purge, soft_purge, []}, + {load_module, emqx_alarm, brutal_purge, soft_purge, []} ]}, {<<"4.2.5">>, [ {load_module, emqx_channel, brutal_purge, soft_purge, []}, - {load_module, emqx_session, brutal_purge, soft_purge, []} + {load_module, emqx_session, brutal_purge, soft_purge, []}, + {load_module, emqx_congestion, brutal_purge, soft_purge, []}, + {load_module, emqx_connection, brutal_purge, soft_purge, []}, + {load_module, emqx_alarm, brutal_purge, soft_purge, []} ]}, {<<".*">>, []} ], @@ -95,11 +101,17 @@ {<<"4.2.4">>, [ {load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_channel, brutal_purge, soft_purge, []}, + {load_module, emqx_congestion, brutal_purge, soft_purge, []}, + {load_module, emqx_connection, brutal_purge, soft_purge, []}, + {load_module, emqx_alarm, brutal_purge, soft_purge, []}, {load_module, emqx_session, brutal_purge, soft_purge, []} ]}, {<<"4.2.5">>, [ {load_module, emqx_channel, brutal_purge, soft_purge, []}, - {load_module, emqx_session, brutal_purge, soft_purge, []} + {load_module, emqx_session, brutal_purge, soft_purge, []}, + {load_module, emqx_congestion, brutal_purge, soft_purge, []}, + {load_module, emqx_connection, brutal_purge, soft_purge, []}, + {load_module, emqx_alarm, brutal_purge, soft_purge, []} ]}, {<<".*">>, []} ] diff --git a/src/emqx_alarm.erl b/src/emqx_alarm.erl index 12c888e6c..ced6622bf 100644 --- a/src/emqx_alarm.erl +++ b/src/emqx_alarm.erl @@ -361,7 +361,7 @@ normalize_message(partition, #{occurred := Node}) -> list_to_binary(io_lib:format("Partition occurs at node ~s", [Node])); normalize_message(<<"resource", _/binary>>, #{type := Type, id := ID}) -> list_to_binary(io_lib:format("Resource ~s(~s) is down", [Type, ID])); -normalize_message(<<"mqtt_conn/congested/", Info/binary>>, _) -> - list_to_binary(io_lib:format("MQTT connection congested: ~s", [Info])); +normalize_message(<<"conn_congestion/", Info/binary>>, _) -> + list_to_binary(io_lib:format("connection congested: ~s", [Info])); normalize_message(_Name, _UnknownDetails) -> <<"Unknown alarm">>. diff --git a/src/emqx_congestion.erl b/src/emqx_congestion.erl index 9ad5b8ee5..3b71b633b 100644 --- a/src/emqx_congestion.erl +++ b/src/emqx_congestion.erl @@ -16,17 +16,16 @@ -module(emqx_congestion). --export([ maybe_alarm_port_busy/3 - , maybe_alarm_port_busy/4 - , maybe_alarm_too_many_publish/5 - , maybe_alarm_too_many_publish/6 +-export([ maybe_alarm_conn_congestion/3 , cancel_alarms/3 ]). -define(ALARM_CONN_CONGEST(Channel, Reason), - list_to_binary(io_lib:format("mqtt_conn/congested/~s/~s/~s", [emqx_channel:info(clientid, Channel), - maps:get(username, emqx_channel:info(clientinfo, Channel), <<"undefined">>), - Reason]))). + list_to_binary( + io_lib:format("~s/~s/~s", + [Reason, emqx_channel:info(clientid, Channel), + maps:get(username, emqx_channel:info(clientinfo, Channel), + <<"unknown_user">>)]))). -define(ALARM_CONN_INFO_KEYS, [ socktype, sockname, peername, clientid, username, proto_name, proto_ver, @@ -36,44 +35,28 @@ -define(ALARM_SOCK_OPTS_KEYS, [high_watermark, high_msgq_watermark, sndbuf, recbuf, buffer]). -define(PROC_INFO_KEYS, [message_queue_len, memory, reductions]). -define(ALARM_SENT(REASON), {alarm_sent, REASON}). --define(ALL_ALARM_REASONS, [port_busy, too_many_publish]). --define(CONFIRM_CLEAR(REASON), {alarm_confirm_clear, REASON}). --define(CONFIRM_CLEAR_INTERVAL, 60000). +-define(ALL_ALARM_REASONS, [conn_congestion]). +-define(WONT_CLEAR_IN, 60000). -maybe_alarm_port_busy(Socket, Transport, Channel) -> - maybe_alarm_port_busy(Socket, Transport, Channel, false). - -maybe_alarm_port_busy(Socket, Transport, Channel, ForceClear) -> - case is_tcp_congested(Socket, Transport) of - true -> alarm_congestion(Socket, Transport, Channel, port_busy); - false -> cancel_alarm_congestion(Socket, Transport, Channel, port_busy, - ForceClear) +maybe_alarm_conn_congestion(Socket, Transport, Channel) -> + case is_alarm_enabled(Channel) of + false -> ok; + true -> + case is_tcp_congested(Socket, Transport) of + true -> alarm_congestion(Socket, Transport, Channel, conn_congestion); + false -> cancel_alarm_congestion(Socket, Transport, Channel, conn_congestion) + end end. -maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount, - MaxBatchSize) -> - maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount, - MaxBatchSize, false). - -maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount, - PubMsgCount = _MaxBatchSize, _ForceClear) -> - %% we only alarm it when the process is "too busy" - alarm_congestion(Socket, Transport, Channel, too_many_publish); -maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount, - _MaxBatchSize, ForceClear) when PubMsgCount == 0 -> - %% but we clear the alarm until it is really "idle", to avoid sending - %% alarms and clears too frequently - cancel_alarm_congestion(Socket, Transport, Channel, too_many_publish, - ForceClear); -maybe_alarm_too_many_publish(_Socket, _Transport, _Channel, _PubMsgCount, - _MaxBatchSize, _ForceClear) -> - ok. - cancel_alarms(Socket, Transport, Channel) -> lists:foreach(fun(Reason) -> do_cancel_alarm_congestion(Socket, Transport, Channel, Reason) end, ?ALL_ALARM_REASONS). +is_alarm_enabled(Channel) -> + emqx_zone:get_env(emqx_channel:info(zone, Channel), + conn_congestion_alarm_enabled, false). + alarm_congestion(Socket, Transport, Channel, Reason) -> case has_alarm_sent(Reason) of false -> do_alarm_congestion(Socket, Transport, Channel, Reason); @@ -82,8 +65,11 @@ alarm_congestion(Socket, Transport, Channel, Reason) -> update_alarm_sent_at(Reason) end. -cancel_alarm_congestion(Socket, Transport, Channel, Reason, ForceClear) -> - case is_alarm_allowed_clear(Reason, ForceClear) of +cancel_alarm_congestion(Socket, Transport, Channel, Reason) -> + Zone = emqx_channel:info(zone, Channel), + WontClearIn = emqx_zone:get_env(Zone, conn_congestion_min_alarm_sustain_duration, + ?WONT_CLEAR_IN), + case has_alarm_sent(Reason) andalso long_time_since_last_alarm(Reason, WontClearIn) of true -> do_cancel_alarm_congestion(Socket, Transport, Channel, Reason); false -> ok end. @@ -123,13 +109,11 @@ get_alarm_sent_at(Reason) -> LastSentAt -> LastSentAt end. -is_alarm_allowed_clear(Reason, _ForceClear = true) -> - has_alarm_sent(Reason); -is_alarm_allowed_clear(Reason, _ForceClear = false) -> +long_time_since_last_alarm(Reason, WontClearIn) -> %% only sent clears when the alarm was not triggered in the last - %% ?CONFIRM_CLEAR_INTERVAL time + %% WontClearIn time case timenow() - get_alarm_sent_at(Reason) of - Elapse when Elapse >= ?CONFIRM_CLEAR_INTERVAL -> true; + Elapse when Elapse >= WontClearIn -> true; _ -> false end. diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 2b28f327a..720b7ec98 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -374,12 +374,8 @@ handle_msg({Passive, _Sock}, State) handle_info(activate_socket, NState1); handle_msg(Deliver = {deliver, _Topic, _Msg}, - #state{active_n = MaxBatchSize, transport = Transport, - socket = Socket, channel = Channel} = State) -> - Delivers0 = emqx_misc:drain_deliver(MaxBatchSize), - emqx_congestion:maybe_alarm_too_many_publish(Socket, Transport, Channel, - length(Delivers0), MaxBatchSize), - Delivers = [Deliver|Delivers0], + #state{active_n = ActiveN} = State) -> + Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)], with_channel(handle_deliver, [Delivers], State); %% Something sent @@ -548,12 +544,9 @@ handle_timeout(_TRef, limit_timeout, State) -> }, handle_info(activate_socket, NState); -handle_timeout(_TRef, emit_stats, State = #state{active_n = MaxBatchSize, - channel = Channel, transport = Transport, socket = Socket}) -> - {_, MsgQLen} = erlang:process_info(self(), message_queue_len), - emqx_congestion:maybe_alarm_port_busy(Socket, Transport, Channel), - emqx_congestion:maybe_alarm_too_many_publish(Socket, Transport, Channel, - MsgQLen, MaxBatchSize), +handle_timeout(_TRef, emit_stats, State = #state{channel = Channel, transport = Transport, + socket = Socket}) -> + emqx_congestion:maybe_alarm_conn_congestion(Socket, Transport, Channel), ClientId = emqx_channel:info(clientid, Channel), emqx_cm:set_chan_stats(ClientId, stats(State)), {ok, State#state{stats_timer = undefined}}; @@ -666,7 +659,7 @@ send(IoData, #state{transport = Transport, socket = Socket, channel = Channel}) Oct = iolist_size(IoData), ok = emqx_metrics:inc('bytes.sent', Oct), emqx_pd:inc_counter(outgoing_bytes, Oct), - emqx_congestion:maybe_alarm_port_busy(Socket, Transport, Channel), + emqx_congestion:maybe_alarm_conn_congestion(Socket, Transport, Channel), case Transport:async_send(Socket, IoData, [nosuspend]) of ok -> ok; Error = {error, _Reason} ->