diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl index be6a306e0..b0e4e4ac8 100644 --- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl @@ -34,16 +34,22 @@ init_per_suite(Config) -> emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), case emqx_common_test_helpers:is_tcp_server_available(GCPEmulatorHost, GCPEmulatorPort) of true -> - ok = emqx_common_test_helpers:start_apps([emqx_conf]), - ok = emqx_connector_test_helpers:start_apps([ - emqx_resource, emqx_bridge, emqx_rule_engine - ]), - {ok, _} = application:ensure_all_started(emqx_connector), + Apps = emqx_cth_suite:start( + [ + emqx, + emqx_conf, + emqx_bridge_gcp_pubsub, + emqx_bridge, + emqx_rule_engine + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), emqx_mgmt_api_test_util:init_suite(), HostPort = GCPEmulatorHost ++ ":" ++ GCPEmulatorPortStr, true = os:putenv("PUBSUB_EMULATOR_HOST", HostPort), Client = start_control_client(), [ + {apps, Apps}, {proxy_name, ProxyName}, {proxy_host, ProxyHost}, {proxy_port, ProxyPort}, @@ -62,12 +68,11 @@ init_per_suite(Config) -> end. end_per_suite(Config) -> + Apps = ?config(apps, Config), Client = ?config(client, Config), stop_control_client(Client), emqx_mgmt_api_test_util:end_suite(), - ok = emqx_common_test_helpers:stop_apps([emqx_conf]), - ok = emqx_connector_test_helpers:stop_apps([emqx_bridge, emqx_resource, emqx_rule_engine]), - _ = application:stop(emqx_connector), + emqx_cth_suite:stop(Apps), os:unsetenv("PUBSUB_EMULATOR_HOST"), ok. @@ -1494,10 +1499,11 @@ t_pull_worker_death(Config) -> ok. t_async_worker_death_mid_pull(Config) -> - ct:timetrap({seconds, 120}), + ct:timetrap({seconds, 122}), [#{pubsub_topic := PubSubTopic}] = ?config(topic_mapping, Config), Payload = emqx_guid:to_hexstr(emqx_guid:gen()), ?check_trace( + #{timetrap => 120_000}, begin start_and_subscribe_mqtt(Config), @@ -1513,23 +1519,28 @@ t_async_worker_death_mid_pull(Config) -> #{?snk_kind := gcp_pubsub_consumer_worker_reply_delegator} ), spawn_link(fun() -> + ct:pal("will kill async workers"), ?tp_span( kill_async_worker, #{}, begin %% produce a message while worker is being killed Messages = [#{<<"data">> => Payload}], + ct:pal("publishing message"), pubsub_publish(Config, PubSubTopic, Messages), + ct:pal("published message"), AsyncWorkerPids = get_async_worker_pids(Config), emqx_utils:pmap( fun(AsyncWorkerPid) -> Ref = monitor(process, AsyncWorkerPid), - sys:terminate(AsyncWorkerPid, die), + ct:pal("killing pid ~p", [AsyncWorkerPid]), + sys:terminate(AsyncWorkerPid, die, 20_000), receive {'DOWN', Ref, process, AsyncWorkerPid, _} -> + ct:pal("killed pid ~p", [AsyncWorkerPid]), ok - after 500 -> ct:fail("async worker didn't die") + after 500 -> ct:fail("async worker ~p didn't die", [AsyncWorkerPid]) end, ok end, @@ -1538,7 +1549,8 @@ t_async_worker_death_mid_pull(Config) -> ok end - ) + ), + ct:pal("killed async workers") end), ?assertMatch(