refactor(buffer_worker): simplify caller reply

This commit is contained in:
Zaiming (Stone) Shi 2023-01-27 11:16:06 +01:00
parent 965236c888
commit db2f631a8a
1 changed files with 36 additions and 59 deletions

View File

@ -210,7 +210,7 @@ running(cast, flush, Data) ->
flush(Data); flush(Data);
running(cast, block, St) -> running(cast, block, St) ->
{next_state, blocked, St}; {next_state, blocked, St};
running(info, ?SEND_REQ(_From, _Req) = Request0, Data) -> running(info, ?SEND_REQ(_ReplyTo, _Req) = Request0, Data) ->
handle_query_requests(Request0, Data); handle_query_requests(Request0, Data);
running(info, {flush, Ref}, St = #{tref := {_TRef, Ref}}) -> running(info, {flush, Ref}, St = #{tref := {_TRef, Ref}}) ->
flush(St#{tref := undefined}); flush(St#{tref := undefined});
@ -238,7 +238,7 @@ blocked(cast, flush, Data) ->
resume_from_blocked(Data); resume_from_blocked(Data);
blocked(state_timeout, unblock, St) -> blocked(state_timeout, unblock, St) ->
resume_from_blocked(St); resume_from_blocked(St);
blocked(info, ?SEND_REQ(_ReqFrom, _Req) = Request0, Data0) -> blocked(info, ?SEND_REQ(_ReplyTo, _Req) = Request0, Data0) ->
Data = collect_and_enqueue_query_requests(Request0, Data0), Data = collect_and_enqueue_query_requests(Request0, Data0),
{keep_state, Data}; {keep_state, Data};
blocked(info, {flush, _Ref}, _Data) -> blocked(info, {flush, _Ref}, _Data) ->
@ -284,7 +284,8 @@ pick_call(Id, Key, Query, Timeout) ->
Caller = self(), Caller = self(),
MRef = erlang:monitor(process, Pid, [{alias, reply_demonitor}]), MRef = erlang:monitor(process, Pid, [{alias, reply_demonitor}]),
From = {Caller, MRef}, From = {Caller, MRef},
erlang:send(Pid, ?SEND_REQ(From, Query)), ReplyTo = {fun gen_statem:reply/2, [From]},
erlang:send(Pid, ?SEND_REQ(ReplyTo, Query)),
receive receive
{MRef, Response} -> {MRef, Response} ->
erlang:demonitor(MRef, [flush]), erlang:demonitor(MRef, [flush]),
@ -304,8 +305,8 @@ pick_call(Id, Key, Query, Timeout) ->
pick_cast(Id, Key, Query) -> pick_cast(Id, Key, Query) ->
?PICK(Id, Key, Pid, begin ?PICK(Id, Key, Pid, begin
From = undefined, ReplyTo = undefined,
erlang:send(Pid, ?SEND_REQ(From, Query)), erlang:send(Pid, ?SEND_REQ(ReplyTo, Query)),
ok ok
end). end).
@ -366,8 +367,8 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
Result = call_query(sync, Id, Index, Ref, QueryOrBatch, QueryOpts), Result = call_query(sync, Id, Index, Ref, QueryOrBatch, QueryOpts),
ReplyResult = ReplyResult =
case QueryOrBatch of case QueryOrBatch of
?QUERY(From, _, HasBeenSent, _ExpireAt) -> ?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt) ->
Reply = ?REPLY(From, HasBeenSent, Result), Reply = ?REPLY(ReplyTo, HasBeenSent, Result),
reply_caller_defer_metrics(Id, Reply, QueryOpts); reply_caller_defer_metrics(Id, Reply, QueryOpts);
[?QUERY(_, _, _, _) | _] = Batch -> [?QUERY(_, _, _, _) | _] = Batch ->
batch_reply_caller_defer_metrics(Id, Result, Batch, QueryOpts) batch_reply_caller_defer_metrics(Id, Result, Batch, QueryOpts)
@ -421,15 +422,15 @@ collect_and_enqueue_query_requests(Request0, Data0) ->
Queries = Queries =
lists:map( lists:map(
fun fun
(?SEND_REQ(undefined = _From, {query, Req, Opts})) -> (?SEND_REQ(undefined = _ReplyTo, {query, Req, Opts})) ->
ReplyFun = maps:get(async_reply_fun, Opts, undefined), ReplyFun = maps:get(async_reply_fun, Opts, undefined),
HasBeenSent = false, HasBeenSent = false,
ExpireAt = maps:get(expire_at, Opts), ExpireAt = maps:get(expire_at, Opts),
?QUERY(ReplyFun, Req, HasBeenSent, ExpireAt); ?QUERY(ReplyFun, Req, HasBeenSent, ExpireAt);
(?SEND_REQ(From, {query, Req, Opts})) -> (?SEND_REQ(ReplyTo, {query, Req, Opts})) ->
HasBeenSent = false, HasBeenSent = false,
ExpireAt = maps:get(expire_at, Opts), ExpireAt = maps:get(expire_at, Opts),
?QUERY(From, Req, HasBeenSent, ExpireAt) ?QUERY(ReplyTo, Req, HasBeenSent, ExpireAt)
end, end,
Requests Requests
), ),
@ -439,17 +440,18 @@ collect_and_enqueue_query_requests(Request0, Data0) ->
reply_overflown([]) -> reply_overflown([]) ->
ok; ok;
reply_overflown([?QUERY(From, _Req, _HasBeenSent, _ExpireAt) | More]) -> reply_overflown([?QUERY(ReplyTo, _Req, _HasBeenSent, _ExpireAt) | More]) ->
do_reply_caller(From, {error, buffer_overflow}), do_reply_caller(ReplyTo, {error, buffer_overflow}),
reply_overflown(More). reply_overflown(More).
do_reply_caller(undefined, _Result) -> do_reply_caller(undefined, _Result) ->
ok; ok;
do_reply_caller({F, Args}, {async_return, Result}) ->
%% this is an early return to async caller, the retry
%% decision has to be made by the caller
do_reply_caller({F, Args}, Result);
do_reply_caller({F, Args}, Result) when is_function(F) -> do_reply_caller({F, Args}, Result) when is_function(F) ->
_ = erlang:apply(F, Args ++ [Result]), _ = erlang:apply(F, Args ++ [Result]),
ok;
do_reply_caller(From, Result) ->
_ = gen_statem:reply(From, Result),
ok. ok.
maybe_flush(Data0) -> maybe_flush(Data0) ->
@ -544,10 +546,10 @@ do_flush(
inflight_tid := InflightTID inflight_tid := InflightTID
} = Data0, } = Data0,
%% unwrap when not batching (i.e., batch size == 1) %% unwrap when not batching (i.e., batch size == 1)
[?QUERY(From, _, HasBeenSent, _ExpireAt) = Request] = Batch, [?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt) = Request] = Batch,
QueryOpts = #{inflight_tid => InflightTID, simple_query => false}, QueryOpts = #{inflight_tid => InflightTID, simple_query => false},
Result = call_query(configured, Id, Index, Ref, Request, QueryOpts), Result = call_query(configured, Id, Index, Ref, Request, QueryOpts),
Reply = ?REPLY(From, HasBeenSent, Result), Reply = ?REPLY(ReplyTo, HasBeenSent, Result),
case reply_caller(Id, Reply, QueryOpts) of case reply_caller(Id, Reply, QueryOpts) of
%% Failed; remove the request from the queue, as we cannot pop %% Failed; remove the request from the queue, as we cannot pop
%% from it again, but we'll retry it using the inflight table. %% from it again, but we'll retry it using the inflight table.
@ -730,46 +732,21 @@ reply_caller(Id, Reply, QueryOpts) ->
%% retriable). See comment on `handle_query_result_pure'. %% retriable). See comment on `handle_query_result_pure'.
reply_caller_defer_metrics(Id, ?REPLY(undefined, HasBeenSent, Result), _QueryOpts) -> reply_caller_defer_metrics(Id, ?REPLY(undefined, HasBeenSent, Result), _QueryOpts) ->
handle_query_result_pure(Id, Result, HasBeenSent); handle_query_result_pure(Id, Result, HasBeenSent);
reply_caller_defer_metrics(Id, ?REPLY({ReplyFun, Args}, HasBeenSent, Result), QueryOpts) when reply_caller_defer_metrics(Id, ?REPLY(ReplyTo, HasBeenSent, Result), QueryOpts) ->
is_function(ReplyFun)
->
IsSimpleQuery = maps:get(simple_query, QueryOpts, false), IsSimpleQuery = maps:get(simple_query, QueryOpts, false),
IsUnrecoverableError = is_unrecoverable_error(Result), IsUnrecoverableError = is_unrecoverable_error(Result),
{ShouldAck, PostFn} = handle_query_result_pure(Id, Result, HasBeenSent), {ShouldAck, PostFn} = handle_query_result_pure(Id, Result, HasBeenSent),
case {ShouldAck, Result, IsUnrecoverableError, IsSimpleQuery} of case {ShouldAck, Result, IsUnrecoverableError, IsSimpleQuery} of
{ack, {async_return, _}, true, _} -> {ack, {async_return, _}, true, _} ->
apply(ReplyFun, Args ++ [Result]), ok = do_reply_caller(ReplyTo, Result);
ok;
{ack, {async_return, _}, false, _} -> {ack, {async_return, _}, false, _} ->
ok; ok;
{_, _, _, true} -> {_, _, _, true} ->
apply(ReplyFun, Args ++ [Result]), ok = do_reply_caller(ReplyTo, Result);
ok;
{nack, _, _, _} -> {nack, _, _, _} ->
ok; ok;
{ack, _, _, _} -> {ack, _, _, _} ->
apply(ReplyFun, Args ++ [Result]), ok = do_reply_caller(ReplyTo, Result)
ok
end,
{ShouldAck, PostFn};
reply_caller_defer_metrics(Id, ?REPLY(From, HasBeenSent, Result), QueryOpts) ->
IsSimpleQuery = maps:get(simple_query, QueryOpts, false),
IsUnrecoverableError = is_unrecoverable_error(Result),
{ShouldAck, PostFn} = handle_query_result_pure(Id, Result, HasBeenSent),
case {ShouldAck, Result, IsUnrecoverableError, IsSimpleQuery} of
{ack, {async_return, _}, true, _} ->
gen_statem:reply(From, Result),
ok;
{ack, {async_return, _}, false, _} ->
ok;
{_, _, _, true} ->
gen_statem:reply(From, Result),
ok;
{nack, _, _, _} ->
ok;
{ack, _, _, _} ->
gen_statem:reply(From, Result),
ok
end, end,
{ShouldAck, PostFn}. {ShouldAck, PostFn}.
@ -935,7 +912,7 @@ apply_query_fun(sync, Mod, Id, _Index, _Ref, [?QUERY(_, _, _, _) | _] = Batch, R
?tp(call_batch_query, #{ ?tp(call_batch_query, #{
id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => sync id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => sync
}), }),
Requests = [Request || ?QUERY(_From, Request, _, _ExpireAt) <- Batch], Requests = [Request || ?QUERY(_ReplyTo, Request, _, _ExpireAt) <- Batch],
?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch); ?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch);
apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, ResSt, QueryOpts) -> apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, ResSt, QueryOpts) ->
?tp(call_batch_query_async, #{ ?tp(call_batch_query_async, #{
@ -947,7 +924,7 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, Re
begin begin
ReplyFun = fun ?MODULE:batch_reply_after_query/8, ReplyFun = fun ?MODULE:batch_reply_after_query/8,
ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, InflightTID, Ref, Batch, QueryOpts]}, ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, InflightTID, Ref, Batch, QueryOpts]},
Requests = [Request || ?QUERY(_From, Request, _, _ExpireAt) <- Batch], Requests = [Request || ?QUERY(_ReplyTo, Request, _, _ExpireAt) <- Batch],
IsRetriable = false, IsRetriable = false,
WorkerMRef = undefined, WorkerMRef = undefined,
InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef), InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef),
@ -964,7 +941,7 @@ reply_after_query(
Index, Index,
InflightTID, InflightTID,
Ref, Ref,
?QUERY(_From, _Request, _HasBeenSent, ExpireAt) = Query, ?QUERY(_ReplyTo, _Request, _HasBeenSent, ExpireAt) = Query,
QueryOpts, QueryOpts,
Result Result
) -> ) ->
@ -991,7 +968,7 @@ do_reply_after_query(
Index, Index,
InflightTID, InflightTID,
Ref, Ref,
?QUERY(From, _Request, HasBeenSent, _ExpireAt), ?QUERY(ReplyTo, _Request, HasBeenSent, _ExpireAt),
QueryOpts, QueryOpts,
Result Result
) -> ) ->
@ -999,14 +976,14 @@ do_reply_after_query(
%% but received no ACK, NOT the number of messages queued in the %% but received no ACK, NOT the number of messages queued in the
%% inflight window. %% inflight window.
{Action, PostFn} = reply_caller_defer_metrics( {Action, PostFn} = reply_caller_defer_metrics(
Id, ?REPLY(From, HasBeenSent, Result), QueryOpts Id, ?REPLY(ReplyTo, HasBeenSent, Result), QueryOpts
), ),
case Action of case Action of
nack -> nack ->
%% Keep retrying. %% Keep retrying.
?tp(buffer_worker_reply_after_query, #{ ?tp(buffer_worker_reply_after_query, #{
action => Action, action => Action,
batch_or_query => ?QUERY(From, _Request, HasBeenSent, _ExpireAt), batch_or_query => ?QUERY(ReplyTo, _Request, HasBeenSent, _ExpireAt),
ref => Ref, ref => Ref,
result => Result result => Result
}), }),
@ -1015,7 +992,7 @@ do_reply_after_query(
ack -> ack ->
?tp(buffer_worker_reply_after_query, #{ ?tp(buffer_worker_reply_after_query, #{
action => Action, action => Action,
batch_or_query => ?QUERY(From, _Request, HasBeenSent, _ExpireAt), batch_or_query => ?QUERY(ReplyTo, _Request, HasBeenSent, _ExpireAt),
ref => Ref, ref => Ref,
result => Result result => Result
}), }),
@ -1175,7 +1152,7 @@ inflight_get_first_retriable(InflightTID, Now) ->
case ets:select(InflightTID, MatchSpec, _Limit = 1) of case ets:select(InflightTID, MatchSpec, _Limit = 1) of
'$end_of_table' -> '$end_of_table' ->
none; none;
{[{Ref, Query = ?QUERY(_From, _CoreReq, _HasBeenSent, ExpireAt)}], _Continuation} -> {[{Ref, Query = ?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt)}], _Continuation} ->
case is_expired(ExpireAt, Now) of case is_expired(ExpireAt, Now) of
true -> true ->
{expired, Ref, [Query]}; {expired, Ref, [Query]};
@ -1234,7 +1211,7 @@ inflight_append(
inflight_append( inflight_append(
InflightTID, InflightTID,
?INFLIGHT_ITEM( ?INFLIGHT_ITEM(
Ref, ?QUERY(_From, _Req, _HasBeenSent, _ExpireAt) = Query0, IsRetriable, WorkerMRef Ref, ?QUERY(_ReplyTo, _Req, _HasBeenSent, _ExpireAt) = Query0, IsRetriable, WorkerMRef
), ),
Id, Id,
Index Index
@ -1405,7 +1382,7 @@ do_collect_requests(Acc, Count, Limit) when Count >= Limit ->
lists:reverse(Acc); lists:reverse(Acc);
do_collect_requests(Acc, Count, Limit) -> do_collect_requests(Acc, Count, Limit) ->
receive receive
?SEND_REQ(_From, _Req) = Request -> ?SEND_REQ(_ReplyTo, _Req) = Request ->
do_collect_requests([Request | Acc], Count + 1, Limit) do_collect_requests([Request | Acc], Count + 1, Limit)
after 0 -> after 0 ->
lists:reverse(Acc) lists:reverse(Acc)
@ -1413,9 +1390,9 @@ do_collect_requests(Acc, Count, Limit) ->
mark_as_sent(Batch) when is_list(Batch) -> mark_as_sent(Batch) when is_list(Batch) ->
lists:map(fun mark_as_sent/1, Batch); lists:map(fun mark_as_sent/1, Batch);
mark_as_sent(?QUERY(From, Req, _HasBeenSent, ExpireAt)) -> mark_as_sent(?QUERY(ReplyTo, Req, _HasBeenSent, ExpireAt)) ->
HasBeenSent = true, HasBeenSent = true,
?QUERY(From, Req, HasBeenSent, ExpireAt). ?QUERY(ReplyTo, Req, HasBeenSent, ExpireAt).
is_unrecoverable_error({error, {unrecoverable_error, _}}) -> is_unrecoverable_error({error, {unrecoverable_error, _}}) ->
true; true;
@ -1439,7 +1416,7 @@ is_async_return(_) ->
sieve_expired_requests(Batch, Now) -> sieve_expired_requests(Batch, Now) ->
{Expired, NotExpired} = {Expired, NotExpired} =
lists:partition( lists:partition(
fun(?QUERY(_From, _CoreReq, _HasBeenSent, ExpireAt)) -> fun(?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt)) ->
is_expired(ExpireAt, Now) is_expired(ExpireAt, Now)
end, end,
Batch Batch