test(gcp_pubsub_consumer): another attempt to stabilize flaky tests

This commit is contained in:
Thales Macedo Garitezi 2023-12-18 17:12:10 -03:00
parent 2d11aca39f
commit 5128c11542
2 changed files with 11 additions and 3 deletions

View File

@ -566,6 +566,7 @@ do_acknowledge(State0) ->
Path = path(State1, ack), Path = path(State1, ack),
Body = body(State1, ack, #{ack_ids => AckIds}), Body = body(State1, ack, #{ack_ids => AckIds}),
PreparedRequest = {prepared_request, {Method, Path, Body}}, PreparedRequest = {prepared_request, {Method, Path, Body}},
?tp(gcp_pubsub_consumer_worker_will_acknowledge, #{acks => PendingAcks}),
Res = emqx_bridge_gcp_pubsub_client:query_sync(PreparedRequest, Client), Res = emqx_bridge_gcp_pubsub_client:query_sync(PreparedRequest, Client),
case Res of case Res of
{error, Reason} -> {error, Reason} ->

View File

@ -706,7 +706,9 @@ prop_all_pulled_are_acked(Trace) ->
|| #{messages := Msgs} <- ?of_kind(gcp_pubsub_consumer_worker_decoded_messages, Trace), || #{messages := Msgs} <- ?of_kind(gcp_pubsub_consumer_worker_decoded_messages, Trace),
#{<<"message">> := #{<<"messageId">> := MsgId}} <- Msgs #{<<"message">> := #{<<"messageId">> := MsgId}} <- Msgs
], ],
AckedMsgIds0 = ?projection(acks, ?of_kind(gcp_pubsub_consumer_worker_acknowledged, Trace)), %% we just need to check that it _tries_ to ack each id; the result itself doesn't
%% matter, as it might timeout.
AckedMsgIds0 = ?projection(acks, ?of_kind(gcp_pubsub_consumer_worker_will_acknowledge, Trace)),
AckedMsgIds1 = [ AckedMsgIds1 = [
MsgId MsgId
|| PendingAcks <- AckedMsgIds0, {MsgId, _AckId} <- maps:to_list(PendingAcks) || PendingAcks <- AckedMsgIds0, {MsgId, _AckId} <- maps:to_list(PendingAcks)
@ -1172,7 +1174,12 @@ t_multiple_topic_mappings(Config) ->
?assertMatch( ?assertMatch(
{{ok, _}, {ok, _}}, {{ok, _}, {ok, _}},
?wait_async_action( ?wait_async_action(
create_bridge(Config), create_bridge(
Config,
#{
<<"consumer">> => #{<<"ack_deadline">> => <<"10m">>}
}
),
#{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"}, #{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"},
40_000 40_000
) )
@ -1233,7 +1240,7 @@ t_multiple_topic_mappings(Config) ->
], ],
Published Published
), ),
wait_acked(#{n => 2}), ?block_until(#{?snk_kind := gcp_pubsub_consumer_worker_acknowledged}, 20_000),
?retry( ?retry(
_Interval = 200, _Interval = 200,
_NAttempts = 20, _NAttempts = 20,