From 196bf1c5ba0e04ada28646e0931ef1e601d7644d Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 12 Jan 2023 16:47:45 -0300 Subject: [PATCH] feat: mass collect calls from mailbox also when blocked --- .../src/emqx_resource_worker.erl | 30 +++++++------------ 1 file changed, 11 insertions(+), 19 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 7ac6c5250..a37d78494 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -213,23 +213,11 @@ blocked(cast, resume, St) -> resume_from_blocked(St); blocked(state_timeout, resume, St) -> resume_from_blocked(St); -blocked(info, ?SEND_REQ(ReqFrom, {query, Request, Opts}), Data0) -> - #{ - id := Id, - index := Index, - queue := Q - } = Data0, - From = - case ReqFrom of - undefined -> maps:get(async_reply_fun, Opts, undefined); - From1 -> From1 - end, +blocked(info, ?SEND_REQ(_ReqFrom, {query, _Request, _Opts}) = Request0, Data0) -> + #{id := Id} = Data0, + {Queries, Data} = collect_and_enqueue_query_requests(Request0, Data0), Error = ?RESOURCE_ERROR(blocked, "resource is blocked"), - HasBeenSent = false, - _ = reply_caller(Id, ?REPLY(From, Request, HasBeenSent, Error)), - %% TODO collect requests - NewQ = append_queue(Id, Index, Q, [?QUERY(From, Request, HasBeenSent)]), - Data = Data0#{queue := NewQ}, + _ = batch_reply_caller(Id, Error, Queries), {keep_state, Data}; blocked(info, {flush, _Ref}, _Data) -> keep_state_and_data; @@ -340,13 +328,17 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) -> %% Called during the `running' state only. -spec handle_query_requests(?SEND_REQ(request_from(), request()), data()) -> data(). handle_query_requests(Request0, Data0) -> + {_Queries, Data} = collect_and_enqueue_query_requests(Request0, Data0), + maybe_flush(Data). + +collect_and_enqueue_query_requests(Request0, Data0) -> #{ id := Id, index := Index, queue := Q } = Data0, Requests = collect_requests([Request0], ?COLLECT_REQ_LIMIT), - QueueItems = + Queries = lists:map( fun (?SEND_REQ(undefined = _From, {query, Req, Opts})) -> @@ -359,9 +351,9 @@ handle_query_requests(Request0, Data0) -> end, Requests ), - NewQ = append_queue(Id, Index, Q, QueueItems), + NewQ = append_queue(Id, Index, Q, Queries), Data = Data0#{queue := NewQ}, - maybe_flush(Data). + {Queries, Data}. maybe_flush(Data0) -> #{