From 2924ec582a111a0cd1018c18b54c4f47c0d2e676 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Wed, 17 Jul 2024 16:29:18 +0800 Subject: [PATCH] feat: add unrecoverable_resource_error throttle --- apps/emqx_conf/src/emqx_conf_schema.erl | 3 +- .../src/emqx_resource_buffer_worker.erl | 68 ++++++++++++------- 2 files changed, 46 insertions(+), 25 deletions(-) diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl index 4c72b74b1..505aff3e3 100644 --- a/apps/emqx_conf/src/emqx_conf_schema.erl +++ b/apps/emqx_conf/src/emqx_conf_schema.erl @@ -80,7 +80,8 @@ cannot_publish_to_topic_due_to_not_authorized, cannot_publish_to_topic_due_to_quota_exceeded, connection_rejected_due_to_license_limit_reached, - dropped_msg_due_to_mqueue_is_full + dropped_msg_due_to_mqueue_is_full, + unrecoverable_resource_error ]). %% Callback to upgrade config after loaded from config file but before validation. diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 05d42ed1a..7419820f7 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -298,10 +298,10 @@ running(info, {flush_metrics, _Ref}, _Data) -> running(info, {'DOWN', _MRef, process, Pid, Reason}, Data0 = #{async_workers := AsyncWorkers0}) when is_map_key(Pid, AsyncWorkers0) -> - ?SLOG(info, #{msg => "async_worker_died", state => running, reason => Reason}), + ?SLOG(info, #{msg => "async_worker_died", state => running, reason => Reason}, #{tag => ?TAG}), handle_async_worker_down(Data0, Pid); running(info, Info, _St) -> - ?SLOG(error, #{msg => "unexpected_msg", state => running, info => Info}), + ?SLOG(error, #{msg => "unexpected_msg", state => running, info => Info}, #{tag => ?TAG}), keep_state_and_data. blocked(enter, _, #{resume_interval := ResumeT} = St0) -> @@ -331,10 +331,10 @@ blocked(info, {flush_metrics, _Ref}, _Data) -> blocked(info, {'DOWN', _MRef, process, Pid, Reason}, Data0 = #{async_workers := AsyncWorkers0}) when is_map_key(Pid, AsyncWorkers0) -> - ?SLOG(info, #{msg => "async_worker_died", state => blocked, reason => Reason}), + ?SLOG(info, #{msg => "async_worker_died", state => blocked, reason => Reason}, #{tag => ?TAG}), handle_async_worker_down(Data0, Pid); blocked(info, Info, _Data) -> - ?SLOG(error, #{msg => "unexpected_msg", state => blocked, info => Info}), + ?SLOG(error, #{msg => "unexpected_msg", state => blocked, info => Info}, #{tag => ?TAG}), keep_state_and_data. terminate(_Reason, #{id := Id, index := Index, queue := Q}) -> @@ -981,7 +981,11 @@ handle_query_result_pure(Id, {error, Reason} = Error, HasBeenSent, TraceCTX) -> true -> PostFn = fun() -> - ?SLOG(error, #{id => Id, msg => "unrecoverable_error", reason => Reason}), + ?SLOG_THROTTLE(error, #{ + resource_id => Id, + msg => unrecoverable_resource_error, + reason => Reason + }), ok end, Counters = @@ -1021,7 +1025,11 @@ handle_query_async_result_pure(Id, {error, Reason} = Error, HasBeenSent, TraceCT true -> PostFn = fun() -> - ?SLOG(error, #{id => Id, msg => "unrecoverable_error", reason => Reason}), + ?SLOG_THROTTLE(error, #{ + resource_id => Id, + msg => unrecoverable_resource_error, + reason => Reason + }), ok end, Counters = @@ -1141,12 +1149,16 @@ log_expired_message_count(_Data = #{id := Id, index := Index, counters := Counte false -> ok; true -> - ?SLOG(info, #{ - msg => "buffer_worker_dropped_expired_messages", - resource_id => Id, - worker_index => Index, - expired_count => ExpiredCount - }), + ?SLOG( + info, + #{ + msg => "buffer_worker_dropped_expired_messages", + resource_id => Id, + worker_index => Index, + expired_count => ExpiredCount + }, + #{tag => ?TAG} + ), ok end. @@ -1556,7 +1568,7 @@ handle_async_reply1( case is_expired(ExpireAt, Now) of true -> IsAcked = ack_inflight(InflightTID, Ref, BufferWorkerPid), - %% evalutate metrics call here since we're not inside + %% evaluate metrics call here since we're not inside %% buffer worker IsAcked andalso begin @@ -1797,12 +1809,16 @@ append_queue(Id, Index, Q, Queries) -> ok = replayq:ack(Q1, QAckRef), Dropped = length(Items2), Counters = #{dropped_queue_full => Dropped}, - ?SLOG(info, #{ - msg => "buffer_worker_overflow", - resource_id => Id, - worker_index => Index, - dropped => Dropped - }), + ?SLOG( + info, + #{ + msg => "buffer_worker_overflow", + resource_id => Id, + worker_index => Index, + dropped => Dropped + }, + #{tag => ?TAG} + ), {Items2, Q1, Counters} end, ?tp( @@ -2236,11 +2252,15 @@ adjust_batch_time(Id, RequestTTL, BatchTime0) -> BatchTime = max(0, min(BatchTime0, RequestTTL div 2)), case BatchTime =:= BatchTime0 of false -> - ?SLOG(info, #{ - id => Id, - msg => "adjusting_buffer_worker_batch_time", - new_batch_time => BatchTime - }); + ?SLOG( + info, + #{ + resource_id => Id, + msg => "adjusting_buffer_worker_batch_time", + new_batch_time => BatchTime + }, + #{tag => ?TAG} + ); true -> ok end,