diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 2e2cd5631..7cb7f8198 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -1466,7 +1466,7 @@ mark_inflight_items_as_retriable(Data, WorkerMRef) -> end ), _NumAffected = ets:select_replace(InflightTID, MatchSpec), - ?tp(buffer_worker_worker_down_update, #{num_affected => _NumAffected}), + ?tp(buffer_worker_async_agent_down, #{num_affected => _NumAffected}), ok. %% used to update a batch after dropping expired individual queries. diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index f8ddd56b5..34781df6c 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -1766,12 +1766,6 @@ t_async_pool_worker_death(_Config) -> ?assertEqual(NumReqs, Inflight0), %% grab one of the worker pids and kill it - {ok, SRef1} = - snabbkaffe:subscribe( - ?match_event(#{?snk_kind := buffer_worker_worker_down_update}), - NumBufferWorkers, - 10_000 - ), {ok, #{pid := Pid0}} = emqx_resource:simple_sync_query(?ID, get_state), MRef = monitor(process, Pid0), ct:pal("will kill ~p", [Pid0]), @@ -1785,13 +1779,27 @@ t_async_pool_worker_death(_Config) -> end, %% inflight requests should have been marked as retriable - {ok, _} = snabbkaffe:receive_events(SRef1), + wait_until_all_marked_as_retriable(NumReqs), Inflight1 = emqx_resource_metrics:inflight_get(?ID), ?assertEqual(NumReqs, Inflight1), - ok + NumReqs end, - [] + fun(NumReqs, Trace) -> + Events = ?of_kind(buffer_worker_async_agent_down, Trace), + %% At least one buffer worker should have marked its + %% requests as retriable. If a single one has + %% received all requests, that's all we got. + ?assertMatch([_ | _], Events), + %% All requests distributed over all buffer workers + %% should have been marked as retriable, by the time + %% the inflight has been drained. + ?assertEqual( + NumReqs, + lists:sum([N || #{num_affected := N} <- Events]) + ), + ok + end ), ok. @@ -3017,3 +3025,33 @@ trace_between_span(Trace0, Marker) -> {Trace1, [_ | _]} = ?split_trace_at(#{?snk_kind := Marker, ?snk_span := {complete, _}}, Trace0), {[_ | _], [_ | Trace2]} = ?split_trace_at(#{?snk_kind := Marker, ?snk_span := start}, Trace1), Trace2. + +wait_until_all_marked_as_retriable(NumExpected) when NumExpected =< 0 -> + ok; +wait_until_all_marked_as_retriable(NumExpected) -> + Seen = #{}, + do_wait_until_all_marked_as_retriable(NumExpected, Seen). + +do_wait_until_all_marked_as_retriable(NumExpected, _Seen) when NumExpected =< 0 -> + ok; +do_wait_until_all_marked_as_retriable(NumExpected, Seen) -> + Res = ?block_until( + #{?snk_kind := buffer_worker_async_agent_down, ?snk_meta := #{pid := P}} when + not is_map_key(P, Seen), + 10_000 + ), + case Res of + {timeout, Evts} -> + ct:pal("events so far:\n ~p", [Evts]), + ct:fail("timeout waiting for events"); + {ok, #{num_affected := NumAffected, ?snk_meta := #{pid := Pid}}} -> + ct:pal("affected: ~p; pid: ~p", [NumAffected, Pid]), + case NumAffected >= NumExpected of + true -> + ok; + false -> + do_wait_until_all_marked_as_retriable(NumExpected - NumAffected, Seen#{ + Pid => true + }) + end + end.