From 4ecfe2be30a339369ae2c1f202a4aa7f9acc34dc Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 29 Nov 2023 14:56:26 +0100 Subject: [PATCH] test: use peer module for slave and ct_slave --- .../emqx_persistent_session_ds_SUITE.erl | 3 - apps/emqx/test/emqx_common_test_helpers.erl | 29 ++---- apps/emqx/test/emqx_cth_cluster.erl | 94 +++++++++---------- apps/emqx/test/emqx_cth_peer.erl | 79 ++++++++++++++++ apps/emqx/test/emqx_mountpoint_SUITE.erl | 5 - .../test/emqx_persistent_messages_SUITE.erl | 4 +- apps/emqx/test/emqx_routing_SUITE.erl | 65 +++++++------ apps/emqx/test/emqx_shared_sub_SUITE.erl | 27 ++---- .../test/emqx_telemetry_SUITE.erl | 2 +- 9 files changed, 181 insertions(+), 127 deletions(-) create mode 100644 apps/emqx/test/emqx_cth_peer.erl diff --git a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl index 05c1eb8f2..265ec02b9 100644 --- a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl +++ b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl @@ -132,9 +132,6 @@ restart_node(Node, NodeSpec) -> Apps = maps:get(apps, NodeSpec), ok = erpc:call(Node, emqx_cth_suite, load_apps, [Apps]), _ = erpc:call(Node, emqx_cth_suite, start_apps, [Apps, NodeSpec]), - %% have to re-inject this so that we may stop the node succesfully at the - %% end.... - ok = emqx_cth_cluster:set_node_opts(Node, NodeSpec), ok = snabbkaffe:forward_trace(Node), ?tp(notice, "node restarted", #{node => Node}), ?tp(restarted_node, #{}), diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl index 4671851f8..c97c72640 100644 --- a/apps/emqx/test/emqx_common_test_helpers.erl +++ b/apps/emqx/test/emqx_common_test_helpers.erl @@ -753,24 +753,15 @@ start_slave(Name, Opts) when is_map(Opts) -> case SlaveMod of ct_slave -> ct:pal("~p: node data dir: ~s", [Node, NodeDataDir]), - ct_slave:start( - Node, - [ - {kill_if_fail, true}, - {monitor_master, true}, - {init_timeout, 20_000}, - {startup_timeout, 20_000}, - {erl_flags, erl_flags()}, - {env, [ - {"HOCON_ENV_OVERRIDE_PREFIX", "EMQX_"}, - {"EMQX_NODE__COOKIE", Cookie}, - {"EMQX_NODE__DATA_DIR", NodeDataDir} - ]} - ] - ); + Envs = [ + {"HOCON_ENV_OVERRIDE_PREFIX", "EMQX_"}, + {"EMQX_NODE__COOKIE", Cookie}, + {"EMQX_NODE__DATA_DIR", NodeDataDir} + ], + emqx_cth_peer:start(Node, erl_flags(), Envs); slave -> - Env = " -env HOCON_ENV_OVERRIDE_PREFIX EMQX_", - slave:start_link(host(), Name, ebin_path() ++ Env) + Envs = [{"HOCON_ENV_OVERRIDE_PREFIX", "EMQX_"}], + emqx_cth_peer:start(Node, ebin_path(), Envs) end end, case DoStart() of @@ -1023,10 +1014,10 @@ set_envs(Node, Env) -> erl_flags() -> %% One core and redirecting logs to master - "+S 1:1 -master " ++ atom_to_list(node()) ++ " " ++ ebin_path(). + ["+S", "1:1", "-master", atom_to_list(node())] ++ ebin_path(). ebin_path() -> - string:join(["-pa" | lists:filter(fun is_lib/1, code:get_path())], " "). + ["-pa" | lists:filter(fun is_lib/1, code:get_path())]. is_lib(Path) -> string:prefix(Path, code:lib_dir()) =:= nomatch andalso diff --git a/apps/emqx/test/emqx_cth_cluster.erl b/apps/emqx/test/emqx_cth_cluster.erl index b41586518..cbe21cbed 100644 --- a/apps/emqx/test/emqx_cth_cluster.erl +++ b/apps/emqx/test/emqx_cth_cluster.erl @@ -45,7 +45,7 @@ -export([share_load_module/2]). -export([node_name/1, mk_nodespecs/2]). --export([start_apps/2, set_node_opts/2]). +-export([start_apps/2]). -define(APPS_CLUSTERING, [gen_rpc, mria, ekka]). @@ -111,7 +111,7 @@ start(Nodes, ClusterOpts) -> NodeSpecs = mk_nodespecs(Nodes, ClusterOpts), ct:pal("Starting cluster:\n ~p", [NodeSpecs]), % 1. Start bare nodes with only basic applications running - _ = emqx_utils:pmap(fun start_node_init/1, NodeSpecs, ?TIMEOUT_NODE_START_MS), + ok = start_nodes_init(NodeSpecs, ?TIMEOUT_NODE_START_MS), % 2. Start applications needed to enable clustering % Generally, this causes some applications to restart, but we deliberately don't % start them yet. @@ -282,8 +282,41 @@ allocate_listener_port(Type, #{base_port := BasePort}) -> allocate_listener_ports(Types, Spec) -> lists:foldl(fun maps:merge/2, #{}, [allocate_listener_port(Type, Spec) || Type <- Types]). -start_node_init(Spec = #{name := Node}) -> - Node = start_bare_node(Node, Spec), +start_nodes_init(Specs, Timeout) -> + Args = erl_flags(), + Envs = [], + Waits = lists:map( + fun(#{name := NodeName}) -> + WaitTag = {boot_complete, make_ref()}, + WaitBoot = {self(), WaitTag}, + {ok, NodeName} = emqx_cth_peer:start(NodeName, Args, Envs, WaitBoot), + WaitTag + end, + Specs + ), + Deadline = erlang:monotonic_time() + erlang:convert_time_unit(Timeout, millisecond, nanosecond), + ok = wait_boot_complete(Waits, Deadline), + lists:foreach(fun(#{name := Node}) -> node_init(Node) end, Specs). + +wait_boot_complete([], _) -> + ok; +wait_boot_complete(Waits, Deadline) -> + case erlang:monotonic_time() > Deadline of + true -> + error({timeout, Waits}); + false -> + ok + end, + receive + {{boot_complete, _Ref} = Wait, {started, _NodeName, _Pid}} -> + wait_boot_complete(Waits -- [Wait], Deadline); + {{boot_complete, _Ref}, Otherwise} -> + error({unexpected, Otherwise}) + after 100 -> + wait_boot_complete(Waits, Deadline) + end. + +node_init(Node) -> % Make it possible to call `ct:pal` and friends (if running under rebar3) _ = share_load_module(Node, cthr), % Enable snabbkaffe trace forwarding @@ -300,12 +333,6 @@ run_node_phase_apps(Spec = #{name := Node}) -> ok = start_apps(Node, Spec), ok. -set_node_opts(Node, Spec) -> - erpc:call(Node, persistent_term, put, [{?MODULE, opts}, Spec]). - -get_node_opts(Node) -> - erpc:call(Node, persistent_term, get, [{?MODULE, opts}]). - load_apps(Node, #{apps := Apps}) -> erpc:call(Node, emqx_cth_suite, load_apps, [Apps]). @@ -352,23 +379,7 @@ stop(Nodes) -> stop_node(Name) -> Node = node_name(Name), - try get_node_opts(Node) of - Opts -> - stop_node(Name, Opts) - catch - error:{erpc, _} -> - ok - end. - -stop_node(Node, #{driver := ct_slave}) -> - case ct_slave:stop(Node, [{stop_timeout, ?TIMEOUT_NODE_STOP_S}]) of - {ok, _} -> - ok; - {error, Reason, _} when Reason == not_connected; Reason == not_started -> - ok - end; -stop_node(Node, #{driver := slave}) -> - slave:stop(Node). + ok = emqx_cth_peer:stop(Node). %% Ports @@ -392,35 +403,22 @@ listener_port(BasePort, wss) -> %% -spec start_bare_node(atom(), map()) -> node(). -start_bare_node(Name, Spec = #{driver := ct_slave}) -> - {ok, Node} = ct_slave:start( - node_name(Name), - [ - {kill_if_fail, true}, - {monitor_master, true}, - {init_timeout, 20_000}, - {startup_timeout, 20_000}, - {erl_flags, erl_flags()}, - {env, []} - ] - ), - init_bare_node(Node, Spec); -start_bare_node(Name, Spec = #{driver := slave}) -> - {ok, Node} = slave:start_link(host(), Name, ebin_path()), - init_bare_node(Node, Spec). +start_bare_node(Name, Spec) -> + Args = erl_flags(), + Envs = [], + {ok, NodeName} = emqx_cth_peer:start(Name, Args, Envs, ?TIMEOUT_NODE_START_MS), + init_bare_node(NodeName, Spec). -init_bare_node(Node, Spec) -> +init_bare_node(Node, _Spec) -> pong = net_adm:ping(Node), - % Preserve node spec right on the remote node - ok = set_node_opts(Node, Spec), Node. erl_flags() -> %% One core and redirecting logs to master - "+S 1:1 -master " ++ atom_to_list(node()) ++ " " ++ ebin_path(). + ["+S", "1:1", "-master", atom_to_list(node())] ++ ebin_path(). ebin_path() -> - string:join(["-pa" | lists:filter(fun is_lib/1, code:get_path())], " "). + ["-pa" | lists:filter(fun is_lib/1, code:get_path())]. is_lib(Path) -> string:prefix(Path, code:lib_dir()) =:= nomatch andalso diff --git a/apps/emqx/test/emqx_cth_peer.erl b/apps/emqx/test/emqx_cth_peer.erl new file mode 100644 index 000000000..8b1996cbd --- /dev/null +++ b/apps/emqx/test/emqx_cth_peer.erl @@ -0,0 +1,79 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc Common Test Helper proxy module for slave -> peer migration. +%% OTP 26 has slave module deprecated, use peer instead. + +-module(emqx_cth_peer). + +-export([start/2, start/3, start/4]). +-export([start_link/2, start_link/3, start_link/4]). +-export([stop/1]). + +start(Name, Args) -> + start(Name, Args, []). + +start(Name, Args, Envs) -> + start(Name, Args, Envs, timer:seconds(20)). + +start(Name, Args, Envs, Timeout) when is_atom(Name) -> + do_start(Name, Args, Envs, Timeout, start). + +start_link(Name, Args) -> + start_link(Name, Args, []). + +start_link(Name, Args, Envs) -> + start_link(Name, Args, Envs, timer:seconds(20)). + +start_link(Name, Args, Envs, Timeout) when is_atom(Name) -> + do_start(Name, Args, Envs, Timeout, start_link). + +do_start(Name0, Args, Envs, Timeout, Func) when is_atom(Name0) -> + {Name, Host} = parse_node_name(Name0), + {ok, Pid, Node} = peer:Func(#{ + name => Name, + host => Host, + args => Args, + env => Envs, + wait_boot => Timeout, + longnames => true, + shutdown => {halt, 1000} + }), + true = register(Node, Pid), + {ok, Node}. + +stop(Node) when is_atom(Node) -> + Pid = whereis(Node), + case is_pid(Pid) of + true -> + unlink(Pid), + ok = peer:stop(Pid); + false -> + ct:pal("The control process for node ~p is unexpetedly down", [Node]), + ok + end. + +parse_node_name(NodeName) -> + case string:tokens(atom_to_list(NodeName), "@") of + [Name, Host] -> + {list_to_atom(Name), Host}; + [_] -> + {NodeName, host()} + end. + +host() -> + [_Name, Host] = string:tokens(atom_to_list(node()), "@"), + Host. diff --git a/apps/emqx/test/emqx_mountpoint_SUITE.erl b/apps/emqx/test/emqx_mountpoint_SUITE.erl index 0bfde981c..1d9539409 100644 --- a/apps/emqx/test/emqx_mountpoint_SUITE.erl +++ b/apps/emqx/test/emqx_mountpoint_SUITE.erl @@ -58,9 +58,6 @@ t_mount_share(_) -> TopicFilters = [T], ?assertEqual(TopicFilter, #share{group = <<"group">>, topic = <<"topic">>}), - %% should not mount share topic when make message. - Msg = emqx_message:make(<<"clientid">>, TopicFilter, <<"payload">>), - ?assertEqual( TopicFilter, mount(undefined, TopicFilter) @@ -89,8 +86,6 @@ t_unmount_share(_) -> ?assertEqual(TopicFilter, #share{group = <<"group">>, topic = <<"topic">>}), - %% should not unmount share topic when make message. - Msg = emqx_message:make(<<"clientid">>, TopicFilter, <<"payload">>), ?assertEqual( TopicFilter, unmount(undefined, TopicFilter) diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index f8f7baaf1..ea5f9f7bc 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -233,7 +233,7 @@ t_session_subscription_iterators(Config) -> ), ok. -t_qos0(Config) -> +t_qos0(_Config) -> Sub = connect(<>, true, 30), Pub = connect(<>, true, 0), try @@ -258,7 +258,7 @@ t_qos0(Config) -> emqtt:stop(Pub) end. -t_publish_as_persistent(Config) -> +t_publish_as_persistent(_Config) -> Sub = connect(<>, true, 30), Pub = connect(<>, true, 30), try diff --git a/apps/emqx/test/emqx_routing_SUITE.erl b/apps/emqx/test/emqx_routing_SUITE.erl index a54e1b4dd..c9ad63cf1 100644 --- a/apps/emqx/test/emqx_routing_SUITE.erl +++ b/apps/emqx/test/emqx_routing_SUITE.erl @@ -218,38 +218,41 @@ t_routing_schema_switch(VFrom, VTo, Config) -> ], #{work_dir => WorkDir} ), - % Verify that new nodes switched to schema v1/v2 in presence of v1/v2 routes respectively Nodes = [Node1, Node2, Node3], - ?assertEqual( - [{ok, VTo}, {ok, VTo}, {ok, VTo}], - erpc:multicall(Nodes, emqx_router, get_schema_vsn, []) - ), - % Wait for all nodes to agree on cluster state - ?retry( - 500, - 10, - ?assertMatch( - [{ok, [Node1, Node2, Node3]}], - lists:usort(erpc:multicall(Nodes, emqx, running_nodes, [])) - ) - ), - % Verify that routing works as expected - C2 = start_client(Node2), - ok = subscribe(C2, <<"a/+/d">>), - C3 = start_client(Node3), - ok = subscribe(C3, <<"d/e/f/#">>), - {ok, _} = publish(C1, <<"a/b/d">>, <<"hey-newbies">>), - {ok, _} = publish(C2, <<"a/b/c">>, <<"hi">>), - {ok, _} = publish(C3, <<"d/e/f/42">>, <<"hello">>), - ?assertReceive({pub, C2, #{topic := <<"a/b/d">>, payload := <<"hey-newbies">>}}), - ?assertReceive({pub, C1, #{topic := <<"a/b/c">>, payload := <<"hi">>}}), - ?assertReceive({pub, C1, #{topic := <<"d/e/f/42">>, payload := <<"hello">>}}), - ?assertReceive({pub, C3, #{topic := <<"d/e/f/42">>, payload := <<"hello">>}}), - ?assertNotReceive(_), - ok = emqtt:stop(C1), - ok = emqtt:stop(C2), - ok = emqtt:stop(C3), - ok = emqx_cth_cluster:stop(Nodes). + try + % Verify that new nodes switched to schema v1/v2 in presence of v1/v2 routes respectively + ?assertEqual( + [{ok, VTo}, {ok, VTo}, {ok, VTo}], + erpc:multicall(Nodes, emqx_router, get_schema_vsn, []) + ), + % Wait for all nodes to agree on cluster state + ?retry( + 500, + 10, + ?assertMatch( + [{ok, [Node1, Node2, Node3]}], + lists:usort(erpc:multicall(Nodes, emqx, running_nodes, [])) + ) + ), + % Verify that routing works as expected + C2 = start_client(Node2), + ok = subscribe(C2, <<"a/+/d">>), + C3 = start_client(Node3), + ok = subscribe(C3, <<"d/e/f/#">>), + {ok, _} = publish(C1, <<"a/b/d">>, <<"hey-newbies">>), + {ok, _} = publish(C2, <<"a/b/c">>, <<"hi">>), + {ok, _} = publish(C3, <<"d/e/f/42">>, <<"hello">>), + ?assertReceive({pub, C2, #{topic := <<"a/b/d">>, payload := <<"hey-newbies">>}}), + ?assertReceive({pub, C1, #{topic := <<"a/b/c">>, payload := <<"hi">>}}), + ?assertReceive({pub, C1, #{topic := <<"d/e/f/42">>, payload := <<"hello">>}}), + ?assertReceive({pub, C3, #{topic := <<"d/e/f/42">>, payload := <<"hello">>}}), + ?assertNotReceive(_), + ok = emqtt:stop(C1), + ok = emqtt:stop(C2), + ok = emqtt:stop(C3) + after + ok = emqx_cth_cluster:stop(Nodes) + end. %% diff --git a/apps/emqx/test/emqx_shared_sub_SUITE.erl b/apps/emqx/test/emqx_shared_sub_SUITE.erl index 86887eff0..cc6908fb6 100644 --- a/apps/emqx/test/emqx_shared_sub_SUITE.erl +++ b/apps/emqx/test/emqx_shared_sub_SUITE.erl @@ -63,6 +63,7 @@ init_per_suite(Config) -> end, emqx_common_test_helpers:boot_modules(all), emqx_common_test_helpers:start_apps([]), + emqx_logger:set_log_level(debug), [{dist_pid, DistPid} | Config]. end_per_suite(Config) -> @@ -574,7 +575,7 @@ t_local(Config) when is_list(Config) -> <<"sticky_group">> => sticky }, - Node = start_slave('local_shared_sub_testtesttest', 21999), + Node = start_slave('local_shared_sub_local_1', 21999), ok = ensure_group_config(GroupConfig), ok = ensure_group_config(Node, GroupConfig), @@ -627,7 +628,7 @@ t_remote(Config) when is_list(Config) -> <<"sticky_group">> => sticky }, - Node = start_slave('remote_shared_sub_testtesttest', 21999), + Node = start_slave('remote_shared_sub_remote_1', 21999), ok = ensure_group_config(GroupConfig), ok = ensure_group_config(Node, GroupConfig), @@ -676,7 +677,7 @@ t_local_fallback(Config) when is_list(Config) -> Topic = <<"local_foo/bar">>, ClientId1 = <<"ClientId1">>, ClientId2 = <<"ClientId2">>, - Node = start_slave('local_fallback_shared_sub_test', 11888), + Node = start_slave('local_fallback_shared_sub_1', 11888), {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]), {ok, _} = emqtt:connect(ConnPid1), @@ -1253,34 +1254,24 @@ recv_msgs(Count, Msgs) -> end. start_slave(Name, Port) -> - {ok, Node} = ct_slave:start( - list_to_atom(atom_to_list(Name) ++ "@" ++ host()), - [ - {kill_if_fail, true}, - {monitor_master, true}, - {init_timeout, 10000}, - {startup_timeout, 10000}, - {erl_flags, ebin_path()} - ] + {ok, Node} = emqx_cth_peer:start_link( + Name, + ebin_path() ), - pong = net_adm:ping(Node), setup_node(Node, Port), Node. stop_slave(Node) -> rpc:call(Node, mria, leave, []), - ct_slave:stop(Node). + emqx_cth_peer:stop(Node). host() -> [_, Host] = string:tokens(atom_to_list(node()), "@"), Host. ebin_path() -> - string:join(["-pa" | lists:filter(fun is_lib/1, code:get_path())], " "). - -is_lib(Path) -> - string:prefix(Path, code:lib_dir()) =:= nomatch. + ["-pa" | code:get_path()]. setup_node(Node, Port) -> EnvHandler = diff --git a/apps/emqx_telemetry/test/emqx_telemetry_SUITE.erl b/apps/emqx_telemetry/test/emqx_telemetry_SUITE.erl index 07cb18e60..f1da349eb 100644 --- a/apps/emqx_telemetry/test/emqx_telemetry_SUITE.erl +++ b/apps/emqx_telemetry/test/emqx_telemetry_SUITE.erl @@ -869,7 +869,7 @@ stop_slave(Node) -> % This line don't work!! %emqx_cluster_rpc:fast_forward_to_commit(Node, 100), rpc:call(Node, ?MODULE, leave_cluster, []), - ok = slave:stop(Node), + ok = emqx_cth_peer:stop(Node), ?assertEqual([node()], mria:running_nodes()), ?assertEqual([], nodes()), _ = application:stop(mria),