From d003f77021a86f348a6fda0747e314e92914af5f Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 21 Feb 2024 18:15:04 -0300 Subject: [PATCH] feat(resource): allow `on_batch_query{,_async}` to return a list of individual results Fixes https://emqx.atlassian.net/browse/EMQX-11892 This allows callers of batching resources to receive results specific to their requests, rather than a broad success or failure for the whole batch. --- apps/emqx_resource/include/emqx_resource.hrl | 2 + apps/emqx_resource/src/emqx_resource.erl | 3 +- .../src/emqx_resource_buffer_worker.erl | 56 ++++++++++--- .../test/emqx_connector_demo.erl | 47 +++++++++++ .../test/emqx_resource_SUITE.erl | 79 +++++++++++++++++++ 5 files changed, 175 insertions(+), 12 deletions(-) diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 780745571..0828dfc60 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -114,6 +114,8 @@ | {error, {recoverable_error, term()}} | {error, term()}. +-type batch_query_result() :: query_result() | [query_result()]. + -type action_resource_id() :: resource_id(). -type source_resource_id() :: resource_id(). -type connector_resource_id() :: resource_id(). diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index f2ab82950..575418f6b 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -170,7 +170,8 @@ -callback on_query(resource_id(), Request :: term(), resource_state()) -> query_result(). %% when calling emqx_resource:on_batch_query/3 --callback on_batch_query(resource_id(), Request :: term(), resource_state()) -> query_result(). +-callback on_batch_query(resource_id(), Request :: term(), resource_state()) -> + batch_query_result(). %% when calling emqx_resource:on_query_async/4 -callback on_query_async( diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 0d3b9cf97..680b60078 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -806,14 +806,7 @@ batch_reply_caller(Id, BatchResult, Batch, QueryOpts) -> {ShouldBlock, DeltaCounters}. batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts) -> - %% the `Mod:on_batch_query/3` returns a single result for a batch, - %% so we need to expand - Replies = lists:map( - fun(?QUERY(FROM, _REQUEST, SENT, _EXPIRE_AT)) -> - ?REPLY(FROM, SENT, BatchResult) - end, - Batch - ), + Replies = expand_batch_reply(BatchResult, Batch), {ShouldAck, PostFns, Counters} = lists:foldl( fun(Reply, {_ShouldAck, PostFns, OldCounters}) -> @@ -829,6 +822,21 @@ batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts) -> PostFn = fun() -> lists:foreach(fun(F) -> F() end, lists:reverse(PostFns)) end, {ShouldAck, PostFn, Counters}. +expand_batch_reply(BatchResults, Batch) when is_list(BatchResults) -> + lists:map( + fun({?QUERY(FROM, _REQUEST, SENT, _EXPIRE_AT), Result}) -> + ?REPLY(FROM, SENT, Result) + end, + lists:zip(Batch, BatchResults) + ); +expand_batch_reply(BatchResult, Batch) -> + lists:map( + fun(?QUERY(FROM, _REQUEST, SENT, _EXPIRE_AT)) -> + ?REPLY(FROM, SENT, BatchResult) + end, + Batch + ). + reply_caller(Id, Reply, QueryOpts) -> {ShouldAck, PostFn, DeltaCounters} = reply_caller_defer_metrics(Id, Reply, QueryOpts), PostFn(), @@ -1465,7 +1473,7 @@ handle_async_batch_reply1( handle_async_batch_reply2([], _, _, _) -> %% this usually should never happen unless the async callback is being evaluated concurrently ok; -handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) -> +handle_async_batch_reply2([Inflight], ReplyContext, Results0, Now) -> ?INFLIGHT_ITEM(_, RealBatch, _IsRetriable, _AsyncWorkerMRef) = Inflight, #{ resource_id := Id, @@ -1479,7 +1487,8 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) -> %% and put it back to the batch found in inflight table %% which must have already been set to `false` [?QUERY(_ReplyTo, _, HasBeenSent, _ExpireAt) | _] = Batch, - {RealNotExpired0, RealExpired} = sieve_expired_requests(RealBatch, Now), + {RealNotExpired0, RealExpired, Results} = + sieve_expired_requests_with_results(RealBatch, Now, Results0), RealNotExpired = lists:map( fun(?QUERY(ReplyTo, CoreReq, _HasBeenSent, ExpireAt)) -> @@ -1508,7 +1517,7 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) -> num_inflight_messages => inflight_num_msgs(InflightTID) } ), - do_handle_async_batch_reply(ReplyContext#{min_batch := RealNotExpired}, Result) + do_handle_async_batch_reply(ReplyContext#{min_batch := RealNotExpired}, Results) end. do_handle_async_batch_reply( @@ -1964,6 +1973,31 @@ sieve_expired_requests(Batch, Now) -> Batch ). +sieve_expired_requests_with_results(Batch, Now, Results) when is_list(Results) -> + %% individual results; we need to drop those that match expired queries + {RevNotExpiredBatch, RevNotExpiredResults, ExpiredBatch} = + lists:foldl( + fun( + {?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt) = Query, Result}, + {NotExpAcc, ResAcc, ExpAcc} + ) -> + case not is_expired(ExpireAt, Now) of + true -> + {[Query | NotExpAcc], [Result | ResAcc], ExpAcc}; + false -> + {NotExpAcc, ResAcc, [Query | ExpAcc]} + end + end, + {[], [], []}, + lists:zip(Batch, Results) + ), + {lists:reverse(RevNotExpiredBatch), lists:reverse(RevNotExpiredResults), ExpiredBatch}; +sieve_expired_requests_with_results(Batch, Now, Result) -> + %% one result for the whole batch, we just pass it along and + %% `batch_reply_caller_defer_metrics' will expand it + {NotExpiredBatch, ExpiredBatch} = sieve_expired_requests(Batch, Now), + {NotExpiredBatch, ExpiredBatch, Result}. + -spec is_expired(infinity | integer(), integer()) -> boolean(). is_expired(infinity = _ExpireAt, _Now) -> false; diff --git a/apps/emqx_resource/test/emqx_connector_demo.erl b/apps/emqx_resource/test/emqx_connector_demo.erl index b95d8c8bf..2d8e79cde 100644 --- a/apps/emqx_resource/test/emqx_connector_demo.erl +++ b/apps/emqx_resource/test/emqx_connector_demo.erl @@ -135,6 +135,15 @@ on_query(_InstId, get_counter, #{pid := Pid}) -> after 1000 -> {error, timeout} end; +on_query(_InstId, {individual_reply, IsSuccess}, #{pid := Pid}) -> + ReqRef = make_ref(), + From = {self(), ReqRef}, + Pid ! {From, {individual_reply, IsSuccess}}, + receive + {ReqRef, Res} -> Res + after 1000 -> + {error, timeout} + end; on_query(_InstId, {sleep_before_reply, For}, #{pid := Pid}) -> ?tp(connector_demo_sleep, #{mode => sync, for => For}), ReqRef = make_ref(), @@ -169,6 +178,9 @@ on_query_async(_InstId, block_now, ReplyFun, #{pid := Pid}) -> on_query_async(_InstId, {big_payload, Payload}, ReplyFun, #{pid := Pid}) -> Pid ! {big_payload, Payload, ReplyFun}, {ok, Pid}; +on_query_async(_InstId, {individual_reply, IsSuccess}, ReplyFun, #{pid := Pid}) -> + Pid ! {individual_reply, IsSuccess, ReplyFun}, + {ok, Pid}; on_query_async(_InstId, {sleep_before_reply, For}, ReplyFun, #{pid := Pid}) -> ?tp(connector_demo_sleep, #{mode => async, for => For}), Pid ! {{sleep_before_reply, For}, ReplyFun}, @@ -184,6 +196,8 @@ on_batch_query(InstId, BatchReq, State) -> batch_get_counter(sync, InstId, State); {big_payload, _Payload} -> batch_big_payload(sync, InstId, BatchReq, State); + {individual_reply, _IsSuccess} -> + batch_individual_reply(sync, InstId, BatchReq, State); {random_reply, Num} -> %% async batch retried make_random_reply(Num) @@ -200,6 +214,8 @@ on_batch_query_async(InstId, BatchReq, ReplyFunAndArgs, #{pid := Pid} = State) - on_query_async(InstId, block_now, ReplyFunAndArgs, State); {big_payload, _Payload} -> batch_big_payload({async, ReplyFunAndArgs}, InstId, BatchReq, State); + {individual_reply, _IsSuccess} -> + batch_individual_reply({async, ReplyFunAndArgs}, InstId, BatchReq, State); {random_reply, Num} -> %% only take the first Num in the batch should be random enough Pid ! {{random_reply, Num}, ReplyFunAndArgs}, @@ -243,6 +259,21 @@ batch_big_payload({async, ReplyFunAndArgs}, InstId, Batch, State = #{pid := Pid} ), {ok, Pid}. +batch_individual_reply(sync, InstId, Batch, State) -> + lists:map( + fun(Req = {individual_reply, _}) -> on_query(InstId, Req, State) end, + Batch + ); +batch_individual_reply({async, ReplyFunAndArgs}, InstId, Batch, State) -> + Pid = spawn(fun() -> + Results = lists:map( + fun(Req = {individual_reply, _}) -> on_query(InstId, Req, State) end, + Batch + ), + apply_reply(ReplyFunAndArgs, Results) + end), + {ok, Pid}. + on_get_status(_InstId, #{health_check_error := true}) -> ?tp(connector_demo_health_check_error, #{}), disconnected; @@ -338,6 +369,22 @@ counter_loop( {{FromPid, ReqRef}, get} -> FromPid ! {ReqRef, Num}, State; + {{FromPid, ReqRef}, {individual_reply, IsSuccess}} -> + Res = + case IsSuccess of + true -> ok; + false -> {error, {unrecoverable_error, bad_request}} + end, + FromPid ! {ReqRef, Res}, + State; + {individual_reply, IsSuccess, ReplyFun} -> + Res = + case IsSuccess of + true -> ok; + false -> {error, {unrecoverable_error, bad_request}} + end, + apply_reply(ReplyFun, Res), + State; {{random_reply, RandNum}, ReplyFun} -> %% usually a behaving connector should reply once and only once for %% each (batch) request diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 873d30abd..1d0a3df2e 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -2678,6 +2678,85 @@ t_expiration_retry_batch_multiple_times(_Config) -> ), ok. +t_batch_individual_reply_sync(_Config) -> + ResumeInterval = 300, + emqx_connector_demo:set_callback_mode(always_sync), + {ok, _} = create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => sync, + batch_size => 5, + batch_time => 100, + worker_pool_size => 1, + metrics_flush_interval => 50, + resume_interval => ResumeInterval + } + ), + do_t_batch_individual_reply(). + +t_batch_individual_reply_async(_Config) -> + ResumeInterval = 300, + emqx_connector_demo:set_callback_mode(async_if_possible), + {ok, _} = create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => sync, + batch_size => 5, + batch_time => 100, + worker_pool_size => 1, + metrics_flush_interval => 50, + resume_interval => ResumeInterval + } + ), + on_exit(fun() -> emqx_resource:remove_local(?ID) end), + do_t_batch_individual_reply(). + +do_t_batch_individual_reply() -> + ?check_trace( + begin + {Results, {ok, _}} = + ?wait_async_action( + emqx_utils:pmap( + fun(N) -> + emqx_resource:query(?ID, {individual_reply, N rem 2 =:= 0}) + end, + lists:seq(1, 5) + ), + #{?snk_kind := buffer_worker_flush_ack, batch_or_query := [_, _ | _]}, + 5_000 + ), + + Ok = ok, + Error = {error, {unrecoverable_error, bad_request}}, + ?assertEqual([Error, Ok, Error, Ok, Error], Results), + + ?retry( + 200, + 10, + ?assertMatch( + #{ + counters := #{ + matched := 5, + failed := 3, + success := 2 + } + }, + tap_metrics(?LINE) + ) + ), + + ok + end, + [] + ), + ok. + t_recursive_flush(_Config) -> emqx_connector_demo:set_callback_mode(async_if_possible), {ok, _} = create(