From d4fab92b72237b850b4b7e231eca9e295c48a2b6 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Thu, 26 Jan 2023 18:00:20 +0100 Subject: [PATCH] refactor(buffer_worker): no need to keep request for REPLY macro --- .../src/emqx_resource_buffer_worker.erl | 40 ++++++++++--------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 3d08f0289..5460a8198 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -63,11 +63,7 @@ -define(SEND_REQ(FROM, REQUEST), {'$send_req', FROM, REQUEST}). -define(QUERY(FROM, REQUEST, SENT, EXPIRE_AT), {query, FROM, REQUEST, SENT, EXPIRE_AT}). -define(SIMPLE_QUERY(REQUEST), ?QUERY(undefined, REQUEST, false, infinity)). --define(REPLY(FROM, REQUEST, SENT, RESULT), {reply, FROM, REQUEST, SENT, RESULT}). --define(EXPAND(RESULT, BATCH), [ - ?REPLY(FROM, REQUEST, SENT, RESULT) - || ?QUERY(FROM, REQUEST, SENT, _EXPIRE_AT) <- BATCH -]). +-define(REPLY(FROM, SENT, RESULT), {reply, FROM, SENT, RESULT}). -define(INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, WorkerMRef), {Ref, BatchOrQuery, IsRetriable, WorkerMRef} ). @@ -370,8 +366,8 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) -> Result = call_query(sync, Id, Index, Ref, QueryOrBatch, QueryOpts), ReplyResult = case QueryOrBatch of - ?QUERY(From, CoreReq, HasBeenSent, _ExpireAt) -> - Reply = ?REPLY(From, CoreReq, HasBeenSent, Result), + ?QUERY(From, _, HasBeenSent, _ExpireAt) -> + Reply = ?REPLY(From, HasBeenSent, Result), reply_caller_defer_metrics(Id, Reply, QueryOpts); [?QUERY(_, _, _, _) | _] = Batch -> batch_reply_caller_defer_metrics(Id, Result, Batch, QueryOpts) @@ -548,10 +544,10 @@ do_flush( inflight_tid := InflightTID } = Data0, %% unwrap when not batching (i.e., batch size == 1) - [?QUERY(From, CoreReq, HasBeenSent, _ExpireAt) = Request] = Batch, + [?QUERY(From, _, HasBeenSent, _ExpireAt) = Request] = Batch, QueryOpts = #{inflight_tid => InflightTID, simple_query => false}, Result = call_query(configured, Id, Index, Ref, Request, QueryOpts), - Reply = ?REPLY(From, CoreReq, HasBeenSent, Result), + Reply = ?REPLY(From, HasBeenSent, Result), case reply_caller(Id, Reply, QueryOpts) of %% Failed; remove the request from the queue, as we cannot pop %% from it again, but we'll retry it using the inflight table. @@ -705,6 +701,14 @@ batch_reply_caller(Id, BatchResult, Batch, QueryOpts) -> ShouldBlock. batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts) -> + %% the `Mod:on_batch_query/3` returns a single result for a batch, + %% so we need to expand + Replies = lists:map( + fun(?QUERY(FROM, _REQUEST, SENT, _EXPIRE_AT)) -> + ?REPLY(FROM, SENT, BatchResult) + end, + Batch + ), {ShouldAck, PostFns} = lists:foldl( fun(Reply, {_ShouldAck, PostFns}) -> @@ -712,9 +716,7 @@ batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts) -> {ShouldAck, [PostFn | PostFns]} end, {ack, []}, - %% the `Mod:on_batch_query/3` returns a single result for a batch, - %% so we need to expand - ?EXPAND(BatchResult, Batch) + Replies ), PostFn = fun() -> lists:foreach(fun(F) -> F() end, PostFns) end, {ShouldAck, PostFn}. @@ -726,9 +728,9 @@ reply_caller(Id, Reply, QueryOpts) -> %% Should only reply to the caller when the decision is final (not %% retriable). See comment on `handle_query_result_pure'. -reply_caller_defer_metrics(Id, ?REPLY(undefined, _, HasBeenSent, Result), _QueryOpts) -> +reply_caller_defer_metrics(Id, ?REPLY(undefined, HasBeenSent, Result), _QueryOpts) -> handle_query_result_pure(Id, Result, HasBeenSent); -reply_caller_defer_metrics(Id, ?REPLY({ReplyFun, Args}, _, HasBeenSent, Result), QueryOpts) when +reply_caller_defer_metrics(Id, ?REPLY({ReplyFun, Args}, HasBeenSent, Result), QueryOpts) when is_function(ReplyFun) -> IsSimpleQuery = maps:get(simple_query, QueryOpts, false), @@ -750,7 +752,7 @@ reply_caller_defer_metrics(Id, ?REPLY({ReplyFun, Args}, _, HasBeenSent, Result), ok end, {ShouldAck, PostFn}; -reply_caller_defer_metrics(Id, ?REPLY(From, _, HasBeenSent, Result), QueryOpts) -> +reply_caller_defer_metrics(Id, ?REPLY(From, HasBeenSent, Result), QueryOpts) -> IsSimpleQuery = maps:get(simple_query, QueryOpts, false), IsUnrecoverableError = is_unrecoverable_error(Result), {ShouldAck, PostFn} = handle_query_result_pure(Id, Result, HasBeenSent), @@ -989,7 +991,7 @@ do_reply_after_query( Index, InflightTID, Ref, - ?QUERY(From, Request, HasBeenSent, _ExpireAt), + ?QUERY(From, _Request, HasBeenSent, _ExpireAt), QueryOpts, Result ) -> @@ -997,14 +999,14 @@ do_reply_after_query( %% but received no ACK, NOT the number of messages queued in the %% inflight window. {Action, PostFn} = reply_caller_defer_metrics( - Id, ?REPLY(From, Request, HasBeenSent, Result), QueryOpts + Id, ?REPLY(From, HasBeenSent, Result), QueryOpts ), case Action of nack -> %% Keep retrying. ?tp(buffer_worker_reply_after_query, #{ action => Action, - batch_or_query => ?QUERY(From, Request, HasBeenSent, _ExpireAt), + batch_or_query => ?QUERY(From, _Request, HasBeenSent, _ExpireAt), ref => Ref, result => Result }), @@ -1013,7 +1015,7 @@ do_reply_after_query( ack -> ?tp(buffer_worker_reply_after_query, #{ action => Action, - batch_or_query => ?QUERY(From, Request, HasBeenSent, _ExpireAt), + batch_or_query => ?QUERY(From, _Request, HasBeenSent, _ExpireAt), ref => Ref, result => Result }),