test(pulsar): simplify the test suite

This commit is contained in:
Andrew Mayorov 2024-02-27 23:51:54 +01:00
parent 367ffa8e80
commit 28d664bae2
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
1 changed files with 34 additions and 74 deletions

View File

@ -14,7 +14,7 @@
-import(emqx_common_test_helpers, [on_exit/1]).
-define(BRIDGE_TYPE_BIN, <<"pulsar_producer">>).
-define(APPS, [emqx_resource, emqx_bridge, emqx_rule_engine, emqx_bridge_pulsar]).
-define(APPS, [emqx_conf, emqx_resource, emqx_bridge, emqx_rule_engine, emqx_bridge_pulsar]).
-define(RULE_TOPIC, "mqtt/rule").
-define(RULE_TOPIC_BIN, <<?RULE_TOPIC>>).
@ -52,14 +52,27 @@ only_once_tests() ->
].
init_per_suite(Config) ->
Config.
%% Ensure enterprise bridge module is loaded
_ = emqx_bridge_enterprise:module_info(),
%% TODO
%% This is needed to ensure that filenames generated deep inside pulsar/replayq
%% will not exceed 256 characters, because replayq eventually turns them into atoms.
%% The downside is increased risk of accidental name clashes / testsuite interference.
{ok, Cwd} = file:get_cwd(),
PrivDir = ?config(priv_dir, Config),
WorkDir = emqx_utils_fs:find_relpath(filename:join(PrivDir, "ebp"), Cwd),
Apps = emqx_cth_suite:start(
lists:flatten([
?APPS,
emqx_management,
emqx_mgmt_api_test_util:emqx_dashboard()
]),
#{work_dir => WorkDir}
),
[{suite_apps, Apps} | Config].
end_per_suite(_Config) ->
emqx_mgmt_api_test_util:end_suite(),
ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
ok = emqx_connector_test_helpers:stop_apps(lists:reverse(?APPS)),
_ = application:stop(emqx_connector),
ok.
end_per_suite(Config) ->
ok = emqx_cth_suite:stop(?config(suite_apps, Config)).
init_per_group(plain = Type, Config) ->
PulsarHost = os:getenv("PULSAR_PLAIN_HOST", "toxiproxy"),
@ -123,13 +136,6 @@ common_init_per_group() ->
ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
%% Ensure enterprise bridge module is loaded
ok = emqx_common_test_helpers:start_apps([emqx_conf]),
ok = emqx_common_test_helpers:start_apps(?APPS),
{ok, _} = application:ensure_all_started(pulsar),
_ = emqx_bridge_enterprise:module_info(),
{ok, _} = application:ensure_all_started(emqx_connector),
emqx_mgmt_api_test_util:init_suite(),
UniqueNum = integer_to_binary(erlang:unique_integer()),
MQTTTopic = <<"mqtt/topic/", UniqueNum/binary>>,
[
@ -210,9 +216,7 @@ pulsar_config(TestCase, _PulsarType, Config) ->
PulsarTopic = ?config(pulsar_topic, Config),
AuthType = proplists:get_value(sasl_auth_mechanism, Config, none),
UseTLS = proplists:get_value(use_tls, Config, false),
Name = <<
(atom_to_binary(TestCase))/binary, UniqueNum/binary
>>,
Name = atom_to_binary(TestCase),
MQTTTopic = proplists:get_value(mqtt_topic, Config, <<"mqtt/topic/", UniqueNum/binary>>),
Prefix =
case UseTLS of
@ -508,51 +512,18 @@ try_decode_json(Payload) ->
end.
cluster(Config) ->
PrivDataDir = ?config(priv_dir, Config),
Cluster = emqx_common_test_helpers:emqx_cluster(
[core, core],
Apps = [
{emqx, #{override_env => [{boot_modules, [broker]}]}}
| ?APPS
],
Nodes = emqx_cth_cluster:start(
[
{apps, [emqx_conf] ++ ?APPS ++ [pulsar]},
{listener_ports, []},
{priv_data_dir, PrivDataDir},
{load_schema, true},
{start_autocluster, true},
{schema_mod, emqx_enterprise_schema},
{env_handler, fun
(emqx) ->
application:set_env(emqx, boot_modules, [broker]),
ok;
(emqx_conf) ->
ok;
(_) ->
ok
end}
]
),
ct:pal("cluster: ~p", [Cluster]),
Cluster.
start_cluster(Cluster) ->
Nodes =
[
emqx_common_test_helpers:start_peer(Name, Opts)
|| {Name, Opts} <- Cluster
{emqx_bridge_pulsar_impl_producer1, #{apps => Apps}},
{emqx_bridge_pulsar_impl_producer2, #{apps => Apps}}
],
NumNodes = length(Nodes),
on_exit(fun() ->
emqx_utils:pmap(
fun(N) ->
ct:pal("stopping ~p", [N]),
ok = emqx_common_test_helpers:stop_peer(N)
end,
Nodes
)
end),
{ok, _} = snabbkaffe:block_until(
%% -1 because only those that join the first node will emit the event.
?match_n_events(NumNodes - 1, #{?snk_kind := emqx_machine_boot_apps_started}),
30_000
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
ok = on_exit(fun() -> emqx_cth_cluster:stop(Nodes) end),
Nodes.
kill_resource_managers() ->
@ -1105,24 +1076,13 @@ do_t_cluster(Config) ->
begin
MQTTTopic = ?config(mqtt_topic, Config),
ResourceId = resource_id(Config),
Cluster = cluster(Config),
Nodes = [N1, N2 | _] = cluster(Config),
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
QoS = 0,
Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
NumNodes = length(Cluster),
{ok, SRef0} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := emqx_bridge_app_started}),
NumNodes,
25_000
),
Nodes = [N1, N2 | _] = start_cluster(Cluster),
%% wait until bridge app supervisor is up; by that point,
%% `emqx_config_handler:add_handler' has been called and the node should be
%% ready to create bridges.
{ok, _} = snabbkaffe:receive_events(SRef0),
{ok, SRef1} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := pulsar_producer_bridge_started}),
NumNodes,
length(Nodes),
25_000
),
{ok, _} = erpc:call(N1, fun() -> create_bridge(Config) end),
@ -1130,7 +1090,7 @@ do_t_cluster(Config) ->
erpc:multicall(Nodes, fun wait_until_producer_connected/0),
{ok, _} = snabbkaffe:block_until(
?match_n_events(
NumNodes,
length(Nodes),
#{?snk_kind := bridge_post_config_update_done}
),
25_000