diff --git a/apps/emqx/include/logger.hrl b/apps/emqx/include/logger.hrl index 31fe0e36a..a7455418d 100644 --- a/apps/emqx/include/logger.hrl +++ b/apps/emqx/include/logger.hrl @@ -38,16 +38,20 @@ ). %% NOTE: do not forget to use atom for msg and add every used msg to -%% the default value of `log.thorttling.msgs` list. +%% the default value of `log.throttling.msgs` list. -define(SLOG_THROTTLE(Level, Data), ?SLOG_THROTTLE(Level, Data, #{}) ). -define(SLOG_THROTTLE(Level, Data, Meta), + ?SLOG_THROTTLE(Level, undefined, Data, Meta) +). + +-define(SLOG_THROTTLE(Level, UniqueKey, Data, Meta), case logger:allow(Level, ?MODULE) of true -> (fun(#{msg := __Msg} = __Data) -> - case emqx_log_throttler:allow(__Msg) of + case emqx_log_throttler:allow(__Msg, UniqueKey) of true -> logger:log(Level, __Data, Meta); false -> diff --git a/apps/emqx/src/emqx_log_throttler.erl b/apps/emqx/src/emqx_log_throttler.erl index 3ebc268fa..008bd1663 100644 --- a/apps/emqx/src/emqx_log_throttler.erl +++ b/apps/emqx/src/emqx_log_throttler.erl @@ -25,7 +25,7 @@ -export([start_link/0]). %% throttler API --export([allow/1]). +-export([allow/2]). %% gen_server callbacks -export([ @@ -40,23 +40,22 @@ -define(SEQ_ID(Msg), {?MODULE, Msg}). -define(NEW_SEQ, atomics:new(1, [{signed, false}])). -define(GET_SEQ(Msg), persistent_term:get(?SEQ_ID(Msg), undefined)). +-define(ERASE_SEQ(Msg), persistent_term:erase(?SEQ_ID(Msg))). -define(RESET_SEQ(SeqRef), atomics:put(SeqRef, 1, 0)). -define(INC_SEQ(SeqRef), atomics:add(SeqRef, 1, 1)). -define(GET_DROPPED(SeqRef), atomics:get(SeqRef, 1) - 1). -define(IS_ALLOWED(SeqRef), atomics:add_get(SeqRef, 1, 1) =:= 1). --define(NEW_THROTTLE(Msg, SeqRef), persistent_term:put(?SEQ_ID(Msg), SeqRef)). - -define(MSGS_LIST, emqx:get_config([log, throttling, msgs], [])). -define(TIME_WINDOW_MS, timer:seconds(emqx:get_config([log, throttling, time_window], 60))). --spec allow(atom()) -> boolean(). -allow(Msg) when is_atom(Msg) -> +-spec allow(atom(), any()) -> boolean(). +allow(Msg, UniqueKey) when is_atom(Msg) -> case emqx_logger:get_primary_log_level() of debug -> true; _ -> - do_allow(Msg) + do_allow(Msg, UniqueKey) end. -spec start_link() -> startlink_ret(). @@ -68,7 +67,8 @@ start_link() -> %%-------------------------------------------------------------------- init([]) -> - ok = lists:foreach(fun(Msg) -> ?NEW_THROTTLE(Msg, ?NEW_SEQ) end, ?MSGS_LIST), + process_flag(trap_exit, true), + ok = lists:foreach(fun(Msg) -> new_throttler(Msg) end, ?MSGS_LIST), CurrentPeriodMs = ?TIME_WINDOW_MS, TimerRef = schedule_refresh(CurrentPeriodMs), {ok, #{timer_ref => TimerRef, current_period_ms => CurrentPeriodMs}}. @@ -88,14 +88,19 @@ handle_info(refresh, #{current_period_ms := PeriodMs} = State) -> case ?GET_SEQ(Msg) of %% Should not happen, unless the static ids list is updated at run-time. undefined -> - ?NEW_THROTTLE(Msg, ?NEW_SEQ), + new_throttler(Msg), ?tp(log_throttler_new_msg, #{throttled_msg => Msg}), Acc; + SeqMap when is_map(SeqMap) -> + maps:fold( + fun(Key, Ref, Acc0) -> + drop_stats(Ref, emqx_utils:format("~ts:~s", [Msg, Key]), Acc0) + end, + Acc, + SeqMap + ); SeqRef -> - Dropped = ?GET_DROPPED(SeqRef), - ok = ?RESET_SEQ(SeqRef), - ?tp(log_throttler_dropped, #{dropped_count => Dropped, throttled_msg => Msg}), - maybe_add_dropped(Msg, Dropped, Acc) + drop_stats(SeqRef, Msg, Acc) end end, #{}, @@ -112,7 +117,34 @@ handle_info(Info, State) -> ?SLOG(error, #{msg => "unxpected_info", info => Info}), {noreply, State}. +drop_stats(SeqRef, Msg, Acc) -> + Dropped = ?GET_DROPPED(SeqRef), + ok = ?RESET_SEQ(SeqRef), + ?tp(log_throttler_dropped, #{dropped_count => Dropped, throttled_msg => Msg}), + maybe_add_dropped(Msg, Dropped, Acc). + terminate(_Reason, _State) -> + lists:foreach( + fun(Msg) -> + case ?GET_SEQ(Msg) of + undefined -> + ok; + SeqMap when is_map(SeqMap) -> + maps:foreach( + fun(_, Ref) -> + ok = ?RESET_SEQ(Ref) + end, + SeqMap + ); + SeqRef -> + %% atomics don't have erase API... + %% (if nobody hold the ref, the atomics should erase automatically?) + ok = ?RESET_SEQ(SeqRef) + end, + ?ERASE_SEQ(Msg) + end, + ?MSGS_LIST + ), ok. code_change(_OldVsn, State, _Extra) -> @@ -122,17 +154,27 @@ code_change(_OldVsn, State, _Extra) -> %% internal functions %%-------------------------------------------------------------------- -do_allow(Msg) -> +do_allow(Msg, UniqueKey) -> case persistent_term:get(?SEQ_ID(Msg), undefined) of undefined -> %% This is either a race condition (emqx_log_throttler is not started yet) %% or a developer mistake (msg used in ?SLOG_THROTTLE/2,3 macro is %% not added to the default value of `log.throttling.msgs`. - ?SLOG(info, #{ - msg => "missing_log_throttle_sequence", + ?SLOG(debug, #{ + msg => "log_throttle_disabled", throttled_msg => Msg }), true; + %% e.g: unrecoverable msg throttle according resource_id + SeqMap when is_map(SeqMap) -> + case maps:find(UniqueKey, SeqMap) of + {ok, SeqRef} -> + ?IS_ALLOWED(SeqRef); + error -> + SeqRef = ?NEW_SEQ, + new_throttler(Msg, SeqMap#{UniqueKey => SeqRef}), + true + end; SeqRef -> ?IS_ALLOWED(SeqRef) end. @@ -154,3 +196,11 @@ maybe_log_dropped(_DroppedStats, _PeriodMs) -> schedule_refresh(PeriodMs) -> ?tp(log_throttler_sched_refresh, #{new_period_ms => PeriodMs}), erlang:send_after(PeriodMs, ?MODULE, refresh). + +new_throttler(unrecoverable_resource_error = Msg) -> + persistent_term:put(?SEQ_ID(Msg), #{}); +new_throttler(Msg) -> + persistent_term:put(?SEQ_ID(Msg), ?NEW_SEQ). + +new_throttler(Msg, Map) -> + persistent_term:put(?SEQ_ID(Msg), Map). diff --git a/apps/emqx/test/emqx_log_throttler_SUITE.erl b/apps/emqx/test/emqx_log_throttler_SUITE.erl index 8b3ac0207..23150a3b1 100644 --- a/apps/emqx/test/emqx_log_throttler_SUITE.erl +++ b/apps/emqx/test/emqx_log_throttler_SUITE.erl @@ -26,6 +26,7 @@ %% Have to use real msgs, as the schema is guarded by enum. -define(THROTTLE_MSG, authorization_permission_denied). -define(THROTTLE_MSG1, cannot_publish_to_topic_due_to_not_authorized). +-define(THROTTLE_UNRECOVERABLE_MSG, unrecoverable_resource_error). -define(TIME_WINDOW, <<"1s">>). all() -> emqx_common_test_helpers:all(?MODULE). @@ -59,6 +60,11 @@ end_per_suite(Config) -> emqx_cth_suite:stop(?config(suite_apps, Config)), emqx_config:delete_override_conf_files(). +init_per_testcase(t_throttle_recoverable_msg, Config) -> + ok = snabbkaffe:start_trace(), + [?THROTTLE_MSG] = Conf = emqx:get_config([log, throttling, msgs]), + {ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_UNRECOVERABLE_MSG | Conf], #{}), + Config; init_per_testcase(t_throttle_add_new_msg, Config) -> ok = snabbkaffe:start_trace(), [?THROTTLE_MSG] = Conf = emqx:get_config([log, throttling, msgs]), @@ -72,6 +78,10 @@ init_per_testcase(_TC, Config) -> ok = snabbkaffe:start_trace(), Config. +end_per_testcase(t_throttle_recoverable_msg, _Config) -> + ok = snabbkaffe:stop(), + {ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_MSG], #{}), + ok; end_per_testcase(t_throttle_add_new_msg, _Config) -> ok = snabbkaffe:stop(), {ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_MSG], #{}), @@ -101,8 +111,8 @@ t_throttle(_Config) -> 5000 ), - ?assert(emqx_log_throttler:allow(?THROTTLE_MSG)), - ?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG)), + ?assert(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)), + ?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)), {ok, _} = ?block_until( #{ ?snk_kind := log_throttler_dropped, @@ -115,14 +125,48 @@ t_throttle(_Config) -> [] ). +t_throttle_recoverable_msg(_Config) -> + ResourceId = <<"resource_id">>, + ThrottledMsg = emqx_utils:format("~ts:~s", [?THROTTLE_UNRECOVERABLE_MSG, ResourceId]), + ?check_trace( + begin + %% Warm-up and block to increase the probability that next events + %% will be in the same throttling time window. + {ok, _} = ?block_until( + #{?snk_kind := log_throttler_new_msg, throttled_msg := ?THROTTLE_UNRECOVERABLE_MSG}, + 5000 + ), + {_, {ok, _}} = ?wait_async_action( + events(?THROTTLE_UNRECOVERABLE_MSG, ResourceId), + #{ + ?snk_kind := log_throttler_dropped, + throttled_msg := ThrottledMsg + }, + 5000 + ), + + ?assert(emqx_log_throttler:allow(?THROTTLE_UNRECOVERABLE_MSG, ResourceId)), + ?assertNot(emqx_log_throttler:allow(?THROTTLE_UNRECOVERABLE_MSG, ResourceId)), + {ok, _} = ?block_until( + #{ + ?snk_kind := log_throttler_dropped, + throttled_msg := ThrottledMsg, + dropped_count := 1 + }, + 3000 + ) + end, + [] + ). + t_throttle_add_new_msg(_Config) -> ?check_trace( begin {ok, _} = ?block_until( #{?snk_kind := log_throttler_new_msg, throttled_msg := ?THROTTLE_MSG1}, 5000 ), - ?assert(emqx_log_throttler:allow(?THROTTLE_MSG1)), - ?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG1)), + ?assert(emqx_log_throttler:allow(?THROTTLE_MSG1, undefined)), + ?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG1, undefined)), {ok, _} = ?block_until( #{ ?snk_kind := log_throttler_dropped, @@ -137,8 +181,8 @@ t_throttle_add_new_msg(_Config) -> t_throttle_no_msg(_Config) -> %% Must simply pass with no crashes - ?assert(emqx_log_throttler:allow(no_test_throttle_msg)), - ?assert(emqx_log_throttler:allow(no_test_throttle_msg)), + ?assert(emqx_log_throttler:allow(no_test_throttle_msg, undefined)), + ?assert(emqx_log_throttler:allow(no_test_throttle_msg, undefined)), timer:sleep(10), ?assert(erlang:is_process_alive(erlang:whereis(emqx_log_throttler))). @@ -168,8 +212,8 @@ t_throttle_debug_primary_level(_Config) -> #{?snk_kind := log_throttler_dropped, throttled_msg := ?THROTTLE_MSG}, 5000 ), - ?assert(emqx_log_throttler:allow(?THROTTLE_MSG)), - ?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG)), + ?assert(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)), + ?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)), {ok, _} = ?block_until( #{ ?snk_kind := log_throttler_dropped, @@ -187,10 +231,13 @@ t_throttle_debug_primary_level(_Config) -> %%-------------------------------------------------------------------- events(Msg) -> - events(100, Msg). + events(100, Msg, undefined). -events(N, Msg) -> - [emqx_log_throttler:allow(Msg) || _ <- lists:seq(1, N)]. +events(Msg, Id) -> + events(100, Msg, Id). + +events(N, Msg, Id) -> + [emqx_log_throttler:allow(Msg, Id) || _ <- lists:seq(1, N)]. module_exists(Mod) -> case erlang:module_loaded(Mod) of diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 6603f7708..8c2bb39a1 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -167,12 +167,4 @@ ). -define(TAG, "RESOURCE"). --define(LOG_LEVEL(_L_), - case _L_ of - true -> info; - false -> warning - end -). --define(TAG, "RESOURCE"). - -define(RESOURCE_ALLOCATION_TAB, emqx_resource_allocations). diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 7419820f7..a203247e9 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -981,11 +981,16 @@ handle_query_result_pure(Id, {error, Reason} = Error, HasBeenSent, TraceCTX) -> true -> PostFn = fun() -> - ?SLOG_THROTTLE(error, #{ - resource_id => Id, - msg => unrecoverable_resource_error, - reason => Reason - }), + ?SLOG_THROTTLE( + error, + Id, + #{ + resource_id => Id, + msg => unrecoverable_resource_error, + reason => Reason + }, + #{tag => ?TAG} + ), ok end, Counters = @@ -1025,11 +1030,16 @@ handle_query_async_result_pure(Id, {error, Reason} = Error, HasBeenSent, TraceCT true -> PostFn = fun() -> - ?SLOG_THROTTLE(error, #{ - resource_id => Id, - msg => unrecoverable_resource_error, - reason => Reason - }), + ?SLOG_THROTTLE( + error, + Id, + #{ + resource_id => Id, + msg => unrecoverable_resource_error, + reason => Reason + }, + #{tag => ?TAG} + ), ok end, Counters =