diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 7a0bcaea9..76566ce47 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -927,6 +927,7 @@ merge_counters(OldCounters, DeltaCounters) -> flush_metrics(Data = #{id := Id, counters := Counters}) -> bump_counters(Id, Counters), set_gauges(Data), + log_expired_message_count(Data), ensure_metrics_flush_timer(Data#{counters := #{}}). -spec ensure_metrics_flush_timer(data()) -> data(). @@ -966,6 +967,22 @@ do_bump_counters1(dropped_resource_not_found, Val, Id) -> do_bump_counters1(dropped_resource_stopped, Val, Id) -> emqx_resource_metrics:dropped_resource_stopped_inc(Id, Val). +-spec log_expired_message_count(data()) -> ok. +log_expired_message_count(_Data = #{id := Id, index := Index, counters := Counters}) -> + ExpiredCount = maps:get(dropped_expired, Counters, 0), + case ExpiredCount > 0 of + false -> + ok; + true -> + ?SLOG(info, #{ + msg => "buffer_worker_dropped_expired_messages", + resource_id => Id, + worker_index => Index, + expired_count => ExpiredCount + }), + ok + end. + -spec set_gauges(data()) -> ok. set_gauges(_Data = #{id := Id, index := Index, queue := Q, inflight_tid := InflightTID}) -> emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q)), diff --git a/changes/ce/feat-11115.en.md b/changes/ce/feat-11115.en.md new file mode 100644 index 000000000..8030926cf --- /dev/null +++ b/changes/ce/feat-11115.en.md @@ -0,0 +1 @@ +Added info logs to indicate when buffered messages are dropped due to time-to-live (TTL) expiration.