diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl index 84a4e6d13..6b64a02e9 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl @@ -478,7 +478,6 @@ do_pull_async(State0) -> Body = body(State0, pull), PreparedRequest = {prepared_request, {Method, Path, Body}}, ReplyFunAndArgs = {fun ?MODULE:reply_delegator/4, [self(), pull_async, InstanceId]}, - %% `ehttpc_pool'/`gproc_pool' might return `false' if there are no workers... Res = emqx_bridge_gcp_pubsub_client:query_async( PreparedRequest, ReplyFunAndArgs, diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl index d82a61fee..7e90ab48a 100644 --- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl @@ -208,7 +208,7 @@ consumer_config(TestCase, Config) -> " resource_opts {\n" " health_check_interval = \"1s\"\n" %% to fail and retry pulling faster - " request_ttl = \"5s\"\n" + " request_ttl = \"1s\"\n" " }\n" "}\n", [ @@ -285,7 +285,7 @@ start_control_client() -> connect_timeout => 5_000, max_retries => 0, pool_size => 1, - resource_opts => #{request_ttl => 5_000}, + resource_opts => #{request_ttl => 1_000}, service_account_json => RawServiceAccount }, PoolName = <<"control_connector">>, @@ -512,10 +512,16 @@ wait_acked(Opts) -> %% no need to check return value; we check the property in %% the check phase. this is just to give it a chance to do %% so and avoid flakiness. should be fast. - snabbkaffe:block_until( + Res = snabbkaffe:block_until( ?match_n_events(N, #{?snk_kind := gcp_pubsub_consumer_worker_acknowledged}), Timeout ), + case Res of + {ok, _} -> + ok; + {timeout, Evts} -> + ct:pal("timed out waiting for acks; received:\n ~p", [Evts]) + end, ok. wait_forgotten() -> @@ -1265,11 +1271,12 @@ t_multiple_pull_workers(Config) -> <<"consumer">> => #{ %% reduce flakiness <<"ack_deadline">> => <<"10m">>, + <<"ack_retry_interval">> => <<"1s">>, <<"consumer_workers_per_topic">> => NConsumers }, <<"resource_opts">> => #{ %% reduce flakiness - <<"request_ttl">> => <<"15s">> + <<"request_ttl">> => <<"11s">> } } ), @@ -1531,11 +1538,12 @@ t_async_worker_death_mid_pull(Config) -> ct:pal("published message"), AsyncWorkerPids = get_async_worker_pids(Config), + Timeout = 20_000, emqx_utils:pmap( fun(AsyncWorkerPid) -> Ref = monitor(process, AsyncWorkerPid), ct:pal("killing pid ~p", [AsyncWorkerPid]), - sys:terminate(AsyncWorkerPid, die, 20_000), + sys:terminate(AsyncWorkerPid, die, Timeout), receive {'DOWN', Ref, process, AsyncWorkerPid, _} -> ct:pal("killed pid ~p", [AsyncWorkerPid]), @@ -1544,7 +1552,8 @@ t_async_worker_death_mid_pull(Config) -> end, ok end, - AsyncWorkerPids + AsyncWorkerPids, + Timeout + 2_000 ), ok @@ -1558,7 +1567,13 @@ t_async_worker_death_mid_pull(Config) -> ?wait_async_action( create_bridge( Config, - #{<<"pool_size">> => 1} + #{ + <<"pool_size">> => 1, + <<"consumer">> => #{ + <<"ack_deadline">> => <<"10s">>, + <<"ack_retry_interval">> => <<"1s">> + } + } ), #{?snk_kind := gcp_pubsub_consumer_worker_init}, 10_000 @@ -1888,7 +1903,10 @@ t_connection_down_during_ack(Config) -> {{ok, _}, {ok, _}} = ?wait_async_action( - create_bridge(Config), + create_bridge( + Config, + #{<<"consumer">> => #{<<"ack_retry_interval">> => <<"1s">>}} + ), #{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"}, 10_000 ), @@ -2026,7 +2044,13 @@ t_connection_down_during_pull(Config) -> {{ok, _}, {ok, _}} = ?wait_async_action( - create_bridge(Config), + create_bridge( + Config, + #{ + <<"consumer">> => #{<<"ack_retry_interval">> => <<"1s">>}, + <<"resource_opts">> => #{<<"request_ttl">> => <<"11s">>} + } + ), #{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"}, 10_000 ),