From 28d664bae231dd980a94019fdcae1a2932b4b3dc Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 27 Feb 2024 23:51:54 +0100 Subject: [PATCH] test(pulsar): simplify the test suite --- ...emqx_bridge_pulsar_impl_producer_SUITE.erl | 108 ++++++------------ 1 file changed, 34 insertions(+), 74 deletions(-) diff --git a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl index 704de0745..dfc5af3a7 100644 --- a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl @@ -14,7 +14,7 @@ -import(emqx_common_test_helpers, [on_exit/1]). -define(BRIDGE_TYPE_BIN, <<"pulsar_producer">>). --define(APPS, [emqx_resource, emqx_bridge, emqx_rule_engine, emqx_bridge_pulsar]). +-define(APPS, [emqx_conf, emqx_resource, emqx_bridge, emqx_rule_engine, emqx_bridge_pulsar]). -define(RULE_TOPIC, "mqtt/rule"). -define(RULE_TOPIC_BIN, <>). @@ -52,14 +52,27 @@ only_once_tests() -> ]. init_per_suite(Config) -> - Config. + %% Ensure enterprise bridge module is loaded + _ = emqx_bridge_enterprise:module_info(), + %% TODO + %% This is needed to ensure that filenames generated deep inside pulsar/replayq + %% will not exceed 256 characters, because replayq eventually turns them into atoms. + %% The downside is increased risk of accidental name clashes / testsuite interference. + {ok, Cwd} = file:get_cwd(), + PrivDir = ?config(priv_dir, Config), + WorkDir = emqx_utils_fs:find_relpath(filename:join(PrivDir, "ebp"), Cwd), + Apps = emqx_cth_suite:start( + lists:flatten([ + ?APPS, + emqx_management, + emqx_mgmt_api_test_util:emqx_dashboard() + ]), + #{work_dir => WorkDir} + ), + [{suite_apps, Apps} | Config]. -end_per_suite(_Config) -> - emqx_mgmt_api_test_util:end_suite(), - ok = emqx_common_test_helpers:stop_apps([emqx_conf]), - ok = emqx_connector_test_helpers:stop_apps(lists:reverse(?APPS)), - _ = application:stop(emqx_connector), - ok. +end_per_suite(Config) -> + ok = emqx_cth_suite:stop(?config(suite_apps, Config)). init_per_group(plain = Type, Config) -> PulsarHost = os:getenv("PULSAR_PLAIN_HOST", "toxiproxy"), @@ -123,13 +136,6 @@ common_init_per_group() -> ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"), ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), - %% Ensure enterprise bridge module is loaded - ok = emqx_common_test_helpers:start_apps([emqx_conf]), - ok = emqx_common_test_helpers:start_apps(?APPS), - {ok, _} = application:ensure_all_started(pulsar), - _ = emqx_bridge_enterprise:module_info(), - {ok, _} = application:ensure_all_started(emqx_connector), - emqx_mgmt_api_test_util:init_suite(), UniqueNum = integer_to_binary(erlang:unique_integer()), MQTTTopic = <<"mqtt/topic/", UniqueNum/binary>>, [ @@ -210,9 +216,7 @@ pulsar_config(TestCase, _PulsarType, Config) -> PulsarTopic = ?config(pulsar_topic, Config), AuthType = proplists:get_value(sasl_auth_mechanism, Config, none), UseTLS = proplists:get_value(use_tls, Config, false), - Name = << - (atom_to_binary(TestCase))/binary, UniqueNum/binary - >>, + Name = atom_to_binary(TestCase), MQTTTopic = proplists:get_value(mqtt_topic, Config, <<"mqtt/topic/", UniqueNum/binary>>), Prefix = case UseTLS of @@ -508,51 +512,18 @@ try_decode_json(Payload) -> end. cluster(Config) -> - PrivDataDir = ?config(priv_dir, Config), - Cluster = emqx_common_test_helpers:emqx_cluster( - [core, core], + Apps = [ + {emqx, #{override_env => [{boot_modules, [broker]}]}} + | ?APPS + ], + Nodes = emqx_cth_cluster:start( [ - {apps, [emqx_conf] ++ ?APPS ++ [pulsar]}, - {listener_ports, []}, - {priv_data_dir, PrivDataDir}, - {load_schema, true}, - {start_autocluster, true}, - {schema_mod, emqx_enterprise_schema}, - {env_handler, fun - (emqx) -> - application:set_env(emqx, boot_modules, [broker]), - ok; - (emqx_conf) -> - ok; - (_) -> - ok - end} - ] - ), - ct:pal("cluster: ~p", [Cluster]), - Cluster. - -start_cluster(Cluster) -> - Nodes = - [ - emqx_common_test_helpers:start_peer(Name, Opts) - || {Name, Opts} <- Cluster + {emqx_bridge_pulsar_impl_producer1, #{apps => Apps}}, + {emqx_bridge_pulsar_impl_producer2, #{apps => Apps}} ], - NumNodes = length(Nodes), - on_exit(fun() -> - emqx_utils:pmap( - fun(N) -> - ct:pal("stopping ~p", [N]), - ok = emqx_common_test_helpers:stop_peer(N) - end, - Nodes - ) - end), - {ok, _} = snabbkaffe:block_until( - %% -1 because only those that join the first node will emit the event. - ?match_n_events(NumNodes - 1, #{?snk_kind := emqx_machine_boot_apps_started}), - 30_000 + #{work_dir => emqx_cth_suite:work_dir(Config)} ), + ok = on_exit(fun() -> emqx_cth_cluster:stop(Nodes) end), Nodes. kill_resource_managers() -> @@ -1105,24 +1076,13 @@ do_t_cluster(Config) -> begin MQTTTopic = ?config(mqtt_topic, Config), ResourceId = resource_id(Config), - Cluster = cluster(Config), + Nodes = [N1, N2 | _] = cluster(Config), ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), QoS = 0, Payload = emqx_guid:to_hexstr(emqx_guid:gen()), - NumNodes = length(Cluster), - {ok, SRef0} = snabbkaffe:subscribe( - ?match_event(#{?snk_kind := emqx_bridge_app_started}), - NumNodes, - 25_000 - ), - Nodes = [N1, N2 | _] = start_cluster(Cluster), - %% wait until bridge app supervisor is up; by that point, - %% `emqx_config_handler:add_handler' has been called and the node should be - %% ready to create bridges. - {ok, _} = snabbkaffe:receive_events(SRef0), {ok, SRef1} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := pulsar_producer_bridge_started}), - NumNodes, + length(Nodes), 25_000 ), {ok, _} = erpc:call(N1, fun() -> create_bridge(Config) end), @@ -1130,7 +1090,7 @@ do_t_cluster(Config) -> erpc:multicall(Nodes, fun wait_until_producer_connected/0), {ok, _} = snabbkaffe:block_until( ?match_n_events( - NumNodes, + length(Nodes), #{?snk_kind := bridge_post_config_update_done} ), 25_000