test(gcp consumer): attempt to stabilize flaky test
``` %%% emqx_bridge_kafka_impl_consumer_SUITE ==> ssl.t_start_and_consume_ok: FAILED %%% emqx_bridge_kafka_impl_consumer_SUITE ==> {{panic, #{msg => "Unexpected result", result => {run_stage_failed,error, {badmatch,{{1,0},timeout}}, [{emqx_bridge_kafka_impl_consumer_SUITE, '-t_start_and_consume_ok/1-fun-12-',4, [{file, "/emqx/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl"}, {line,1184}]}, {emqx_bridge_kafka_impl_consumer_SUITE, t_start_and_consume_ok,1, [{file, "/emqx/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl"}, {line,1171}]}]}}}, [{emqx_bridge_kafka_impl_consumer_SUITE,t_start_and_consume_ok,1, [{file, "/emqx/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl"}, {line,1240}]}, ```
This commit is contained in:
parent
c04e93838f
commit
317b29451f
|
@ -519,6 +519,7 @@ wait_acked(Opts) ->
|
||||||
%% no need to check return value; we check the property in
|
%% no need to check return value; we check the property in
|
||||||
%% the check phase. this is just to give it a chance to do
|
%% the check phase. this is just to give it a chance to do
|
||||||
%% so and avoid flakiness. should be fast.
|
%% so and avoid flakiness. should be fast.
|
||||||
|
ct:pal("waiting ~b ms until acked...", [Timeout]),
|
||||||
Res = snabbkaffe:block_until(
|
Res = snabbkaffe:block_until(
|
||||||
?match_n_events(N, #{?snk_kind := gcp_pubsub_consumer_worker_acknowledged}),
|
?match_n_events(N, #{?snk_kind := gcp_pubsub_consumer_worker_acknowledged}),
|
||||||
Timeout
|
Timeout
|
||||||
|
@ -1265,11 +1266,12 @@ t_multiple_topic_mappings(Config) ->
|
||||||
|
|
||||||
%% 2+ pull workers do not duplicate delivered messages
|
%% 2+ pull workers do not duplicate delivered messages
|
||||||
t_multiple_pull_workers(Config) ->
|
t_multiple_pull_workers(Config) ->
|
||||||
ct:timetrap({seconds, 120}),
|
ct:timetrap({seconds, 121}),
|
||||||
BridgeName = ?config(consumer_name, Config),
|
BridgeName = ?config(consumer_name, Config),
|
||||||
TopicMapping = ?config(topic_mapping, Config),
|
TopicMapping = ?config(topic_mapping, Config),
|
||||||
ResourceId = resource_id(Config),
|
ResourceId = resource_id(Config),
|
||||||
?check_trace(
|
?check_trace(
|
||||||
|
#{timetrap => 120_000},
|
||||||
begin
|
begin
|
||||||
NConsumers = 3,
|
NConsumers = 3,
|
||||||
start_and_subscribe_mqtt(Config),
|
start_and_subscribe_mqtt(Config),
|
||||||
|
@ -1277,7 +1279,7 @@ t_multiple_pull_workers(Config) ->
|
||||||
snabbkaffe:subscribe(
|
snabbkaffe:subscribe(
|
||||||
?match_event(#{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"}),
|
?match_event(#{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"}),
|
||||||
NConsumers,
|
NConsumers,
|
||||||
40_000
|
infinity
|
||||||
),
|
),
|
||||||
{ok, _} = create_bridge(
|
{ok, _} = create_bridge(
|
||||||
Config,
|
Config,
|
||||||
|
@ -1287,6 +1289,14 @@ t_multiple_pull_workers(Config) ->
|
||||||
<<"ack_deadline">> => <<"10m">>,
|
<<"ack_deadline">> => <<"10m">>,
|
||||||
<<"ack_retry_interval">> => <<"1s">>,
|
<<"ack_retry_interval">> => <<"1s">>,
|
||||||
<<"consumer_workers_per_topic">> => NConsumers
|
<<"consumer_workers_per_topic">> => NConsumers
|
||||||
|
},
|
||||||
|
<<"resource_opts">> => #{
|
||||||
|
%% Used by worker when patching subscritpion; we increase it a bit
|
||||||
|
%% here because (at least) the gcp emulator tends to time out /
|
||||||
|
%% throttle (?) workers targeting the same subscription, making
|
||||||
|
%% the test flakier.
|
||||||
|
<<"request_ttl">> => <<"5s">>,
|
||||||
|
<<"resume_interval">> => <<"1s">>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
|
@ -1294,6 +1304,14 @@ t_multiple_pull_workers(Config) ->
|
||||||
[#{pubsub_topic := Topic}] = TopicMapping,
|
[#{pubsub_topic := Topic}] = TopicMapping,
|
||||||
Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
|
Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
|
||||||
Messages = [#{<<"data">> => Payload}],
|
Messages = [#{<<"data">> => Payload}],
|
||||||
|
?retry(
|
||||||
|
500,
|
||||||
|
20,
|
||||||
|
?assertMatch(
|
||||||
|
{ok, connected},
|
||||||
|
health_check(Config)
|
||||||
|
)
|
||||||
|
),
|
||||||
pubsub_publish(Config, Topic, Messages),
|
pubsub_publish(Config, Topic, Messages),
|
||||||
{ok, Published} = receive_published(),
|
{ok, Published} = receive_published(),
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
|
|
Loading…
Reference in New Issue