From 98ba300f7c78dd3ae8954300cdb5a3e7ba60ea01 Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Thu, 15 Feb 2024 17:34:54 +0200 Subject: [PATCH] feat: implement log throttling --- apps/emqx/include/logger.hrl | 15 ++ apps/emqx/src/emqx_kernel_sup.erl | 3 +- apps/emqx/src/emqx_log_throttler.erl | 146 +++++++++++++++++ apps/emqx/test/emqx_log_throttler_SUITE.erl | 150 ++++++++++++++++++ apps/emqx_conf/src/emqx_conf_schema.erl | 31 +++- .../emqx_conf/test/emqx_conf_logger_SUITE.erl | 8 +- .../test/emqx_enterprise_schema_SUITE.erl | 5 +- changes/ce/feat-12520.en.md | 2 + rel/i18n/emqx_conf_schema.hocon | 13 ++ 9 files changed, 369 insertions(+), 4 deletions(-) create mode 100644 apps/emqx/src/emqx_log_throttler.erl create mode 100644 apps/emqx/test/emqx_log_throttler_SUITE.erl create mode 100644 changes/ce/feat-12520.en.md diff --git a/apps/emqx/include/logger.hrl b/apps/emqx/include/logger.hrl index a40f9dc9c..227af26b3 100644 --- a/apps/emqx/include/logger.hrl +++ b/apps/emqx/include/logger.hrl @@ -40,6 +40,21 @@ end ). +%% NOTE: do not forget to add every used msg to the default value of +%% `log.thorttling.msgs` list. +-define(SLOG_THROTTLE(Level, Data), + ?SLOG_THROTTLE(Level, Data, #{}) +). + +-define(SLOG_THROTTLE(Level, Data, Meta), + case emqx_log_throttler:allow(Level, maps:get(msg, Data)) of + true -> + ?SLOG(Level, Data, Meta); + false -> + ok + end +). + -define(AUDIT_HANDLER, emqx_audit). -define(TRACE_FILTER, emqx_trace_filter). -define(OWN_KEYS, [level, filters, filter_default, handlers]). diff --git a/apps/emqx/src/emqx_kernel_sup.erl b/apps/emqx/src/emqx_kernel_sup.erl index 85724b9b4..5f1bd6ad1 100644 --- a/apps/emqx/src/emqx_kernel_sup.erl +++ b/apps/emqx/src/emqx_kernel_sup.erl @@ -40,7 +40,8 @@ init([]) -> child_spec(emqx_authn_authz_metrics_sup, supervisor), child_spec(emqx_ocsp_cache, worker), child_spec(emqx_crl_cache, worker), - child_spec(emqx_tls_lib_sup, supervisor) + child_spec(emqx_tls_lib_sup, supervisor), + child_spec(emqx_log_throttler, worker) ] }}. diff --git a/apps/emqx/src/emqx_log_throttler.erl b/apps/emqx/src/emqx_log_throttler.erl new file mode 100644 index 000000000..93666da1b --- /dev/null +++ b/apps/emqx/src/emqx_log_throttler.erl @@ -0,0 +1,146 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_log_throttler). + +-behaviour(gen_server). + +-include("logger.hrl"). +-include("types.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-export([start_link/0]). + +%% throttler API +-export([allow/2]). + +%% gen_server callbacks +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 +]). + +-define(SEQ_ID(Msg), {?MODULE, Msg}). +-define(NEW_SEQ, atomics:new(1, [{signed, false}])). +-define(GET_SEQ(Msg), persistent_term:get(?SEQ_ID(Msg), undefined)). +-define(RESET_SEQ(SeqRef), atomics:put(SeqRef, 1, 0)). +-define(INC_SEQ(SeqRef), atomics:add(SeqRef, 1, 1)). +-define(GET_DROPPED(SeqRef), atomics:get(SeqRef, 1) - 1). +-define(IS_ALLOWED(SeqRef), atomics:add_get(SeqRef, 1, 1) =:= 1). + +-define(NEW_THROTTLE(Msg, SeqRef), persistent_term:put(?SEQ_ID(Msg), SeqRef)). + +-define(MSGS_LIST, emqx:get_config([log, throttling, msgs], [])). +-define(TIME_WINDOW_MS, timer:seconds(emqx:get_config([log, throttling, time_window], 60))). + +-spec allow(logger:level(), string()) -> boolean(). +allow(debug, _Msg) -> + true; +allow(_Level, Msg) -> + Seq = persistent_term:get(?SEQ_ID(Msg), undefined), + case Seq of + undefined -> + %% This is either a race condition (emqx_log_throttler is not started yet) + %% or a developer mistake (msg used in ?SLOG_THROTTLE/2,3 macro is + %% not added to the default value of `log.throttling.msgs`. + ?SLOG(info, #{ + msg => "missing_log_throttle_sequence", + throttled_msg => Msg + }), + true; + SeqRef -> + ?IS_ALLOWED(SeqRef) + end. + +-spec start_link() -> startlink_ret(). +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +%%-------------------------------------------------------------------- +%% gen_server callbacks +%%-------------------------------------------------------------------- + +init([]) -> + ok = lists:foreach(fun(Msg) -> ?NEW_THROTTLE(Msg, ?NEW_SEQ) end, ?MSGS_LIST), + TimerRef = schedule_refresh(?TIME_WINDOW_MS), + {ok, #{timer_ref => TimerRef}}. + +handle_call(Req, _From, State) -> + ?SLOG(error, #{msg => "unexpected_call", call => Req}), + {reply, ignored, State}. + +handle_cast(Msg, State) -> + ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), + {noreply, State}. + +handle_info(refresh, State) -> + PeriodMs = ?TIME_WINDOW_MS, + Msgs = ?MSGS_LIST, + DroppedStats = lists:foldl( + fun(Msg, Acc) -> + case ?GET_SEQ(Msg) of + %% Should not happen, unless the static ids list is updated at run-time. + undefined -> + ?NEW_THROTTLE(Msg, ?NEW_SEQ), + ?tp(log_throttler_new_msg, #{throttled_msg => Msg}), + Acc; + SeqRef -> + Dropped = ?GET_DROPPED(SeqRef), + ok = ?RESET_SEQ(SeqRef), + ?tp(log_throttler_dropped, #{dropped_count => Dropped, throttled_msg => Msg}), + maybe_add_dropped(Msg, Dropped, Acc) + end + end, + #{}, + Msgs + ), + maybe_log_dropped(DroppedStats, PeriodMs), + State1 = State#{timer_ref => schedule_refresh(PeriodMs)}, + {noreply, State1}; +handle_info(Info, State) -> + ?SLOG(error, #{msg => "unxpected_info", info => Info}), + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% internal functions +%%-------------------------------------------------------------------- + +maybe_add_dropped(Msg, Dropped, DroppedAcc) when Dropped > 0 -> + DroppedAcc#{Msg => Dropped}; +maybe_add_dropped(_Msg, _Dropped, DroppedAcc) -> + DroppedAcc. + +maybe_log_dropped(DroppedStats, PeriodMs) when map_size(DroppedStats) > 0 -> + ?SLOG(warning, #{ + msg => "log_events_throttled_during_last_period", + dropped => DroppedStats, + period => emqx_utils_calendar:human_readable_duration_string(PeriodMs) + }); +maybe_log_dropped(_DroppedStats, _PeriodMs) -> + ok. + +schedule_refresh(PeriodMs) -> + erlang:send_after(PeriodMs, ?MODULE, refresh). diff --git a/apps/emqx/test/emqx_log_throttler_SUITE.erl b/apps/emqx/test/emqx_log_throttler_SUITE.erl new file mode 100644 index 000000000..c5689ea24 --- /dev/null +++ b/apps/emqx/test/emqx_log_throttler_SUITE.erl @@ -0,0 +1,150 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_log_throttler_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-define(THROTTLE_MSG, "test_throttle_msg"). +-define(THROTTLE_MSG1, "test_throttle_msg1"). + +all() -> emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + %% This test suite can't be run in standalone tests (without emqx_conf) + case module_exists(emqx_conf) of + true -> + Apps = emqx_cth_suite:start( + [ + {emqx_conf, #{ + config => + #{ + log => #{ + throttling => #{ + time_window => <<"1s">>, msgs => [?THROTTLE_MSG] + } + } + } + }}, + emqx + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + [{suite_apps, Apps} | Config]; + false -> + {skip, standalone_not_supported} + end. + +end_per_suite(Config) -> + emqx_cth_suite:stop(?config(suite_apps, Config)), + emqx_config:delete_override_conf_files(). + +init_per_testcase(t_throttle_add_new_msg, Config) -> + ok = snabbkaffe:start_trace(), + [?THROTTLE_MSG] = Conf = emqx:get_config([log, throttling, msgs]), + {ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_MSG1 | Conf], #{}), + Config; +init_per_testcase(_TC, Config) -> + ok = snabbkaffe:start_trace(), + Config. + +end_per_testcase(t_throttle_add_new_msg, _Config) -> + ok = snabbkaffe:stop(), + {ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_MSG], #{}), + ok; +end_per_testcase(_TC, _Config) -> + ok = snabbkaffe:stop(). + +%%-------------------------------------------------------------------- +%% Test cases +%%-------------------------------------------------------------------- + +t_throttle(_Config) -> + ?check_trace( + begin + %% Warm-up and block to increase the probability that next events + %% will be in the same throttling time window. + lists:foreach( + fun(_) -> emqx_log_throttler:allow(warning, ?THROTTLE_MSG) end, + lists:seq(1, 100) + ), + {ok, _} = ?block_until( + #{?snk_kind := log_throttler_dropped, throttled_msg := ?THROTTLE_MSG}, 3000 + ), + + ?assert(emqx_log_throttler:allow(warning, ?THROTTLE_MSG)), + ?assertNot(emqx_log_throttler:allow(warning, ?THROTTLE_MSG)), + %% Debug is always allowed + ?assert(emqx_log_throttler:allow(debug, ?THROTTLE_MSG)), + {ok, _} = ?block_until( + #{ + ?snk_kind := log_throttler_dropped, + throttled_msg := ?THROTTLE_MSG, + dropped_count := 1 + }, + 3000 + ) + end, + [] + ). + +t_throttle_add_new_msg(_Config) -> + ?check_trace( + begin + ?block_until( + #{?snk_kind := log_throttler_new_msg, throttled_msg := ?THROTTLE_MSG1}, 3000 + ), + ?assert(emqx_log_throttler:allow(warning, ?THROTTLE_MSG1)), + ?assertNot(emqx_log_throttler:allow(warning, ?THROTTLE_MSG1)), + {ok, _} = ?block_until( + #{ + ?snk_kind := log_throttler_dropped, + throttled_msg := ?THROTTLE_MSG1, + dropped_count := 1 + }, + 3000 + ) + end, + [] + ). + +t_throttle_no_msg(_Config) -> + %% Must simply pass with no crashes + ?assert(emqx_log_throttler:allow(warning, "no_test_throttle_msg")), + ?assert(emqx_log_throttler:allow(warning, "no_test_throttle_msg")), + timer:sleep(10), + ?assert(erlang:is_process_alive(erlang:whereis(emqx_log_throttler))). + +%%-------------------------------------------------------------------- +%% internal functions +%%-------------------------------------------------------------------- + +module_exists(Mod) -> + case erlang:module_loaded(Mod) of + true -> + true; + false -> + case code:ensure_loaded(Mod) of + ok -> true; + {module, Mod} -> true; + _ -> false + end + end. diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl index 914470ba4..f0b0b4a12 100644 --- a/apps/emqx_conf/src/emqx_conf_schema.erl +++ b/apps/emqx_conf/src/emqx_conf_schema.erl @@ -909,7 +909,12 @@ fields("log") -> aliases => [file_handlers], importance => ?IMPORTANCE_HIGH } - )} + )}, + {"throttling", + sc(?R_REF("log_throttling"), #{ + desc => ?DESC("log_throttling"), + importance => ?IMPORTANCE_MEDIUM + })} ]; fields("console_handler") -> log_handler_common_confs(console, #{}); @@ -1012,6 +1017,28 @@ fields("log_burst_limit") -> } )} ]; +fields("log_throttling") -> + [ + {"window_time", + sc( + emqx_schema:duration_s(), + #{ + default => <<"1m">>, + desc => ?DESC("log_throttling_window_time"), + importance => ?IMPORTANCE_MEDIUM + } + )}, + %% A static list of event ids used in ?SLOG_THROTTLE/3,4 macro. + %% For internal (developer) use only. + {"event_ids", + sc( + hoconsc:array(atom()), + #{ + default => [], + importance => ?IMPORTANCE_HIDDEN + } + )} + ]; fields("authorization") -> emqx_schema:authz_fields() ++ emqx_authz_schema:authz_fields(). @@ -1046,6 +1073,8 @@ desc("log_burst_limit") -> ?DESC("desc_log_burst_limit"); desc("authorization") -> ?DESC("desc_authorization"); +desc("log_throttling") -> + ?DESC("desc_log_throttling"); desc(_) -> undefined. diff --git a/apps/emqx_conf/test/emqx_conf_logger_SUITE.erl b/apps/emqx_conf/test/emqx_conf_logger_SUITE.erl index 2cb699036..b83d933fa 100644 --- a/apps/emqx_conf/test/emqx_conf_logger_SUITE.erl +++ b/apps/emqx_conf/test/emqx_conf_logger_SUITE.erl @@ -35,6 +35,10 @@ level = info path = \"log/emqx.log\" } + throttling { + msgs = [] + time_window = 1m + } } "). @@ -84,7 +88,9 @@ t_log_conf(_Conf) -> <<"time_offset">> => <<"system">> }, <<"file">> => - #{<<"default">> => FileExpect} + #{<<"default">> => FileExpect}, + <<"throttling">> => + #{<<"time_window">> => <<"1m">>, <<"msgs">> => []} }, ?assertEqual(ExpectLog1, emqx_conf:get_raw([<<"log">>])), UpdateLog0 = emqx_utils_maps:deep_remove([<<"file">>, <<"default">>], ExpectLog1), diff --git a/apps/emqx_enterprise/test/emqx_enterprise_schema_SUITE.erl b/apps/emqx_enterprise/test/emqx_enterprise_schema_SUITE.erl index bf1f358ea..ec9ae6c02 100644 --- a/apps/emqx_enterprise/test/emqx_enterprise_schema_SUITE.erl +++ b/apps/emqx_enterprise/test/emqx_enterprise_schema_SUITE.erl @@ -102,5 +102,8 @@ t_audit_log_conf(_Config) -> <<"time_offset">> => <<"system">> } }, - ?assertEqual(ExpectLog1, emqx_conf:get_raw([<<"log">>])), + %% The default value of throttling.msgs can be frequently updated, + %% remove it here, otherwise this test needs to be updated each time + %% a new throttle event is added. + ?assertEqual(ExpectLog1, maps:remove(<<"throttling">>, emqx_conf:get_raw([<<"log">>]))), ok. diff --git a/changes/ce/feat-12520.en.md b/changes/ce/feat-12520.en.md new file mode 100644 index 000000000..593b66ec4 --- /dev/null +++ b/changes/ce/feat-12520.en.md @@ -0,0 +1,2 @@ +Implement log throttling. The feature reduces the number of potentially flooding logged events by +dropping all but the first event within a configured time window. diff --git a/rel/i18n/emqx_conf_schema.hocon b/rel/i18n/emqx_conf_schema.hocon index 32828b377..ff975a7c2 100644 --- a/rel/i18n/emqx_conf_schema.hocon +++ b/rel/i18n/emqx_conf_schema.hocon @@ -475,6 +475,19 @@ log_burst_limit_window_time.desc: log_burst_limit_window_time.label: """Window Time""" +desc_log_throttling.label: +"""Log Throttling""" + +desc_log_throttling.desc: +"""Log throttling feature reduces the number of potentially flooding logged events by +dropping all but the first event within a configured time window.""" + +log_throttling_window_time.desc: +"""A time interval at which log throttling is applied. Defaults to 1 minute.""" + +log_throttling_window_time.label: +"""Log Throttling Window Time""" + cluster_dns_record_type.desc: """DNS record type."""