diff --git a/apps/emqx/include/logger.hrl b/apps/emqx/include/logger.hrl index 31fe0e36a..a7455418d 100644 --- a/apps/emqx/include/logger.hrl +++ b/apps/emqx/include/logger.hrl @@ -38,16 +38,20 @@ ). %% NOTE: do not forget to use atom for msg and add every used msg to -%% the default value of `log.thorttling.msgs` list. +%% the default value of `log.throttling.msgs` list. -define(SLOG_THROTTLE(Level, Data), ?SLOG_THROTTLE(Level, Data, #{}) ). -define(SLOG_THROTTLE(Level, Data, Meta), + ?SLOG_THROTTLE(Level, undefined, Data, Meta) +). + +-define(SLOG_THROTTLE(Level, UniqueKey, Data, Meta), case logger:allow(Level, ?MODULE) of true -> (fun(#{msg := __Msg} = __Data) -> - case emqx_log_throttler:allow(__Msg) of + case emqx_log_throttler:allow(__Msg, UniqueKey) of true -> logger:log(Level, __Data, Meta); false -> diff --git a/apps/emqx/src/emqx_log_throttler.erl b/apps/emqx/src/emqx_log_throttler.erl index 3ebc268fa..928580e2b 100644 --- a/apps/emqx/src/emqx_log_throttler.erl +++ b/apps/emqx/src/emqx_log_throttler.erl @@ -25,7 +25,7 @@ -export([start_link/0]). %% throttler API --export([allow/1]). +-export([allow/2]). %% gen_server callbacks -export([ @@ -40,23 +40,29 @@ -define(SEQ_ID(Msg), {?MODULE, Msg}). -define(NEW_SEQ, atomics:new(1, [{signed, false}])). -define(GET_SEQ(Msg), persistent_term:get(?SEQ_ID(Msg), undefined)). +-define(ERASE_SEQ(Msg), persistent_term:erase(?SEQ_ID(Msg))). -define(RESET_SEQ(SeqRef), atomics:put(SeqRef, 1, 0)). -define(INC_SEQ(SeqRef), atomics:add(SeqRef, 1, 1)). -define(GET_DROPPED(SeqRef), atomics:get(SeqRef, 1) - 1). -define(IS_ALLOWED(SeqRef), atomics:add_get(SeqRef, 1, 1) =:= 1). --define(NEW_THROTTLE(Msg, SeqRef), persistent_term:put(?SEQ_ID(Msg), SeqRef)). - -define(MSGS_LIST, emqx:get_config([log, throttling, msgs], [])). -define(TIME_WINDOW_MS, timer:seconds(emqx:get_config([log, throttling, time_window], 60))). --spec allow(atom()) -> boolean(). -allow(Msg) when is_atom(Msg) -> +%% @doc Check if a throttled log message is allowed to pass down to the logger this time. +%% The Msg has to be an atom, and the second argument `UniqueKey' should be `undefined' +%% for predefined message IDs. +%% For relatively static resources created from configurations such as data integration +%% resource IDs `UniqueKey' should be of `binary()' type. +-spec allow(atom(), undefined | binary()) -> boolean(). +allow(Msg, UniqueKey) when + is_atom(Msg) andalso (is_binary(UniqueKey) orelse UniqueKey =:= undefined) +-> case emqx_logger:get_primary_log_level() of debug -> true; _ -> - do_allow(Msg) + do_allow(Msg, UniqueKey) end. -spec start_link() -> startlink_ret(). @@ -68,7 +74,8 @@ start_link() -> %%-------------------------------------------------------------------- init([]) -> - ok = lists:foreach(fun(Msg) -> ?NEW_THROTTLE(Msg, ?NEW_SEQ) end, ?MSGS_LIST), + process_flag(trap_exit, true), + ok = lists:foreach(fun new_throttler/1, ?MSGS_LIST), CurrentPeriodMs = ?TIME_WINDOW_MS, TimerRef = schedule_refresh(CurrentPeriodMs), {ok, #{timer_ref => TimerRef, current_period_ms => CurrentPeriodMs}}. @@ -86,16 +93,22 @@ handle_info(refresh, #{current_period_ms := PeriodMs} = State) -> DroppedStats = lists:foldl( fun(Msg, Acc) -> case ?GET_SEQ(Msg) of - %% Should not happen, unless the static ids list is updated at run-time. undefined -> - ?NEW_THROTTLE(Msg, ?NEW_SEQ), + %% Should not happen, unless the static ids list is updated at run-time. + new_throttler(Msg), ?tp(log_throttler_new_msg, #{throttled_msg => Msg}), Acc; + SeqMap when is_map(SeqMap) -> + maps:fold( + fun(Key, Ref, Acc0) -> + ID = iolist_to_binary([atom_to_binary(Msg), $:, Key]), + drop_stats(Ref, ID, Acc0) + end, + Acc, + SeqMap + ); SeqRef -> - Dropped = ?GET_DROPPED(SeqRef), - ok = ?RESET_SEQ(SeqRef), - ?tp(log_throttler_dropped, #{dropped_count => Dropped, throttled_msg => Msg}), - maybe_add_dropped(Msg, Dropped, Acc) + drop_stats(SeqRef, Msg, Acc) end end, #{}, @@ -112,7 +125,16 @@ handle_info(Info, State) -> ?SLOG(error, #{msg => "unxpected_info", info => Info}), {noreply, State}. +drop_stats(SeqRef, Msg, Acc) -> + Dropped = ?GET_DROPPED(SeqRef), + ok = ?RESET_SEQ(SeqRef), + ?tp(log_throttler_dropped, #{dropped_count => Dropped, throttled_msg => Msg}), + maybe_add_dropped(Msg, Dropped, Acc). + terminate(_Reason, _State) -> + %% atomics do not have delete/remove/release/deallocate API + %% after the reference is garbage-collected the resource is released + lists:foreach(fun(Msg) -> ?ERASE_SEQ(Msg) end, ?MSGS_LIST), ok. code_change(_OldVsn, State, _Extra) -> @@ -122,17 +144,27 @@ code_change(_OldVsn, State, _Extra) -> %% internal functions %%-------------------------------------------------------------------- -do_allow(Msg) -> +do_allow(Msg, UniqueKey) -> case persistent_term:get(?SEQ_ID(Msg), undefined) of undefined -> %% This is either a race condition (emqx_log_throttler is not started yet) %% or a developer mistake (msg used in ?SLOG_THROTTLE/2,3 macro is %% not added to the default value of `log.throttling.msgs`. - ?SLOG(info, #{ - msg => "missing_log_throttle_sequence", + ?SLOG(debug, #{ + msg => "log_throttle_disabled", throttled_msg => Msg }), true; + %% e.g: unrecoverable msg throttle according resource_id + SeqMap when is_map(SeqMap) -> + case maps:find(UniqueKey, SeqMap) of + {ok, SeqRef} -> + ?IS_ALLOWED(SeqRef); + error -> + SeqRef = ?NEW_SEQ, + new_throttler(Msg, SeqMap#{UniqueKey => SeqRef}), + true + end; SeqRef -> ?IS_ALLOWED(SeqRef) end. @@ -154,3 +186,11 @@ maybe_log_dropped(_DroppedStats, _PeriodMs) -> schedule_refresh(PeriodMs) -> ?tp(log_throttler_sched_refresh, #{new_period_ms => PeriodMs}), erlang:send_after(PeriodMs, ?MODULE, refresh). + +new_throttler(unrecoverable_resource_error = Msg) -> + new_throttler(Msg, #{}); +new_throttler(Msg) -> + new_throttler(Msg, ?NEW_SEQ). + +new_throttler(Msg, AtomicOrEmptyMap) -> + persistent_term:put(?SEQ_ID(Msg), AtomicOrEmptyMap). diff --git a/apps/emqx/test/emqx_log_throttler_SUITE.erl b/apps/emqx/test/emqx_log_throttler_SUITE.erl index 8b3ac0207..f95d62969 100644 --- a/apps/emqx/test/emqx_log_throttler_SUITE.erl +++ b/apps/emqx/test/emqx_log_throttler_SUITE.erl @@ -26,6 +26,7 @@ %% Have to use real msgs, as the schema is guarded by enum. -define(THROTTLE_MSG, authorization_permission_denied). -define(THROTTLE_MSG1, cannot_publish_to_topic_due_to_not_authorized). +-define(THROTTLE_UNRECOVERABLE_MSG, unrecoverable_resource_error). -define(TIME_WINDOW, <<"1s">>). all() -> emqx_common_test_helpers:all(?MODULE). @@ -59,6 +60,11 @@ end_per_suite(Config) -> emqx_cth_suite:stop(?config(suite_apps, Config)), emqx_config:delete_override_conf_files(). +init_per_testcase(t_throttle_recoverable_msg, Config) -> + ok = snabbkaffe:start_trace(), + [?THROTTLE_MSG] = Conf = emqx:get_config([log, throttling, msgs]), + {ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_UNRECOVERABLE_MSG | Conf], #{}), + Config; init_per_testcase(t_throttle_add_new_msg, Config) -> ok = snabbkaffe:start_trace(), [?THROTTLE_MSG] = Conf = emqx:get_config([log, throttling, msgs]), @@ -72,6 +78,10 @@ init_per_testcase(_TC, Config) -> ok = snabbkaffe:start_trace(), Config. +end_per_testcase(t_throttle_recoverable_msg, _Config) -> + ok = snabbkaffe:stop(), + {ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_MSG], #{}), + ok; end_per_testcase(t_throttle_add_new_msg, _Config) -> ok = snabbkaffe:stop(), {ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_MSG], #{}), @@ -101,8 +111,8 @@ t_throttle(_Config) -> 5000 ), - ?assert(emqx_log_throttler:allow(?THROTTLE_MSG)), - ?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG)), + ?assert(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)), + ?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)), {ok, _} = ?block_until( #{ ?snk_kind := log_throttler_dropped, @@ -115,14 +125,48 @@ t_throttle(_Config) -> [] ). +t_throttle_recoverable_msg(_Config) -> + ResourceId = <<"resource_id">>, + ThrottledMsg = iolist_to_binary([atom_to_list(?THROTTLE_UNRECOVERABLE_MSG), ":", ResourceId]), + ?check_trace( + begin + %% Warm-up and block to increase the probability that next events + %% will be in the same throttling time window. + {ok, _} = ?block_until( + #{?snk_kind := log_throttler_new_msg, throttled_msg := ?THROTTLE_UNRECOVERABLE_MSG}, + 5000 + ), + {_, {ok, _}} = ?wait_async_action( + events(?THROTTLE_UNRECOVERABLE_MSG, ResourceId), + #{ + ?snk_kind := log_throttler_dropped, + throttled_msg := ThrottledMsg + }, + 5000 + ), + + ?assert(emqx_log_throttler:allow(?THROTTLE_UNRECOVERABLE_MSG, ResourceId)), + ?assertNot(emqx_log_throttler:allow(?THROTTLE_UNRECOVERABLE_MSG, ResourceId)), + {ok, _} = ?block_until( + #{ + ?snk_kind := log_throttler_dropped, + throttled_msg := ThrottledMsg, + dropped_count := 1 + }, + 3000 + ) + end, + [] + ). + t_throttle_add_new_msg(_Config) -> ?check_trace( begin {ok, _} = ?block_until( #{?snk_kind := log_throttler_new_msg, throttled_msg := ?THROTTLE_MSG1}, 5000 ), - ?assert(emqx_log_throttler:allow(?THROTTLE_MSG1)), - ?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG1)), + ?assert(emqx_log_throttler:allow(?THROTTLE_MSG1, undefined)), + ?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG1, undefined)), {ok, _} = ?block_until( #{ ?snk_kind := log_throttler_dropped, @@ -137,10 +181,15 @@ t_throttle_add_new_msg(_Config) -> t_throttle_no_msg(_Config) -> %% Must simply pass with no crashes - ?assert(emqx_log_throttler:allow(no_test_throttle_msg)), - ?assert(emqx_log_throttler:allow(no_test_throttle_msg)), - timer:sleep(10), - ?assert(erlang:is_process_alive(erlang:whereis(emqx_log_throttler))). + Pid = erlang:whereis(emqx_log_throttler), + ?assert(emqx_log_throttler:allow(no_test_throttle_msg, undefined)), + ?assert(emqx_log_throttler:allow(no_test_throttle_msg, undefined)), + %% assert process is not restarted + ?assertEqual(Pid, erlang:whereis(emqx_log_throttler)), + %% make a gen_call to ensure the process is alive + %% note: this call result in an 'unexpected_call' error log. + ?assertEqual(ignored, gen_server:call(Pid, probe)), + ok. t_update_time_window(_Config) -> ?check_trace( @@ -168,8 +217,8 @@ t_throttle_debug_primary_level(_Config) -> #{?snk_kind := log_throttler_dropped, throttled_msg := ?THROTTLE_MSG}, 5000 ), - ?assert(emqx_log_throttler:allow(?THROTTLE_MSG)), - ?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG)), + ?assert(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)), + ?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)), {ok, _} = ?block_until( #{ ?snk_kind := log_throttler_dropped, @@ -187,10 +236,13 @@ t_throttle_debug_primary_level(_Config) -> %%-------------------------------------------------------------------- events(Msg) -> - events(100, Msg). + events(100, Msg, undefined). -events(N, Msg) -> - [emqx_log_throttler:allow(Msg) || _ <- lists:seq(1, N)]. +events(Msg, Id) -> + events(100, Msg, Id). + +events(N, Msg, Id) -> + [emqx_log_throttler:allow(Msg, Id) || _ <- lists:seq(1, N)]. module_exists(Mod) -> case erlang:module_loaded(Mod) of diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl index 4c72b74b1..505aff3e3 100644 --- a/apps/emqx_conf/src/emqx_conf_schema.erl +++ b/apps/emqx_conf/src/emqx_conf_schema.erl @@ -80,7 +80,8 @@ cannot_publish_to_topic_due_to_not_authorized, cannot_publish_to_topic_due_to_quota_exceeded, connection_rejected_due_to_license_limit_reached, - dropped_msg_due_to_mqueue_is_full + dropped_msg_due_to_mqueue_is_full, + unrecoverable_resource_error ]). %% Callback to upgrade config after loaded from config file but before validation. diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 05d42ed1a..a203247e9 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -298,10 +298,10 @@ running(info, {flush_metrics, _Ref}, _Data) -> running(info, {'DOWN', _MRef, process, Pid, Reason}, Data0 = #{async_workers := AsyncWorkers0}) when is_map_key(Pid, AsyncWorkers0) -> - ?SLOG(info, #{msg => "async_worker_died", state => running, reason => Reason}), + ?SLOG(info, #{msg => "async_worker_died", state => running, reason => Reason}, #{tag => ?TAG}), handle_async_worker_down(Data0, Pid); running(info, Info, _St) -> - ?SLOG(error, #{msg => "unexpected_msg", state => running, info => Info}), + ?SLOG(error, #{msg => "unexpected_msg", state => running, info => Info}, #{tag => ?TAG}), keep_state_and_data. blocked(enter, _, #{resume_interval := ResumeT} = St0) -> @@ -331,10 +331,10 @@ blocked(info, {flush_metrics, _Ref}, _Data) -> blocked(info, {'DOWN', _MRef, process, Pid, Reason}, Data0 = #{async_workers := AsyncWorkers0}) when is_map_key(Pid, AsyncWorkers0) -> - ?SLOG(info, #{msg => "async_worker_died", state => blocked, reason => Reason}), + ?SLOG(info, #{msg => "async_worker_died", state => blocked, reason => Reason}, #{tag => ?TAG}), handle_async_worker_down(Data0, Pid); blocked(info, Info, _Data) -> - ?SLOG(error, #{msg => "unexpected_msg", state => blocked, info => Info}), + ?SLOG(error, #{msg => "unexpected_msg", state => blocked, info => Info}, #{tag => ?TAG}), keep_state_and_data. terminate(_Reason, #{id := Id, index := Index, queue := Q}) -> @@ -981,7 +981,16 @@ handle_query_result_pure(Id, {error, Reason} = Error, HasBeenSent, TraceCTX) -> true -> PostFn = fun() -> - ?SLOG(error, #{id => Id, msg => "unrecoverable_error", reason => Reason}), + ?SLOG_THROTTLE( + error, + Id, + #{ + resource_id => Id, + msg => unrecoverable_resource_error, + reason => Reason + }, + #{tag => ?TAG} + ), ok end, Counters = @@ -1021,7 +1030,16 @@ handle_query_async_result_pure(Id, {error, Reason} = Error, HasBeenSent, TraceCT true -> PostFn = fun() -> - ?SLOG(error, #{id => Id, msg => "unrecoverable_error", reason => Reason}), + ?SLOG_THROTTLE( + error, + Id, + #{ + resource_id => Id, + msg => unrecoverable_resource_error, + reason => Reason + }, + #{tag => ?TAG} + ), ok end, Counters = @@ -1141,12 +1159,16 @@ log_expired_message_count(_Data = #{id := Id, index := Index, counters := Counte false -> ok; true -> - ?SLOG(info, #{ - msg => "buffer_worker_dropped_expired_messages", - resource_id => Id, - worker_index => Index, - expired_count => ExpiredCount - }), + ?SLOG( + info, + #{ + msg => "buffer_worker_dropped_expired_messages", + resource_id => Id, + worker_index => Index, + expired_count => ExpiredCount + }, + #{tag => ?TAG} + ), ok end. @@ -1556,7 +1578,7 @@ handle_async_reply1( case is_expired(ExpireAt, Now) of true -> IsAcked = ack_inflight(InflightTID, Ref, BufferWorkerPid), - %% evalutate metrics call here since we're not inside + %% evaluate metrics call here since we're not inside %% buffer worker IsAcked andalso begin @@ -1797,12 +1819,16 @@ append_queue(Id, Index, Q, Queries) -> ok = replayq:ack(Q1, QAckRef), Dropped = length(Items2), Counters = #{dropped_queue_full => Dropped}, - ?SLOG(info, #{ - msg => "buffer_worker_overflow", - resource_id => Id, - worker_index => Index, - dropped => Dropped - }), + ?SLOG( + info, + #{ + msg => "buffer_worker_overflow", + resource_id => Id, + worker_index => Index, + dropped => Dropped + }, + #{tag => ?TAG} + ), {Items2, Q1, Counters} end, ?tp( @@ -2236,11 +2262,15 @@ adjust_batch_time(Id, RequestTTL, BatchTime0) -> BatchTime = max(0, min(BatchTime0, RequestTTL div 2)), case BatchTime =:= BatchTime0 of false -> - ?SLOG(info, #{ - id => Id, - msg => "adjusting_buffer_worker_batch_time", - new_batch_time => BatchTime - }); + ?SLOG( + info, + #{ + resource_id => Id, + msg => "adjusting_buffer_worker_batch_time", + new_batch_time => BatchTime + }, + #{tag => ?TAG} + ); true -> ok end, diff --git a/changes/ce/feat-13528.en.md b/changes/ce/feat-13528.en.md new file mode 100644 index 000000000..f761e9565 --- /dev/null +++ b/changes/ce/feat-13528.en.md @@ -0,0 +1 @@ +Add log throttling for data integration unrecoverable errors.