Merge pull request #12036 from thalesmg/test-more-flaky-tests-r54-20231127

test: attempting to stabilize more flaky tests
This commit is contained in:
Thales Macedo Garitezi 2023-11-28 13:40:49 -03:00 committed by GitHub
commit f6f61f4353
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 33 additions and 10 deletions

View File

@ -478,7 +478,6 @@ do_pull_async(State0) ->
Body = body(State0, pull), Body = body(State0, pull),
PreparedRequest = {prepared_request, {Method, Path, Body}}, PreparedRequest = {prepared_request, {Method, Path, Body}},
ReplyFunAndArgs = {fun ?MODULE:reply_delegator/4, [self(), pull_async, InstanceId]}, ReplyFunAndArgs = {fun ?MODULE:reply_delegator/4, [self(), pull_async, InstanceId]},
%% `ehttpc_pool'/`gproc_pool' might return `false' if there are no workers...
Res = emqx_bridge_gcp_pubsub_client:query_async( Res = emqx_bridge_gcp_pubsub_client:query_async(
PreparedRequest, PreparedRequest,
ReplyFunAndArgs, ReplyFunAndArgs,

View File

@ -208,7 +208,7 @@ consumer_config(TestCase, Config) ->
" resource_opts {\n" " resource_opts {\n"
" health_check_interval = \"1s\"\n" " health_check_interval = \"1s\"\n"
%% to fail and retry pulling faster %% to fail and retry pulling faster
" request_ttl = \"5s\"\n" " request_ttl = \"1s\"\n"
" }\n" " }\n"
"}\n", "}\n",
[ [
@ -285,7 +285,7 @@ start_control_client() ->
connect_timeout => 5_000, connect_timeout => 5_000,
max_retries => 0, max_retries => 0,
pool_size => 1, pool_size => 1,
resource_opts => #{request_ttl => 5_000}, resource_opts => #{request_ttl => 1_000},
service_account_json => RawServiceAccount service_account_json => RawServiceAccount
}, },
PoolName = <<"control_connector">>, PoolName = <<"control_connector">>,
@ -512,10 +512,16 @@ wait_acked(Opts) ->
%% no need to check return value; we check the property in %% no need to check return value; we check the property in
%% the check phase. this is just to give it a chance to do %% the check phase. this is just to give it a chance to do
%% so and avoid flakiness. should be fast. %% so and avoid flakiness. should be fast.
snabbkaffe:block_until( Res = snabbkaffe:block_until(
?match_n_events(N, #{?snk_kind := gcp_pubsub_consumer_worker_acknowledged}), ?match_n_events(N, #{?snk_kind := gcp_pubsub_consumer_worker_acknowledged}),
Timeout Timeout
), ),
case Res of
{ok, _} ->
ok;
{timeout, Evts} ->
ct:pal("timed out waiting for acks; received:\n ~p", [Evts])
end,
ok. ok.
wait_forgotten() -> wait_forgotten() ->
@ -1265,11 +1271,12 @@ t_multiple_pull_workers(Config) ->
<<"consumer">> => #{ <<"consumer">> => #{
%% reduce flakiness %% reduce flakiness
<<"ack_deadline">> => <<"10m">>, <<"ack_deadline">> => <<"10m">>,
<<"ack_retry_interval">> => <<"1s">>,
<<"consumer_workers_per_topic">> => NConsumers <<"consumer_workers_per_topic">> => NConsumers
}, },
<<"resource_opts">> => #{ <<"resource_opts">> => #{
%% reduce flakiness %% reduce flakiness
<<"request_ttl">> => <<"15s">> <<"request_ttl">> => <<"11s">>
} }
} }
), ),
@ -1531,11 +1538,12 @@ t_async_worker_death_mid_pull(Config) ->
ct:pal("published message"), ct:pal("published message"),
AsyncWorkerPids = get_async_worker_pids(Config), AsyncWorkerPids = get_async_worker_pids(Config),
Timeout = 20_000,
emqx_utils:pmap( emqx_utils:pmap(
fun(AsyncWorkerPid) -> fun(AsyncWorkerPid) ->
Ref = monitor(process, AsyncWorkerPid), Ref = monitor(process, AsyncWorkerPid),
ct:pal("killing pid ~p", [AsyncWorkerPid]), ct:pal("killing pid ~p", [AsyncWorkerPid]),
sys:terminate(AsyncWorkerPid, die, 20_000), sys:terminate(AsyncWorkerPid, die, Timeout),
receive receive
{'DOWN', Ref, process, AsyncWorkerPid, _} -> {'DOWN', Ref, process, AsyncWorkerPid, _} ->
ct:pal("killed pid ~p", [AsyncWorkerPid]), ct:pal("killed pid ~p", [AsyncWorkerPid]),
@ -1544,7 +1552,8 @@ t_async_worker_death_mid_pull(Config) ->
end, end,
ok ok
end, end,
AsyncWorkerPids AsyncWorkerPids,
Timeout + 2_000
), ),
ok ok
@ -1558,7 +1567,13 @@ t_async_worker_death_mid_pull(Config) ->
?wait_async_action( ?wait_async_action(
create_bridge( create_bridge(
Config, Config,
#{<<"pool_size">> => 1} #{
<<"pool_size">> => 1,
<<"consumer">> => #{
<<"ack_deadline">> => <<"10s">>,
<<"ack_retry_interval">> => <<"1s">>
}
}
), ),
#{?snk_kind := gcp_pubsub_consumer_worker_init}, #{?snk_kind := gcp_pubsub_consumer_worker_init},
10_000 10_000
@ -1888,7 +1903,10 @@ t_connection_down_during_ack(Config) ->
{{ok, _}, {ok, _}} = {{ok, _}, {ok, _}} =
?wait_async_action( ?wait_async_action(
create_bridge(Config), create_bridge(
Config,
#{<<"consumer">> => #{<<"ack_retry_interval">> => <<"1s">>}}
),
#{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"}, #{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"},
10_000 10_000
), ),
@ -2026,7 +2044,13 @@ t_connection_down_during_pull(Config) ->
{{ok, _}, {ok, _}} = {{ok, _}, {ok, _}} =
?wait_async_action( ?wait_async_action(
create_bridge(Config), create_bridge(
Config,
#{
<<"consumer">> => #{<<"ack_retry_interval">> => <<"1s">>},
<<"resource_opts">> => #{<<"request_ttl">> => <<"11s">>}
}
),
#{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"}, #{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"},
10_000 10_000
), ),