test: use peer module for slave and ct_slave

This commit is contained in:
Zaiming (Stone) Shi 2023-11-29 14:56:26 +01:00
parent 6f35f25163
commit 4ecfe2be30
9 changed files with 181 additions and 127 deletions

View File

@ -132,9 +132,6 @@ restart_node(Node, NodeSpec) ->
Apps = maps:get(apps, NodeSpec),
ok = erpc:call(Node, emqx_cth_suite, load_apps, [Apps]),
_ = erpc:call(Node, emqx_cth_suite, start_apps, [Apps, NodeSpec]),
%% have to re-inject this so that we may stop the node succesfully at the
%% end....
ok = emqx_cth_cluster:set_node_opts(Node, NodeSpec),
ok = snabbkaffe:forward_trace(Node),
?tp(notice, "node restarted", #{node => Node}),
?tp(restarted_node, #{}),

View File

@ -753,24 +753,15 @@ start_slave(Name, Opts) when is_map(Opts) ->
case SlaveMod of
ct_slave ->
ct:pal("~p: node data dir: ~s", [Node, NodeDataDir]),
ct_slave:start(
Node,
[
{kill_if_fail, true},
{monitor_master, true},
{init_timeout, 20_000},
{startup_timeout, 20_000},
{erl_flags, erl_flags()},
{env, [
{"HOCON_ENV_OVERRIDE_PREFIX", "EMQX_"},
{"EMQX_NODE__COOKIE", Cookie},
{"EMQX_NODE__DATA_DIR", NodeDataDir}
]}
]
);
Envs = [
{"HOCON_ENV_OVERRIDE_PREFIX", "EMQX_"},
{"EMQX_NODE__COOKIE", Cookie},
{"EMQX_NODE__DATA_DIR", NodeDataDir}
],
emqx_cth_peer:start(Node, erl_flags(), Envs);
slave ->
Env = " -env HOCON_ENV_OVERRIDE_PREFIX EMQX_",
slave:start_link(host(), Name, ebin_path() ++ Env)
Envs = [{"HOCON_ENV_OVERRIDE_PREFIX", "EMQX_"}],
emqx_cth_peer:start(Node, ebin_path(), Envs)
end
end,
case DoStart() of
@ -1023,10 +1014,10 @@ set_envs(Node, Env) ->
erl_flags() ->
%% One core and redirecting logs to master
"+S 1:1 -master " ++ atom_to_list(node()) ++ " " ++ ebin_path().
["+S", "1:1", "-master", atom_to_list(node())] ++ ebin_path().
ebin_path() ->
string:join(["-pa" | lists:filter(fun is_lib/1, code:get_path())], " ").
["-pa" | lists:filter(fun is_lib/1, code:get_path())].
is_lib(Path) ->
string:prefix(Path, code:lib_dir()) =:= nomatch andalso

View File

@ -45,7 +45,7 @@
-export([share_load_module/2]).
-export([node_name/1, mk_nodespecs/2]).
-export([start_apps/2, set_node_opts/2]).
-export([start_apps/2]).
-define(APPS_CLUSTERING, [gen_rpc, mria, ekka]).
@ -111,7 +111,7 @@ start(Nodes, ClusterOpts) ->
NodeSpecs = mk_nodespecs(Nodes, ClusterOpts),
ct:pal("Starting cluster:\n ~p", [NodeSpecs]),
% 1. Start bare nodes with only basic applications running
_ = emqx_utils:pmap(fun start_node_init/1, NodeSpecs, ?TIMEOUT_NODE_START_MS),
ok = start_nodes_init(NodeSpecs, ?TIMEOUT_NODE_START_MS),
% 2. Start applications needed to enable clustering
% Generally, this causes some applications to restart, but we deliberately don't
% start them yet.
@ -282,8 +282,41 @@ allocate_listener_port(Type, #{base_port := BasePort}) ->
allocate_listener_ports(Types, Spec) ->
lists:foldl(fun maps:merge/2, #{}, [allocate_listener_port(Type, Spec) || Type <- Types]).
start_node_init(Spec = #{name := Node}) ->
Node = start_bare_node(Node, Spec),
start_nodes_init(Specs, Timeout) ->
Args = erl_flags(),
Envs = [],
Waits = lists:map(
fun(#{name := NodeName}) ->
WaitTag = {boot_complete, make_ref()},
WaitBoot = {self(), WaitTag},
{ok, NodeName} = emqx_cth_peer:start(NodeName, Args, Envs, WaitBoot),
WaitTag
end,
Specs
),
Deadline = erlang:monotonic_time() + erlang:convert_time_unit(Timeout, millisecond, nanosecond),
ok = wait_boot_complete(Waits, Deadline),
lists:foreach(fun(#{name := Node}) -> node_init(Node) end, Specs).
wait_boot_complete([], _) ->
ok;
wait_boot_complete(Waits, Deadline) ->
case erlang:monotonic_time() > Deadline of
true ->
error({timeout, Waits});
false ->
ok
end,
receive
{{boot_complete, _Ref} = Wait, {started, _NodeName, _Pid}} ->
wait_boot_complete(Waits -- [Wait], Deadline);
{{boot_complete, _Ref}, Otherwise} ->
error({unexpected, Otherwise})
after 100 ->
wait_boot_complete(Waits, Deadline)
end.
node_init(Node) ->
% Make it possible to call `ct:pal` and friends (if running under rebar3)
_ = share_load_module(Node, cthr),
% Enable snabbkaffe trace forwarding
@ -300,12 +333,6 @@ run_node_phase_apps(Spec = #{name := Node}) ->
ok = start_apps(Node, Spec),
ok.
set_node_opts(Node, Spec) ->
erpc:call(Node, persistent_term, put, [{?MODULE, opts}, Spec]).
get_node_opts(Node) ->
erpc:call(Node, persistent_term, get, [{?MODULE, opts}]).
load_apps(Node, #{apps := Apps}) ->
erpc:call(Node, emqx_cth_suite, load_apps, [Apps]).
@ -352,23 +379,7 @@ stop(Nodes) ->
stop_node(Name) ->
Node = node_name(Name),
try get_node_opts(Node) of
Opts ->
stop_node(Name, Opts)
catch
error:{erpc, _} ->
ok
end.
stop_node(Node, #{driver := ct_slave}) ->
case ct_slave:stop(Node, [{stop_timeout, ?TIMEOUT_NODE_STOP_S}]) of
{ok, _} ->
ok;
{error, Reason, _} when Reason == not_connected; Reason == not_started ->
ok
end;
stop_node(Node, #{driver := slave}) ->
slave:stop(Node).
ok = emqx_cth_peer:stop(Node).
%% Ports
@ -392,35 +403,22 @@ listener_port(BasePort, wss) ->
%%
-spec start_bare_node(atom(), map()) -> node().
start_bare_node(Name, Spec = #{driver := ct_slave}) ->
{ok, Node} = ct_slave:start(
node_name(Name),
[
{kill_if_fail, true},
{monitor_master, true},
{init_timeout, 20_000},
{startup_timeout, 20_000},
{erl_flags, erl_flags()},
{env, []}
]
),
init_bare_node(Node, Spec);
start_bare_node(Name, Spec = #{driver := slave}) ->
{ok, Node} = slave:start_link(host(), Name, ebin_path()),
init_bare_node(Node, Spec).
start_bare_node(Name, Spec) ->
Args = erl_flags(),
Envs = [],
{ok, NodeName} = emqx_cth_peer:start(Name, Args, Envs, ?TIMEOUT_NODE_START_MS),
init_bare_node(NodeName, Spec).
init_bare_node(Node, Spec) ->
init_bare_node(Node, _Spec) ->
pong = net_adm:ping(Node),
% Preserve node spec right on the remote node
ok = set_node_opts(Node, Spec),
Node.
erl_flags() ->
%% One core and redirecting logs to master
"+S 1:1 -master " ++ atom_to_list(node()) ++ " " ++ ebin_path().
["+S", "1:1", "-master", atom_to_list(node())] ++ ebin_path().
ebin_path() ->
string:join(["-pa" | lists:filter(fun is_lib/1, code:get_path())], " ").
["-pa" | lists:filter(fun is_lib/1, code:get_path())].
is_lib(Path) ->
string:prefix(Path, code:lib_dir()) =:= nomatch andalso

View File

@ -0,0 +1,79 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc Common Test Helper proxy module for slave -> peer migration.
%% OTP 26 has slave module deprecated, use peer instead.
-module(emqx_cth_peer).
-export([start/2, start/3, start/4]).
-export([start_link/2, start_link/3, start_link/4]).
-export([stop/1]).
start(Name, Args) ->
start(Name, Args, []).
start(Name, Args, Envs) ->
start(Name, Args, Envs, timer:seconds(20)).
start(Name, Args, Envs, Timeout) when is_atom(Name) ->
do_start(Name, Args, Envs, Timeout, start).
start_link(Name, Args) ->
start_link(Name, Args, []).
start_link(Name, Args, Envs) ->
start_link(Name, Args, Envs, timer:seconds(20)).
start_link(Name, Args, Envs, Timeout) when is_atom(Name) ->
do_start(Name, Args, Envs, Timeout, start_link).
do_start(Name0, Args, Envs, Timeout, Func) when is_atom(Name0) ->
{Name, Host} = parse_node_name(Name0),
{ok, Pid, Node} = peer:Func(#{
name => Name,
host => Host,
args => Args,
env => Envs,
wait_boot => Timeout,
longnames => true,
shutdown => {halt, 1000}
}),
true = register(Node, Pid),
{ok, Node}.
stop(Node) when is_atom(Node) ->
Pid = whereis(Node),
case is_pid(Pid) of
true ->
unlink(Pid),
ok = peer:stop(Pid);
false ->
ct:pal("The control process for node ~p is unexpetedly down", [Node]),
ok
end.
parse_node_name(NodeName) ->
case string:tokens(atom_to_list(NodeName), "@") of
[Name, Host] ->
{list_to_atom(Name), Host};
[_] ->
{NodeName, host()}
end.
host() ->
[_Name, Host] = string:tokens(atom_to_list(node()), "@"),
Host.

View File

@ -58,9 +58,6 @@ t_mount_share(_) ->
TopicFilters = [T],
?assertEqual(TopicFilter, #share{group = <<"group">>, topic = <<"topic">>}),
%% should not mount share topic when make message.
Msg = emqx_message:make(<<"clientid">>, TopicFilter, <<"payload">>),
?assertEqual(
TopicFilter,
mount(undefined, TopicFilter)
@ -89,8 +86,6 @@ t_unmount_share(_) ->
?assertEqual(TopicFilter, #share{group = <<"group">>, topic = <<"topic">>}),
%% should not unmount share topic when make message.
Msg = emqx_message:make(<<"clientid">>, TopicFilter, <<"payload">>),
?assertEqual(
TopicFilter,
unmount(undefined, TopicFilter)

View File

@ -233,7 +233,7 @@ t_session_subscription_iterators(Config) ->
),
ok.
t_qos0(Config) ->
t_qos0(_Config) ->
Sub = connect(<<?MODULE_STRING "1">>, true, 30),
Pub = connect(<<?MODULE_STRING "2">>, true, 0),
try
@ -258,7 +258,7 @@ t_qos0(Config) ->
emqtt:stop(Pub)
end.
t_publish_as_persistent(Config) ->
t_publish_as_persistent(_Config) ->
Sub = connect(<<?MODULE_STRING "1">>, true, 30),
Pub = connect(<<?MODULE_STRING "2">>, true, 30),
try

View File

@ -218,38 +218,41 @@ t_routing_schema_switch(VFrom, VTo, Config) ->
],
#{work_dir => WorkDir}
),
% Verify that new nodes switched to schema v1/v2 in presence of v1/v2 routes respectively
Nodes = [Node1, Node2, Node3],
?assertEqual(
[{ok, VTo}, {ok, VTo}, {ok, VTo}],
erpc:multicall(Nodes, emqx_router, get_schema_vsn, [])
),
% Wait for all nodes to agree on cluster state
?retry(
500,
10,
?assertMatch(
[{ok, [Node1, Node2, Node3]}],
lists:usort(erpc:multicall(Nodes, emqx, running_nodes, []))
)
),
% Verify that routing works as expected
C2 = start_client(Node2),
ok = subscribe(C2, <<"a/+/d">>),
C3 = start_client(Node3),
ok = subscribe(C3, <<"d/e/f/#">>),
{ok, _} = publish(C1, <<"a/b/d">>, <<"hey-newbies">>),
{ok, _} = publish(C2, <<"a/b/c">>, <<"hi">>),
{ok, _} = publish(C3, <<"d/e/f/42">>, <<"hello">>),
?assertReceive({pub, C2, #{topic := <<"a/b/d">>, payload := <<"hey-newbies">>}}),
?assertReceive({pub, C1, #{topic := <<"a/b/c">>, payload := <<"hi">>}}),
?assertReceive({pub, C1, #{topic := <<"d/e/f/42">>, payload := <<"hello">>}}),
?assertReceive({pub, C3, #{topic := <<"d/e/f/42">>, payload := <<"hello">>}}),
?assertNotReceive(_),
ok = emqtt:stop(C1),
ok = emqtt:stop(C2),
ok = emqtt:stop(C3),
ok = emqx_cth_cluster:stop(Nodes).
try
% Verify that new nodes switched to schema v1/v2 in presence of v1/v2 routes respectively
?assertEqual(
[{ok, VTo}, {ok, VTo}, {ok, VTo}],
erpc:multicall(Nodes, emqx_router, get_schema_vsn, [])
),
% Wait for all nodes to agree on cluster state
?retry(
500,
10,
?assertMatch(
[{ok, [Node1, Node2, Node3]}],
lists:usort(erpc:multicall(Nodes, emqx, running_nodes, []))
)
),
% Verify that routing works as expected
C2 = start_client(Node2),
ok = subscribe(C2, <<"a/+/d">>),
C3 = start_client(Node3),
ok = subscribe(C3, <<"d/e/f/#">>),
{ok, _} = publish(C1, <<"a/b/d">>, <<"hey-newbies">>),
{ok, _} = publish(C2, <<"a/b/c">>, <<"hi">>),
{ok, _} = publish(C3, <<"d/e/f/42">>, <<"hello">>),
?assertReceive({pub, C2, #{topic := <<"a/b/d">>, payload := <<"hey-newbies">>}}),
?assertReceive({pub, C1, #{topic := <<"a/b/c">>, payload := <<"hi">>}}),
?assertReceive({pub, C1, #{topic := <<"d/e/f/42">>, payload := <<"hello">>}}),
?assertReceive({pub, C3, #{topic := <<"d/e/f/42">>, payload := <<"hello">>}}),
?assertNotReceive(_),
ok = emqtt:stop(C1),
ok = emqtt:stop(C2),
ok = emqtt:stop(C3)
after
ok = emqx_cth_cluster:stop(Nodes)
end.
%%

View File

@ -63,6 +63,7 @@ init_per_suite(Config) ->
end,
emqx_common_test_helpers:boot_modules(all),
emqx_common_test_helpers:start_apps([]),
emqx_logger:set_log_level(debug),
[{dist_pid, DistPid} | Config].
end_per_suite(Config) ->
@ -574,7 +575,7 @@ t_local(Config) when is_list(Config) ->
<<"sticky_group">> => sticky
},
Node = start_slave('local_shared_sub_testtesttest', 21999),
Node = start_slave('local_shared_sub_local_1', 21999),
ok = ensure_group_config(GroupConfig),
ok = ensure_group_config(Node, GroupConfig),
@ -627,7 +628,7 @@ t_remote(Config) when is_list(Config) ->
<<"sticky_group">> => sticky
},
Node = start_slave('remote_shared_sub_testtesttest', 21999),
Node = start_slave('remote_shared_sub_remote_1', 21999),
ok = ensure_group_config(GroupConfig),
ok = ensure_group_config(Node, GroupConfig),
@ -676,7 +677,7 @@ t_local_fallback(Config) when is_list(Config) ->
Topic = <<"local_foo/bar">>,
ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>,
Node = start_slave('local_fallback_shared_sub_test', 11888),
Node = start_slave('local_fallback_shared_sub_1', 11888),
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]),
{ok, _} = emqtt:connect(ConnPid1),
@ -1253,34 +1254,24 @@ recv_msgs(Count, Msgs) ->
end.
start_slave(Name, Port) ->
{ok, Node} = ct_slave:start(
list_to_atom(atom_to_list(Name) ++ "@" ++ host()),
[
{kill_if_fail, true},
{monitor_master, true},
{init_timeout, 10000},
{startup_timeout, 10000},
{erl_flags, ebin_path()}
]
{ok, Node} = emqx_cth_peer:start_link(
Name,
ebin_path()
),
pong = net_adm:ping(Node),
setup_node(Node, Port),
Node.
stop_slave(Node) ->
rpc:call(Node, mria, leave, []),
ct_slave:stop(Node).
emqx_cth_peer:stop(Node).
host() ->
[_, Host] = string:tokens(atom_to_list(node()), "@"),
Host.
ebin_path() ->
string:join(["-pa" | lists:filter(fun is_lib/1, code:get_path())], " ").
is_lib(Path) ->
string:prefix(Path, code:lib_dir()) =:= nomatch.
["-pa" | code:get_path()].
setup_node(Node, Port) ->
EnvHandler =

View File

@ -869,7 +869,7 @@ stop_slave(Node) ->
% This line don't work!!
%emqx_cluster_rpc:fast_forward_to_commit(Node, 100),
rpc:call(Node, ?MODULE, leave_cluster, []),
ok = slave:stop(Node),
ok = emqx_cth_peer:stop(Node),
?assertEqual([node()], mria:running_nodes()),
?assertEqual([], nodes()),
_ = application:stop(mria),