Merge pull request #12564 from thalesmg/bw-support-batch-list-resp-m-20240221

feat(resource): allow `on_batch_query{,_async}` to return a list of individual results
This commit is contained in:
Thales Macedo Garitezi 2024-02-23 09:37:42 -03:00 committed by GitHub
commit 15f919e60f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 176 additions and 13 deletions

View File

@ -337,7 +337,7 @@ process_test_cmds([{push, N} | Tl], Cnt0) ->
Cnt = Cnt0 + N,
[{push, Cnt} | process_test_cmds(Tl, Cnt)].
iqueue_print(I = #iqueue{head = Hd, head_end = HdEnd, queue = Q, tail = Tl, tail_end = TlEnd}) ->
iqueue_print(#iqueue{head = Hd, head_end = HdEnd, queue = Q, tail = Tl, tail_end = TlEnd}) ->
#{
hd => {Hd, HdEnd},
tl => {Tl, TlEnd},

View File

@ -114,6 +114,8 @@
| {error, {recoverable_error, term()}}
| {error, term()}.
-type batch_query_result() :: query_result() | [query_result()].
-type action_resource_id() :: resource_id().
-type source_resource_id() :: resource_id().
-type connector_resource_id() :: resource_id().

View File

@ -170,7 +170,8 @@
-callback on_query(resource_id(), Request :: term(), resource_state()) -> query_result().
%% when calling emqx_resource:on_batch_query/3
-callback on_batch_query(resource_id(), Request :: term(), resource_state()) -> query_result().
-callback on_batch_query(resource_id(), Request :: term(), resource_state()) ->
batch_query_result().
%% when calling emqx_resource:on_query_async/4
-callback on_query_async(

View File

@ -806,14 +806,7 @@ batch_reply_caller(Id, BatchResult, Batch, QueryOpts) ->
{ShouldBlock, DeltaCounters}.
batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts) ->
%% the `Mod:on_batch_query/3` returns a single result for a batch,
%% so we need to expand
Replies = lists:map(
fun(?QUERY(FROM, _REQUEST, SENT, _EXPIRE_AT)) ->
?REPLY(FROM, SENT, BatchResult)
end,
Batch
),
Replies = expand_batch_reply(BatchResult, Batch),
{ShouldAck, PostFns, Counters} =
lists:foldl(
fun(Reply, {_ShouldAck, PostFns, OldCounters}) ->
@ -829,6 +822,21 @@ batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts) ->
PostFn = fun() -> lists:foreach(fun(F) -> F() end, lists:reverse(PostFns)) end,
{ShouldAck, PostFn, Counters}.
expand_batch_reply(BatchResults, Batch) when is_list(BatchResults) ->
lists:map(
fun({?QUERY(FROM, _REQUEST, SENT, _EXPIRE_AT), Result}) ->
?REPLY(FROM, SENT, Result)
end,
lists:zip(Batch, BatchResults)
);
expand_batch_reply(BatchResult, Batch) ->
lists:map(
fun(?QUERY(FROM, _REQUEST, SENT, _EXPIRE_AT)) ->
?REPLY(FROM, SENT, BatchResult)
end,
Batch
).
reply_caller(Id, Reply, QueryOpts) ->
{ShouldAck, PostFn, DeltaCounters} = reply_caller_defer_metrics(Id, Reply, QueryOpts),
PostFn(),
@ -1465,7 +1473,7 @@ handle_async_batch_reply1(
handle_async_batch_reply2([], _, _, _) ->
%% this usually should never happen unless the async callback is being evaluated concurrently
ok;
handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) ->
handle_async_batch_reply2([Inflight], ReplyContext, Results0, Now) ->
?INFLIGHT_ITEM(_, RealBatch, _IsRetriable, _AsyncWorkerMRef) = Inflight,
#{
resource_id := Id,
@ -1479,7 +1487,8 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) ->
%% and put it back to the batch found in inflight table
%% which must have already been set to `false`
[?QUERY(_ReplyTo, _, HasBeenSent, _ExpireAt) | _] = Batch,
{RealNotExpired0, RealExpired} = sieve_expired_requests(RealBatch, Now),
{RealNotExpired0, RealExpired, Results} =
sieve_expired_requests_with_results(RealBatch, Now, Results0),
RealNotExpired =
lists:map(
fun(?QUERY(ReplyTo, CoreReq, _HasBeenSent, ExpireAt)) ->
@ -1508,7 +1517,7 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) ->
num_inflight_messages => inflight_num_msgs(InflightTID)
}
),
do_handle_async_batch_reply(ReplyContext#{min_batch := RealNotExpired}, Result)
do_handle_async_batch_reply(ReplyContext#{min_batch := RealNotExpired}, Results)
end.
do_handle_async_batch_reply(
@ -1964,6 +1973,31 @@ sieve_expired_requests(Batch, Now) ->
Batch
).
sieve_expired_requests_with_results(Batch, Now, Results) when is_list(Results) ->
%% individual results; we need to drop those that match expired queries
{RevNotExpiredBatch, RevNotExpiredResults, ExpiredBatch} =
lists:foldl(
fun(
{?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt) = Query, Result},
{NotExpAcc, ResAcc, ExpAcc}
) ->
case not is_expired(ExpireAt, Now) of
true ->
{[Query | NotExpAcc], [Result | ResAcc], ExpAcc};
false ->
{NotExpAcc, ResAcc, [Query | ExpAcc]}
end
end,
{[], [], []},
lists:zip(Batch, Results)
),
{lists:reverse(RevNotExpiredBatch), lists:reverse(RevNotExpiredResults), ExpiredBatch};
sieve_expired_requests_with_results(Batch, Now, Result) ->
%% one result for the whole batch, we just pass it along and
%% `batch_reply_caller_defer_metrics' will expand it
{NotExpiredBatch, ExpiredBatch} = sieve_expired_requests(Batch, Now),
{NotExpiredBatch, ExpiredBatch, Result}.
-spec is_expired(infinity | integer(), integer()) -> boolean().
is_expired(infinity = _ExpireAt, _Now) ->
false;

View File

@ -135,6 +135,15 @@ on_query(_InstId, get_counter, #{pid := Pid}) ->
after 1000 ->
{error, timeout}
end;
on_query(_InstId, {individual_reply, IsSuccess}, #{pid := Pid}) ->
ReqRef = make_ref(),
From = {self(), ReqRef},
Pid ! {From, {individual_reply, IsSuccess}},
receive
{ReqRef, Res} -> Res
after 1000 ->
{error, timeout}
end;
on_query(_InstId, {sleep_before_reply, For}, #{pid := Pid}) ->
?tp(connector_demo_sleep, #{mode => sync, for => For}),
ReqRef = make_ref(),
@ -169,6 +178,9 @@ on_query_async(_InstId, block_now, ReplyFun, #{pid := Pid}) ->
on_query_async(_InstId, {big_payload, Payload}, ReplyFun, #{pid := Pid}) ->
Pid ! {big_payload, Payload, ReplyFun},
{ok, Pid};
on_query_async(_InstId, {individual_reply, IsSuccess}, ReplyFun, #{pid := Pid}) ->
Pid ! {individual_reply, IsSuccess, ReplyFun},
{ok, Pid};
on_query_async(_InstId, {sleep_before_reply, For}, ReplyFun, #{pid := Pid}) ->
?tp(connector_demo_sleep, #{mode => async, for => For}),
Pid ! {{sleep_before_reply, For}, ReplyFun},
@ -184,6 +196,8 @@ on_batch_query(InstId, BatchReq, State) ->
batch_get_counter(sync, InstId, State);
{big_payload, _Payload} ->
batch_big_payload(sync, InstId, BatchReq, State);
{individual_reply, _IsSuccess} ->
batch_individual_reply(sync, InstId, BatchReq, State);
{random_reply, Num} ->
%% async batch retried
make_random_reply(Num)
@ -200,6 +214,8 @@ on_batch_query_async(InstId, BatchReq, ReplyFunAndArgs, #{pid := Pid} = State) -
on_query_async(InstId, block_now, ReplyFunAndArgs, State);
{big_payload, _Payload} ->
batch_big_payload({async, ReplyFunAndArgs}, InstId, BatchReq, State);
{individual_reply, _IsSuccess} ->
batch_individual_reply({async, ReplyFunAndArgs}, InstId, BatchReq, State);
{random_reply, Num} ->
%% only take the first Num in the batch should be random enough
Pid ! {{random_reply, Num}, ReplyFunAndArgs},
@ -243,6 +259,21 @@ batch_big_payload({async, ReplyFunAndArgs}, InstId, Batch, State = #{pid := Pid}
),
{ok, Pid}.
batch_individual_reply(sync, InstId, Batch, State) ->
lists:map(
fun(Req = {individual_reply, _}) -> on_query(InstId, Req, State) end,
Batch
);
batch_individual_reply({async, ReplyFunAndArgs}, InstId, Batch, State) ->
Pid = spawn(fun() ->
Results = lists:map(
fun(Req = {individual_reply, _}) -> on_query(InstId, Req, State) end,
Batch
),
apply_reply(ReplyFunAndArgs, Results)
end),
{ok, Pid}.
on_get_status(_InstId, #{health_check_error := true}) ->
?tp(connector_demo_health_check_error, #{}),
disconnected;
@ -338,6 +369,22 @@ counter_loop(
{{FromPid, ReqRef}, get} ->
FromPid ! {ReqRef, Num},
State;
{{FromPid, ReqRef}, {individual_reply, IsSuccess}} ->
Res =
case IsSuccess of
true -> ok;
false -> {error, {unrecoverable_error, bad_request}}
end,
FromPid ! {ReqRef, Res},
State;
{individual_reply, IsSuccess, ReplyFun} ->
Res =
case IsSuccess of
true -> ok;
false -> {error, {unrecoverable_error, bad_request}}
end,
apply_reply(ReplyFun, Res),
State;
{{random_reply, RandNum}, ReplyFun} ->
%% usually a behaving connector should reply once and only once for
%% each (batch) request

View File

@ -2678,6 +2678,85 @@ t_expiration_retry_batch_multiple_times(_Config) ->
),
ok.
t_batch_individual_reply_sync(_Config) ->
ResumeInterval = 300,
emqx_connector_demo:set_callback_mode(always_sync),
{ok, _} = create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => sync,
batch_size => 5,
batch_time => 100,
worker_pool_size => 1,
metrics_flush_interval => 50,
resume_interval => ResumeInterval
}
),
do_t_batch_individual_reply().
t_batch_individual_reply_async(_Config) ->
ResumeInterval = 300,
emqx_connector_demo:set_callback_mode(async_if_possible),
{ok, _} = create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => sync,
batch_size => 5,
batch_time => 100,
worker_pool_size => 1,
metrics_flush_interval => 50,
resume_interval => ResumeInterval
}
),
on_exit(fun() -> emqx_resource:remove_local(?ID) end),
do_t_batch_individual_reply().
do_t_batch_individual_reply() ->
?check_trace(
begin
{Results, {ok, _}} =
?wait_async_action(
emqx_utils:pmap(
fun(N) ->
emqx_resource:query(?ID, {individual_reply, N rem 2 =:= 0})
end,
lists:seq(1, 5)
),
#{?snk_kind := buffer_worker_flush_ack, batch_or_query := [_, _ | _]},
5_000
),
Ok = ok,
Error = {error, {unrecoverable_error, bad_request}},
?assertEqual([Error, Ok, Error, Ok, Error], Results),
?retry(
200,
10,
?assertMatch(
#{
counters := #{
matched := 5,
failed := 3,
success := 2
}
},
tap_metrics(?LINE)
)
),
ok
end,
[]
),
ok.
t_recursive_flush(_Config) ->
emqx_connector_demo:set_callback_mode(async_if_possible),
{ok, _} = create(