diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index a37d78494..d3b797fd5 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -313,14 +313,14 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) -> {keep_state, Data0, {state_timeout, ResumeT, resume}}; %% Send ok or failed but the resource is working {false, PostFn} -> - IsDropped = inflight_drop(InflightTID, Ref, Id, Index), + IsAcked = ack_inflight(InflightTID, Ref, Id, Index), %% we need to defer bumping the counters after %% `inflight_drop' to avoid the race condition when an %% inflight request might get completed concurrently with %% the retry, bumping them twice. Since both inflight %% requests (repeated and original) have the safe `Ref', %% we bump the counter when removing it from the table. - IsDropped andalso PostFn(), + IsAcked andalso PostFn(), ?tp(resource_worker_retry_inflight_succeeded, #{query_or_batch => QueryOrBatch}), resume_from_blocked(Data0) end. @@ -438,7 +438,7 @@ do_flush( %% Success; just ack. false -> ok = replayq:ack(Q1, QAckRef), - is_async(Id) orelse inflight_drop(InflightTID, Ref, Id, Index), + is_async(Id) orelse ack_inflight(InflightTID, Ref, Id, Index), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), case queue_count(Q1) > 0 of true -> @@ -478,7 +478,7 @@ do_flush(Data0, #{ %% Success; just ack. false -> ok = replayq:ack(Q1, QAckRef), - is_async(Id) orelse inflight_drop(InflightTID, Ref, Id, Index), + is_async(Id) orelse ack_inflight(InflightTID, Ref, Id, Index), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), CurrentCount = queue_count(Q1), case {CurrentCount > 0, CurrentCount >= BatchSize} of @@ -736,8 +736,8 @@ reply_after_query(Pid, Id, Index, InflightTID, Ref, ?QUERY(From, Request, HasBee PostFn(), ?MODULE:block(Pid); {false, PostFn} -> - IsDropped = drop_inflight_and_resume(Pid, InflightTID, Ref, Id, Index), - IsDropped andalso PostFn(), + IsAcked = ack_inflight_and_resume(Pid, InflightTID, Ref, Id, Index), + IsAcked andalso PostFn(), ok end. @@ -750,19 +750,19 @@ batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, Result) -> lists:foreach(fun(F) -> F() end, PostFns), ?MODULE:block(Pid); {false, PostFns} -> - IsDropped = drop_inflight_and_resume(Pid, InflightTID, Ref, Id, Index), - IsDropped andalso lists:foreach(fun(F) -> F() end, PostFns), + IsAcked = ack_inflight_and_resume(Pid, InflightTID, Ref, Id, Index), + IsAcked andalso lists:foreach(fun(F) -> F() end, PostFns), ok end. -drop_inflight_and_resume(Pid, InflightTID, Ref, Id, Index) -> +ack_inflight_and_resume(Pid, InflightTID, Ref, Id, Index) -> case is_inflight_full(InflightTID) of true -> - IsDropped = inflight_drop(InflightTID, Ref, Id, Index), + IsAcked = ack_inflight(InflightTID, Ref, Id, Index), ?MODULE:resume(Pid), - IsDropped; + IsAcked; false -> - inflight_drop(InflightTID, Ref, Id, Index) + ack_inflight(InflightTID, Ref, Id, Index) end. %%============================================================================== @@ -871,19 +871,19 @@ inflight_append(InflightTID, Ref, Data, _Id, _Index) -> %% the inflight metric. ok. -inflight_drop(undefined, _Ref, _Id, _Index) -> +ack_inflight(undefined, _Ref, _Id, _Index) -> false; -inflight_drop(InflightTID, Ref, Id, Index) -> +ack_inflight(InflightTID, Ref, Id, Index) -> Count = case ets:take(InflightTID, Ref) of [{Ref, ?QUERY(_, _, _)}] -> 1; [{Ref, [?QUERY(_, _, _) | _] = Batch}] -> length(Batch); _ -> 0 end, - IsDropped = Count > 0, - IsDropped andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}), + IsAcked = Count > 0, + IsAcked andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}), emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)), - IsDropped. + IsAcked. %%==============================================================================