diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index aaa07cf9a..601a77deb 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -1440,10 +1440,10 @@ ack_inflight(InflightTID, Ref, Id, Index) -> [] -> 0 end, + ok = dec_inflight(InflightTID, Count), IsKnownRef = (Count > 0), case IsKnownRef of true -> - ok = dec_inflight(InflightTID, Count), emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)); false -> ok @@ -1466,15 +1466,16 @@ mark_inflight_items_as_retriable(Data, WorkerMRef) -> ok. %% used to update a batch after dropping expired individual queries. -update_inflight_item(InflightTID, Ref, NewBatch, NumExpired) when NumExpired > 0 -> +update_inflight_item(InflightTID, Ref, NewBatch, NumExpired) -> _ = ets:update_element(InflightTID, Ref, {?ITEM_IDX, NewBatch}), - ok = dec_inflight(InflightTID, NumExpired), - ok. + ok = dec_inflight(InflightTID, NumExpired). inc_inflight(InflightTID, Count) -> _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, Count}), ok. +dec_inflight(_InflightTID, 0) -> + ok; dec_inflight(InflightTID, Count) when Count > 0 -> _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}), ok.