Merge pull request #12560 from SergeTupchiy/EMQX-11530-log-throttling-followup-fixes

fix: disable log throttling if primary log level is debug
This commit is contained in:
SergeTupchiy 2024-02-21 22:02:32 +02:00 committed by GitHub
commit 8e47503f7d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 76 additions and 35 deletions

View File

@ -47,7 +47,7 @@
).
-define(SLOG_THROTTLE(Level, Data, Meta),
case emqx_log_throttler:allow(Level, maps:get(msg, Data)) of
case emqx_log_throttler:allow(maps:get(msg, Data)) of
true ->
?SLOG(Level, Data, Meta);
false ->

View File

@ -25,7 +25,7 @@
-export([start_link/0]).
%% throttler API
-export([allow/2]).
-export([allow/1]).
%% gen_server callbacks
-export([
@ -50,23 +50,13 @@
-define(MSGS_LIST, emqx:get_config([log, throttling, msgs], [])).
-define(TIME_WINDOW_MS, timer:seconds(emqx:get_config([log, throttling, time_window], 60))).
-spec allow(logger:level(), atom()) -> boolean().
allow(debug, _Msg) ->
true;
allow(_Level, Msg) when is_atom(Msg) ->
Seq = persistent_term:get(?SEQ_ID(Msg), undefined),
case Seq of
undefined ->
%% This is either a race condition (emqx_log_throttler is not started yet)
%% or a developer mistake (msg used in ?SLOG_THROTTLE/2,3 macro is
%% not added to the default value of `log.throttling.msgs`.
?SLOG(info, #{
msg => "missing_log_throttle_sequence",
throttled_msg => Msg
}),
-spec allow(atom()) -> boolean().
allow(Msg) when is_atom(Msg) ->
case emqx_logger:get_primary_log_level() of
debug ->
true;
SeqRef ->
?IS_ALLOWED(SeqRef)
_ ->
do_allow(Msg)
end.
-spec start_link() -> startlink_ret().
@ -132,6 +122,21 @@ code_change(_OldVsn, State, _Extra) ->
%% internal functions
%%--------------------------------------------------------------------
do_allow(Msg) ->
case persistent_term:get(?SEQ_ID(Msg), undefined) of
undefined ->
%% This is either a race condition (emqx_log_throttler is not started yet)
%% or a developer mistake (msg used in ?SLOG_THROTTLE/2,3 macro is
%% not added to the default value of `log.throttling.msgs`.
?SLOG(info, #{
msg => "missing_log_throttle_sequence",
throttled_msg => Msg
}),
true;
SeqRef ->
?IS_ALLOWED(SeqRef)
end.
maybe_add_dropped(Msg, Dropped, DroppedAcc) when Dropped > 0 ->
DroppedAcc#{Msg => Dropped};
maybe_add_dropped(_Msg, _Dropped, DroppedAcc) ->

View File

@ -64,6 +64,10 @@ init_per_testcase(t_throttle_add_new_msg, Config) ->
[?THROTTLE_MSG] = Conf = emqx:get_config([log, throttling, msgs]),
{ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_MSG1 | Conf], #{}),
Config;
init_per_testcase(t_throttle_debug_primary_level, Config) ->
ok = snabbkaffe:start_trace(),
Level = emqx_logger:get_primary_log_level(),
[{prev_log_level, Level} | Config];
init_per_testcase(_TC, Config) ->
ok = snabbkaffe:start_trace(),
Config.
@ -76,6 +80,9 @@ end_per_testcase(t_update_time_window, _Config) ->
ok = snabbkaffe:stop(),
{ok, _} = emqx_conf:update([log, throttling, time_window], ?TIME_WINDOW, #{}),
ok;
end_per_testcase(t_throttle_debug_primary_level, Config) ->
ok = snabbkaffe:stop(),
ok = emqx_logger:set_primary_log_level(?config(prev_log_level, Config));
end_per_testcase(_TC, _Config) ->
ok = snabbkaffe:stop().
@ -88,18 +95,14 @@ t_throttle(_Config) ->
begin
%% Warm-up and block to increase the probability that next events
%% will be in the same throttling time window.
lists:foreach(
fun(_) -> emqx_log_throttler:allow(warning, ?THROTTLE_MSG) end,
lists:seq(1, 100)
),
{ok, _} = ?block_until(
#{?snk_kind := log_throttler_dropped, throttled_msg := ?THROTTLE_MSG}, 5000
{_, {ok, _}} = ?wait_async_action(
events(?THROTTLE_MSG),
#{?snk_kind := log_throttler_dropped, throttled_msg := ?THROTTLE_MSG},
5000
),
?assert(emqx_log_throttler:allow(warning, ?THROTTLE_MSG)),
?assertNot(emqx_log_throttler:allow(warning, ?THROTTLE_MSG)),
%% Debug is always allowed
?assert(emqx_log_throttler:allow(debug, ?THROTTLE_MSG)),
?assert(emqx_log_throttler:allow(?THROTTLE_MSG)),
?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG)),
{ok, _} = ?block_until(
#{
?snk_kind := log_throttler_dropped,
@ -115,11 +118,11 @@ t_throttle(_Config) ->
t_throttle_add_new_msg(_Config) ->
?check_trace(
begin
?block_until(
{ok, _} = ?block_until(
#{?snk_kind := log_throttler_new_msg, throttled_msg := ?THROTTLE_MSG1}, 5000
),
?assert(emqx_log_throttler:allow(warning, ?THROTTLE_MSG1)),
?assertNot(emqx_log_throttler:allow(warning, ?THROTTLE_MSG1)),
?assert(emqx_log_throttler:allow(?THROTTLE_MSG1)),
?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG1)),
{ok, _} = ?block_until(
#{
?snk_kind := log_throttler_dropped,
@ -134,15 +137,15 @@ t_throttle_add_new_msg(_Config) ->
t_throttle_no_msg(_Config) ->
%% Must simply pass with no crashes
?assert(emqx_log_throttler:allow(warning, no_test_throttle_msg)),
?assert(emqx_log_throttler:allow(warning, no_test_throttle_msg)),
?assert(emqx_log_throttler:allow(no_test_throttle_msg)),
?assert(emqx_log_throttler:allow(no_test_throttle_msg)),
timer:sleep(10),
?assert(erlang:is_process_alive(erlang:whereis(emqx_log_throttler))).
t_update_time_window(_Config) ->
?check_trace(
begin
?wait_async_action(
{_, {ok, _}} = ?wait_async_action(
emqx_conf:update([log, throttling, time_window], <<"2s">>, #{}),
#{?snk_kind := log_throttler_sched_refresh, new_period_ms := 2000},
5000
@ -153,10 +156,42 @@ t_update_time_window(_Config) ->
[]
).
t_throttle_debug_primary_level(_Config) ->
?check_trace(
begin
ok = emqx_logger:set_primary_log_level(debug),
?assert(lists:all(fun(Allow) -> Allow =:= true end, events(?THROTTLE_MSG))),
ok = emqx_logger:set_primary_log_level(warning),
{_, {ok, _}} = ?wait_async_action(
events(?THROTTLE_MSG),
#{?snk_kind := log_throttler_dropped, throttled_msg := ?THROTTLE_MSG},
5000
),
?assert(emqx_log_throttler:allow(?THROTTLE_MSG)),
?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG)),
{ok, _} = ?block_until(
#{
?snk_kind := log_throttler_dropped,
throttled_msg := ?THROTTLE_MSG,
dropped_count := 1
},
3000
)
end,
[]
).
%%--------------------------------------------------------------------
%% internal functions
%%--------------------------------------------------------------------
events(Msg) ->
events(100, Msg).
events(N, Msg) ->
[emqx_log_throttler:allow(Msg) || _ <- lists:seq(1, N)].
module_exists(Mod) ->
case erlang:module_loaded(Mod) of
true ->

View File

@ -480,7 +480,8 @@ desc_log_throttling.label:
desc_log_throttling.desc:
"""Log throttling feature reduces the number of potentially flooding logged events by
dropping all but the first event within a configured time window."""
dropping all but the first event within a configured time window.
The throttling is automatically disabled if `console` or `file` log level is set to debug."""
log_throttling_time_window.desc:
"""For throttled messages, only log 1 in each time window."""