refactor(buffer_worker): less defensive on inflight counter decrement

This commit is contained in:
Zaiming (Stone) Shi 2023-02-23 21:23:10 +01:00
parent 7a6465e2cf
commit a10dbba084
1 changed files with 5 additions and 4 deletions

View File

@ -1440,10 +1440,10 @@ ack_inflight(InflightTID, Ref, Id, Index) ->
[] -> [] ->
0 0
end, end,
ok = dec_inflight(InflightTID, Count),
IsKnownRef = (Count > 0), IsKnownRef = (Count > 0),
case IsKnownRef of case IsKnownRef of
true -> true ->
ok = dec_inflight(InflightTID, Count),
emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)); emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID));
false -> false ->
ok ok
@ -1466,15 +1466,16 @@ mark_inflight_items_as_retriable(Data, WorkerMRef) ->
ok. ok.
%% used to update a batch after dropping expired individual queries. %% used to update a batch after dropping expired individual queries.
update_inflight_item(InflightTID, Ref, NewBatch, NumExpired) when NumExpired > 0 -> update_inflight_item(InflightTID, Ref, NewBatch, NumExpired) ->
_ = ets:update_element(InflightTID, Ref, {?ITEM_IDX, NewBatch}), _ = ets:update_element(InflightTID, Ref, {?ITEM_IDX, NewBatch}),
ok = dec_inflight(InflightTID, NumExpired), ok = dec_inflight(InflightTID, NumExpired).
ok.
inc_inflight(InflightTID, Count) -> inc_inflight(InflightTID, Count) ->
_ = ets:update_counter(InflightTID, ?SIZE_REF, {2, Count}), _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, Count}),
ok. ok.
dec_inflight(_InflightTID, 0) ->
ok;
dec_inflight(InflightTID, Count) when Count > 0 -> dec_inflight(InflightTID, Count) when Count > 0 ->
_ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}), _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}),
ok. ok.