From 317b29451fbf71115569009a572e948d220f8b5a Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 1 Jul 2024 16:11:03 -0300 Subject: [PATCH] test(gcp consumer): attempt to stabilize flaky test ``` %%% emqx_bridge_kafka_impl_consumer_SUITE ==> ssl.t_start_and_consume_ok: FAILED %%% emqx_bridge_kafka_impl_consumer_SUITE ==> {{panic, #{msg => "Unexpected result", result => {run_stage_failed,error, {badmatch,{{1,0},timeout}}, [{emqx_bridge_kafka_impl_consumer_SUITE, '-t_start_and_consume_ok/1-fun-12-',4, [{file, "/emqx/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl"}, {line,1184}]}, {emqx_bridge_kafka_impl_consumer_SUITE, t_start_and_consume_ok,1, [{file, "/emqx/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl"}, {line,1171}]}]}}}, [{emqx_bridge_kafka_impl_consumer_SUITE,t_start_and_consume_ok,1, [{file, "/emqx/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl"}, {line,1240}]}, ``` --- .../emqx_bridge_gcp_pubsub_consumer_SUITE.erl | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl index 9450d02f0..0e6956d58 100644 --- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl @@ -519,6 +519,7 @@ wait_acked(Opts) -> %% no need to check return value; we check the property in %% the check phase. this is just to give it a chance to do %% so and avoid flakiness. should be fast. + ct:pal("waiting ~b ms until acked...", [Timeout]), Res = snabbkaffe:block_until( ?match_n_events(N, #{?snk_kind := gcp_pubsub_consumer_worker_acknowledged}), Timeout @@ -1265,11 +1266,12 @@ t_multiple_topic_mappings(Config) -> %% 2+ pull workers do not duplicate delivered messages t_multiple_pull_workers(Config) -> - ct:timetrap({seconds, 120}), + ct:timetrap({seconds, 121}), BridgeName = ?config(consumer_name, Config), TopicMapping = ?config(topic_mapping, Config), ResourceId = resource_id(Config), ?check_trace( + #{timetrap => 120_000}, begin NConsumers = 3, start_and_subscribe_mqtt(Config), @@ -1277,7 +1279,7 @@ t_multiple_pull_workers(Config) -> snabbkaffe:subscribe( ?match_event(#{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"}), NConsumers, - 40_000 + infinity ), {ok, _} = create_bridge( Config, @@ -1287,6 +1289,14 @@ t_multiple_pull_workers(Config) -> <<"ack_deadline">> => <<"10m">>, <<"ack_retry_interval">> => <<"1s">>, <<"consumer_workers_per_topic">> => NConsumers + }, + <<"resource_opts">> => #{ + %% Used by worker when patching subscritpion; we increase it a bit + %% here because (at least) the gcp emulator tends to time out / + %% throttle (?) workers targeting the same subscription, making + %% the test flakier. + <<"request_ttl">> => <<"5s">>, + <<"resume_interval">> => <<"1s">> } } ), @@ -1294,6 +1304,14 @@ t_multiple_pull_workers(Config) -> [#{pubsub_topic := Topic}] = TopicMapping, Payload = emqx_guid:to_hexstr(emqx_guid:gen()), Messages = [#{<<"data">> => Payload}], + ?retry( + 500, + 20, + ?assertMatch( + {ok, connected}, + health_check(Config) + ) + ), pubsub_publish(Config, Topic, Messages), {ok, Published} = receive_published(), ?assertMatch(