From d8d06a260ff1df921966166736b88bd7436e8274 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 6 Feb 2023 19:24:56 +0300 Subject: [PATCH] test(buffer): add test on inflight overflow w/ async queries This testcase should verify that the buffer will retry all inflight queries failed with recoverable errors + flush all outstanding queries. Co-authored-by: ieQu1 <99872536+ieQu1@users.noreply.github.com> --- .../test/emqx_connector_demo.erl | 33 ++++++++ .../test/emqx_resource_SUITE.erl | 84 +++++++++++++++---- 2 files changed, 103 insertions(+), 14 deletions(-) diff --git a/apps/emqx_resource/test/emqx_connector_demo.erl b/apps/emqx_resource/test/emqx_connector_demo.erl index 4e3423808..1d96fa083 100644 --- a/apps/emqx_resource/test/emqx_connector_demo.erl +++ b/apps/emqx_resource/test/emqx_connector_demo.erl @@ -134,6 +134,17 @@ on_query(_InstId, get_counter, #{pid := Pid}) -> {ReqRef, Num} -> {ok, Num} after 1000 -> {error, timeout} + end; +on_query(_InstId, {sleep, For}, #{pid := Pid}) -> + ?tp(connector_demo_sleep, #{mode => sync, for => For}), + ReqRef = make_ref(), + From = {self(), ReqRef}, + Pid ! {From, {sleep, For}}, + receive + {ReqRef, Result} -> + Result + after 1000 -> + {error, timeout} end. on_query_async(_InstId, {inc_counter, N}, ReplyFun, #{pid := Pid}) -> @@ -147,6 +158,10 @@ on_query_async(_InstId, block_now, ReplyFun, #{pid := Pid}) -> {ok, Pid}; on_query_async(_InstId, {big_payload, Payload}, ReplyFun, #{pid := Pid}) -> Pid ! {big_payload, Payload, ReplyFun}, + {ok, Pid}; +on_query_async(_InstId, {sleep, For}, ReplyFun, #{pid := Pid}) -> + ?tp(connector_demo_sleep, #{mode => async, for => For}), + Pid ! {{sleep, For}, ReplyFun}, {ok, Pid}. on_batch_query(InstId, BatchReq, State) -> @@ -283,10 +298,28 @@ counter_loop( State; {{FromPid, ReqRef}, get} -> FromPid ! {ReqRef, Num}, + State; + {{sleep, _} = SleepQ, ReplyFun} -> + apply_reply(ReplyFun, handle_query(async, SleepQ, Status)), + State; + {{FromPid, ReqRef}, {sleep, _} = SleepQ} -> + FromPid ! {ReqRef, handle_query(sync, SleepQ, Status)}, State end, counter_loop(NewState). +handle_query(Mode, {sleep, For} = Query, Status) -> + ok = timer:sleep(For), + Result = + case Status of + running -> ok; + blocked -> {error, {recoverable_error, blocked}} + end, + ?tp(connector_demo_sleep_handled, #{ + mode => Mode, query => Query, slept => For, result => Result + }), + Result. + maybe_register(Name, Pid, true) -> ct:pal("---- Register Name: ~p", [Name]), ct:pal("---- whereis(): ~p", [whereis(Name)]), diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index bc0331d02..a042bfb67 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -1452,6 +1452,61 @@ t_retry_async_inflight(_Config) -> ), ok. +t_retry_async_inflight_full(_Config) -> + ResumeInterval = 1_000, + AsyncInflightWindow = 5, + emqx_connector_demo:set_callback_mode(async_if_possible), + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => ?FUNCTION_NAME}, + #{ + query_mode => async, + async_inflight_window => AsyncInflightWindow, + batch_size => 1, + batch_time => 20, + worker_pool_size => 1, + resume_interval => ResumeInterval + } + ), + ?check_trace( + #{timetrap => 15_000}, + begin + %% block + ok = emqx_resource:simple_sync_query(?ID, block), + + {ok, {ok, _}} = + ?wait_async_action( + inc_counter_in_parallel( + AsyncInflightWindow * 2, + fun() -> + For = (ResumeInterval div 4) + rand:uniform(ResumeInterval div 4), + {sleep, For} + end, + #{async_reply_fun => {fun(Res) -> ct:pal("Res = ~p", [Res]) end, []}} + ), + #{?snk_kind := buffer_worker_flush_but_inflight_full}, + ResumeInterval * 2 + ), + + %% will reply with success after the resource is healed + {ok, {ok, _}} = + ?wait_async_action( + emqx_resource:simple_sync_query(?ID, resume), + #{?snk_kind := buffer_worker_enter_running} + ), + ok + end, + [ + fun(Trace) -> + ?assertMatch([#{} | _], ?of_kind(buffer_worker_flush_but_inflight_full, Trace)) + end + ] + ), + ?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)), + ok. + t_retry_async_inflight_batch(_Config) -> ResumeInterval = 1_000, emqx_connector_demo:set_callback_mode(async_if_possible), @@ -2241,18 +2296,16 @@ t_expiration_retry_batch_multiple_times(_Config) -> %%------------------------------------------------------------------------------ inc_counter_in_parallel(N) -> - inc_counter_in_parallel(N, #{}). + inc_counter_in_parallel(N, {inc_counter, 1}, #{}). inc_counter_in_parallel(N, Opts0) -> + inc_counter_in_parallel(N, {inc_counter, 1}, Opts0). + +inc_counter_in_parallel(N, Query, Opts) -> Parent = self(), Pids = [ erlang:spawn(fun() -> - Opts = - case is_function(Opts0) of - true -> Opts0(); - false -> Opts0 - end, - emqx_resource:query(?ID, {inc_counter, 1}, Opts), + emqx_resource:query(?ID, maybe_apply(Query), maybe_apply(Opts)), Parent ! {complete, self()} end) || _ <- lists:seq(1, N) @@ -2267,16 +2320,11 @@ inc_counter_in_parallel(N, Opts0) -> ], ok. -inc_counter_in_parallel_increasing(N, StartN, Opts0) -> +inc_counter_in_parallel_increasing(N, StartN, Opts) -> Parent = self(), Pids = [ erlang:spawn(fun() -> - Opts = - case is_function(Opts0) of - true -> Opts0(); - false -> Opts0 - end, - emqx_resource:query(?ID, {inc_counter, M}, Opts), + emqx_resource:query(?ID, {inc_counter, M}, maybe_apply(Opts)), Parent ! {complete, self()} end) || M <- lists:seq(StartN, StartN + N - 1) @@ -2290,6 +2338,14 @@ inc_counter_in_parallel_increasing(N, StartN, Opts0) -> || Pid <- Pids ]. +maybe_apply(FunOrTerm) -> + maybe_apply(FunOrTerm, []). + +maybe_apply(Fun, Args) when is_function(Fun) -> + erlang:apply(Fun, Args); +maybe_apply(Term, _Args) -> + Term. + bin_config() -> <<"\"name\": \"test_resource\"">>.