From e08425e67d82e357adb0250df96e0999baae0f4e Mon Sep 17 00:00:00 2001 From: zmstone Date: Fri, 26 Jul 2024 15:16:07 +0200 Subject: [PATCH] refactor(log-throttler): remove unnecessary code there is no need to reset counters before erasing --- apps/emqx/src/emqx_log_throttler.erl | 50 +++++++++------------ apps/emqx/test/emqx_log_throttler_SUITE.erl | 11 +++-- 2 files changed, 28 insertions(+), 33 deletions(-) diff --git a/apps/emqx/src/emqx_log_throttler.erl b/apps/emqx/src/emqx_log_throttler.erl index 008bd1663..928580e2b 100644 --- a/apps/emqx/src/emqx_log_throttler.erl +++ b/apps/emqx/src/emqx_log_throttler.erl @@ -49,8 +49,15 @@ -define(MSGS_LIST, emqx:get_config([log, throttling, msgs], [])). -define(TIME_WINDOW_MS, timer:seconds(emqx:get_config([log, throttling, time_window], 60))). --spec allow(atom(), any()) -> boolean(). -allow(Msg, UniqueKey) when is_atom(Msg) -> +%% @doc Check if a throttled log message is allowed to pass down to the logger this time. +%% The Msg has to be an atom, and the second argument `UniqueKey' should be `undefined' +%% for predefined message IDs. +%% For relatively static resources created from configurations such as data integration +%% resource IDs `UniqueKey' should be of `binary()' type. +-spec allow(atom(), undefined | binary()) -> boolean(). +allow(Msg, UniqueKey) when + is_atom(Msg) andalso (is_binary(UniqueKey) orelse UniqueKey =:= undefined) +-> case emqx_logger:get_primary_log_level() of debug -> true; @@ -68,7 +75,7 @@ start_link() -> init([]) -> process_flag(trap_exit, true), - ok = lists:foreach(fun(Msg) -> new_throttler(Msg) end, ?MSGS_LIST), + ok = lists:foreach(fun new_throttler/1, ?MSGS_LIST), CurrentPeriodMs = ?TIME_WINDOW_MS, TimerRef = schedule_refresh(CurrentPeriodMs), {ok, #{timer_ref => TimerRef, current_period_ms => CurrentPeriodMs}}. @@ -86,15 +93,16 @@ handle_info(refresh, #{current_period_ms := PeriodMs} = State) -> DroppedStats = lists:foldl( fun(Msg, Acc) -> case ?GET_SEQ(Msg) of - %% Should not happen, unless the static ids list is updated at run-time. undefined -> + %% Should not happen, unless the static ids list is updated at run-time. new_throttler(Msg), ?tp(log_throttler_new_msg, #{throttled_msg => Msg}), Acc; SeqMap when is_map(SeqMap) -> maps:fold( fun(Key, Ref, Acc0) -> - drop_stats(Ref, emqx_utils:format("~ts:~s", [Msg, Key]), Acc0) + ID = iolist_to_binary([atom_to_binary(Msg), $:, Key]), + drop_stats(Ref, ID, Acc0) end, Acc, SeqMap @@ -124,27 +132,9 @@ drop_stats(SeqRef, Msg, Acc) -> maybe_add_dropped(Msg, Dropped, Acc). terminate(_Reason, _State) -> - lists:foreach( - fun(Msg) -> - case ?GET_SEQ(Msg) of - undefined -> - ok; - SeqMap when is_map(SeqMap) -> - maps:foreach( - fun(_, Ref) -> - ok = ?RESET_SEQ(Ref) - end, - SeqMap - ); - SeqRef -> - %% atomics don't have erase API... - %% (if nobody hold the ref, the atomics should erase automatically?) - ok = ?RESET_SEQ(SeqRef) - end, - ?ERASE_SEQ(Msg) - end, - ?MSGS_LIST - ), + %% atomics do not have delete/remove/release/deallocate API + %% after the reference is garbage-collected the resource is released + lists:foreach(fun(Msg) -> ?ERASE_SEQ(Msg) end, ?MSGS_LIST), ok. code_change(_OldVsn, State, _Extra) -> @@ -198,9 +188,9 @@ schedule_refresh(PeriodMs) -> erlang:send_after(PeriodMs, ?MODULE, refresh). new_throttler(unrecoverable_resource_error = Msg) -> - persistent_term:put(?SEQ_ID(Msg), #{}); + new_throttler(Msg, #{}); new_throttler(Msg) -> - persistent_term:put(?SEQ_ID(Msg), ?NEW_SEQ). + new_throttler(Msg, ?NEW_SEQ). -new_throttler(Msg, Map) -> - persistent_term:put(?SEQ_ID(Msg), Map). +new_throttler(Msg, AtomicOrEmptyMap) -> + persistent_term:put(?SEQ_ID(Msg), AtomicOrEmptyMap). diff --git a/apps/emqx/test/emqx_log_throttler_SUITE.erl b/apps/emqx/test/emqx_log_throttler_SUITE.erl index 23150a3b1..f95d62969 100644 --- a/apps/emqx/test/emqx_log_throttler_SUITE.erl +++ b/apps/emqx/test/emqx_log_throttler_SUITE.erl @@ -127,7 +127,7 @@ t_throttle(_Config) -> t_throttle_recoverable_msg(_Config) -> ResourceId = <<"resource_id">>, - ThrottledMsg = emqx_utils:format("~ts:~s", [?THROTTLE_UNRECOVERABLE_MSG, ResourceId]), + ThrottledMsg = iolist_to_binary([atom_to_list(?THROTTLE_UNRECOVERABLE_MSG), ":", ResourceId]), ?check_trace( begin %% Warm-up and block to increase the probability that next events @@ -181,10 +181,15 @@ t_throttle_add_new_msg(_Config) -> t_throttle_no_msg(_Config) -> %% Must simply pass with no crashes + Pid = erlang:whereis(emqx_log_throttler), ?assert(emqx_log_throttler:allow(no_test_throttle_msg, undefined)), ?assert(emqx_log_throttler:allow(no_test_throttle_msg, undefined)), - timer:sleep(10), - ?assert(erlang:is_process_alive(erlang:whereis(emqx_log_throttler))). + %% assert process is not restarted + ?assertEqual(Pid, erlang:whereis(emqx_log_throttler)), + %% make a gen_call to ensure the process is alive + %% note: this call result in an 'unexpected_call' error log. + ?assertEqual(ignored, gen_server:call(Pid, probe)), + ok. t_update_time_window(_Config) -> ?check_trace(