Merge pull request #12006 from thalesmg/test-debug-flaky-test-r53-20231122
test(gcp_pubsub_consumer): fix flaky test
This commit is contained in:
commit
8e8d6d2192
|
@ -34,16 +34,22 @@ init_per_suite(Config) ->
|
|||
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
||||
case emqx_common_test_helpers:is_tcp_server_available(GCPEmulatorHost, GCPEmulatorPort) of
|
||||
true ->
|
||||
ok = emqx_common_test_helpers:start_apps([emqx_conf]),
|
||||
ok = emqx_connector_test_helpers:start_apps([
|
||||
emqx_resource, emqx_bridge, emqx_rule_engine
|
||||
]),
|
||||
{ok, _} = application:ensure_all_started(emqx_connector),
|
||||
Apps = emqx_cth_suite:start(
|
||||
[
|
||||
emqx,
|
||||
emqx_conf,
|
||||
emqx_bridge_gcp_pubsub,
|
||||
emqx_bridge,
|
||||
emqx_rule_engine
|
||||
],
|
||||
#{work_dir => emqx_cth_suite:work_dir(Config)}
|
||||
),
|
||||
emqx_mgmt_api_test_util:init_suite(),
|
||||
HostPort = GCPEmulatorHost ++ ":" ++ GCPEmulatorPortStr,
|
||||
true = os:putenv("PUBSUB_EMULATOR_HOST", HostPort),
|
||||
Client = start_control_client(),
|
||||
[
|
||||
{apps, Apps},
|
||||
{proxy_name, ProxyName},
|
||||
{proxy_host, ProxyHost},
|
||||
{proxy_port, ProxyPort},
|
||||
|
@ -62,12 +68,11 @@ init_per_suite(Config) ->
|
|||
end.
|
||||
|
||||
end_per_suite(Config) ->
|
||||
Apps = ?config(apps, Config),
|
||||
Client = ?config(client, Config),
|
||||
stop_control_client(Client),
|
||||
emqx_mgmt_api_test_util:end_suite(),
|
||||
ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
|
||||
ok = emqx_connector_test_helpers:stop_apps([emqx_bridge, emqx_resource, emqx_rule_engine]),
|
||||
_ = application:stop(emqx_connector),
|
||||
emqx_cth_suite:stop(Apps),
|
||||
os:unsetenv("PUBSUB_EMULATOR_HOST"),
|
||||
ok.
|
||||
|
||||
|
@ -1494,10 +1499,11 @@ t_pull_worker_death(Config) ->
|
|||
ok.
|
||||
|
||||
t_async_worker_death_mid_pull(Config) ->
|
||||
ct:timetrap({seconds, 120}),
|
||||
ct:timetrap({seconds, 122}),
|
||||
[#{pubsub_topic := PubSubTopic}] = ?config(topic_mapping, Config),
|
||||
Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
|
||||
?check_trace(
|
||||
#{timetrap => 120_000},
|
||||
begin
|
||||
start_and_subscribe_mqtt(Config),
|
||||
|
||||
|
@ -1513,23 +1519,28 @@ t_async_worker_death_mid_pull(Config) ->
|
|||
#{?snk_kind := gcp_pubsub_consumer_worker_reply_delegator}
|
||||
),
|
||||
spawn_link(fun() ->
|
||||
ct:pal("will kill async workers"),
|
||||
?tp_span(
|
||||
kill_async_worker,
|
||||
#{},
|
||||
begin
|
||||
%% produce a message while worker is being killed
|
||||
Messages = [#{<<"data">> => Payload}],
|
||||
ct:pal("publishing message"),
|
||||
pubsub_publish(Config, PubSubTopic, Messages),
|
||||
ct:pal("published message"),
|
||||
|
||||
AsyncWorkerPids = get_async_worker_pids(Config),
|
||||
emqx_utils:pmap(
|
||||
fun(AsyncWorkerPid) ->
|
||||
Ref = monitor(process, AsyncWorkerPid),
|
||||
sys:terminate(AsyncWorkerPid, die),
|
||||
ct:pal("killing pid ~p", [AsyncWorkerPid]),
|
||||
sys:terminate(AsyncWorkerPid, die, 20_000),
|
||||
receive
|
||||
{'DOWN', Ref, process, AsyncWorkerPid, _} ->
|
||||
ct:pal("killed pid ~p", [AsyncWorkerPid]),
|
||||
ok
|
||||
after 500 -> ct:fail("async worker didn't die")
|
||||
after 500 -> ct:fail("async worker ~p didn't die", [AsyncWorkerPid])
|
||||
end,
|
||||
ok
|
||||
end,
|
||||
|
@ -1538,7 +1549,8 @@ t_async_worker_death_mid_pull(Config) ->
|
|||
|
||||
ok
|
||||
end
|
||||
)
|
||||
),
|
||||
ct:pal("killed async workers")
|
||||
end),
|
||||
|
||||
?assertMatch(
|
||||
|
|
Loading…
Reference in New Issue