Merge pull request #13335 from thalesmg/20240625-test-flaky-resource-expiration-retry-r572

test: attempt to fix flaky test
This commit is contained in:
Thales Macedo Garitezi 2024-06-26 15:15:33 -03:00 committed by GitHub
commit 7dea8e08b5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 35 additions and 22 deletions

View File

@ -29,6 +29,9 @@
-define(RESOURCE_ERROR(REASON), {error, {resource_error, #{reason := REASON}}}). -define(RESOURCE_ERROR(REASON), {error, {resource_error, #{reason := REASON}}}).
-define(TRACE_OPTS, #{timetrap => 10000, timeout => 1000}). -define(TRACE_OPTS, #{timetrap => 10000, timeout => 1000}).
-define(TELEMETRY_PREFIX, emqx, resource). -define(TELEMETRY_PREFIX, emqx, resource).
-define(QUERY(FROM, REQUEST, SENT, EXPIRE_AT, TRACE_CTX),
{query, FROM, REQUEST, SENT, EXPIRE_AT, TRACE_CTX}
).
-import(emqx_common_test_helpers, [on_exit/1]). -import(emqx_common_test_helpers, [on_exit/1]).
@ -2494,7 +2497,7 @@ t_expiration_retry(_Config) ->
resume_interval => 300 resume_interval => 300
} }
), ),
do_t_expiration_retry(). do_t_expiration_retry(#{is_batch => false}).
t_expiration_retry_batch(_Config) -> t_expiration_retry_batch(_Config) ->
emqx_connector_demo:set_callback_mode(always_sync), emqx_connector_demo:set_callback_mode(always_sync),
@ -2511,20 +2514,17 @@ t_expiration_retry_batch(_Config) ->
resume_interval => 300 resume_interval => 300
} }
), ),
do_t_expiration_retry(). do_t_expiration_retry(#{is_batch => true}).
do_t_expiration_retry() -> do_t_expiration_retry(Context) ->
IsBatch = maps:get(is_batch, Context),
ResumeInterval = 300, ResumeInterval = 300,
?check_trace( ?check_trace(
#{timetrap => 10_000},
begin begin
ok = emqx_resource:simple_sync_query(?ID, block), ok = emqx_resource:simple_sync_query(?ID, block),
{ok, SRef0} = snabbkaffe:subscribe( TimeoutMS = 200,
?match_event(#{?snk_kind := buffer_worker_flush_nack}),
1,
200
),
TimeoutMS = 100,
%% the request that expires must be first, so it's the %% the request that expires must be first, so it's the
%% head of the inflight table (and retriable). %% head of the inflight table (and retriable).
{ok, SRef1} = snabbkaffe:subscribe( {ok, SRef1} = snabbkaffe:subscribe(
@ -2542,6 +2542,8 @@ do_t_expiration_retry() ->
) )
) )
end), end),
%% This second message must be enqueued while the resource is blocked by the
%% previous message.
Pid1 = Pid1 =
spawn_link(fun() -> spawn_link(fun() ->
receive receive
@ -2556,22 +2558,33 @@ do_t_expiration_retry() ->
) )
) )
end), end),
?tp("waiting for first message to be appended to the queue", #{}),
{ok, _} = snabbkaffe:receive_events(SRef1), {ok, _} = snabbkaffe:receive_events(SRef1),
?tp("waiting for first message to expire during blocked retries", #{}),
{ok, _} = ?block_until(#{?snk_kind := buffer_worker_retry_expired}),
%% Now we wait until the worker tries the second message at least once before
%% unblocking it.
Pid1 ! go, Pid1 ! go,
{ok, _} = snabbkaffe:receive_events(SRef0), ?tp("waiting for second message to be retried and be nacked while blocked", #{}),
case IsBatch of
false ->
{ok, _} = ?block_until(#{
?snk_kind := buffer_worker_flush_nack,
batch_or_query := ?QUERY(_, {inc_counter, 2}, _, _, _)
});
true ->
{ok, _} = ?block_until(#{
?snk_kind := buffer_worker_flush_nack,
batch_or_query := [?QUERY(_, {inc_counter, 2}, _, _, _) | _]
})
end,
{ok, _} = %% Bypass the buffer worker and unblock the resource.
?block_until( ok = emqx_resource:simple_sync_query(?ID, resume),
#{?snk_kind := buffer_worker_retry_expired}, ?tp("waiting for second message to be retried and be acked, unblocking", #{}),
ResumeInterval * 10 {ok, _} = ?block_until(#{?snk_kind := buffer_worker_retry_inflight_succeeded}),
),
{ok, {ok, _}} =
?wait_async_action(
emqx_resource:simple_sync_query(?ID, resume),
#{?snk_kind := buffer_worker_retry_inflight_succeeded},
ResumeInterval * 5
),
ok ok
end, end,