diff --git a/apps/emqx/include/logger.hrl b/apps/emqx/include/logger.hrl index f39c88441..c7e6b3fcd 100644 --- a/apps/emqx/include/logger.hrl +++ b/apps/emqx/include/logger.hrl @@ -47,7 +47,7 @@ ). -define(SLOG_THROTTLE(Level, Data, Meta), - case emqx_log_throttler:allow(Level, maps:get(msg, Data)) of + case emqx_log_throttler:allow(maps:get(msg, Data)) of true -> ?SLOG(Level, Data, Meta); false -> diff --git a/apps/emqx/src/emqx_log_throttler.erl b/apps/emqx/src/emqx_log_throttler.erl index ef29d5a79..3ebc268fa 100644 --- a/apps/emqx/src/emqx_log_throttler.erl +++ b/apps/emqx/src/emqx_log_throttler.erl @@ -25,7 +25,7 @@ -export([start_link/0]). %% throttler API --export([allow/2]). +-export([allow/1]). %% gen_server callbacks -export([ @@ -50,23 +50,13 @@ -define(MSGS_LIST, emqx:get_config([log, throttling, msgs], [])). -define(TIME_WINDOW_MS, timer:seconds(emqx:get_config([log, throttling, time_window], 60))). --spec allow(logger:level(), atom()) -> boolean(). -allow(debug, _Msg) -> - true; -allow(_Level, Msg) when is_atom(Msg) -> - Seq = persistent_term:get(?SEQ_ID(Msg), undefined), - case Seq of - undefined -> - %% This is either a race condition (emqx_log_throttler is not started yet) - %% or a developer mistake (msg used in ?SLOG_THROTTLE/2,3 macro is - %% not added to the default value of `log.throttling.msgs`. - ?SLOG(info, #{ - msg => "missing_log_throttle_sequence", - throttled_msg => Msg - }), +-spec allow(atom()) -> boolean(). +allow(Msg) when is_atom(Msg) -> + case emqx_logger:get_primary_log_level() of + debug -> true; - SeqRef -> - ?IS_ALLOWED(SeqRef) + _ -> + do_allow(Msg) end. -spec start_link() -> startlink_ret(). @@ -132,6 +122,21 @@ code_change(_OldVsn, State, _Extra) -> %% internal functions %%-------------------------------------------------------------------- +do_allow(Msg) -> + case persistent_term:get(?SEQ_ID(Msg), undefined) of + undefined -> + %% This is either a race condition (emqx_log_throttler is not started yet) + %% or a developer mistake (msg used in ?SLOG_THROTTLE/2,3 macro is + %% not added to the default value of `log.throttling.msgs`. + ?SLOG(info, #{ + msg => "missing_log_throttle_sequence", + throttled_msg => Msg + }), + true; + SeqRef -> + ?IS_ALLOWED(SeqRef) + end. + maybe_add_dropped(Msg, Dropped, DroppedAcc) when Dropped > 0 -> DroppedAcc#{Msg => Dropped}; maybe_add_dropped(_Msg, _Dropped, DroppedAcc) -> diff --git a/apps/emqx/test/emqx_log_throttler_SUITE.erl b/apps/emqx/test/emqx_log_throttler_SUITE.erl index 441ef2d95..8b3ac0207 100644 --- a/apps/emqx/test/emqx_log_throttler_SUITE.erl +++ b/apps/emqx/test/emqx_log_throttler_SUITE.erl @@ -64,6 +64,10 @@ init_per_testcase(t_throttle_add_new_msg, Config) -> [?THROTTLE_MSG] = Conf = emqx:get_config([log, throttling, msgs]), {ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_MSG1 | Conf], #{}), Config; +init_per_testcase(t_throttle_debug_primary_level, Config) -> + ok = snabbkaffe:start_trace(), + Level = emqx_logger:get_primary_log_level(), + [{prev_log_level, Level} | Config]; init_per_testcase(_TC, Config) -> ok = snabbkaffe:start_trace(), Config. @@ -76,6 +80,9 @@ end_per_testcase(t_update_time_window, _Config) -> ok = snabbkaffe:stop(), {ok, _} = emqx_conf:update([log, throttling, time_window], ?TIME_WINDOW, #{}), ok; +end_per_testcase(t_throttle_debug_primary_level, Config) -> + ok = snabbkaffe:stop(), + ok = emqx_logger:set_primary_log_level(?config(prev_log_level, Config)); end_per_testcase(_TC, _Config) -> ok = snabbkaffe:stop(). @@ -88,18 +95,14 @@ t_throttle(_Config) -> begin %% Warm-up and block to increase the probability that next events %% will be in the same throttling time window. - lists:foreach( - fun(_) -> emqx_log_throttler:allow(warning, ?THROTTLE_MSG) end, - lists:seq(1, 100) - ), - {ok, _} = ?block_until( - #{?snk_kind := log_throttler_dropped, throttled_msg := ?THROTTLE_MSG}, 5000 + {_, {ok, _}} = ?wait_async_action( + events(?THROTTLE_MSG), + #{?snk_kind := log_throttler_dropped, throttled_msg := ?THROTTLE_MSG}, + 5000 ), - ?assert(emqx_log_throttler:allow(warning, ?THROTTLE_MSG)), - ?assertNot(emqx_log_throttler:allow(warning, ?THROTTLE_MSG)), - %% Debug is always allowed - ?assert(emqx_log_throttler:allow(debug, ?THROTTLE_MSG)), + ?assert(emqx_log_throttler:allow(?THROTTLE_MSG)), + ?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG)), {ok, _} = ?block_until( #{ ?snk_kind := log_throttler_dropped, @@ -115,11 +118,11 @@ t_throttle(_Config) -> t_throttle_add_new_msg(_Config) -> ?check_trace( begin - ?block_until( + {ok, _} = ?block_until( #{?snk_kind := log_throttler_new_msg, throttled_msg := ?THROTTLE_MSG1}, 5000 ), - ?assert(emqx_log_throttler:allow(warning, ?THROTTLE_MSG1)), - ?assertNot(emqx_log_throttler:allow(warning, ?THROTTLE_MSG1)), + ?assert(emqx_log_throttler:allow(?THROTTLE_MSG1)), + ?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG1)), {ok, _} = ?block_until( #{ ?snk_kind := log_throttler_dropped, @@ -134,15 +137,15 @@ t_throttle_add_new_msg(_Config) -> t_throttle_no_msg(_Config) -> %% Must simply pass with no crashes - ?assert(emqx_log_throttler:allow(warning, no_test_throttle_msg)), - ?assert(emqx_log_throttler:allow(warning, no_test_throttle_msg)), + ?assert(emqx_log_throttler:allow(no_test_throttle_msg)), + ?assert(emqx_log_throttler:allow(no_test_throttle_msg)), timer:sleep(10), ?assert(erlang:is_process_alive(erlang:whereis(emqx_log_throttler))). t_update_time_window(_Config) -> ?check_trace( begin - ?wait_async_action( + {_, {ok, _}} = ?wait_async_action( emqx_conf:update([log, throttling, time_window], <<"2s">>, #{}), #{?snk_kind := log_throttler_sched_refresh, new_period_ms := 2000}, 5000 @@ -153,10 +156,42 @@ t_update_time_window(_Config) -> [] ). +t_throttle_debug_primary_level(_Config) -> + ?check_trace( + begin + ok = emqx_logger:set_primary_log_level(debug), + ?assert(lists:all(fun(Allow) -> Allow =:= true end, events(?THROTTLE_MSG))), + + ok = emqx_logger:set_primary_log_level(warning), + {_, {ok, _}} = ?wait_async_action( + events(?THROTTLE_MSG), + #{?snk_kind := log_throttler_dropped, throttled_msg := ?THROTTLE_MSG}, + 5000 + ), + ?assert(emqx_log_throttler:allow(?THROTTLE_MSG)), + ?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG)), + {ok, _} = ?block_until( + #{ + ?snk_kind := log_throttler_dropped, + throttled_msg := ?THROTTLE_MSG, + dropped_count := 1 + }, + 3000 + ) + end, + [] + ). + %%-------------------------------------------------------------------- %% internal functions %%-------------------------------------------------------------------- +events(Msg) -> + events(100, Msg). + +events(N, Msg) -> + [emqx_log_throttler:allow(Msg) || _ <- lists:seq(1, N)]. + module_exists(Mod) -> case erlang:module_loaded(Mod) of true -> diff --git a/rel/i18n/emqx_conf_schema.hocon b/rel/i18n/emqx_conf_schema.hocon index 889bfafa5..4477b6a54 100644 --- a/rel/i18n/emqx_conf_schema.hocon +++ b/rel/i18n/emqx_conf_schema.hocon @@ -480,7 +480,8 @@ desc_log_throttling.label: desc_log_throttling.desc: """Log throttling feature reduces the number of potentially flooding logged events by -dropping all but the first event within a configured time window.""" +dropping all but the first event within a configured time window. +The throttling is automatically disabled if `console` or `file` log level is set to debug.""" log_throttling_time_window.desc: """For throttled messages, only log 1 in each time window."""