From 356a94af30ec168f8c2a2d6a7360e7b446e14e26 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Thu, 23 Feb 2023 09:47:34 +0100 Subject: [PATCH] fix(buffer_worker): ensure async flush message is sent This is a new issue introduced in the previous fix commits after handling the partial expiry correctly, the IsFullBefore check is no longer the state before the reply is received but the state after a partially-expired batch is shrinked. The fix is simple, move the check to the entry-point of where async reply callback enters, then send an async 'flush' notification regardless of the handling result. --- .../src/emqx_resource_buffer_worker.erl | 42 ++++++++++++------- 1 file changed, 26 insertions(+), 16 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index ff3b67c7a..0a6adf3d6 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -1019,15 +1019,16 @@ do_handle_async_reply( ref => Ref, result => Result }), - + IsFullBefore = is_inflight_full(InflightTID), case Action of nack -> %% Keep retrying. ok = mark_inflight_as_retriable(InflightTID, Ref), ?MODULE:block(Pid); ack -> - do_ack(InflightTID, Ref, Id, Index, PostFn, Pid, QueryOpts) - end. + do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts) + end, + ok = maybe_flush_after_async_reply(IsFullBefore). handle_async_batch_reply( #{ @@ -1042,19 +1043,21 @@ handle_async_batch_reply( #{batch_or_query => Batch, ref => Ref} ), Now = now_(), + IsFullBefore = is_inflight_full(InflightTID), case sieve_expired_requests(Batch, Now) of {_NotExpired, []} -> %% this is the critical code path, %% we try not to do ets:lookup in this case %% because the batch can be quite big - do_handle_async_batch_reply(ReplyContext, Result); + ok = do_handle_async_batch_reply(ReplyContext, Result); {_NotExpired, _Expired} -> %% at least one is expired %% the batch from reply context is minimized, so it cannot be used %% to update the inflight items, hence discard Batch and lookup the RealBatch ?tp(handle_async_reply_expired, #{expired => _Expired}), - handle_async_batch_reply2(ets:lookup(InflightTID, Ref), ReplyContext, Result, Now) - end. + ok = handle_async_batch_reply2(ets:lookup(InflightTID, Ref), ReplyContext, Result, Now) + end, + ok = maybe_flush_after_async_reply(IsFullBefore). handle_async_batch_reply2([], _, _, _) -> %% e.g. if the driver evaluates the callback more than once @@ -1063,7 +1066,6 @@ handle_async_batch_reply2([], _, _, _) -> handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) -> ?INFLIGHT_ITEM(_, RealBatch, _IsRetriable, _WorkerMRef) = Inflight, #{ - buffer_worker := Pid, resource_id := Id, worker_index := Index, inflight_tid := InflightTID, @@ -1088,15 +1090,15 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) -> case RealNotExpired of [] -> %% all expired, no need to update back the inflight batch - IsFullBefore = is_inflight_full(InflightTID), - IsAcked = ack_inflight(InflightTID, Ref, Id, Index), - IsFullBefore andalso IsAcked andalso ?MODULE:flush_worker(Pid); + _ = ack_inflight(InflightTID, Ref, Id, Index), + ok; _ -> %% some queries are not expired, put them back to the inflight batch %% so it can be either acked now or retried later ok = update_inflight_item(InflightTID, Ref, RealNotExpired, NumExpired), - do_handle_async_batch_reply(ReplyContext#{batch := RealNotExpired}, Result) - end. + ok = do_handle_async_batch_reply(ReplyContext#{batch := RealNotExpired}, Result) + end, + ok. do_handle_async_batch_reply( #{ @@ -1123,11 +1125,10 @@ do_handle_async_batch_reply( ok = mark_inflight_as_retriable(InflightTID, Ref), ?MODULE:block(Pid); ack -> - do_ack(InflightTID, Ref, Id, Index, PostFn, Pid, QueryOpts) + ok = do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts) end. -do_ack(InflightTID, Ref, Id, Index, PostFn, WorkerPid, QueryOpts) -> - IsFullBefore = is_inflight_full(InflightTID), +do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts) -> IsKnownRef = ack_inflight(InflightTID, Ref, Id, Index), case maps:get(simple_query, QueryOpts, false) of true -> @@ -1137,9 +1138,18 @@ do_ack(InflightTID, Ref, Id, Index, PostFn, WorkerPid, QueryOpts) -> false -> ok end, - IsFullBefore andalso ?MODULE:flush_worker(WorkerPid), ok. +maybe_flush_after_async_reply(_WasFullBeforeReplyHandled = false) -> + %% inflight was not full before async reply is handled, + %% after it is handled, the inflight table must be even smaller + %% hance we can rely on the buffer worker's flush timer to trigger + %% the next flush + ok; +maybe_flush_after_async_reply(_WasFullBeforeReplyHandled = true) -> + %% the inflight table was full before handling aync reply + ok = ?MODULE:flush_worker(self()). + %%============================================================================== %% operations for queue queue_item_marshaller(Bin) when is_binary(Bin) ->