fix(buffer_worker): fix inflight count when updating inflight item
This commit is contained in:
parent
a21dee441b
commit
827be2adba
|
@ -1432,16 +1432,16 @@ store_async_worker_reference(InflightTID, Ref, WorkerMRef) when
|
||||||
ack_inflight(undefined, _Ref, _Id, _Index) ->
|
ack_inflight(undefined, _Ref, _Id, _Index) ->
|
||||||
false;
|
false;
|
||||||
ack_inflight(InflightTID, Ref, Id, Index) ->
|
ack_inflight(InflightTID, Ref, Id, Index) ->
|
||||||
Count =
|
{Count, Removed} =
|
||||||
case ets:take(InflightTID, Ref) of
|
case ets:take(InflightTID, Ref) of
|
||||||
[?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _, _), _IsRetriable, _WorkerMRef)] ->
|
[?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _, _), _IsRetriable, _WorkerMRef)] ->
|
||||||
1;
|
{1, true};
|
||||||
[?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch, _IsRetriable, _WorkerMRef)] ->
|
[?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch, _IsRetriable, _WorkerMRef)] ->
|
||||||
length(Batch);
|
{length(Batch), true};
|
||||||
[] ->
|
[] ->
|
||||||
0
|
{0, false}
|
||||||
end,
|
end,
|
||||||
ok = dec_inflight(InflightTID, Count),
|
ok = dec_inflight_remove(InflightTID, Count, Removed),
|
||||||
IsKnownRef = (Count > 0),
|
IsKnownRef = (Count > 0),
|
||||||
case IsKnownRef of
|
case IsKnownRef of
|
||||||
true ->
|
true ->
|
||||||
|
@ -1469,18 +1469,28 @@ mark_inflight_items_as_retriable(Data, WorkerMRef) ->
|
||||||
%% used to update a batch after dropping expired individual queries.
|
%% used to update a batch after dropping expired individual queries.
|
||||||
update_inflight_item(InflightTID, Ref, NewBatch, NumExpired) ->
|
update_inflight_item(InflightTID, Ref, NewBatch, NumExpired) ->
|
||||||
_ = ets:update_element(InflightTID, Ref, {?ITEM_IDX, NewBatch}),
|
_ = ets:update_element(InflightTID, Ref, {?ITEM_IDX, NewBatch}),
|
||||||
ok = dec_inflight(InflightTID, NumExpired).
|
ok = dec_inflight_update(InflightTID, NumExpired).
|
||||||
|
|
||||||
inc_inflight(InflightTID, Count) ->
|
inc_inflight(InflightTID, Count) ->
|
||||||
_ = ets:update_counter(InflightTID, ?SIZE_REF, {2, Count}),
|
_ = ets:update_counter(InflightTID, ?SIZE_REF, {2, Count}),
|
||||||
_ = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, 1}),
|
_ = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, 1}),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
dec_inflight(_InflightTID, 0) ->
|
dec_inflight_remove(_InflightTID, _Count = 0, _Removed = false) ->
|
||||||
ok;
|
ok;
|
||||||
dec_inflight(InflightTID, Count) when Count > 0 ->
|
dec_inflight_remove(InflightTID, _Count = 0, _Removed = true) ->
|
||||||
_ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}),
|
|
||||||
_ = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, -1, 0, 0}),
|
_ = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, -1, 0, 0}),
|
||||||
|
ok;
|
||||||
|
dec_inflight_remove(InflightTID, Count, _Removed = true) when Count > 0 ->
|
||||||
|
%% If Count > 0, it must have been removed
|
||||||
|
_ = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, -1, 0, 0}),
|
||||||
|
_ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
dec_inflight_update(_InflightTID, _Count = 0) ->
|
||||||
|
ok;
|
||||||
|
dec_inflight_update(InflightTID, Count) when Count > 0 ->
|
||||||
|
_ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
%%==============================================================================
|
%%==============================================================================
|
||||||
|
|
Loading…
Reference in New Issue