test: refactor to loop wait for counters

This commit is contained in:
Zaiming (Stone) Shi 2023-02-24 01:24:36 +01:00
parent a10dbba084
commit c97d17cc91
3 changed files with 52 additions and 56 deletions

View File

@ -77,7 +77,7 @@
blocked -> blocked ->
ok; ok;
ok -> ok ->
maybe_flush_after_async_reply(IsFullBefore) ok = maybe_flush_after_async_reply(IsFullBefore)
end end
end)() end)()
). ).
@ -486,15 +486,14 @@ flush(Data0) ->
Data1 = cancel_flush_timer(Data0), Data1 = cancel_flush_timer(Data0),
CurrentCount = queue_count(Q0), CurrentCount = queue_count(Q0),
IsFull = is_inflight_full(InflightTID), IsFull = is_inflight_full(InflightTID),
InflightCount = inflight_num_batches(InflightTID),
?tp(buffer_worker_flush, #{ ?tp(buffer_worker_flush, #{
queued => CurrentCount, queued => CurrentCount,
is_inflight_full => IsFull, is_inflight_full => IsFull,
inflight => InflightCount inflight => inflight_count(InflightTID)
}), }),
case {CurrentCount, IsFull} of case {CurrentCount, IsFull} of
{0, _} -> {0, _} ->
?tp(buffer_worker_queue_drained, #{inflight => InflightCount}), ?tp(buffer_worker_queue_drained, #{inflight => inflight_count(InflightTID)}),
{keep_state, Data1}; {keep_state, Data1};
{_, true} -> {_, true} ->
?tp(buffer_worker_flush_but_inflight_full, #{}), ?tp(buffer_worker_flush_but_inflight_full, #{}),
@ -626,7 +625,7 @@ do_flush(
flush_worker(self()); flush_worker(self());
false -> false ->
?tp(buffer_worker_queue_drained, #{ ?tp(buffer_worker_queue_drained, #{
inflight => inflight_num_batches(InflightTID) inflight => inflight_count(InflightTID)
}), }),
ok ok
end, end,
@ -707,7 +706,7 @@ do_flush(#{queue := Q1} = Data0, #{
case {CurrentCount > 0, CurrentCount >= BatchSize} of case {CurrentCount > 0, CurrentCount >= BatchSize} of
{false, _} -> {false, _} ->
?tp(buffer_worker_queue_drained, #{ ?tp(buffer_worker_queue_drained, #{
inflight => inflight_num_batches(InflightTID) inflight => inflight_count(InflightTID)
}), }),
Data1; Data1;
{true, true} -> {true, true} ->
@ -1336,10 +1335,10 @@ is_inflight_full(InflightTID) ->
[{_, MaxSize}] = ets:lookup(InflightTID, ?MAX_SIZE_REF), [{_, MaxSize}] = ets:lookup(InflightTID, ?MAX_SIZE_REF),
%% we consider number of batches rather than number of messages %% we consider number of batches rather than number of messages
%% because one batch request may hold several messages. %% because one batch request may hold several messages.
Size = inflight_num_batches(InflightTID), Size = inflight_count(InflightTID),
Size >= MaxSize. Size >= MaxSize.
inflight_num_batches(InflightTID) -> inflight_count(InflightTID) ->
case ets:info(InflightTID, size) of case ets:info(InflightTID, size) of
undefined -> 0; undefined -> 0;
Size -> max(0, Size - ?INFLIGHT_META_ROWS) Size -> max(0, Size - ?INFLIGHT_META_ROWS)

View File

