feat: mass collect calls from mailbox also when blocked

This commit is contained in:
Thales Macedo Garitezi 2023-01-12 16:47:45 -03:00
parent d4724d6ce9
commit 196bf1c5ba
1 changed files with 11 additions and 19 deletions

View File

@ -213,23 +213,11 @@ blocked(cast, resume, St) ->
resume_from_blocked(St); resume_from_blocked(St);
blocked(state_timeout, resume, St) -> blocked(state_timeout, resume, St) ->
resume_from_blocked(St); resume_from_blocked(St);
blocked(info, ?SEND_REQ(ReqFrom, {query, Request, Opts}), Data0) -> blocked(info, ?SEND_REQ(_ReqFrom, {query, _Request, _Opts}) = Request0, Data0) ->
#{ #{id := Id} = Data0,
id := Id, {Queries, Data} = collect_and_enqueue_query_requests(Request0, Data0),
index := Index,
queue := Q
} = Data0,
From =
case ReqFrom of
undefined -> maps:get(async_reply_fun, Opts, undefined);
From1 -> From1
end,
Error = ?RESOURCE_ERROR(blocked, "resource is blocked"), Error = ?RESOURCE_ERROR(blocked, "resource is blocked"),
HasBeenSent = false, _ = batch_reply_caller(Id, Error, Queries),
_ = reply_caller(Id, ?REPLY(From, Request, HasBeenSent, Error)),
%% TODO collect requests
NewQ = append_queue(Id, Index, Q, [?QUERY(From, Request, HasBeenSent)]),
Data = Data0#{queue := NewQ},
{keep_state, Data}; {keep_state, Data};
blocked(info, {flush, _Ref}, _Data) -> blocked(info, {flush, _Ref}, _Data) ->
keep_state_and_data; keep_state_and_data;
@ -340,13 +328,17 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
%% Called during the `running' state only. %% Called during the `running' state only.
-spec handle_query_requests(?SEND_REQ(request_from(), request()), data()) -> data(). -spec handle_query_requests(?SEND_REQ(request_from(), request()), data()) -> data().
handle_query_requests(Request0, Data0) -> handle_query_requests(Request0, Data0) ->
{_Queries, Data} = collect_and_enqueue_query_requests(Request0, Data0),
maybe_flush(Data).
collect_and_enqueue_query_requests(Request0, Data0) ->
#{ #{
id := Id, id := Id,
index := Index, index := Index,
queue := Q queue := Q
} = Data0, } = Data0,
Requests = collect_requests([Request0], ?COLLECT_REQ_LIMIT), Requests = collect_requests([Request0], ?COLLECT_REQ_LIMIT),
QueueItems = Queries =
lists:map( lists:map(
fun fun
(?SEND_REQ(undefined = _From, {query, Req, Opts})) -> (?SEND_REQ(undefined = _From, {query, Req, Opts})) ->
@ -359,9 +351,9 @@ handle_query_requests(Request0, Data0) ->
end, end,
Requests Requests
), ),
NewQ = append_queue(Id, Index, Q, QueueItems), NewQ = append_queue(Id, Index, Q, Queries),
Data = Data0#{queue := NewQ}, Data = Data0#{queue := NewQ},
maybe_flush(Data). {Queries, Data}.
maybe_flush(Data0) -> maybe_flush(Data0) ->
#{ #{