Compare commits

...

2 Commits

Author SHA1 Message Date
Thales Macedo Garitezi eb1f00f3e4 chore: bump release -> `e5.0.3-alpha.6` 2023-05-03 21:01:34 -03:00
Thales Macedo Garitezi 827be2adba fix(buffer_worker): fix inflight count when updating inflight item 2023-05-03 21:01:34 -03:00
2 changed files with 20 additions and 10 deletions

View File

@ -35,7 +35,7 @@
-define(EMQX_RELEASE_CE, "5.0.22").
%% Enterprise edition
-define(EMQX_RELEASE_EE, "5.0.3-alpha.5").
-define(EMQX_RELEASE_EE, "5.0.3-alpha.6").
%% the HTTP API version
-define(EMQX_API_VERSION, "5.0").

View File

@ -1432,16 +1432,16 @@ store_async_worker_reference(InflightTID, Ref, WorkerMRef) when
ack_inflight(undefined, _Ref, _Id, _Index) ->
false;
ack_inflight(InflightTID, Ref, Id, Index) ->
Count =
{Count, Removed} =
case ets:take(InflightTID, Ref) of
[?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _, _), _IsRetriable, _WorkerMRef)] ->
1;
{1, true};
[?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch, _IsRetriable, _WorkerMRef)] ->
length(Batch);
{length(Batch), true};
[] ->
0
{0, false}
end,
ok = dec_inflight(InflightTID, Count),
ok = dec_inflight_remove(InflightTID, Count, Removed),
IsKnownRef = (Count > 0),
case IsKnownRef of
true ->
@ -1469,18 +1469,28 @@ mark_inflight_items_as_retriable(Data, WorkerMRef) ->
%% used to update a batch after dropping expired individual queries.
update_inflight_item(InflightTID, Ref, NewBatch, NumExpired) ->
_ = ets:update_element(InflightTID, Ref, {?ITEM_IDX, NewBatch}),
ok = dec_inflight(InflightTID, NumExpired).
ok = dec_inflight_update(InflightTID, NumExpired).
inc_inflight(InflightTID, Count) ->
_ = ets:update_counter(InflightTID, ?SIZE_REF, {2, Count}),
_ = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, 1}),
ok.
dec_inflight(_InflightTID, 0) ->
dec_inflight_remove(_InflightTID, _Count = 0, _Removed = false) ->
ok;
dec_inflight(InflightTID, Count) when Count > 0 ->
_ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}),
dec_inflight_remove(InflightTID, _Count = 0, _Removed = true) ->
_ = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, -1, 0, 0}),
ok;
dec_inflight_remove(InflightTID, Count, _Removed = true) when Count > 0 ->
%% If Count > 0, it must have been removed
_ = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, -1, 0, 0}),
_ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}),
ok.
dec_inflight_update(_InflightTID, _Count = 0) ->
ok;
dec_inflight_update(InflightTID, Count) when Count > 0 ->
_ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}),
ok.
%%==============================================================================