test(resource): fix flaky test
This commit is contained in:
parent
687509886e
commit
d78312e10e
|
@ -1466,7 +1466,7 @@ mark_inflight_items_as_retriable(Data, WorkerMRef) ->
|
|||
end
|
||||
),
|
||||
_NumAffected = ets:select_replace(InflightTID, MatchSpec),
|
||||
?tp(buffer_worker_worker_down_update, #{num_affected => _NumAffected}),
|
||||
?tp(buffer_worker_async_agent_down, #{num_affected => _NumAffected}),
|
||||
ok.
|
||||
|
||||
%% used to update a batch after dropping expired individual queries.
|
||||
|
|
|
@ -1766,12 +1766,6 @@ t_async_pool_worker_death(_Config) ->
|
|||
?assertEqual(NumReqs, Inflight0),
|
||||
|
||||
%% grab one of the worker pids and kill it
|
||||
{ok, SRef1} =
|
||||
snabbkaffe:subscribe(
|
||||
?match_event(#{?snk_kind := buffer_worker_worker_down_update}),
|
||||
NumBufferWorkers,
|
||||
10_000
|
||||
),
|
||||
{ok, #{pid := Pid0}} = emqx_resource:simple_sync_query(?ID, get_state),
|
||||
MRef = monitor(process, Pid0),
|
||||
ct:pal("will kill ~p", [Pid0]),
|
||||
|
@ -1785,13 +1779,27 @@ t_async_pool_worker_death(_Config) ->
|
|||
end,
|
||||
|
||||
%% inflight requests should have been marked as retriable
|
||||
{ok, _} = snabbkaffe:receive_events(SRef1),
|
||||
wait_until_all_marked_as_retriable(NumReqs),
|
||||
Inflight1 = emqx_resource_metrics:inflight_get(?ID),
|
||||
?assertEqual(NumReqs, Inflight1),
|
||||
|
||||
ok
|
||||
NumReqs
|
||||
end,
|
||||
[]
|
||||
fun(NumReqs, Trace) ->
|
||||
Events = ?of_kind(buffer_worker_async_agent_down, Trace),
|
||||
%% At least one buffer worker should have marked its
|
||||
%% requests as retriable. If a single one has
|
||||
%% received all requests, that's all we got.
|
||||
?assertMatch([_ | _], Events),
|
||||
%% All requests distributed over all buffer workers
|
||||
%% should have been marked as retriable, by the time
|
||||
%% the inflight has been drained.
|
||||
?assertEqual(
|
||||
NumReqs,
|
||||
lists:sum([N || #{num_affected := N} <- Events])
|
||||
),
|
||||
ok
|
||||
end
|
||||
),
|
||||
ok.
|
||||
|
||||
|
@ -3017,3 +3025,33 @@ trace_between_span(Trace0, Marker) ->
|
|||
{Trace1, [_ | _]} = ?split_trace_at(#{?snk_kind := Marker, ?snk_span := {complete, _}}, Trace0),
|
||||
{[_ | _], [_ | Trace2]} = ?split_trace_at(#{?snk_kind := Marker, ?snk_span := start}, Trace1),
|
||||
Trace2.
|
||||
|
||||
wait_until_all_marked_as_retriable(NumExpected) when NumExpected =< 0 ->
|
||||
ok;
|
||||
wait_until_all_marked_as_retriable(NumExpected) ->
|
||||
Seen = #{},
|
||||
do_wait_until_all_marked_as_retriable(NumExpected, Seen).
|
||||
|
||||
do_wait_until_all_marked_as_retriable(NumExpected, _Seen) when NumExpected =< 0 ->
|
||||
ok;
|
||||
do_wait_until_all_marked_as_retriable(NumExpected, Seen) ->
|
||||
Res = ?block_until(
|
||||
#{?snk_kind := buffer_worker_async_agent_down, ?snk_meta := #{pid := P}} when
|
||||
not is_map_key(P, Seen),
|
||||
10_000
|
||||
),
|
||||
case Res of
|
||||
{timeout, Evts} ->
|
||||
ct:pal("events so far:\n ~p", [Evts]),
|
||||
ct:fail("timeout waiting for events");
|
||||
{ok, #{num_affected := NumAffected, ?snk_meta := #{pid := Pid}}} ->
|
||||
ct:pal("affected: ~p; pid: ~p", [NumAffected, Pid]),
|
||||
case NumAffected >= NumExpected of
|
||||
true ->
|
||||
ok;
|
||||
false ->
|
||||
do_wait_until_all_marked_as_retriable(NumExpected - NumAffected, Seen#{
|
||||
Pid => true
|
||||
})
|
||||
end
|
||||
end.
|
||||
|
|
Loading…
Reference in New Issue