test(buffer): add test on inflight overflow w/ async queries
This testcase should verify that the buffer will retry all inflight queries failed with recoverable errors + flush all outstanding queries. Co-authored-by: ieQu1 <99872536+ieQu1@users.noreply.github.com>
This commit is contained in:
parent
cec8afe1b4
commit
d8d06a260f
|
@ -134,6 +134,17 @@ on_query(_InstId, get_counter, #{pid := Pid}) ->
|
||||||
{ReqRef, Num} -> {ok, Num}
|
{ReqRef, Num} -> {ok, Num}
|
||||||
after 1000 ->
|
after 1000 ->
|
||||||
{error, timeout}
|
{error, timeout}
|
||||||
|
end;
|
||||||
|
on_query(_InstId, {sleep, For}, #{pid := Pid}) ->
|
||||||
|
?tp(connector_demo_sleep, #{mode => sync, for => For}),
|
||||||
|
ReqRef = make_ref(),
|
||||||
|
From = {self(), ReqRef},
|
||||||
|
Pid ! {From, {sleep, For}},
|
||||||
|
receive
|
||||||
|
{ReqRef, Result} ->
|
||||||
|
Result
|
||||||
|
after 1000 ->
|
||||||
|
{error, timeout}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
on_query_async(_InstId, {inc_counter, N}, ReplyFun, #{pid := Pid}) ->
|
on_query_async(_InstId, {inc_counter, N}, ReplyFun, #{pid := Pid}) ->
|
||||||
|
@ -147,6 +158,10 @@ on_query_async(_InstId, block_now, ReplyFun, #{pid := Pid}) ->
|
||||||
{ok, Pid};
|
{ok, Pid};
|
||||||
on_query_async(_InstId, {big_payload, Payload}, ReplyFun, #{pid := Pid}) ->
|
on_query_async(_InstId, {big_payload, Payload}, ReplyFun, #{pid := Pid}) ->
|
||||||
Pid ! {big_payload, Payload, ReplyFun},
|
Pid ! {big_payload, Payload, ReplyFun},
|
||||||
|
{ok, Pid};
|
||||||
|
on_query_async(_InstId, {sleep, For}, ReplyFun, #{pid := Pid}) ->
|
||||||
|
?tp(connector_demo_sleep, #{mode => async, for => For}),
|
||||||
|
Pid ! {{sleep, For}, ReplyFun},
|
||||||
{ok, Pid}.
|
{ok, Pid}.
|
||||||
|
|
||||||
on_batch_query(InstId, BatchReq, State) ->
|
on_batch_query(InstId, BatchReq, State) ->
|
||||||
|
@ -283,10 +298,28 @@ counter_loop(
|
||||||
State;
|
State;
|
||||||
{{FromPid, ReqRef}, get} ->
|
{{FromPid, ReqRef}, get} ->
|
||||||
FromPid ! {ReqRef, Num},
|
FromPid ! {ReqRef, Num},
|
||||||
|
State;
|
||||||
|
{{sleep, _} = SleepQ, ReplyFun} ->
|
||||||
|
apply_reply(ReplyFun, handle_query(async, SleepQ, Status)),
|
||||||
|
State;
|
||||||
|
{{FromPid, ReqRef}, {sleep, _} = SleepQ} ->
|
||||||
|
FromPid ! {ReqRef, handle_query(sync, SleepQ, Status)},
|
||||||
State
|
State
|
||||||
end,
|
end,
|
||||||
counter_loop(NewState).
|
counter_loop(NewState).
|
||||||
|
|
||||||
|
handle_query(Mode, {sleep, For} = Query, Status) ->
|
||||||
|
ok = timer:sleep(For),
|
||||||
|
Result =
|
||||||
|
case Status of
|
||||||
|
running -> ok;
|
||||||
|
blocked -> {error, {recoverable_error, blocked}}
|
||||||
|
end,
|
||||||
|
?tp(connector_demo_sleep_handled, #{
|
||||||
|
mode => Mode, query => Query, slept => For, result => Result
|
||||||
|
}),
|
||||||
|
Result.
|
||||||
|
|
||||||
maybe_register(Name, Pid, true) ->
|
maybe_register(Name, Pid, true) ->
|
||||||
ct:pal("---- Register Name: ~p", [Name]),
|
ct:pal("---- Register Name: ~p", [Name]),
|
||||||
ct:pal("---- whereis(): ~p", [whereis(Name)]),
|
ct:pal("---- whereis(): ~p", [whereis(Name)]),
|
||||||
|
|
|
@ -1452,6 +1452,61 @@ t_retry_async_inflight(_Config) ->
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_retry_async_inflight_full(_Config) ->
|
||||||
|
ResumeInterval = 1_000,
|
||||||
|
AsyncInflightWindow = 5,
|
||||||
|
emqx_connector_demo:set_callback_mode(async_if_possible),
|
||||||
|
{ok, _} = emqx_resource:create(
|
||||||
|
?ID,
|
||||||
|
?DEFAULT_RESOURCE_GROUP,
|
||||||
|
?TEST_RESOURCE,
|
||||||
|
#{name => ?FUNCTION_NAME},
|
||||||
|
#{
|
||||||
|
query_mode => async,
|
||||||
|
async_inflight_window => AsyncInflightWindow,
|
||||||
|
batch_size => 1,
|
||||||
|
batch_time => 20,
|
||||||
|
worker_pool_size => 1,
|
||||||
|
resume_interval => ResumeInterval
|
||||||
|
}
|
||||||
|
),
|
||||||
|
?check_trace(
|
||||||
|
#{timetrap => 15_000},
|
||||||
|
begin
|
||||||
|
%% block
|
||||||
|
ok = emqx_resource:simple_sync_query(?ID, block),
|
||||||
|
|
||||||
|
{ok, {ok, _}} =
|
||||||
|
?wait_async_action(
|
||||||
|
inc_counter_in_parallel(
|
||||||
|
AsyncInflightWindow * 2,
|
||||||
|
fun() ->
|
||||||
|
For = (ResumeInterval div 4) + rand:uniform(ResumeInterval div 4),
|
||||||
|
{sleep, For}
|
||||||
|
end,
|
||||||
|
#{async_reply_fun => {fun(Res) -> ct:pal("Res = ~p", [Res]) end, []}}
|
||||||
|
),
|
||||||
|
#{?snk_kind := buffer_worker_flush_but_inflight_full},
|
||||||
|
ResumeInterval * 2
|
||||||
|
),
|
||||||
|
|
||||||
|
%% will reply with success after the resource is healed
|
||||||
|
{ok, {ok, _}} =
|
||||||
|
?wait_async_action(
|
||||||
|
emqx_resource:simple_sync_query(?ID, resume),
|
||||||
|
#{?snk_kind := buffer_worker_enter_running}
|
||||||
|
),
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
[
|
||||||
|
fun(Trace) ->
|
||||||
|
?assertMatch([#{} | _], ?of_kind(buffer_worker_flush_but_inflight_full, Trace))
|
||||||
|
end
|
||||||
|
]
|
||||||
|
),
|
||||||
|
?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)),
|
||||||
|
ok.
|
||||||
|
|
||||||
t_retry_async_inflight_batch(_Config) ->
|
t_retry_async_inflight_batch(_Config) ->
|
||||||
ResumeInterval = 1_000,
|
ResumeInterval = 1_000,
|
||||||
emqx_connector_demo:set_callback_mode(async_if_possible),
|
emqx_connector_demo:set_callback_mode(async_if_possible),
|
||||||
|
@ -2241,18 +2296,16 @@ t_expiration_retry_batch_multiple_times(_Config) ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
inc_counter_in_parallel(N) ->
|
inc_counter_in_parallel(N) ->
|
||||||
inc_counter_in_parallel(N, #{}).
|
inc_counter_in_parallel(N, {inc_counter, 1}, #{}).
|
||||||
|
|
||||||
inc_counter_in_parallel(N, Opts0) ->
|
inc_counter_in_parallel(N, Opts0) ->
|
||||||
|
inc_counter_in_parallel(N, {inc_counter, 1}, Opts0).
|
||||||
|
|
||||||
|
inc_counter_in_parallel(N, Query, Opts) ->
|
||||||
Parent = self(),
|
Parent = self(),
|
||||||
Pids = [
|
Pids = [
|
||||||
erlang:spawn(fun() ->
|
erlang:spawn(fun() ->
|
||||||
Opts =
|
emqx_resource:query(?ID, maybe_apply(Query), maybe_apply(Opts)),
|
||||||
case is_function(Opts0) of
|
|
||||||
true -> Opts0();
|
|
||||||
false -> Opts0
|
|
||||||
end,
|
|
||||||
emqx_resource:query(?ID, {inc_counter, 1}, Opts),
|
|
||||||
Parent ! {complete, self()}
|
Parent ! {complete, self()}
|
||||||
end)
|
end)
|
||||||
|| _ <- lists:seq(1, N)
|
|| _ <- lists:seq(1, N)
|
||||||
|
@ -2267,16 +2320,11 @@ inc_counter_in_parallel(N, Opts0) ->
|
||||||
],
|
],
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
inc_counter_in_parallel_increasing(N, StartN, Opts0) ->
|
inc_counter_in_parallel_increasing(N, StartN, Opts) ->
|
||||||
Parent = self(),
|
Parent = self(),
|
||||||
Pids = [
|
Pids = [
|
||||||
erlang:spawn(fun() ->
|
erlang:spawn(fun() ->
|
||||||
Opts =
|
emqx_resource:query(?ID, {inc_counter, M}, maybe_apply(Opts)),
|
||||||
case is_function(Opts0) of
|
|
||||||
true -> Opts0();
|
|
||||||
false -> Opts0
|
|
||||||
end,
|
|
||||||
emqx_resource:query(?ID, {inc_counter, M}, Opts),
|
|
||||||
Parent ! {complete, self()}
|
Parent ! {complete, self()}
|
||||||
end)
|
end)
|
||||||
|| M <- lists:seq(StartN, StartN + N - 1)
|
|| M <- lists:seq(StartN, StartN + N - 1)
|
||||||
|
@ -2290,6 +2338,14 @@ inc_counter_in_parallel_increasing(N, StartN, Opts0) ->
|
||||||
|| Pid <- Pids
|
|| Pid <- Pids
|
||||||
].
|
].
|
||||||
|
|
||||||
|
maybe_apply(FunOrTerm) ->
|
||||||
|
maybe_apply(FunOrTerm, []).
|
||||||
|
|
||||||
|
maybe_apply(Fun, Args) when is_function(Fun) ->
|
||||||
|
erlang:apply(Fun, Args);
|
||||||
|
maybe_apply(Term, _Args) ->
|
||||||
|
Term.
|
||||||
|
|
||||||
bin_config() ->
|
bin_config() ->
|
||||||
<<"\"name\": \"test_resource\"">>.
|
<<"\"name\": \"test_resource\"">>.
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue