From 344eeebe6321a602631b663724fc2bcaf311afd4 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 13 Jan 2023 16:23:12 -0300 Subject: [PATCH] fix: always ack async replies The caller should decide if it should retry in that case, to avoid overwhelming the resource with retries. --- .../src/emqx_resource_worker.erl | 42 +++++++++++-------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 2d646f4b8..f9940210a 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -724,13 +724,17 @@ reply_after_query(Pid, Id, Index, InflightTID, Ref, ?QUERY(From, Request, HasBee %% NOTE: 'inflight' is the count of messages that were sent async %% but received no ACK, NOT the number of messages queued in the %% inflight window. - case reply_caller_defer_metrics(Id, ?REPLY(From, Request, HasBeenSent, Result)) of - {nack, PostFn} -> - PostFn(), + {Action, PostFn} = reply_caller_defer_metrics(Id, ?REPLY(From, Request, HasBeenSent, Result)), + %% Should always ack async inflight requests that + %% returned, otherwise the request will get retried. The + %% caller has just been notified of the failure and should + %% decide if it wants to retry or not. + IsAcked = ack_inflight(InflightTID, Ref, Id, Index), + IsAcked andalso PostFn(), + case Action of + nack -> ?MODULE:block(Pid); - {ack, PostFn} -> - IsAcked = ack_inflight_and_resume(Pid, InflightTID, Ref, Id, Index), - IsAcked andalso PostFn(), + ack -> ok end. @@ -738,25 +742,29 @@ batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, Result) -> %% NOTE: 'inflight' is the count of messages that were sent async %% but received no ACK, NOT the number of messages queued in the %% inflight window. - case batch_reply_caller_defer_metrics(Id, Result, Batch) of - {nack, PostFns} -> - lists:foreach(fun(F) -> F() end, PostFns), + {Action, PostFns} = batch_reply_caller_defer_metrics(Id, Result, Batch), + %% Should always ack async inflight requests that + %% returned, otherwise the request will get retried. The + %% caller has just been notified of the failure and should + %% decide if it wants to retry or not. + IsAcked = ack_inflight(InflightTID, Ref, Id, Index), + IsAcked andalso lists:foreach(fun(F) -> F() end, PostFns), + case Action of + nack -> ?MODULE:block(Pid); - {ack, PostFns} -> - IsAcked = ack_inflight_and_resume(Pid, InflightTID, Ref, Id, Index), - IsAcked andalso lists:foreach(fun(F) -> F() end, PostFns), + ack -> ok end. ack_inflight_and_resume(Pid, InflightTID, Ref, Id, Index) -> + IsAcked = ack_inflight(InflightTID, Ref, Id, Index), case is_inflight_full(InflightTID) of true -> - IsAcked = ack_inflight(InflightTID, Ref, Id, Index), - ?MODULE:resume(Pid), - IsAcked; + ok; false -> - ack_inflight(InflightTID, Ref, Id, Index) - end. + ?MODULE:resume(Pid) + end, + IsAcked. %%============================================================================== %% operations for queue