From 5128c115421d286eb33e3f2ae9f710cd38271123 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 18 Dec 2023 17:12:10 -0300 Subject: [PATCH] test(gcp_pubsub_consumer): another attempt to stabilize flaky tests --- .../src/emqx_bridge_gcp_pubsub_consumer_worker.erl | 1 + .../test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl | 13 ++++++++++--- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl index 93f8fd8c3..f860e3635 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl @@ -566,6 +566,7 @@ do_acknowledge(State0) -> Path = path(State1, ack), Body = body(State1, ack, #{ack_ids => AckIds}), PreparedRequest = {prepared_request, {Method, Path, Body}}, + ?tp(gcp_pubsub_consumer_worker_will_acknowledge, #{acks => PendingAcks}), Res = emqx_bridge_gcp_pubsub_client:query_sync(PreparedRequest, Client), case Res of {error, Reason} -> diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl index b747e9262..41c3e1883 100644 --- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl @@ -706,7 +706,9 @@ prop_all_pulled_are_acked(Trace) -> || #{messages := Msgs} <- ?of_kind(gcp_pubsub_consumer_worker_decoded_messages, Trace), #{<<"message">> := #{<<"messageId">> := MsgId}} <- Msgs ], - AckedMsgIds0 = ?projection(acks, ?of_kind(gcp_pubsub_consumer_worker_acknowledged, Trace)), + %% we just need to check that it _tries_ to ack each id; the result itself doesn't + %% matter, as it might timeout. + AckedMsgIds0 = ?projection(acks, ?of_kind(gcp_pubsub_consumer_worker_will_acknowledge, Trace)), AckedMsgIds1 = [ MsgId || PendingAcks <- AckedMsgIds0, {MsgId, _AckId} <- maps:to_list(PendingAcks) @@ -1172,7 +1174,12 @@ t_multiple_topic_mappings(Config) -> ?assertMatch( {{ok, _}, {ok, _}}, ?wait_async_action( - create_bridge(Config), + create_bridge( + Config, + #{ + <<"consumer">> => #{<<"ack_deadline">> => <<"10m">>} + } + ), #{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"}, 40_000 ) @@ -1233,7 +1240,7 @@ t_multiple_topic_mappings(Config) -> ], Published ), - wait_acked(#{n => 2}), + ?block_until(#{?snk_kind := gcp_pubsub_consumer_worker_acknowledged}, 20_000), ?retry( _Interval = 200, _NAttempts = 20,