feat(resource): allow `on_batch_query{,_async}` to return a list of individual results
Fixes https://emqx.atlassian.net/browse/EMQX-11892 This allows callers of batching resources to receive results specific to their requests, rather than a broad success or failure for the whole batch.
This commit is contained in:
parent
985a3e3062
commit
d003f77021
|
@ -114,6 +114,8 @@
|
|||
| {error, {recoverable_error, term()}}
|
||||
| {error, term()}.
|
||||
|
||||
-type batch_query_result() :: query_result() | [query_result()].
|
||||
|
||||
-type action_resource_id() :: resource_id().
|
||||
-type source_resource_id() :: resource_id().
|
||||
-type connector_resource_id() :: resource_id().
|
||||
|
|
|
@ -170,7 +170,8 @@
|
|||
-callback on_query(resource_id(), Request :: term(), resource_state()) -> query_result().
|
||||
|
||||
%% when calling emqx_resource:on_batch_query/3
|
||||
-callback on_batch_query(resource_id(), Request :: term(), resource_state()) -> query_result().
|
||||
-callback on_batch_query(resource_id(), Request :: term(), resource_state()) ->
|
||||
batch_query_result().
|
||||
|
||||
%% when calling emqx_resource:on_query_async/4
|
||||
-callback on_query_async(
|
||||
|
|
|
@ -806,14 +806,7 @@ batch_reply_caller(Id, BatchResult, Batch, QueryOpts) ->
|
|||
{ShouldBlock, DeltaCounters}.
|
||||
|
||||
batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts) ->
|
||||
%% the `Mod:on_batch_query/3` returns a single result for a batch,
|
||||
%% so we need to expand
|
||||
Replies = lists:map(
|
||||
fun(?QUERY(FROM, _REQUEST, SENT, _EXPIRE_AT)) ->
|
||||
?REPLY(FROM, SENT, BatchResult)
|
||||
end,
|
||||
Batch
|
||||
),
|
||||
Replies = expand_batch_reply(BatchResult, Batch),
|
||||
{ShouldAck, PostFns, Counters} =
|
||||
lists:foldl(
|
||||
fun(Reply, {_ShouldAck, PostFns, OldCounters}) ->
|
||||
|
@ -829,6 +822,21 @@ batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts) ->
|
|||
PostFn = fun() -> lists:foreach(fun(F) -> F() end, lists:reverse(PostFns)) end,
|
||||
{ShouldAck, PostFn, Counters}.
|
||||
|
||||
expand_batch_reply(BatchResults, Batch) when is_list(BatchResults) ->
|
||||
lists:map(
|
||||
fun({?QUERY(FROM, _REQUEST, SENT, _EXPIRE_AT), Result}) ->
|
||||
?REPLY(FROM, SENT, Result)
|
||||
end,
|
||||
lists:zip(Batch, BatchResults)
|
||||
);
|
||||
expand_batch_reply(BatchResult, Batch) ->
|
||||
lists:map(
|
||||
fun(?QUERY(FROM, _REQUEST, SENT, _EXPIRE_AT)) ->
|
||||
?REPLY(FROM, SENT, BatchResult)
|
||||
end,
|
||||
Batch
|
||||
).
|
||||
|
||||
reply_caller(Id, Reply, QueryOpts) ->
|
||||
{ShouldAck, PostFn, DeltaCounters} = reply_caller_defer_metrics(Id, Reply, QueryOpts),
|
||||
PostFn(),
|
||||
|
@ -1465,7 +1473,7 @@ handle_async_batch_reply1(
|
|||
handle_async_batch_reply2([], _, _, _) ->
|
||||
%% this usually should never happen unless the async callback is being evaluated concurrently
|
||||
ok;
|
||||
handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) ->
|
||||
handle_async_batch_reply2([Inflight], ReplyContext, Results0, Now) ->
|
||||
?INFLIGHT_ITEM(_, RealBatch, _IsRetriable, _AsyncWorkerMRef) = Inflight,
|
||||
#{
|
||||
resource_id := Id,
|
||||
|
@ -1479,7 +1487,8 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) ->
|
|||
%% and put it back to the batch found in inflight table
|
||||
%% which must have already been set to `false`
|
||||
[?QUERY(_ReplyTo, _, HasBeenSent, _ExpireAt) | _] = Batch,
|
||||
{RealNotExpired0, RealExpired} = sieve_expired_requests(RealBatch, Now),
|
||||
{RealNotExpired0, RealExpired, Results} =
|
||||
sieve_expired_requests_with_results(RealBatch, Now, Results0),
|
||||
RealNotExpired =
|
||||
lists:map(
|
||||
fun(?QUERY(ReplyTo, CoreReq, _HasBeenSent, ExpireAt)) ->
|
||||
|
@ -1508,7 +1517,7 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) ->
|
|||
num_inflight_messages => inflight_num_msgs(InflightTID)
|
||||
}
|
||||
),
|
||||
do_handle_async_batch_reply(ReplyContext#{min_batch := RealNotExpired}, Result)
|
||||
do_handle_async_batch_reply(ReplyContext#{min_batch := RealNotExpired}, Results)
|
||||
end.
|
||||
|
||||
do_handle_async_batch_reply(
|
||||
|
@ -1964,6 +1973,31 @@ sieve_expired_requests(Batch, Now) ->
|
|||
Batch
|
||||
).
|
||||
|
||||
sieve_expired_requests_with_results(Batch, Now, Results) when is_list(Results) ->
|
||||
%% individual results; we need to drop those that match expired queries
|
||||
{RevNotExpiredBatch, RevNotExpiredResults, ExpiredBatch} =
|
||||
lists:foldl(
|
||||
fun(
|
||||
{?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt) = Query, Result},
|
||||
{NotExpAcc, ResAcc, ExpAcc}
|
||||
) ->
|
||||
case not is_expired(ExpireAt, Now) of
|
||||
true ->
|
||||
{[Query | NotExpAcc], [Result | ResAcc], ExpAcc};
|
||||
false ->
|
||||
{NotExpAcc, ResAcc, [Query | ExpAcc]}
|
||||
end
|
||||
end,
|
||||
{[], [], []},
|
||||
lists:zip(Batch, Results)
|
||||
),
|
||||
{lists:reverse(RevNotExpiredBatch), lists:reverse(RevNotExpiredResults), ExpiredBatch};
|
||||
sieve_expired_requests_with_results(Batch, Now, Result) ->
|
||||
%% one result for the whole batch, we just pass it along and
|
||||
%% `batch_reply_caller_defer_metrics' will expand it
|
||||
{NotExpiredBatch, ExpiredBatch} = sieve_expired_requests(Batch, Now),
|
||||
{NotExpiredBatch, ExpiredBatch, Result}.
|
||||
|
||||
-spec is_expired(infinity | integer(), integer()) -> boolean().
|
||||
is_expired(infinity = _ExpireAt, _Now) ->
|
||||
false;
|
||||
|
|
|
@ -135,6 +135,15 @@ on_query(_InstId, get_counter, #{pid := Pid}) ->
|
|||
after 1000 ->
|
||||
{error, timeout}
|
||||
end;
|
||||
on_query(_InstId, {individual_reply, IsSuccess}, #{pid := Pid}) ->
|
||||
ReqRef = make_ref(),
|
||||
From = {self(), ReqRef},
|
||||
Pid ! {From, {individual_reply, IsSuccess}},
|
||||
receive
|
||||
{ReqRef, Res} -> Res
|
||||
after 1000 ->
|
||||
{error, timeout}
|
||||
end;
|
||||
on_query(_InstId, {sleep_before_reply, For}, #{pid := Pid}) ->
|
||||
?tp(connector_demo_sleep, #{mode => sync, for => For}),
|
||||
ReqRef = make_ref(),
|
||||
|
@ -169,6 +178,9 @@ on_query_async(_InstId, block_now, ReplyFun, #{pid := Pid}) ->
|
|||
on_query_async(_InstId, {big_payload, Payload}, ReplyFun, #{pid := Pid}) ->
|
||||
Pid ! {big_payload, Payload, ReplyFun},
|
||||
{ok, Pid};
|
||||
on_query_async(_InstId, {individual_reply, IsSuccess}, ReplyFun, #{pid := Pid}) ->
|
||||
Pid ! {individual_reply, IsSuccess, ReplyFun},
|
||||
{ok, Pid};
|
||||
on_query_async(_InstId, {sleep_before_reply, For}, ReplyFun, #{pid := Pid}) ->
|
||||
?tp(connector_demo_sleep, #{mode => async, for => For}),
|
||||
Pid ! {{sleep_before_reply, For}, ReplyFun},
|
||||
|
@ -184,6 +196,8 @@ on_batch_query(InstId, BatchReq, State) ->
|
|||
batch_get_counter(sync, InstId, State);
|
||||
{big_payload, _Payload} ->
|
||||
batch_big_payload(sync, InstId, BatchReq, State);
|
||||
{individual_reply, _IsSuccess} ->
|
||||
batch_individual_reply(sync, InstId, BatchReq, State);
|
||||
{random_reply, Num} ->
|
||||
%% async batch retried
|
||||
make_random_reply(Num)
|
||||
|
@ -200,6 +214,8 @@ on_batch_query_async(InstId, BatchReq, ReplyFunAndArgs, #{pid := Pid} = State) -
|
|||
on_query_async(InstId, block_now, ReplyFunAndArgs, State);
|
||||
{big_payload, _Payload} ->
|
||||
batch_big_payload({async, ReplyFunAndArgs}, InstId, BatchReq, State);
|
||||
{individual_reply, _IsSuccess} ->
|
||||
batch_individual_reply({async, ReplyFunAndArgs}, InstId, BatchReq, State);
|
||||
{random_reply, Num} ->
|
||||
%% only take the first Num in the batch should be random enough
|
||||
Pid ! {{random_reply, Num}, ReplyFunAndArgs},
|
||||
|
@ -243,6 +259,21 @@ batch_big_payload({async, ReplyFunAndArgs}, InstId, Batch, State = #{pid := Pid}
|
|||
),
|
||||
{ok, Pid}.
|
||||
|
||||
batch_individual_reply(sync, InstId, Batch, State) ->
|
||||
lists:map(
|
||||
fun(Req = {individual_reply, _}) -> on_query(InstId, Req, State) end,
|
||||
Batch
|
||||
);
|
||||
batch_individual_reply({async, ReplyFunAndArgs}, InstId, Batch, State) ->
|
||||
Pid = spawn(fun() ->
|
||||
Results = lists:map(
|
||||
fun(Req = {individual_reply, _}) -> on_query(InstId, Req, State) end,
|
||||
Batch
|
||||
),
|
||||
apply_reply(ReplyFunAndArgs, Results)
|
||||
end),
|
||||
{ok, Pid}.
|
||||
|
||||
on_get_status(_InstId, #{health_check_error := true}) ->
|
||||
?tp(connector_demo_health_check_error, #{}),
|
||||
disconnected;
|
||||
|
@ -338,6 +369,22 @@ counter_loop(
|
|||
{{FromPid, ReqRef}, get} ->
|
||||
FromPid ! {ReqRef, Num},
|
||||
State;
|
||||
{{FromPid, ReqRef}, {individual_reply, IsSuccess}} ->
|
||||
Res =
|
||||
case IsSuccess of
|
||||
true -> ok;
|
||||
false -> {error, {unrecoverable_error, bad_request}}
|
||||
end,
|
||||
FromPid ! {ReqRef, Res},
|
||||
State;
|
||||
{individual_reply, IsSuccess, ReplyFun} ->
|
||||
Res =
|
||||
case IsSuccess of
|
||||
true -> ok;
|
||||
false -> {error, {unrecoverable_error, bad_request}}
|
||||
end,
|
||||
apply_reply(ReplyFun, Res),
|
||||
State;
|
||||
{{random_reply, RandNum}, ReplyFun} ->
|
||||
%% usually a behaving connector should reply once and only once for
|
||||
%% each (batch) request
|
||||
|
|
|
@ -2678,6 +2678,85 @@ t_expiration_retry_batch_multiple_times(_Config) ->
|
|||
),
|
||||
ok.
|
||||
|
||||
t_batch_individual_reply_sync(_Config) ->
|
||||
ResumeInterval = 300,
|
||||
emqx_connector_demo:set_callback_mode(always_sync),
|
||||
{ok, _} = create(
|
||||
?ID,
|
||||
?DEFAULT_RESOURCE_GROUP,
|
||||
?TEST_RESOURCE,
|
||||
#{name => test_resource},
|
||||
#{
|
||||
query_mode => sync,
|
||||
batch_size => 5,
|
||||
batch_time => 100,
|
||||
worker_pool_size => 1,
|
||||
metrics_flush_interval => 50,
|
||||
resume_interval => ResumeInterval
|
||||
}
|
||||
),
|
||||
do_t_batch_individual_reply().
|
||||
|
||||
t_batch_individual_reply_async(_Config) ->
|
||||
ResumeInterval = 300,
|
||||
emqx_connector_demo:set_callback_mode(async_if_possible),
|
||||
{ok, _} = create(
|
||||
?ID,
|
||||
?DEFAULT_RESOURCE_GROUP,
|
||||
?TEST_RESOURCE,
|
||||
#{name => test_resource},
|
||||
#{
|
||||
query_mode => sync,
|
||||
batch_size => 5,
|
||||
batch_time => 100,
|
||||
worker_pool_size => 1,
|
||||
metrics_flush_interval => 50,
|
||||
resume_interval => ResumeInterval
|
||||
}
|
||||
),
|
||||
on_exit(fun() -> emqx_resource:remove_local(?ID) end),
|
||||
do_t_batch_individual_reply().
|
||||
|
||||
do_t_batch_individual_reply() ->
|
||||
?check_trace(
|
||||
begin
|
||||
{Results, {ok, _}} =
|
||||
?wait_async_action(
|
||||
emqx_utils:pmap(
|
||||
fun(N) ->
|
||||
emqx_resource:query(?ID, {individual_reply, N rem 2 =:= 0})
|
||||
end,
|
||||
lists:seq(1, 5)
|
||||
),
|
||||
#{?snk_kind := buffer_worker_flush_ack, batch_or_query := [_, _ | _]},
|
||||
5_000
|
||||
),
|
||||
|
||||
Ok = ok,
|
||||
Error = {error, {unrecoverable_error, bad_request}},
|
||||
?assertEqual([Error, Ok, Error, Ok, Error], Results),
|
||||
|
||||
?retry(
|
||||
200,
|
||||
10,
|
||||
?assertMatch(
|
||||
#{
|
||||
counters := #{
|
||||
matched := 5,
|
||||
failed := 3,
|
||||
success := 2
|
||||
}
|
||||
},
|
||||
tap_metrics(?LINE)
|
||||
)
|
||||
),
|
||||
|
||||
ok
|
||||
end,
|
||||
[]
|
||||
),
|
||||
ok.
|
||||
|
||||
t_recursive_flush(_Config) ->
|
||||
emqx_connector_demo:set_callback_mode(async_if_possible),
|
||||
{ok, _} = create(
|
||||
|
|
Loading…
Reference in New Issue