diff --git a/apps/emqx/include/logger.hrl b/apps/emqx/include/logger.hrl index a40f9dc9c..f39c88441 100644 --- a/apps/emqx/include/logger.hrl +++ b/apps/emqx/include/logger.hrl @@ -40,6 +40,21 @@ end ). +%% NOTE: do not forget to use atom for msg and add every used msg to +%% the default value of `log.thorttling.msgs` list. +-define(SLOG_THROTTLE(Level, Data), + ?SLOG_THROTTLE(Level, Data, #{}) +). + +-define(SLOG_THROTTLE(Level, Data, Meta), + case emqx_log_throttler:allow(Level, maps:get(msg, Data)) of + true -> + ?SLOG(Level, Data, Meta); + false -> + ok + end +). + -define(AUDIT_HANDLER, emqx_audit). -define(TRACE_FILTER, emqx_trace_filter). -define(OWN_KEYS, [level, filters, filter_default, handlers]). diff --git a/apps/emqx/src/emqx_access_control.erl b/apps/emqx/src/emqx_access_control.erl index 13b02bb4d..d97dbd167 100644 --- a/apps/emqx/src/emqx_access_control.erl +++ b/apps/emqx/src/emqx_access_control.erl @@ -183,8 +183,13 @@ log_result(#{username := Username}, Topic, Action, From, Result) -> } end, case Result of - allow -> ?SLOG(info, (LogMeta())#{msg => "authorization_permission_allowed"}); - deny -> ?SLOG(info, (LogMeta())#{msg => "authorization_permission_denied"}) + allow -> + ?SLOG(info, (LogMeta())#{msg => "authorization_permission_allowed"}); + deny -> + ?SLOG_THROTTLE( + warning, + (LogMeta())#{msg => authorization_permission_denied} + ) end. %% @private Format authorization rules source. diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 658bc7bbb..192335a25 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -616,10 +616,10 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel) -> Msg = packet_to_message(NPacket, NChannel), do_publish(PacketId, Msg, NChannel); {error, Rc = ?RC_NOT_AUTHORIZED, NChannel} -> - ?SLOG( - info, + ?SLOG_THROTTLE( + warning, #{ - msg => "cannot_publish_to_topic", + msg => cannot_publish_to_topic_due_to_not_authorized, reason => emqx_reason_codes:name(Rc) }, #{topic => Topic} @@ -635,10 +635,10 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel) -> handle_out(disconnect, Rc, NChannel) end; {error, Rc = ?RC_QUOTA_EXCEEDED, NChannel} -> - ?SLOG( - info, + ?SLOG_THROTTLE( + warning, #{ - msg => "cannot_publish_to_topic", + msg => cannot_publish_to_topic_due_to_quota_exceeded, reason => emqx_reason_codes:name(Rc) }, #{topic => Topic} diff --git a/apps/emqx/src/emqx_kernel_sup.erl b/apps/emqx/src/emqx_kernel_sup.erl index 85724b9b4..5f1bd6ad1 100644 --- a/apps/emqx/src/emqx_kernel_sup.erl +++ b/apps/emqx/src/emqx_kernel_sup.erl @@ -40,7 +40,8 @@ init([]) -> child_spec(emqx_authn_authz_metrics_sup, supervisor), child_spec(emqx_ocsp_cache, worker), child_spec(emqx_crl_cache, worker), - child_spec(emqx_tls_lib_sup, supervisor) + child_spec(emqx_tls_lib_sup, supervisor), + child_spec(emqx_log_throttler, worker) ] }}. diff --git a/apps/emqx/src/emqx_log_throttler.erl b/apps/emqx/src/emqx_log_throttler.erl new file mode 100644 index 000000000..ef29d5a79 --- /dev/null +++ b/apps/emqx/src/emqx_log_throttler.erl @@ -0,0 +1,151 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_log_throttler). + +-behaviour(gen_server). + +-include("logger.hrl"). +-include("types.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-export([start_link/0]). + +%% throttler API +-export([allow/2]). + +%% gen_server callbacks +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 +]). + +-define(SEQ_ID(Msg), {?MODULE, Msg}). +-define(NEW_SEQ, atomics:new(1, [{signed, false}])). +-define(GET_SEQ(Msg), persistent_term:get(?SEQ_ID(Msg), undefined)). +-define(RESET_SEQ(SeqRef), atomics:put(SeqRef, 1, 0)). +-define(INC_SEQ(SeqRef), atomics:add(SeqRef, 1, 1)). +-define(GET_DROPPED(SeqRef), atomics:get(SeqRef, 1) - 1). +-define(IS_ALLOWED(SeqRef), atomics:add_get(SeqRef, 1, 1) =:= 1). + +-define(NEW_THROTTLE(Msg, SeqRef), persistent_term:put(?SEQ_ID(Msg), SeqRef)). + +-define(MSGS_LIST, emqx:get_config([log, throttling, msgs], [])). +-define(TIME_WINDOW_MS, timer:seconds(emqx:get_config([log, throttling, time_window], 60))). + +-spec allow(logger:level(), atom()) -> boolean(). +allow(debug, _Msg) -> + true; +allow(_Level, Msg) when is_atom(Msg) -> + Seq = persistent_term:get(?SEQ_ID(Msg), undefined), + case Seq of + undefined -> + %% This is either a race condition (emqx_log_throttler is not started yet) + %% or a developer mistake (msg used in ?SLOG_THROTTLE/2,3 macro is + %% not added to the default value of `log.throttling.msgs`. + ?SLOG(info, #{ + msg => "missing_log_throttle_sequence", + throttled_msg => Msg + }), + true; + SeqRef -> + ?IS_ALLOWED(SeqRef) + end. + +-spec start_link() -> startlink_ret(). +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +%%-------------------------------------------------------------------- +%% gen_server callbacks +%%-------------------------------------------------------------------- + +init([]) -> + ok = lists:foreach(fun(Msg) -> ?NEW_THROTTLE(Msg, ?NEW_SEQ) end, ?MSGS_LIST), + CurrentPeriodMs = ?TIME_WINDOW_MS, + TimerRef = schedule_refresh(CurrentPeriodMs), + {ok, #{timer_ref => TimerRef, current_period_ms => CurrentPeriodMs}}. + +handle_call(Req, _From, State) -> + ?SLOG(error, #{msg => "unexpected_call", call => Req}), + {reply, ignored, State}. + +handle_cast(Msg, State) -> + ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), + {noreply, State}. + +handle_info(refresh, #{current_period_ms := PeriodMs} = State) -> + Msgs = ?MSGS_LIST, + DroppedStats = lists:foldl( + fun(Msg, Acc) -> + case ?GET_SEQ(Msg) of + %% Should not happen, unless the static ids list is updated at run-time. + undefined -> + ?NEW_THROTTLE(Msg, ?NEW_SEQ), + ?tp(log_throttler_new_msg, #{throttled_msg => Msg}), + Acc; + SeqRef -> + Dropped = ?GET_DROPPED(SeqRef), + ok = ?RESET_SEQ(SeqRef), + ?tp(log_throttler_dropped, #{dropped_count => Dropped, throttled_msg => Msg}), + maybe_add_dropped(Msg, Dropped, Acc) + end + end, + #{}, + Msgs + ), + maybe_log_dropped(DroppedStats, PeriodMs), + NewPeriodMs = ?TIME_WINDOW_MS, + State1 = State#{ + timer_ref => schedule_refresh(NewPeriodMs), + current_period_ms => NewPeriodMs + }, + {noreply, State1}; +handle_info(Info, State) -> + ?SLOG(error, #{msg => "unxpected_info", info => Info}), + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% internal functions +%%-------------------------------------------------------------------- + +maybe_add_dropped(Msg, Dropped, DroppedAcc) when Dropped > 0 -> + DroppedAcc#{Msg => Dropped}; +maybe_add_dropped(_Msg, _Dropped, DroppedAcc) -> + DroppedAcc. + +maybe_log_dropped(DroppedStats, PeriodMs) when map_size(DroppedStats) > 0 -> + ?SLOG(warning, #{ + msg => "log_events_throttled_during_last_period", + dropped => DroppedStats, + period => emqx_utils_calendar:human_readable_duration_string(PeriodMs) + }); +maybe_log_dropped(_DroppedStats, _PeriodMs) -> + ok. + +schedule_refresh(PeriodMs) -> + ?tp(log_throttler_sched_refresh, #{new_period_ms => PeriodMs}), + erlang:send_after(PeriodMs, ?MODULE, refresh). diff --git a/apps/emqx/src/emqx_session_events.erl b/apps/emqx/src/emqx_session_events.erl index 856efac74..ac8dee262 100644 --- a/apps/emqx/src/emqx_session_events.erl +++ b/apps/emqx/src/emqx_session_events.erl @@ -62,10 +62,10 @@ handle_event(ClientInfo, {dropped, Msg, #{reason := queue_full, logctx := Ctx}}) ok = emqx_metrics:inc('delivery.dropped.queue_full'), ok = inc_pd('send_msg.dropped', 1), ok = inc_pd('send_msg.dropped.queue_full', 1), - ?SLOG( - info, + ?SLOG_THROTTLE( + warning, Ctx#{ - msg => "dropped_msg_due_to_mqueue_is_full", + msg => dropped_msg_due_to_mqueue_is_full, payload => Msg#message.payload }, #{topic => Msg#message.topic} diff --git a/apps/emqx/test/emqx_log_throttler_SUITE.erl b/apps/emqx/test/emqx_log_throttler_SUITE.erl new file mode 100644 index 000000000..441ef2d95 --- /dev/null +++ b/apps/emqx/test/emqx_log_throttler_SUITE.erl @@ -0,0 +1,170 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_log_throttler_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +%% Have to use real msgs, as the schema is guarded by enum. +-define(THROTTLE_MSG, authorization_permission_denied). +-define(THROTTLE_MSG1, cannot_publish_to_topic_due_to_not_authorized). +-define(TIME_WINDOW, <<"1s">>). + +all() -> emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + %% This test suite can't be run in standalone tests (without emqx_conf) + case module_exists(emqx_conf) of + true -> + Apps = emqx_cth_suite:start( + [ + {emqx_conf, #{ + config => + #{ + log => #{ + throttling => #{ + time_window => ?TIME_WINDOW, msgs => [?THROTTLE_MSG] + } + } + } + }}, + emqx + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + [{suite_apps, Apps} | Config]; + false -> + {skip, standalone_not_supported} + end. + +end_per_suite(Config) -> + emqx_cth_suite:stop(?config(suite_apps, Config)), + emqx_config:delete_override_conf_files(). + +init_per_testcase(t_throttle_add_new_msg, Config) -> + ok = snabbkaffe:start_trace(), + [?THROTTLE_MSG] = Conf = emqx:get_config([log, throttling, msgs]), + {ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_MSG1 | Conf], #{}), + Config; +init_per_testcase(_TC, Config) -> + ok = snabbkaffe:start_trace(), + Config. + +end_per_testcase(t_throttle_add_new_msg, _Config) -> + ok = snabbkaffe:stop(), + {ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_MSG], #{}), + ok; +end_per_testcase(t_update_time_window, _Config) -> + ok = snabbkaffe:stop(), + {ok, _} = emqx_conf:update([log, throttling, time_window], ?TIME_WINDOW, #{}), + ok; +end_per_testcase(_TC, _Config) -> + ok = snabbkaffe:stop(). + +%%-------------------------------------------------------------------- +%% Test cases +%%-------------------------------------------------------------------- + +t_throttle(_Config) -> + ?check_trace( + begin + %% Warm-up and block to increase the probability that next events + %% will be in the same throttling time window. + lists:foreach( + fun(_) -> emqx_log_throttler:allow(warning, ?THROTTLE_MSG) end, + lists:seq(1, 100) + ), + {ok, _} = ?block_until( + #{?snk_kind := log_throttler_dropped, throttled_msg := ?THROTTLE_MSG}, 5000 + ), + + ?assert(emqx_log_throttler:allow(warning, ?THROTTLE_MSG)), + ?assertNot(emqx_log_throttler:allow(warning, ?THROTTLE_MSG)), + %% Debug is always allowed + ?assert(emqx_log_throttler:allow(debug, ?THROTTLE_MSG)), + {ok, _} = ?block_until( + #{ + ?snk_kind := log_throttler_dropped, + throttled_msg := ?THROTTLE_MSG, + dropped_count := 1 + }, + 3000 + ) + end, + [] + ). + +t_throttle_add_new_msg(_Config) -> + ?check_trace( + begin + ?block_until( + #{?snk_kind := log_throttler_new_msg, throttled_msg := ?THROTTLE_MSG1}, 5000 + ), + ?assert(emqx_log_throttler:allow(warning, ?THROTTLE_MSG1)), + ?assertNot(emqx_log_throttler:allow(warning, ?THROTTLE_MSG1)), + {ok, _} = ?block_until( + #{ + ?snk_kind := log_throttler_dropped, + throttled_msg := ?THROTTLE_MSG1, + dropped_count := 1 + }, + 3000 + ) + end, + [] + ). + +t_throttle_no_msg(_Config) -> + %% Must simply pass with no crashes + ?assert(emqx_log_throttler:allow(warning, no_test_throttle_msg)), + ?assert(emqx_log_throttler:allow(warning, no_test_throttle_msg)), + timer:sleep(10), + ?assert(erlang:is_process_alive(erlang:whereis(emqx_log_throttler))). + +t_update_time_window(_Config) -> + ?check_trace( + begin + ?wait_async_action( + emqx_conf:update([log, throttling, time_window], <<"2s">>, #{}), + #{?snk_kind := log_throttler_sched_refresh, new_period_ms := 2000}, + 5000 + ), + timer:sleep(10), + ?assert(erlang:is_process_alive(erlang:whereis(emqx_log_throttler))) + end, + [] + ). + +%%-------------------------------------------------------------------- +%% internal functions +%%-------------------------------------------------------------------- + +module_exists(Mod) -> + case erlang:module_loaded(Mod) of + true -> + true; + false -> + case code:ensure_loaded(Mod) of + ok -> true; + {module, Mod} -> true; + _ -> false + end + end. diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl index 914470ba4..19ca74f4e 100644 --- a/apps/emqx_conf/src/emqx_conf_schema.erl +++ b/apps/emqx_conf/src/emqx_conf_schema.erl @@ -75,6 +75,14 @@ %% 1 million default ports counter -define(DEFAULT_MAX_PORTS, 1024 * 1024). +-define(LOG_THROTTLING_MSGS, [ + authorization_permission_denied, + cannot_publish_to_topic_due_to_not_authorized, + cannot_publish_to_topic_due_to_quota_exceeded, + connection_rejected_due_to_license_limit_reached, + dropped_msg_due_to_mqueue_is_full +]). + %% Callback to upgrade config after loaded from config file but before validation. upgrade_raw_conf(Raw0) -> Raw1 = emqx_connector_schema:transform_bridges_v1_to_connectors_and_bridges_v2(Raw0), @@ -909,7 +917,12 @@ fields("log") -> aliases => [file_handlers], importance => ?IMPORTANCE_HIGH } - )} + )}, + {throttling, + sc(?R_REF("log_throttling"), #{ + desc => ?DESC("log_throttling"), + importance => ?IMPORTANCE_MEDIUM + })} ]; fields("console_handler") -> log_handler_common_confs(console, #{}); @@ -1012,6 +1025,28 @@ fields("log_burst_limit") -> } )} ]; +fields("log_throttling") -> + [ + {time_window, + sc( + emqx_schema:duration_s(), + #{ + default => <<"1m">>, + desc => ?DESC("log_throttling_time_window"), + importance => ?IMPORTANCE_MEDIUM + } + )}, + %% A static list of msgs used in ?SLOG_THROTTLE/2,3 macro. + %% For internal (developer) use only. + {msgs, + sc( + hoconsc:array(hoconsc:enum(?LOG_THROTTLING_MSGS)), + #{ + default => ?LOG_THROTTLING_MSGS, + importance => ?IMPORTANCE_HIDDEN + } + )} + ]; fields("authorization") -> emqx_schema:authz_fields() ++ emqx_authz_schema:authz_fields(). @@ -1046,6 +1081,8 @@ desc("log_burst_limit") -> ?DESC("desc_log_burst_limit"); desc("authorization") -> ?DESC("desc_authorization"); +desc("log_throttling") -> + ?DESC("desc_log_throttling"); desc(_) -> undefined. diff --git a/apps/emqx_conf/test/emqx_conf_logger_SUITE.erl b/apps/emqx_conf/test/emqx_conf_logger_SUITE.erl index 2cb699036..b83d933fa 100644 --- a/apps/emqx_conf/test/emqx_conf_logger_SUITE.erl +++ b/apps/emqx_conf/test/emqx_conf_logger_SUITE.erl @@ -35,6 +35,10 @@ level = info path = \"log/emqx.log\" } + throttling { + msgs = [] + time_window = 1m + } } "). @@ -84,7 +88,9 @@ t_log_conf(_Conf) -> <<"time_offset">> => <<"system">> }, <<"file">> => - #{<<"default">> => FileExpect} + #{<<"default">> => FileExpect}, + <<"throttling">> => + #{<<"time_window">> => <<"1m">>, <<"msgs">> => []} }, ?assertEqual(ExpectLog1, emqx_conf:get_raw([<<"log">>])), UpdateLog0 = emqx_utils_maps:deep_remove([<<"file">>, <<"default">>], ExpectLog1), diff --git a/apps/emqx_enterprise/test/emqx_enterprise_schema_SUITE.erl b/apps/emqx_enterprise/test/emqx_enterprise_schema_SUITE.erl index bf1f358ea..ec9ae6c02 100644 --- a/apps/emqx_enterprise/test/emqx_enterprise_schema_SUITE.erl +++ b/apps/emqx_enterprise/test/emqx_enterprise_schema_SUITE.erl @@ -102,5 +102,8 @@ t_audit_log_conf(_Config) -> <<"time_offset">> => <<"system">> } }, - ?assertEqual(ExpectLog1, emqx_conf:get_raw([<<"log">>])), + %% The default value of throttling.msgs can be frequently updated, + %% remove it here, otherwise this test needs to be updated each time + %% a new throttle event is added. + ?assertEqual(ExpectLog1, maps:remove(<<"throttling">>, emqx_conf:get_raw([<<"log">>]))), ok. diff --git a/apps/emqx_license/src/emqx_license.erl b/apps/emqx_license/src/emqx_license.erl index c0fc10b91..fd80cd2c7 100644 --- a/apps/emqx_license/src/emqx_license.erl +++ b/apps/emqx_license/src/emqx_license.erl @@ -85,7 +85,10 @@ check(_ConnInfo, AckProps) -> {ok, #{max_connections := MaxClients}} -> case check_max_clients_exceeded(MaxClients) of true -> - ?SLOG(info, #{msg => "connection_rejected_due_to_license_limit_reached"}), + ?SLOG_THROTTLE( + error, + #{msg => connection_rejected_due_to_license_limit_reached} + ), {stop, {error, ?RC_QUOTA_EXCEEDED}}; false -> {ok, AckProps} diff --git a/changes/ce/feat-12520.en.md b/changes/ce/feat-12520.en.md new file mode 100644 index 000000000..593b66ec4 --- /dev/null +++ b/changes/ce/feat-12520.en.md @@ -0,0 +1,2 @@ +Implement log throttling. The feature reduces the number of potentially flooding logged events by +dropping all but the first event within a configured time window. diff --git a/rel/i18n/emqx_conf_schema.hocon b/rel/i18n/emqx_conf_schema.hocon index 32828b377..889bfafa5 100644 --- a/rel/i18n/emqx_conf_schema.hocon +++ b/rel/i18n/emqx_conf_schema.hocon @@ -475,6 +475,19 @@ log_burst_limit_window_time.desc: log_burst_limit_window_time.label: """Window Time""" +desc_log_throttling.label: +"""Log Throttling""" + +desc_log_throttling.desc: +"""Log throttling feature reduces the number of potentially flooding logged events by +dropping all but the first event within a configured time window.""" + +log_throttling_time_window.desc: +"""For throttled messages, only log 1 in each time window.""" + +log_throttling_time_window.label: +"""Log Throttling Time Window""" + cluster_dns_record_type.desc: """DNS record type."""