From 954adc71c4b899f4ca6bfefe68230338508247ae Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 25 Jun 2024 17:55:31 -0300 Subject: [PATCH] test: attempt to fix flaky test https://github.com/emqx/emqx/actions/runs/9662725303/job/26653594859?pr=13328#step:6:186 ``` %%% emqx_resource_SUITE ==> t_expiration_retry: FAILED %%% emqx_resource_SUITE ==> {{panic, #{msg => "Unexpected result", result => {run_stage_failed,error, {badmatch,{ok,timeout}}, [{emqx_resource_SUITE,'-do_t_expiration_retry/0-fun-12-',0, [{file, "/__w/emqx/emqx/apps/emqx_resource/test/emqx_resource_SUITE.erl"}, {line,2569}]}, {emqx_resource_SUITE,do_t_expiration_retry,0, [{file, "/__w/emqx/emqx/apps/emqx_resource/test/emqx_resource_SUITE.erl"}, {line,2518}]}]}}}, [{emqx_resource_SUITE,do_t_expiration_retry,0, [{file,"/__w/emqx/emqx/apps/emqx_resource/test/emqx_resource_SUITE.erl"}, {line,2594}]}, {test_server,ts_tc,3,[{file,"test_server.erl"},{line,1793}]}, {test_server,run_test_case_eval1,6,[{file,"test_server.erl"},{line,1302}]}, {test_server,run_test_case_eval,9,[{file,"test_server.erl"},{line,1234}]}]} ``` --- .../test/emqx_resource_SUITE.erl | 57 ++++++++++++------- 1 file changed, 35 insertions(+), 22 deletions(-) diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 764c65e6f..af9abe95b 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -29,6 +29,9 @@ -define(RESOURCE_ERROR(REASON), {error, {resource_error, #{reason := REASON}}}). -define(TRACE_OPTS, #{timetrap => 10000, timeout => 1000}). -define(TELEMETRY_PREFIX, emqx, resource). +-define(QUERY(FROM, REQUEST, SENT, EXPIRE_AT, TRACE_CTX), + {query, FROM, REQUEST, SENT, EXPIRE_AT, TRACE_CTX} +). -import(emqx_common_test_helpers, [on_exit/1]). @@ -2494,7 +2497,7 @@ t_expiration_retry(_Config) -> resume_interval => 300 } ), - do_t_expiration_retry(). + do_t_expiration_retry(#{is_batch => false}). t_expiration_retry_batch(_Config) -> emqx_connector_demo:set_callback_mode(always_sync), @@ -2511,20 +2514,17 @@ t_expiration_retry_batch(_Config) -> resume_interval => 300 } ), - do_t_expiration_retry(). + do_t_expiration_retry(#{is_batch => true}). -do_t_expiration_retry() -> +do_t_expiration_retry(Context) -> + IsBatch = maps:get(is_batch, Context), ResumeInterval = 300, ?check_trace( + #{timetrap => 10_000}, begin ok = emqx_resource:simple_sync_query(?ID, block), - {ok, SRef0} = snabbkaffe:subscribe( - ?match_event(#{?snk_kind := buffer_worker_flush_nack}), - 1, - 200 - ), - TimeoutMS = 100, + TimeoutMS = 200, %% the request that expires must be first, so it's the %% head of the inflight table (and retriable). {ok, SRef1} = snabbkaffe:subscribe( @@ -2542,6 +2542,8 @@ do_t_expiration_retry() -> ) ) end), + %% This second message must be enqueued while the resource is blocked by the + %% previous message. Pid1 = spawn_link(fun() -> receive @@ -2556,22 +2558,33 @@ do_t_expiration_retry() -> ) ) end), + ?tp("waiting for first message to be appended to the queue", #{}), {ok, _} = snabbkaffe:receive_events(SRef1), + + ?tp("waiting for first message to expire during blocked retries", #{}), + {ok, _} = ?block_until(#{?snk_kind := buffer_worker_retry_expired}), + + %% Now we wait until the worker tries the second message at least once before + %% unblocking it. Pid1 ! go, - {ok, _} = snabbkaffe:receive_events(SRef0), + ?tp("waiting for second message to be retried and be nacked while blocked", #{}), + case IsBatch of + false -> + {ok, _} = ?block_until(#{ + ?snk_kind := buffer_worker_flush_nack, + batch_or_query := ?QUERY(_, {inc_counter, 2}, _, _, _) + }); + true -> + {ok, _} = ?block_until(#{ + ?snk_kind := buffer_worker_flush_nack, + batch_or_query := [?QUERY(_, {inc_counter, 2}, _, _, _) | _] + }) + end, - {ok, _} = - ?block_until( - #{?snk_kind := buffer_worker_retry_expired}, - ResumeInterval * 10 - ), - - {ok, {ok, _}} = - ?wait_async_action( - emqx_resource:simple_sync_query(?ID, resume), - #{?snk_kind := buffer_worker_retry_inflight_succeeded}, - ResumeInterval * 5 - ), + %% Bypass the buffer worker and unblock the resource. + ok = emqx_resource:simple_sync_query(?ID, resume), + ?tp("waiting for second message to be retried and be acked, unblocking", #{}), + {ok, _} = ?block_until(#{?snk_kind := buffer_worker_retry_inflight_succeeded}), ok end,