diff --git a/etc/emqx.conf b/etc/emqx.conf index 1eebed652..5927ad99d 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -876,6 +876,30 @@ zone.external.enable_flapping_detect = off ## Example: 100KB incoming per 10 seconds. #zone.external.rate_limit.conn_bytes_in = 100KB,10s +## Whether to alarm the congested connections. +## +## Sometimes the mqtt connection (usually an MQTT subscriber) may get "congested" because +## there're too many packets to sent. The socket trys to buffer the packets until the buffer is +## full. If more packets comes after that, the packets will be "pending" in a queue +## and we consider the connection is "congested". +## +## Enable this to send an alarm when there's any bytes pending in the queue. You could set +## the `listener.tcp..sndbuf` to a larger value if the alarm is triggered too often. +## +## The name of the alarm is of format "conn_congestion//". +## Where the is the client-id of the congested MQTT connection. +## And the is the username or "unknown_user" of not provided by the client. +## Default: off +#zone.external.conn_congestion.alarm = off + +## Won't clear the congested alarm in how long time. +## The alarm is cleared only when there're no pending bytes in the queue, and also it has been +## `wont_clear_alarm_in` time since the last time we considered the connection is "congested". +## +## This is to avoid clearing and sending the alarm again too often. +## Default: 1m +#zone.external.conn_congestion.wont_clear_alarm_in = 1m + ## Messages quota for the each of external MQTT connection. ## This value consumed by the number of recipient on a message. ## diff --git a/priv/emqx.schema b/priv/emqx.schema index e09db53bb..9bffa6987 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1015,6 +1015,16 @@ end}. {datatype, string} ]}. +{mapping, "zone.$name.conn_congestion.alarm", "emqx.zones", [ + {datatype, flag}, + {default, off} +]}. + +{mapping, "zone.$name.conn_congestion.wont_clear_alarm_in", "emqx.zones", [ + {default, "1m"}, + {datatype, {duration, ms}} +]}. + {mapping, "zone.$name.quota.conn_messages_routing", "emqx.zones", [ {datatype, string} ]}. @@ -1144,6 +1154,10 @@ end}. {ratelimit, {conn_messages_in, Ratelimit(Val)}}; (["rate_limit", "conn_bytes_in"], Val) -> {ratelimit, {conn_bytes_in, Ratelimit(Val)}}; + (["conn_congestion", "alarm"], Val) -> + {conn_congestion_alarm_enabled, Val}; + (["conn_congestion", "wont_clear_alarm_in"], Val) -> + {conn_congestion_wont_clear_alarm_in, Val}; (["quota", "conn_messages_routing"], Val) -> {quota, {conn_messages_routing, Ratelimit(Val)}}; (["quota", "overall_messages_routing"], Val) -> diff --git a/src/emqx_alarm.erl b/src/emqx_alarm.erl index b17443c45..7b4ca5989 100644 --- a/src/emqx_alarm.erl +++ b/src/emqx_alarm.erl @@ -379,7 +379,7 @@ normalize_message(partition, #{occurred := Node}) -> list_to_binary(io_lib:format("Partition occurs at node ~s", [Node])); normalize_message(<<"resource", _/binary>>, #{type := Type, id := ID}) -> list_to_binary(io_lib:format("Resource ~s(~s) is down", [Type, ID])); -normalize_message(<<"mqtt_conn/congested/", Info/binary>>, _) -> - list_to_binary(io_lib:format("MQTT connection congested: ~s", [Info])); +normalize_message(<<"conn_congestion/", Info/binary>>, _) -> + list_to_binary(io_lib:format("connection congested: ~s", [Info])); normalize_message(_Name, _UnknownDetails) -> <<"Unknown alarm">>. diff --git a/src/emqx_congestion.erl b/src/emqx_congestion.erl index 0e7992f2e..14e5f6f6e 100644 --- a/src/emqx_congestion.erl +++ b/src/emqx_congestion.erl @@ -16,22 +16,16 @@ -module(emqx_congestion). --export([ maybe_alarm_port_busy/3 - , maybe_alarm_port_busy/4 - , maybe_alarm_too_many_publish/5 - , maybe_alarm_too_many_publish/6 +-export([ maybe_alarm_conn_congestion/3 , cancel_alarms/3 ]). --elvis([{elvis_style, invalid_dynamic_call, #{ignore => [emqx_congestion]}}]). - -define(ALARM_CONN_CONGEST(Channel, Reason), list_to_binary( - io_lib:format("mqtt_conn/congested/~s/~s/~s", - [emqx_channel:info(clientid, Channel), + io_lib:format("~s/~s/~s", + [Reason, emqx_channel:info(clientid, Channel), maps:get(username, emqx_channel:info(clientinfo, Channel), - <<"undefined">>), - Reason]))). + <<"unknown_user">>)]))). -define(ALARM_CONN_INFO_KEYS, [socktype, sockname, peername, clientid, username, proto_name, proto_ver, connected_at, conn_state]). @@ -39,44 +33,28 @@ -define(ALARM_SOCK_OPTS_KEYS, [high_watermark, high_msgq_watermark, sndbuf, recbuf, buffer]). -define(PROC_INFO_KEYS, [message_queue_len, memory, reductions]). -define(ALARM_SENT(REASON), {alarm_sent, REASON}). --define(ALL_ALARM_REASONS, [port_busy, too_many_publish]). --define(CONFIRM_CLEAR(REASON), {alarm_confirm_clear, REASON}). --define(CONFIRM_CLEAR_INTERVAL, 10000). +-define(ALL_ALARM_REASONS, [conn_congestion]). +-define(WONT_CLEAR_IN, 60000). -maybe_alarm_port_busy(Socket, Transport, Channel) -> - maybe_alarm_port_busy(Socket, Transport, Channel, false). - -maybe_alarm_port_busy(Socket, Transport, Channel, ForceClear) -> - case is_tcp_congested(Socket, Transport) of - true -> alarm_congestion(Socket, Transport, Channel, port_busy); - false -> cancel_alarm_congestion(Socket, Transport, Channel, port_busy, - ForceClear) +maybe_alarm_conn_congestion(Socket, Transport, Channel) -> + case is_alarm_enabled(Channel) of + false -> ok; + true -> + case is_tcp_congested(Socket, Transport) of + true -> alarm_congestion(Socket, Transport, Channel, conn_congestion); + false -> cancel_alarm_congestion(Socket, Transport, Channel, conn_congestion) + end end. -maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount, - MaxBatchSize) -> - maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount, - MaxBatchSize, false). - -maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount, - PubMsgCount = _MaxBatchSize, _ForceClear) -> - %% we only alarm it when the process is "too busy" - alarm_congestion(Socket, Transport, Channel, too_many_publish); -maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount, - _MaxBatchSize, ForceClear) when PubMsgCount == 0 -> - %% but we clear the alarm until it is really "idle", to avoid sending - %% alarms and clears too frequently - cancel_alarm_congestion(Socket, Transport, Channel, too_many_publish, - ForceClear); -maybe_alarm_too_many_publish(_Socket, _Transport, _Channel, _PubMsgCount, - _MaxBatchSize, _ForceClear) -> - ok. - cancel_alarms(Socket, Transport, Channel) -> lists:foreach(fun(Reason) -> do_cancel_alarm_congestion(Socket, Transport, Channel, Reason) end, ?ALL_ALARM_REASONS). +is_alarm_enabled(Channel) -> + emqx_zone:get_env(emqx_channel:info(zone, Channel), + conn_congestion_alarm_enabled, false). + alarm_congestion(Socket, Transport, Channel, Reason) -> case has_alarm_sent(Reason) of false -> do_alarm_congestion(Socket, Transport, Channel, Reason); @@ -85,8 +63,10 @@ alarm_congestion(Socket, Transport, Channel, Reason) -> update_alarm_sent_at(Reason) end. -cancel_alarm_congestion(Socket, Transport, Channel, Reason, ForceClear) -> - case is_alarm_allowed_clear(Reason, ForceClear) of +cancel_alarm_congestion(Socket, Transport, Channel, Reason) -> + Zone = emqx_channel:info(zone, Channel), + WontClearIn = emqx_zone:get_env(Zone, conn_congestion_wont_clear_alarm_in, ?WONT_CLEAR_IN), + case has_alarm_sent(Reason) andalso long_time_since_last_alarm(Reason, WontClearIn) of true -> do_cancel_alarm_congestion(Socket, Transport, Channel, Reason); false -> ok end. @@ -125,14 +105,11 @@ get_alarm_sent_at(Reason) -> undefined -> 0; LastSentAt -> LastSentAt end. - -is_alarm_allowed_clear(Reason, _ForceClear = true) -> - has_alarm_sent(Reason); -is_alarm_allowed_clear(Reason, _ForceClear = false) -> +long_time_since_last_alarm(Reason, WontClearIn) -> %% only sent clears when the alarm was not triggered in the last - %% ?CONFIRM_CLEAR_INTERVAL time + %% WontClearIn time case timenow() - get_alarm_sent_at(Reason) of - Elapse when Elapse >= ?CONFIRM_CLEAR_INTERVAL -> true; + Elapse when Elapse >= WontClearIn -> true; _ -> false end. diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 1d17cac03..eee3b9c5a 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -390,12 +390,8 @@ handle_msg({Passive, _Sock}, State) handle_info(activate_socket, NState1); handle_msg(Deliver = {deliver, _Topic, _Msg}, - #state{active_n = MaxBatchSize, transport = Transport, - socket = Socket, channel = Channel} = State) -> - Delivers0 = emqx_misc:drain_deliver(MaxBatchSize), - emqx_congestion:maybe_alarm_too_many_publish(Socket, Transport, Channel, - length(Delivers0), MaxBatchSize), - Delivers = [Deliver|Delivers0], + #state{active_n = ActiveN} = State) -> + Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)], with_channel(handle_deliver, [Delivers], State); %% Something sent @@ -509,12 +505,9 @@ handle_timeout(_TRef, limit_timeout, State) -> }, handle_info(activate_socket, NState); -handle_timeout(_TRef, emit_stats, State = #state{active_n = MaxBatchSize, - channel = Channel, transport = Transport, socket = Socket}) -> - {_, MsgQLen} = erlang:process_info(self(), message_queue_len), - emqx_congestion:maybe_alarm_port_busy(Socket, Transport, Channel, true), - emqx_congestion:maybe_alarm_too_many_publish(Socket, Transport, Channel, - MsgQLen, MaxBatchSize, true), +handle_timeout(_TRef, emit_stats, State = #state{channel = Channel, transport = Transport, + socket = Socket}) -> + emqx_congestion:maybe_alarm_conn_congestion(Socket, Transport, Channel), ClientId = emqx_channel:info(clientid, Channel), emqx_cm:set_chan_stats(ClientId, stats(State)), {ok, State#state{stats_timer = undefined}}; @@ -627,7 +620,7 @@ send(IoData, #state{transport = Transport, socket = Socket, channel = Channel}) Oct = iolist_size(IoData), ok = emqx_metrics:inc('bytes.sent', Oct), inc_counter(outgoing_bytes, Oct), - emqx_congestion:maybe_alarm_port_busy(Socket, Transport, Channel), + emqx_congestion:maybe_alarm_conn_congestion(Socket, Transport, Channel), case Transport:async_send(Socket, IoData, [nosuspend]) of ok -> ok; Error = {error, _Reason} ->