diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index b25725c41..bb4eee57d 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -468,7 +468,10 @@ flush(Data0) -> queue := Q0 } = Data0, Data1 = cancel_flush_timer(Data0), - case {queue_count(Q0), is_inflight_full(InflightTID)} of + CurrentCount = queue_count(Q0), + IsFull = is_inflight_full(InflightTID), + ?tp(buffer_worker_flush, #{queue_count => CurrentCount, is_full => IsFull}), + case {CurrentCount, IsFull} of {0, _} -> {keep_state, Data1}; {_, true} -> @@ -595,8 +598,12 @@ do_flush( result => Result } ), - case queue_count(Q1) > 0 of + CurrentCount = queue_count(Q1), + case CurrentCount > 0 of true -> + ?tp(buffer_worker_flush_ack_reflush, #{ + batch_or_query => Request, result => Result, queue_count => CurrentCount + }), flush_worker(self()); false -> ok @@ -666,19 +673,26 @@ do_flush(Data0, #{ {Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result), store_async_worker_reference(InflightTID, Ref, WorkerMRef), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), + CurrentCount = queue_count(Q1), ?tp( buffer_worker_flush_ack, #{ batch_or_query => Batch, - result => Result + result => Result, + queue_count => CurrentCount } ), - CurrentCount = queue_count(Q1), Data2 = case {CurrentCount > 0, CurrentCount >= BatchSize} of {false, _} -> Data1; {true, true} -> + ?tp(buffer_worker_flush_ack_reflush, #{ + batch_or_query => Batch, + result => Result, + queue_count => CurrentCount, + batch_size => BatchSize + }), flush_worker(self()), Data1; {true, false} ->