refactor: rename ack fn

This commit is contained in:
Thales Macedo Garitezi 2023-01-13 10:17:15 -03:00
parent 196bf1c5ba
commit 7401d6f0ce
1 changed files with 17 additions and 17 deletions

View File

@ -313,14 +313,14 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
{keep_state, Data0, {state_timeout, ResumeT, resume}}; {keep_state, Data0, {state_timeout, ResumeT, resume}};
%% Send ok or failed but the resource is working %% Send ok or failed but the resource is working
{false, PostFn} -> {false, PostFn} ->
IsDropped = inflight_drop(InflightTID, Ref, Id, Index), IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
%% we need to defer bumping the counters after %% we need to defer bumping the counters after
%% `inflight_drop' to avoid the race condition when an %% `inflight_drop' to avoid the race condition when an
%% inflight request might get completed concurrently with %% inflight request might get completed concurrently with
%% the retry, bumping them twice. Since both inflight %% the retry, bumping them twice. Since both inflight
%% requests (repeated and original) have the safe `Ref', %% requests (repeated and original) have the safe `Ref',
%% we bump the counter when removing it from the table. %% we bump the counter when removing it from the table.
IsDropped andalso PostFn(), IsAcked andalso PostFn(),
?tp(resource_worker_retry_inflight_succeeded, #{query_or_batch => QueryOrBatch}), ?tp(resource_worker_retry_inflight_succeeded, #{query_or_batch => QueryOrBatch}),
resume_from_blocked(Data0) resume_from_blocked(Data0)
end. end.
@ -438,7 +438,7 @@ do_flush(
%% Success; just ack. %% Success; just ack.
false -> false ->
ok = replayq:ack(Q1, QAckRef), ok = replayq:ack(Q1, QAckRef),
is_async(Id) orelse inflight_drop(InflightTID, Ref, Id, Index), is_async(Id) orelse ack_inflight(InflightTID, Ref, Id, Index),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
case queue_count(Q1) > 0 of case queue_count(Q1) > 0 of
true -> true ->
@ -478,7 +478,7 @@ do_flush(Data0, #{
%% Success; just ack. %% Success; just ack.
false -> false ->
ok = replayq:ack(Q1, QAckRef), ok = replayq:ack(Q1, QAckRef),
is_async(Id) orelse inflight_drop(InflightTID, Ref, Id, Index), is_async(Id) orelse ack_inflight(InflightTID, Ref, Id, Index),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
CurrentCount = queue_count(Q1), CurrentCount = queue_count(Q1),
case {CurrentCount > 0, CurrentCount >= BatchSize} of case {CurrentCount > 0, CurrentCount >= BatchSize} of
@ -736,8 +736,8 @@ reply_after_query(Pid, Id, Index, InflightTID, Ref, ?QUERY(From, Request, HasBee
PostFn(), PostFn(),
?MODULE:block(Pid); ?MODULE:block(Pid);
{false, PostFn} -> {false, PostFn} ->
IsDropped = drop_inflight_and_resume(Pid, InflightTID, Ref, Id, Index), IsAcked = ack_inflight_and_resume(Pid, InflightTID, Ref, Id, Index),
IsDropped andalso PostFn(), IsAcked andalso PostFn(),
ok ok
end. end.
@ -750,19 +750,19 @@ batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, Result) ->
lists:foreach(fun(F) -> F() end, PostFns), lists:foreach(fun(F) -> F() end, PostFns),
?MODULE:block(Pid); ?MODULE:block(Pid);
{false, PostFns} -> {false, PostFns} ->
IsDropped = drop_inflight_and_resume(Pid, InflightTID, Ref, Id, Index), IsAcked = ack_inflight_and_resume(Pid, InflightTID, Ref, Id, Index),
IsDropped andalso lists:foreach(fun(F) -> F() end, PostFns), IsAcked andalso lists:foreach(fun(F) -> F() end, PostFns),
ok ok
end. end.
drop_inflight_and_resume(Pid, InflightTID, Ref, Id, Index) -> ack_inflight_and_resume(Pid, InflightTID, Ref, Id, Index) ->
case is_inflight_full(InflightTID) of case is_inflight_full(InflightTID) of
true -> true ->
IsDropped = inflight_drop(InflightTID, Ref, Id, Index), IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
?MODULE:resume(Pid), ?MODULE:resume(Pid),
IsDropped; IsAcked;
false -> false ->
inflight_drop(InflightTID, Ref, Id, Index) ack_inflight(InflightTID, Ref, Id, Index)
end. end.
%%============================================================================== %%==============================================================================
@ -871,19 +871,19 @@ inflight_append(InflightTID, Ref, Data, _Id, _Index) ->
%% the inflight metric. %% the inflight metric.
ok. ok.
inflight_drop(undefined, _Ref, _Id, _Index) -> ack_inflight(undefined, _Ref, _Id, _Index) ->
false; false;
inflight_drop(InflightTID, Ref, Id, Index) -> ack_inflight(InflightTID, Ref, Id, Index) ->
Count = Count =
case ets:take(InflightTID, Ref) of case ets:take(InflightTID, Ref) of
[{Ref, ?QUERY(_, _, _)}] -> 1; [{Ref, ?QUERY(_, _, _)}] -> 1;
[{Ref, [?QUERY(_, _, _) | _] = Batch}] -> length(Batch); [{Ref, [?QUERY(_, _, _) | _] = Batch}] -> length(Batch);
_ -> 0 _ -> 0
end, end,
IsDropped = Count > 0, IsAcked = Count > 0,
IsDropped andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}), IsAcked andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}),
emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)), emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)),
IsDropped. IsAcked.
%%============================================================================== %%==============================================================================