From eba627b365184954bd9657cf995228c9b7b4d10e Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 3 May 2023 14:14:00 -0300 Subject: [PATCH] fix(buffer_worker): fix inflight count when updating inflight item --- .../src/emqx_resource_buffer_worker.erl | 28 +++++++++++++------ 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 266b8df69..d9cca8f03 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -1432,16 +1432,16 @@ store_async_worker_reference(InflightTID, Ref, WorkerMRef) when ack_inflight(undefined, _Ref, _Id, _Index) -> false; ack_inflight(InflightTID, Ref, Id, Index) -> - Count = + {Count, Removed} = case ets:take(InflightTID, Ref) of [?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _, _), _IsRetriable, _WorkerMRef)] -> - 1; + {1, true}; [?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch, _IsRetriable, _WorkerMRef)] -> - length(Batch); + {length(Batch), true}; [] -> - 0 + {0, false} end, - ok = dec_inflight(InflightTID, Count), + ok = dec_inflight_remove(InflightTID, Count, Removed), IsKnownRef = (Count > 0), case IsKnownRef of true -> @@ -1469,18 +1469,28 @@ mark_inflight_items_as_retriable(Data, WorkerMRef) -> %% used to update a batch after dropping expired individual queries. update_inflight_item(InflightTID, Ref, NewBatch, NumExpired) -> _ = ets:update_element(InflightTID, Ref, {?ITEM_IDX, NewBatch}), - ok = dec_inflight(InflightTID, NumExpired). + ok = dec_inflight_update(InflightTID, NumExpired). inc_inflight(InflightTID, Count) -> _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, Count}), _ = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, 1}), ok. -dec_inflight(_InflightTID, 0) -> +dec_inflight_remove(_InflightTID, _Count = 0, _Removed = false) -> ok; -dec_inflight(InflightTID, Count) when Count > 0 -> - _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}), +dec_inflight_remove(InflightTID, _Count = 0, _Removed = true) -> _ = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, -1, 0, 0}), + ok; +dec_inflight_remove(InflightTID, Count, _Removed = true) when Count > 0 -> + %% If Count > 0, it must have been removed + _ = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, -1, 0, 0}), + _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}), + ok. + +dec_inflight_update(_InflightTID, _Count = 0) -> + ok; +dec_inflight_update(InflightTID, Count) when Count > 0 -> + _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}), ok. %%==============================================================================