fix: always ack async replies
The caller should decide if it should retry in that case, to avoid overwhelming the resource with retries.
This commit is contained in:
parent
bd95a95409
commit
344eeebe63
|
@ -724,13 +724,17 @@ reply_after_query(Pid, Id, Index, InflightTID, Ref, ?QUERY(From, Request, HasBee
|
|||
%% NOTE: 'inflight' is the count of messages that were sent async
|
||||
%% but received no ACK, NOT the number of messages queued in the
|
||||
%% inflight window.
|
||||
case reply_caller_defer_metrics(Id, ?REPLY(From, Request, HasBeenSent, Result)) of
|
||||
{nack, PostFn} ->
|
||||
PostFn(),
|
||||
{Action, PostFn} = reply_caller_defer_metrics(Id, ?REPLY(From, Request, HasBeenSent, Result)),
|
||||
%% Should always ack async inflight requests that
|
||||
%% returned, otherwise the request will get retried. The
|
||||
%% caller has just been notified of the failure and should
|
||||
%% decide if it wants to retry or not.
|
||||
IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
|
||||
IsAcked andalso PostFn(),
|
||||
case Action of
|
||||
nack ->
|
||||
?MODULE:block(Pid);
|
||||
{ack, PostFn} ->
|
||||
IsAcked = ack_inflight_and_resume(Pid, InflightTID, Ref, Id, Index),
|
||||
IsAcked andalso PostFn(),
|
||||
ack ->
|
||||
ok
|
||||
end.
|
||||
|
||||
|
@ -738,25 +742,29 @@ batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, Result) ->
|
|||
%% NOTE: 'inflight' is the count of messages that were sent async
|
||||
%% but received no ACK, NOT the number of messages queued in the
|
||||
%% inflight window.
|
||||
case batch_reply_caller_defer_metrics(Id, Result, Batch) of
|
||||
{nack, PostFns} ->
|
||||
lists:foreach(fun(F) -> F() end, PostFns),
|
||||
{Action, PostFns} = batch_reply_caller_defer_metrics(Id, Result, Batch),
|
||||
%% Should always ack async inflight requests that
|
||||
%% returned, otherwise the request will get retried. The
|
||||
%% caller has just been notified of the failure and should
|
||||
%% decide if it wants to retry or not.
|
||||
IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
|
||||
IsAcked andalso lists:foreach(fun(F) -> F() end, PostFns),
|
||||
case Action of
|
||||
nack ->
|
||||
?MODULE:block(Pid);
|
||||
{ack, PostFns} ->
|
||||
IsAcked = ack_inflight_and_resume(Pid, InflightTID, Ref, Id, Index),
|
||||
IsAcked andalso lists:foreach(fun(F) -> F() end, PostFns),
|
||||
ack ->
|
||||
ok
|
||||
end.
|
||||
|
||||
ack_inflight_and_resume(Pid, InflightTID, Ref, Id, Index) ->
|
||||
IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
|
||||
case is_inflight_full(InflightTID) of
|
||||
true ->
|
||||
IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
|
||||
?MODULE:resume(Pid),
|
||||
IsAcked;
|
||||
ok;
|
||||
false ->
|
||||
ack_inflight(InflightTID, Ref, Id, Index)
|
||||
end.
|
||||
?MODULE:resume(Pid)
|
||||
end,
|
||||
IsAcked.
|
||||
|
||||
%%==============================================================================
|
||||
%% operations for queue
|
||||
|
|
Loading…
Reference in New Issue