From 713220f88b8a0bbc5dec89eb54fd9e5b9415d311 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Thu, 23 Feb 2023 00:04:20 +0100 Subject: [PATCH] refactor(buffer_worker): more generic process for all_expired --- .../src/emqx_resource_buffer_worker.erl | 110 +++++++++--------- 1 file changed, 55 insertions(+), 55 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 6aa13092a..ff3b67c7a 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -489,7 +489,7 @@ flush(Data0) -> %% if the request has expired, the caller is no longer %% waiting for a response. case sieve_expired_requests(Batch, Now) of - all_expired -> + {[], _AllExpired} -> ok = replayq:ack(Q1, QAckRef), emqx_resource_metrics:dropped_expired_inc(Id, length(Batch)), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), @@ -1031,9 +1031,6 @@ do_handle_async_reply( handle_async_batch_reply( #{ - buffer_worker := Pid, - resource_id := Id, - worker_index := Index, inflight_tid := InflightTID, request_ref := Ref, batch := Batch @@ -1046,44 +1043,59 @@ handle_async_batch_reply( ), Now = now_(), case sieve_expired_requests(Batch, Now) of - all_expired -> - IsFullBefore = is_inflight_full(InflightTID), - IsAcked = ack_inflight(InflightTID, Ref, Id, Index), - IsAcked andalso emqx_resource_metrics:late_reply_inc(Id, length(Batch)), - IsFullBefore andalso IsAcked andalso ?MODULE:flush_worker(Pid), - ?tp(handle_async_reply_expired, #{expired => Batch}), - ok; {_NotExpired, []} -> + %% this is the critical code path, + %% we try not to do ets:lookup in this case + %% because the batch can be quite big do_handle_async_batch_reply(ReplyContext, Result); {_NotExpired, _Expired} -> - %% partial expire + %% at least one is expired %% the batch from reply context is minimized, so it cannot be used %% to update the inflight items, hence discard Batch and lookup the RealBatch ?tp(handle_async_reply_expired, #{expired => _Expired}), - case ets:lookup(InflightTID, Ref) of - [] -> - %% e.g. if the driver evaluates it more than once - %% which should really be a bug, TODO: add a unknown_reply counter - ok; - [?INFLIGHT_ITEM(_, RealBatch, _IsRetriable, _WorkerMRef)] -> - %% All batch items share the same HasBeenSent flag - %% So we just take the original flag from the ReplyContext batch - %% and put it back to the batch found in inflight table - %% which must have already been set to `false` - [?QUERY(_ReplyTo, _, HasBeenSent, _ExpireAt) | _] = Batch, - {RealNotExpired0, RealExpired} = sieve_expired_requests(RealBatch, Now), - RealNotExpired = - lists:map( - fun(?QUERY(ReplyTo, CoreReq, _HasBeenSent, ExpireAt)) -> - ?QUERY(ReplyTo, CoreReq, HasBeenSent, ExpireAt) - end, - RealNotExpired0 - ), - NumExpired = length(RealExpired), - emqx_resource_metrics:late_reply_inc(Id, NumExpired), - ok = update_inflight_item(InflightTID, Ref, RealNotExpired, NumExpired), - do_handle_async_batch_reply(ReplyContext#{batch := RealNotExpired}, Result) - end + handle_async_batch_reply2(ets:lookup(InflightTID, Ref), ReplyContext, Result, Now) + end. + +handle_async_batch_reply2([], _, _, _) -> + %% e.g. if the driver evaluates the callback more than once + %% which should really be a bug + ok; +handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) -> + ?INFLIGHT_ITEM(_, RealBatch, _IsRetriable, _WorkerMRef) = Inflight, + #{ + buffer_worker := Pid, + resource_id := Id, + worker_index := Index, + inflight_tid := InflightTID, + request_ref := Ref, + batch := Batch + } = ReplyContext, + %% All batch items share the same HasBeenSent flag + %% So we just take the original flag from the ReplyContext batch + %% and put it back to the batch found in inflight table + %% which must have already been set to `false` + [?QUERY(_ReplyTo, _, HasBeenSent, _ExpireAt) | _] = Batch, + {RealNotExpired0, RealExpired} = sieve_expired_requests(RealBatch, Now), + RealNotExpired = + lists:map( + fun(?QUERY(ReplyTo, CoreReq, _HasBeenSent, ExpireAt)) -> + ?QUERY(ReplyTo, CoreReq, HasBeenSent, ExpireAt) + end, + RealNotExpired0 + ), + NumExpired = length(RealExpired), + emqx_resource_metrics:late_reply_inc(Id, NumExpired), + case RealNotExpired of + [] -> + %% all expired, no need to update back the inflight batch + IsFullBefore = is_inflight_full(InflightTID), + IsAcked = ack_inflight(InflightTID, Ref, Id, Index), + IsFullBefore andalso IsAcked andalso ?MODULE:flush_worker(Pid); + _ -> + %% some queries are not expired, put them back to the inflight batch + %% so it can be either acked now or retried later + ok = update_inflight_item(InflightTID, Ref, RealNotExpired, NumExpired), + do_handle_async_batch_reply(ReplyContext#{batch := RealNotExpired}, Result) end. do_handle_async_batch_reply( @@ -1226,10 +1238,8 @@ inflight_get_first_retriable(InflightTID, Now) -> {single, Ref, Query} end; {[{Ref, Batch = [_ | _]}], _Continuation} -> - %% batch is non-empty because we check that in - %% `sieve_expired_requests'. case sieve_expired_requests(Batch, Now) of - all_expired -> + {[], _AllExpired} -> {expired, Ref, Batch}; {NotExpired, Expired} -> {batch, Ref, NotExpired, Expired} @@ -1482,22 +1492,12 @@ is_async_return(_) -> false. sieve_expired_requests(Batch, Now) -> - {Expired, NotExpired} = - lists:partition( - fun(?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt)) -> - is_expired(ExpireAt, Now) - end, - Batch - ), - case {NotExpired, Expired} of - {[], []} -> - %% Should be impossible for batch_size >= 1. - all_expired; - {[], [_ | _]} -> - all_expired; - {[_ | _], _} -> - {NotExpired, Expired} - end. + lists:partition( + fun(?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt)) -> + not is_expired(ExpireAt, Now) + end, + Batch + ). -spec is_expired(infinity | integer(), integer()) -> boolean(). is_expired(infinity = _ExpireAt, _Now) ->