diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index ff3b67c7a..0a6adf3d6 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -1019,15 +1019,16 @@ do_handle_async_reply( ref => Ref, result => Result }), - + IsFullBefore = is_inflight_full(InflightTID), case Action of nack -> %% Keep retrying. ok = mark_inflight_as_retriable(InflightTID, Ref), ?MODULE:block(Pid); ack -> - do_ack(InflightTID, Ref, Id, Index, PostFn, Pid, QueryOpts) - end. + do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts) + end, + ok = maybe_flush_after_async_reply(IsFullBefore). handle_async_batch_reply( #{ @@ -1042,19 +1043,21 @@ handle_async_batch_reply( #{batch_or_query => Batch, ref => Ref} ), Now = now_(), + IsFullBefore = is_inflight_full(InflightTID), case sieve_expired_requests(Batch, Now) of {_NotExpired, []} -> %% this is the critical code path, %% we try not to do ets:lookup in this case %% because the batch can be quite big - do_handle_async_batch_reply(ReplyContext, Result); + ok = do_handle_async_batch_reply(ReplyContext, Result); {_NotExpired, _Expired} -> %% at least one is expired %% the batch from reply context is minimized, so it cannot be used %% to update the inflight items, hence discard Batch and lookup the RealBatch ?tp(handle_async_reply_expired, #{expired => _Expired}), - handle_async_batch_reply2(ets:lookup(InflightTID, Ref), ReplyContext, Result, Now) - end. + ok = handle_async_batch_reply2(ets:lookup(InflightTID, Ref), ReplyContext, Result, Now) + end, + ok = maybe_flush_after_async_reply(IsFullBefore). handle_async_batch_reply2([], _, _, _) -> %% e.g. if the driver evaluates the callback more than once @@ -1063,7 +1066,6 @@ handle_async_batch_reply2([], _, _, _) -> handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) -> ?INFLIGHT_ITEM(_, RealBatch, _IsRetriable, _WorkerMRef) = Inflight, #{ - buffer_worker := Pid, resource_id := Id, worker_index := Index, inflight_tid := InflightTID, @@ -1088,15 +1090,15 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) -> case RealNotExpired of [] -> %% all expired, no need to update back the inflight batch - IsFullBefore = is_inflight_full(InflightTID), - IsAcked = ack_inflight(InflightTID, Ref, Id, Index), - IsFullBefore andalso IsAcked andalso ?MODULE:flush_worker(Pid); + _ = ack_inflight(InflightTID, Ref, Id, Index), + ok; _ -> %% some queries are not expired, put them back to the inflight batch %% so it can be either acked now or retried later ok = update_inflight_item(InflightTID, Ref, RealNotExpired, NumExpired), - do_handle_async_batch_reply(ReplyContext#{batch := RealNotExpired}, Result) - end. + ok = do_handle_async_batch_reply(ReplyContext#{batch := RealNotExpired}, Result) + end, + ok. do_handle_async_batch_reply( #{ @@ -1123,11 +1125,10 @@ do_handle_async_batch_reply( ok = mark_inflight_as_retriable(InflightTID, Ref), ?MODULE:block(Pid); ack -> - do_ack(InflightTID, Ref, Id, Index, PostFn, Pid, QueryOpts) + ok = do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts) end. -do_ack(InflightTID, Ref, Id, Index, PostFn, WorkerPid, QueryOpts) -> - IsFullBefore = is_inflight_full(InflightTID), +do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts) -> IsKnownRef = ack_inflight(InflightTID, Ref, Id, Index), case maps:get(simple_query, QueryOpts, false) of true -> @@ -1137,9 +1138,18 @@ do_ack(InflightTID, Ref, Id, Index, PostFn, WorkerPid, QueryOpts) -> false -> ok end, - IsFullBefore andalso ?MODULE:flush_worker(WorkerPid), ok. +maybe_flush_after_async_reply(_WasFullBeforeReplyHandled = false) -> + %% inflight was not full before async reply is handled, + %% after it is handled, the inflight table must be even smaller + %% hance we can rely on the buffer worker's flush timer to trigger + %% the next flush + ok; +maybe_flush_after_async_reply(_WasFullBeforeReplyHandled = true) -> + %% the inflight table was full before handling aync reply + ok = ?MODULE:flush_worker(self()). + %%============================================================================== %% operations for queue queue_item_marshaller(Bin) when is_binary(Bin) ->