@ -176,7 +176,7 @@ on_batch_query(InstId, BatchReq, State) ->
batch_big_payload(sync, InstId, BatchReq, State); batch_big_payload(sync, InstId, BatchReq, State);
{random_reply, Num} -> {random_reply, Num} ->
%% async batch retried %% async batch retried
random_reply(Num) make_random_reply(Num)
end. end.
on_batch_query_async(InstId, BatchReq, ReplyFunAndArgs, #{pid := Pid} = State) -> on_batch_query_async(InstId, BatchReq, ReplyFunAndArgs, #{pid := Pid} = State) ->
@ -313,11 +313,11 @@ counter_loop(
%% with 'ok' in the result, the buffer worker should eventually %% with 'ok' in the result, the buffer worker should eventually
%% drain the buffer (and inflights table) %% drain the buffer (and inflights table)
ReplyCount = 1 + (RandNum rem 3), ReplyCount = 1 + (RandNum rem 3),
Results = random_replies(ReplyCount), Results = make_random_replies(ReplyCount),
%% add a delay to trigger inflight full %% add a delay to trigger inflight full
timer:sleep(5),
lists:foreach( lists:foreach(
fun(Result) -> fun(Result) ->
timer:sleep(rand:uniform(5)),
apply_reply(ReplyFun, Result) apply_reply(ReplyFun, Result)
end, end,
Results Results
@ -354,12 +354,12 @@ maybe_register(_Name, _Pid, false) ->
apply_reply({ReplyFun, Args}, Result) when is_function(ReplyFun) -> apply_reply({ReplyFun, Args}, Result) when is_function(ReplyFun) ->
apply(ReplyFun, Args ++ [Result]). apply(ReplyFun, Args ++ [Result]).
random_replies(0) -> make_random_replies(0) ->
[]; [];
random_replies(N) -> make_random_replies(N) ->
[random_reply(N) | random_replies(N - 1)]. [make_random_reply(N) | make_random_replies(N - 1)].
random_reply(N) -> make_random_reply(N) ->
case rand:uniform(3) of case rand:uniform(3) of
1 -> 1 ->
{ok, N}; {ok, N};

View File

@ -1513,6 +1513,7 @@ t_async_reply_multi_eval(_Config) ->
ResumeInterval = 5, ResumeInterval = 5,
TotalTime = 5_000, TotalTime = 5_000,
AsyncInflightWindow = 3, AsyncInflightWindow = 3,
TotalQueries = AsyncInflightWindow * 5,
emqx_connector_demo:set_callback_mode(async_if_possible), emqx_connector_demo:set_callback_mode(async_if_possible),
{ok, _} = emqx_resource:create( {ok, _} = emqx_resource:create(
?ID, ?ID,
@ -1528,35 +1529,17 @@ t_async_reply_multi_eval(_Config) ->
resume_interval => ResumeInterval resume_interval => ResumeInterval
} }
), ),
?check_trace(
#{timetrap => 30_000},
begin
%% block %% block
ok = emqx_resource:simple_sync_query(?ID, block), ok = emqx_resource:simple_sync_query(?ID, block),
?wait_async_action(
inc_counter_in_parallel( inc_counter_in_parallel(
AsyncInflightWindow * 5, TotalQueries,
fun() -> fun() ->
Rand = rand:uniform(1000), Rand = rand:uniform(1000),
{random_reply, Rand} {random_reply, Rand}
end, end,
#{} #{}
), ),
#{?snk_kind := buffer_worker_queue_drained, inflight := 0}, F = fun() ->
TotalTime
),
ok
end,
[
fun(Trace) ->
?assertMatch(
[#{inflight := 0} | _],
lists:reverse(?of_kind(buffer_worker_queue_drained, Trace))
)
end
]
),
Metrics = tap_metrics(?LINE), Metrics = tap_metrics(?LINE),
#{ #{
counters := Counters, counters := Counters,
@ -1569,8 +1552,10 @@ t_async_reply_multi_eval(_Config) ->
late_reply := LateReply, late_reply := LateReply,
failed := Failed failed := Failed
} = Counters, } = Counters,
?assertEqual(Matched, Success + Dropped + LateReply + Failed), ?assertEqual(TotalQueries, Matched - 1),
ok. ?assertEqual(Matched, Success + Dropped + LateReply + Failed)
end,
loop_wait(F, _Interval = 5, TotalTime).
t_retry_async_inflight_batch(_Config) -> t_retry_async_inflight_batch(_Config) ->
ResumeInterval = 1_000, ResumeInterval = 1_000,
@ -2637,3 +2622,15 @@ assert_async_retry_fail_then_succeed_inflight(Trace) ->
) )
), ),
ok. ok.
loop_wait(F, Interval, TotalTime) when Interval >= TotalTime ->
%% do it for the last time
F();
loop_wait(F, Interval, TotalTime) ->
try
F()
catch
_:_ ->
timer:sleep(Interval),
loop_wait(F, Interval, TotalTime - Interval)
end.