From d6a9d0aa484935e3f4bb1b85ca750c3e03334873 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 12 Jan 2023 16:19:02 -0300 Subject: [PATCH] fix: set queuing to 0 after buffer worker termination --- apps/emqx_resource/src/emqx_resource_worker.erl | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index eb4921c72..deca3a5cd 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -238,10 +238,12 @@ blocked(info, Info, _Data) -> keep_state_and_data. terminate(_Reason, #{id := Id, index := Index, queue := Q}) -> - emqx_resource_metrics:inflight_set(Id, Index, 0), - emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q)), - gproc_pool:disconnect_worker(Id, {Id, Index}), replayq:close(Q), + emqx_resource_metrics:inflight_set(Id, Index, 0), + %% since we want volatile queues, this will be 0 after + %% termination. + emqx_resource_metrics:queuing_set(Id, Index, 0), + gproc_pool:disconnect_worker(Id, {Id, Index}), ok. code_change(_OldVsn, State, _Extra) ->