From db2f631a8a49289f30eb59624cd1c220de505c03 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Fri, 27 Jan 2023 11:16:06 +0100 Subject: [PATCH] refactor(buffer_worker): simplify caller reply --- .../src/emqx_resource_buffer_worker.erl | 95 +++++++------------ 1 file changed, 36 insertions(+), 59 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 5460a8198..c7a061b61 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -210,7 +210,7 @@ running(cast, flush, Data) -> flush(Data); running(cast, block, St) -> {next_state, blocked, St}; -running(info, ?SEND_REQ(_From, _Req) = Request0, Data) -> +running(info, ?SEND_REQ(_ReplyTo, _Req) = Request0, Data) -> handle_query_requests(Request0, Data); running(info, {flush, Ref}, St = #{tref := {_TRef, Ref}}) -> flush(St#{tref := undefined}); @@ -238,7 +238,7 @@ blocked(cast, flush, Data) -> resume_from_blocked(Data); blocked(state_timeout, unblock, St) -> resume_from_blocked(St); -blocked(info, ?SEND_REQ(_ReqFrom, _Req) = Request0, Data0) -> +blocked(info, ?SEND_REQ(_ReplyTo, _Req) = Request0, Data0) -> Data = collect_and_enqueue_query_requests(Request0, Data0), {keep_state, Data}; blocked(info, {flush, _Ref}, _Data) -> @@ -284,7 +284,8 @@ pick_call(Id, Key, Query, Timeout) -> Caller = self(), MRef = erlang:monitor(process, Pid, [{alias, reply_demonitor}]), From = {Caller, MRef}, - erlang:send(Pid, ?SEND_REQ(From, Query)), + ReplyTo = {fun gen_statem:reply/2, [From]}, + erlang:send(Pid, ?SEND_REQ(ReplyTo, Query)), receive {MRef, Response} -> erlang:demonitor(MRef, [flush]), @@ -304,8 +305,8 @@ pick_call(Id, Key, Query, Timeout) -> pick_cast(Id, Key, Query) -> ?PICK(Id, Key, Pid, begin - From = undefined, - erlang:send(Pid, ?SEND_REQ(From, Query)), + ReplyTo = undefined, + erlang:send(Pid, ?SEND_REQ(ReplyTo, Query)), ok end). @@ -366,8 +367,8 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) -> Result = call_query(sync, Id, Index, Ref, QueryOrBatch, QueryOpts), ReplyResult = case QueryOrBatch of - ?QUERY(From, _, HasBeenSent, _ExpireAt) -> - Reply = ?REPLY(From, HasBeenSent, Result), + ?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt) -> + Reply = ?REPLY(ReplyTo, HasBeenSent, Result), reply_caller_defer_metrics(Id, Reply, QueryOpts); [?QUERY(_, _, _, _) | _] = Batch -> batch_reply_caller_defer_metrics(Id, Result, Batch, QueryOpts) @@ -421,15 +422,15 @@ collect_and_enqueue_query_requests(Request0, Data0) -> Queries = lists:map( fun - (?SEND_REQ(undefined = _From, {query, Req, Opts})) -> + (?SEND_REQ(undefined = _ReplyTo, {query, Req, Opts})) -> ReplyFun = maps:get(async_reply_fun, Opts, undefined), HasBeenSent = false, ExpireAt = maps:get(expire_at, Opts), ?QUERY(ReplyFun, Req, HasBeenSent, ExpireAt); - (?SEND_REQ(From, {query, Req, Opts})) -> + (?SEND_REQ(ReplyTo, {query, Req, Opts})) -> HasBeenSent = false, ExpireAt = maps:get(expire_at, Opts), - ?QUERY(From, Req, HasBeenSent, ExpireAt) + ?QUERY(ReplyTo, Req, HasBeenSent, ExpireAt) end, Requests ), @@ -439,17 +440,18 @@ collect_and_enqueue_query_requests(Request0, Data0) -> reply_overflown([]) -> ok; -reply_overflown([?QUERY(From, _Req, _HasBeenSent, _ExpireAt) | More]) -> - do_reply_caller(From, {error, buffer_overflow}), +reply_overflown([?QUERY(ReplyTo, _Req, _HasBeenSent, _ExpireAt) | More]) -> + do_reply_caller(ReplyTo, {error, buffer_overflow}), reply_overflown(More). do_reply_caller(undefined, _Result) -> ok; +do_reply_caller({F, Args}, {async_return, Result}) -> + %% this is an early return to async caller, the retry + %% decision has to be made by the caller + do_reply_caller({F, Args}, Result); do_reply_caller({F, Args}, Result) when is_function(F) -> _ = erlang:apply(F, Args ++ [Result]), - ok; -do_reply_caller(From, Result) -> - _ = gen_statem:reply(From, Result), ok. maybe_flush(Data0) -> @@ -544,10 +546,10 @@ do_flush( inflight_tid := InflightTID } = Data0, %% unwrap when not batching (i.e., batch size == 1) - [?QUERY(From, _, HasBeenSent, _ExpireAt) = Request] = Batch, + [?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt) = Request] = Batch, QueryOpts = #{inflight_tid => InflightTID, simple_query => false}, Result = call_query(configured, Id, Index, Ref, Request, QueryOpts), - Reply = ?REPLY(From, HasBeenSent, Result), + Reply = ?REPLY(ReplyTo, HasBeenSent, Result), case reply_caller(Id, Reply, QueryOpts) of %% Failed; remove the request from the queue, as we cannot pop %% from it again, but we'll retry it using the inflight table. @@ -730,46 +732,21 @@ reply_caller(Id, Reply, QueryOpts) -> %% retriable). See comment on `handle_query_result_pure'. reply_caller_defer_metrics(Id, ?REPLY(undefined, HasBeenSent, Result), _QueryOpts) -> handle_query_result_pure(Id, Result, HasBeenSent); -reply_caller_defer_metrics(Id, ?REPLY({ReplyFun, Args}, HasBeenSent, Result), QueryOpts) when - is_function(ReplyFun) --> +reply_caller_defer_metrics(Id, ?REPLY(ReplyTo, HasBeenSent, Result), QueryOpts) -> IsSimpleQuery = maps:get(simple_query, QueryOpts, false), IsUnrecoverableError = is_unrecoverable_error(Result), {ShouldAck, PostFn} = handle_query_result_pure(Id, Result, HasBeenSent), case {ShouldAck, Result, IsUnrecoverableError, IsSimpleQuery} of {ack, {async_return, _}, true, _} -> - apply(ReplyFun, Args ++ [Result]), - ok; + ok = do_reply_caller(ReplyTo, Result); {ack, {async_return, _}, false, _} -> ok; {_, _, _, true} -> - apply(ReplyFun, Args ++ [Result]), - ok; + ok = do_reply_caller(ReplyTo, Result); {nack, _, _, _} -> ok; {ack, _, _, _} -> - apply(ReplyFun, Args ++ [Result]), - ok - end, - {ShouldAck, PostFn}; -reply_caller_defer_metrics(Id, ?REPLY(From, HasBeenSent, Result), QueryOpts) -> - IsSimpleQuery = maps:get(simple_query, QueryOpts, false), - IsUnrecoverableError = is_unrecoverable_error(Result), - {ShouldAck, PostFn} = handle_query_result_pure(Id, Result, HasBeenSent), - case {ShouldAck, Result, IsUnrecoverableError, IsSimpleQuery} of - {ack, {async_return, _}, true, _} -> - gen_statem:reply(From, Result), - ok; - {ack, {async_return, _}, false, _} -> - ok; - {_, _, _, true} -> - gen_statem:reply(From, Result), - ok; - {nack, _, _, _} -> - ok; - {ack, _, _, _} -> - gen_statem:reply(From, Result), - ok + ok = do_reply_caller(ReplyTo, Result) end, {ShouldAck, PostFn}. @@ -935,7 +912,7 @@ apply_query_fun(sync, Mod, Id, _Index, _Ref, [?QUERY(_, _, _, _) | _] = Batch, R ?tp(call_batch_query, #{ id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => sync }), - Requests = [Request || ?QUERY(_From, Request, _, _ExpireAt) <- Batch], + Requests = [Request || ?QUERY(_ReplyTo, Request, _, _ExpireAt) <- Batch], ?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch); apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, ResSt, QueryOpts) -> ?tp(call_batch_query_async, #{ @@ -947,7 +924,7 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, Re begin ReplyFun = fun ?MODULE:batch_reply_after_query/8, ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, InflightTID, Ref, Batch, QueryOpts]}, - Requests = [Request || ?QUERY(_From, Request, _, _ExpireAt) <- Batch], + Requests = [Request || ?QUERY(_ReplyTo, Request, _, _ExpireAt) <- Batch], IsRetriable = false, WorkerMRef = undefined, InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef), @@ -964,7 +941,7 @@ reply_after_query( Index, InflightTID, Ref, - ?QUERY(_From, _Request, _HasBeenSent, ExpireAt) = Query, + ?QUERY(_ReplyTo, _Request, _HasBeenSent, ExpireAt) = Query, QueryOpts, Result ) -> @@ -991,7 +968,7 @@ do_reply_after_query( Index, InflightTID, Ref, - ?QUERY(From, _Request, HasBeenSent, _ExpireAt), + ?QUERY(ReplyTo, _Request, HasBeenSent, _ExpireAt), QueryOpts, Result ) -> @@ -999,14 +976,14 @@ do_reply_after_query( %% but received no ACK, NOT the number of messages queued in the %% inflight window. {Action, PostFn} = reply_caller_defer_metrics( - Id, ?REPLY(From, HasBeenSent, Result), QueryOpts + Id, ?REPLY(ReplyTo, HasBeenSent, Result), QueryOpts ), case Action of nack -> %% Keep retrying. ?tp(buffer_worker_reply_after_query, #{ action => Action, - batch_or_query => ?QUERY(From, _Request, HasBeenSent, _ExpireAt), + batch_or_query => ?QUERY(ReplyTo, _Request, HasBeenSent, _ExpireAt), ref => Ref, result => Result }), @@ -1015,7 +992,7 @@ do_reply_after_query( ack -> ?tp(buffer_worker_reply_after_query, #{ action => Action, - batch_or_query => ?QUERY(From, _Request, HasBeenSent, _ExpireAt), + batch_or_query => ?QUERY(ReplyTo, _Request, HasBeenSent, _ExpireAt), ref => Ref, result => Result }), @@ -1175,7 +1152,7 @@ inflight_get_first_retriable(InflightTID, Now) -> case ets:select(InflightTID, MatchSpec, _Limit = 1) of '$end_of_table' -> none; - {[{Ref, Query = ?QUERY(_From, _CoreReq, _HasBeenSent, ExpireAt)}], _Continuation} -> + {[{Ref, Query = ?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt)}], _Continuation} -> case is_expired(ExpireAt, Now) of true -> {expired, Ref, [Query]}; @@ -1234,7 +1211,7 @@ inflight_append( inflight_append( InflightTID, ?INFLIGHT_ITEM( - Ref, ?QUERY(_From, _Req, _HasBeenSent, _ExpireAt) = Query0, IsRetriable, WorkerMRef + Ref, ?QUERY(_ReplyTo, _Req, _HasBeenSent, _ExpireAt) = Query0, IsRetriable, WorkerMRef ), Id, Index @@ -1405,7 +1382,7 @@ do_collect_requests(Acc, Count, Limit) when Count >= Limit -> lists:reverse(Acc); do_collect_requests(Acc, Count, Limit) -> receive - ?SEND_REQ(_From, _Req) = Request -> + ?SEND_REQ(_ReplyTo, _Req) = Request -> do_collect_requests([Request | Acc], Count + 1, Limit) after 0 -> lists:reverse(Acc) @@ -1413,9 +1390,9 @@ do_collect_requests(Acc, Count, Limit) -> mark_as_sent(Batch) when is_list(Batch) -> lists:map(fun mark_as_sent/1, Batch); -mark_as_sent(?QUERY(From, Req, _HasBeenSent, ExpireAt)) -> +mark_as_sent(?QUERY(ReplyTo, Req, _HasBeenSent, ExpireAt)) -> HasBeenSent = true, - ?QUERY(From, Req, HasBeenSent, ExpireAt). + ?QUERY(ReplyTo, Req, HasBeenSent, ExpireAt). is_unrecoverable_error({error, {unrecoverable_error, _}}) -> true; @@ -1439,7 +1416,7 @@ is_async_return(_) -> sieve_expired_requests(Batch, Now) -> {Expired, NotExpired} = lists:partition( - fun(?QUERY(_From, _CoreReq, _HasBeenSent, ExpireAt)) -> + fun(?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt)) -> is_expired(ExpireAt, Now) end, Batch