test: replace 'slave' and 'ct_slave' with 'peer'

This commit is contained in:
Zaiming (Stone) Shi 2023-12-01 08:07:09 +01:00
parent 0d245acdc1
commit 22f7cc1622
17 changed files with 84 additions and 139 deletions

View File

@ -70,8 +70,8 @@
emqx_cluster/2, emqx_cluster/2,
start_ekka/0, start_ekka/0,
start_epmd/0, start_epmd/0,
start_slave/2, start_peer/2,
stop_slave/1, stop_peer/1,
listener_port/2 listener_port/2
]). ]).
@ -734,13 +734,11 @@ emqx_cluster(Specs0, CommonOpts) ->
%% Lower level starting API %% Lower level starting API
-spec start_slave(shortname(), node_opts()) -> nodename(). -spec start_peer(shortname(), node_opts()) -> nodename().
start_slave(Name, Opts) when is_list(Opts) -> start_peer(Name, Opts) when is_list(Opts) ->
start_slave(Name, maps:from_list(Opts)); start_peer(Name, maps:from_list(Opts));
start_slave(Name, Opts) when is_map(Opts) -> start_peer(Name, Opts) when is_map(Opts) ->
SlaveMod = maps:get(peer_mod, Opts, ct_slave),
Node = node_name(Name), Node = node_name(Name),
put_peer_mod(Node, SlaveMod),
Cookie = atom_to_list(erlang:get_cookie()), Cookie = atom_to_list(erlang:get_cookie()),
PrivDataDir = maps:get(priv_data_dir, Opts, "/tmp"), PrivDataDir = maps:get(priv_data_dir, Opts, "/tmp"),
NodeDataDir = filename:join([ NodeDataDir = filename:join([
@ -750,19 +748,13 @@ start_slave(Name, Opts) when is_map(Opts) ->
]), ]),
DoStart = DoStart =
fun() -> fun() ->
case SlaveMod of ct:pal("~p: node data dir: ~s", [Node, NodeDataDir]),
ct_slave -> Envs = [
ct:pal("~p: node data dir: ~s", [Node, NodeDataDir]), {"HOCON_ENV_OVERRIDE_PREFIX", "EMQX_"},
Envs = [ {"EMQX_NODE__COOKIE", Cookie},
{"HOCON_ENV_OVERRIDE_PREFIX", "EMQX_"}, {"EMQX_NODE__DATA_DIR", NodeDataDir}
{"EMQX_NODE__COOKIE", Cookie}, ],
{"EMQX_NODE__DATA_DIR", NodeDataDir} emqx_cth_peer:start(Node, erl_flags(), Envs)
],
emqx_cth_peer:start(Node, erl_flags(), Envs);
slave ->
Envs = [{"HOCON_ENV_OVERRIDE_PREFIX", "EMQX_"}],
emqx_cth_peer:start(Node, ebin_path(), Envs)
end
end, end,
case DoStart() of case DoStart() of
{ok, _} -> {ok, _} ->
@ -778,7 +770,7 @@ start_slave(Name, Opts) when is_map(Opts) ->
Node. Node.
%% Node stopping %% Node stopping
stop_slave(Node0) -> stop_peer(Node0) ->
Node = node_name(Node0), Node = node_name(Node0),
emqx_cth_peer:stop(Node). emqx_cth_peer:stop(Node).
@ -939,7 +931,7 @@ setup_node(Node, Opts) when is_map(Opts) ->
ignore -> ignore ->
ok; ok;
Err -> Err ->
stop_slave(Node), stop_peer(Node),
error({failed_to_join_cluster, #{node => Node, error => Err}}) error({failed_to_join_cluster, #{node => Node, error => Err}})
end end
end, end,
@ -956,19 +948,6 @@ set_env_once(Var, Value) ->
end, end,
ok. ok.
put_peer_mod(Node, SlaveMod) ->
put({?MODULE, Node}, SlaveMod),
ok.
get_peer_mod(Node) ->
case get({?MODULE, Node}) of
undefined -> ct_slave;
SlaveMod -> SlaveMod
end.
erase_peer_mod(Node) ->
erase({?MODULE, Node}).
node_name(Name) -> node_name(Name) ->
case string:tokens(atom_to_list(Name), "@") of case string:tokens(atom_to_list(Name), "@") of
[_Name, _Host] -> [_Name, _Host] ->

View File

@ -91,11 +91,7 @@
%% Working directory %% Working directory
%% If this directory is not empty, starting up the node applications will fail %% If this directory is not empty, starting up the node applications will fail
%% Default: "${ClusterOpts.work_dir}/${nodename}" %% Default: "${ClusterOpts.work_dir}/${nodename}"
work_dir => file:name(), work_dir => file:name()
% Tooling to manage nodes
% Default: `ct_slave`.
driver => ct_slave | slave
}}. }}.
-spec start([nodespec()], ClusterOpts) -> -spec start([nodespec()], ClusterOpts) ->
@ -162,8 +158,7 @@ mk_init_nodespec(N, Name, NodeOpts, ClusterOpts) ->
role => core, role => core,
apps => [], apps => [],
base_port => BasePort, base_port => BasePort,
work_dir => filename:join([WorkDir, Node]), work_dir => filename:join([WorkDir, Node])
driver => ct_slave
}, },
maps:merge(Defaults, NodeOpts). maps:merge(Defaults, NodeOpts).

View File

@ -575,7 +575,7 @@ t_local(Config) when is_list(Config) ->
<<"sticky_group">> => sticky <<"sticky_group">> => sticky
}, },
Node = start_slave('local_shared_sub_local_1', 21999), Node = start_peer('local_shared_sub_local_1', 21999),
ok = ensure_group_config(GroupConfig), ok = ensure_group_config(GroupConfig),
ok = ensure_group_config(Node, GroupConfig), ok = ensure_group_config(Node, GroupConfig),
@ -606,7 +606,7 @@ t_local(Config) when is_list(Config) ->
emqtt:stop(ConnPid1), emqtt:stop(ConnPid1),
emqtt:stop(ConnPid2), emqtt:stop(ConnPid2),
stop_slave(Node), stop_peer(Node),
?assertEqual(local, emqx_shared_sub:strategy(<<"local_group">>)), ?assertEqual(local, emqx_shared_sub:strategy(<<"local_group">>)),
?assertEqual(local, RemoteLocalGroupStrategy), ?assertEqual(local, RemoteLocalGroupStrategy),
@ -628,7 +628,7 @@ t_remote(Config) when is_list(Config) ->
<<"sticky_group">> => sticky <<"sticky_group">> => sticky
}, },
Node = start_slave('remote_shared_sub_remote_1', 21999), Node = start_peer('remote_shared_sub_remote_1', 21999),
ok = ensure_group_config(GroupConfig), ok = ensure_group_config(GroupConfig),
ok = ensure_group_config(Node, GroupConfig), ok = ensure_group_config(Node, GroupConfig),
@ -664,7 +664,7 @@ t_remote(Config) when is_list(Config) ->
after after
emqtt:stop(ConnPidLocal), emqtt:stop(ConnPidLocal),
emqtt:stop(ConnPidRemote), emqtt:stop(ConnPidRemote),
stop_slave(Node) stop_peer(Node)
end. end.
t_local_fallback(Config) when is_list(Config) -> t_local_fallback(Config) when is_list(Config) ->
@ -677,7 +677,7 @@ t_local_fallback(Config) when is_list(Config) ->
Topic = <<"local_foo/bar">>, Topic = <<"local_foo/bar">>,
ClientId1 = <<"ClientId1">>, ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>, ClientId2 = <<"ClientId2">>,
Node = start_slave('local_fallback_shared_sub_1', 11888), Node = start_peer('local_fallback_shared_sub_1', 11888),
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]), {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]),
{ok, _} = emqtt:connect(ConnPid1), {ok, _} = emqtt:connect(ConnPid1),
@ -693,7 +693,7 @@ t_local_fallback(Config) when is_list(Config) ->
{true, UsedSubPid2} = last_message(<<"hello2">>, [ConnPid1], 2_000), {true, UsedSubPid2} = last_message(<<"hello2">>, [ConnPid1], 2_000),
emqtt:stop(ConnPid1), emqtt:stop(ConnPid1),
stop_slave(Node), stop_peer(Node),
?assertEqual(UsedSubPid1, UsedSubPid2), ?assertEqual(UsedSubPid1, UsedSubPid2),
ok. ok.
@ -1253,7 +1253,7 @@ recv_msgs(Count, Msgs) ->
Msgs Msgs
end. end.
start_slave(Name, Port) -> start_peer(Name, Port) ->
{ok, Node} = emqx_cth_peer:start_link( {ok, Node} = emqx_cth_peer:start_link(
Name, Name,
ebin_path() ebin_path()
@ -1262,7 +1262,7 @@ start_slave(Name, Port) ->
setup_node(Node, Port), setup_node(Node, Port),
Node. Node.
stop_slave(Node) -> stop_peer(Node) ->
rpc:call(Node, mria, leave, []), rpc:call(Node, mria, leave, []),
emqx_cth_peer:stop(Node). emqx_cth_peer:stop(Node).

View File

@ -588,7 +588,6 @@ cluster(Config) ->
[ [
{apps, [emqx_conf, emqx_rule_engine, emqx_bridge]}, {apps, [emqx_conf, emqx_rule_engine, emqx_bridge]},
{listener_ports, []}, {listener_ports, []},
{peer_mod, slave},
{priv_data_dir, PrivDataDir}, {priv_data_dir, PrivDataDir},
{load_schema, true}, {load_schema, true},
{start_autocluster, true}, {start_autocluster, true},
@ -611,7 +610,7 @@ start_cluster(Cluster) ->
Nodes = lists:map( Nodes = lists:map(
fun({Name, Opts}) -> fun({Name, Opts}) ->
ct:pal("starting ~p", [Name]), ct:pal("starting ~p", [Name]),
emqx_common_test_helpers:start_slave(Name, Opts) emqx_common_test_helpers:start_peer(Name, Opts)
end, end,
Cluster Cluster
), ),
@ -620,7 +619,7 @@ start_cluster(Cluster) ->
emqx_utils:pmap( emqx_utils:pmap(
fun(N) -> fun(N) ->
ct:pal("stopping ~p", [N]), ct:pal("stopping ~p", [N]),
emqx_common_test_helpers:stop_slave(N) emqx_common_test_helpers:stop_peer(N)
end, end,
Nodes Nodes
) )

View File

@ -1069,20 +1069,12 @@ setup_and_start_listeners(Node, NodeOpts) ->
cluster(Config) -> cluster(Config) ->
PrivDataDir = ?config(priv_dir, Config), PrivDataDir = ?config(priv_dir, Config),
PeerModule =
case os:getenv("IS_CI") of
false ->
slave;
_ ->
ct_slave
end,
ExtraEnvHandlerHook = setup_group_subscriber_spy_fn(), ExtraEnvHandlerHook = setup_group_subscriber_spy_fn(),
Cluster = emqx_common_test_helpers:emqx_cluster( Cluster = emqx_common_test_helpers:emqx_cluster(
[core, core], [core, core],
[ [
{apps, [emqx_conf, emqx_rule_engine, emqx_bridge_kafka, emqx_bridge]}, {apps, [emqx_conf, emqx_rule_engine, emqx_bridge_kafka, emqx_bridge]},
{listener_ports, []}, {listener_ports, []},
{peer_mod, PeerModule},
{priv_data_dir, PrivDataDir}, {priv_data_dir, PrivDataDir},
{load_schema, true}, {load_schema, true},
{start_autocluster, true}, {start_autocluster, true},
@ -1744,14 +1736,14 @@ t_cluster_group(Config) ->
begin begin
Nodes = Nodes =
[_N1, N2 | _] = [ [_N1, N2 | _] = [
emqx_common_test_helpers:start_slave(Name, Opts) emqx_common_test_helpers:start_peer(Name, Opts)
|| {Name, Opts} <- Cluster || {Name, Opts} <- Cluster
], ],
on_exit(fun() -> on_exit(fun() ->
emqx_utils:pmap( emqx_utils:pmap(
fun(N) -> fun(N) ->
ct:pal("stopping ~p", [N]), ct:pal("stopping ~p", [N]),
ok = emqx_common_test_helpers:stop_slave(N) ok = emqx_common_test_helpers:stop_peer(N)
end, end,
Nodes Nodes
) )
@ -1827,10 +1819,10 @@ t_node_joins_existing_cluster(Config) ->
begin begin
[{Name1, Opts1}, {Name2, Opts2} | _] = Cluster, [{Name1, Opts1}, {Name2, Opts2} | _] = Cluster,
ct:pal("starting ~p", [Name1]), ct:pal("starting ~p", [Name1]),
N1 = emqx_common_test_helpers:start_slave(Name1, Opts1), N1 = emqx_common_test_helpers:start_peer(Name1, Opts1),
on_exit(fun() -> on_exit(fun() ->
ct:pal("stopping ~p", [N1]), ct:pal("stopping ~p", [N1]),
ok = emqx_common_test_helpers:stop_slave(N1) ok = emqx_common_test_helpers:stop_peer(N1)
end), end),
{{ok, _}, {ok, _}} = {{ok, _}, {ok, _}} =
?wait_async_action( ?wait_async_action(
@ -1870,10 +1862,10 @@ t_node_joins_existing_cluster(Config) ->
30_000 30_000
), ),
ct:pal("starting ~p", [Name2]), ct:pal("starting ~p", [Name2]),
N2 = emqx_common_test_helpers:start_slave(Name2, Opts2), N2 = emqx_common_test_helpers:start_peer(Name2, Opts2),
on_exit(fun() -> on_exit(fun() ->
ct:pal("stopping ~p", [N2]), ct:pal("stopping ~p", [N2]),
ok = emqx_common_test_helpers:stop_slave(N2) ok = emqx_common_test_helpers:stop_peer(N2)
end), end),
Nodes = [N1, N2], Nodes = [N1, N2],
wait_for_cluster_rpc(N2), wait_for_cluster_rpc(N2),
@ -1963,7 +1955,7 @@ t_cluster_node_down(Config) ->
lists:map( lists:map(
fun({Name, Opts}) -> fun({Name, Opts}) ->
ct:pal("starting ~p", [Name]), ct:pal("starting ~p", [Name]),
emqx_common_test_helpers:start_slave(Name, Opts) emqx_common_test_helpers:start_peer(Name, Opts)
end, end,
Cluster Cluster
), ),
@ -1971,7 +1963,7 @@ t_cluster_node_down(Config) ->
emqx_utils:pmap( emqx_utils:pmap(
fun(N) -> fun(N) ->
ct:pal("stopping ~p", [N]), ct:pal("stopping ~p", [N]),
ok = emqx_common_test_helpers:stop_slave(N) ok = emqx_common_test_helpers:stop_peer(N)
end, end,
Nodes Nodes
) )
@ -2016,7 +2008,7 @@ t_cluster_node_down(Config) ->
{TId, Pid} = start_async_publisher(Config, KafkaTopic), {TId, Pid} = start_async_publisher(Config, KafkaTopic),
ct:pal("stopping node ~p", [N1]), ct:pal("stopping node ~p", [N1]),
ok = emqx_common_test_helpers:stop_slave(N1), ok = emqx_common_test_helpers:stop_peer(N1),
%% Give some time for the consumers in remaining node to %% Give some time for the consumers in remaining node to
%% rebalance. %% rebalance.

View File

@ -517,19 +517,11 @@ try_decode_json(Payload) ->
cluster(Config) -> cluster(Config) ->
PrivDataDir = ?config(priv_dir, Config), PrivDataDir = ?config(priv_dir, Config),
PeerModule =
case os:getenv("IS_CI") of
false ->
slave;
_ ->
ct_slave
end,
Cluster = emqx_common_test_helpers:emqx_cluster( Cluster = emqx_common_test_helpers:emqx_cluster(
[core, core], [core, core],
[ [
{apps, [emqx_conf] ++ ?APPS ++ [pulsar]}, {apps, [emqx_conf] ++ ?APPS ++ [pulsar]},
{listener_ports, []}, {listener_ports, []},
{peer_mod, PeerModule},
{priv_data_dir, PrivDataDir}, {priv_data_dir, PrivDataDir},
{load_schema, true}, {load_schema, true},
{start_autocluster, true}, {start_autocluster, true},
@ -551,7 +543,7 @@ cluster(Config) ->
start_cluster(Cluster) -> start_cluster(Cluster) ->
Nodes = Nodes =
[ [
emqx_common_test_helpers:start_slave(Name, Opts) emqx_common_test_helpers:start_peer(Name, Opts)
|| {Name, Opts} <- Cluster || {Name, Opts} <- Cluster
], ],
NumNodes = length(Nodes), NumNodes = length(Nodes),
@ -559,7 +551,7 @@ start_cluster(Cluster) ->
emqx_utils:pmap( emqx_utils:pmap(
fun(N) -> fun(N) ->
ct:pal("stopping ~p", [N]), ct:pal("stopping ~p", [N]),
ok = emqx_common_test_helpers:stop_slave(N) ok = emqx_common_test_helpers:stop_peer(N)
end, end,
Nodes Nodes
) )

View File

@ -222,16 +222,16 @@ assert_config_load_done(Nodes) ->
). ).
stop_cluster(Nodes) -> stop_cluster(Nodes) ->
emqx_utils:pmap(fun emqx_common_test_helpers:stop_slave/1, Nodes). emqx_utils:pmap(fun emqx_common_test_helpers:stop_peer/1, Nodes).
start_cluster(Specs) -> start_cluster(Specs) ->
[emqx_common_test_helpers:start_slave(Name, Opts) || {Name, Opts} <- Specs]. [emqx_common_test_helpers:start_peer(Name, Opts) || {Name, Opts} <- Specs].
start_cluster_async(Specs) -> start_cluster_async(Specs) ->
[ [
begin begin
Opts1 = maps:remove(join_to, Opts), Opts1 = maps:remove(join_to, Opts),
spawn_link(fun() -> emqx_common_test_helpers:start_slave(Name, Opts1) end), spawn_link(fun() -> emqx_common_test_helpers:start_peer(Name, Opts1) end),
timer:sleep(7_000) timer:sleep(7_000)
end end
|| {Name, Opts} <- Specs || {Name, Opts} <- Specs

View File

@ -50,9 +50,9 @@ end_per_suite(Config) ->
init_per_testcase(Case, Config) -> init_per_testcase(Case, Config) ->
_ = emqx_eviction_agent:disable(test_eviction), _ = emqx_eviction_agent:disable(test_eviction),
ok = snabbkaffe:start_trace(), ok = snabbkaffe:start_trace(),
start_slave(Case, Config). start_peer(Case, Config).
start_slave(t_explicit_session_takeover, Config) -> start_peer(t_explicit_session_takeover, Config) ->
NodeNames = NodeNames =
[ [
t_explicit_session_takeover_donor, t_explicit_session_takeover_donor,
@ -65,19 +65,19 @@ start_slave(t_explicit_session_takeover, Config) ->
), ),
ok = snabbkaffe:start_trace(), ok = snabbkaffe:start_trace(),
[{evacuate_nodes, ClusterNodes} | Config]; [{evacuate_nodes, ClusterNodes} | Config];
start_slave(_Case, Config) -> start_peer(_Case, Config) ->
Config. Config.
end_per_testcase(TestCase, Config) -> end_per_testcase(TestCase, Config) ->
emqx_eviction_agent:disable(test_eviction), emqx_eviction_agent:disable(test_eviction),
ok = snabbkaffe:stop(), ok = snabbkaffe:stop(),
stop_slave(TestCase, Config). stop_peer(TestCase, Config).
stop_slave(t_explicit_session_takeover, Config) -> stop_peer(t_explicit_session_takeover, Config) ->
emqx_eviction_agent_test_helpers:stop_cluster( emqx_eviction_agent_test_helpers:stop_cluster(
?config(evacuate_nodes, Config) ?config(evacuate_nodes, Config)
); );
stop_slave(_Case, _Config) -> stop_peer(_Case, _Config) ->
ok. ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -112,7 +112,7 @@ setup_test(TestCase, Config) when
end} end}
] ]
), ),
Nodes = [emqx_common_test_helpers:start_slave(Name, Opts) || {Name, Opts} <- Cluster], Nodes = [emqx_common_test_helpers:start_peer(Name, Opts) || {Name, Opts} <- Cluster],
[{nodes, Nodes}, {cluster, Cluster}, {old_license, LicenseKey}]; [{nodes, Nodes}, {cluster, Cluster}, {old_license, LicenseKey}];
setup_test(_TestCase, _Config) -> setup_test(_TestCase, _Config) ->
[]. [].

View File

@ -42,8 +42,8 @@ t_cluster_query(_Config) ->
ct:timetrap({seconds, 120}), ct:timetrap({seconds, 120}),
snabbkaffe:fix_ct_logging(), snabbkaffe:fix_ct_logging(),
[{Name, Opts}, {Name1, Opts1}] = cluster_specs(), [{Name, Opts}, {Name1, Opts1}] = cluster_specs(),
Node1 = emqx_common_test_helpers:start_slave(Name, Opts), Node1 = emqx_common_test_helpers:start_peer(Name, Opts),
Node2 = emqx_common_test_helpers:start_slave(Name1, Opts1), Node2 = emqx_common_test_helpers:start_peer(Name1, Opts1),
try try
process_flag(trap_exit, true), process_flag(trap_exit, true),
ClientLs1 = [start_emqtt_client(Node1, I, 2883) || I <- lists:seq(1, 10)], ClientLs1 = [start_emqtt_client(Node1, I, 2883) || I <- lists:seq(1, 10)],
@ -168,8 +168,8 @@ t_cluster_query(_Config) ->
_ = lists:foreach(fun(C) -> emqtt:disconnect(C) end, ClientLs1), _ = lists:foreach(fun(C) -> emqtt:disconnect(C) end, ClientLs1),
_ = lists:foreach(fun(C) -> emqtt:disconnect(C) end, ClientLs2) _ = lists:foreach(fun(C) -> emqtt:disconnect(C) end, ClientLs2)
after after
emqx_common_test_helpers:stop_slave(Node1), emqx_common_test_helpers:stop_peer(Node1),
emqx_common_test_helpers:stop_slave(Node2) emqx_common_test_helpers:stop_peer(Node2)
end, end,
ok. ok.

View File

@ -194,8 +194,8 @@ t_api_listeners_list_not_ready(Config) when is_list(Config) ->
snabbkaffe:fix_ct_logging(), snabbkaffe:fix_ct_logging(),
Cluster = [{Name, Opts}, {Name1, Opts1}] = cluster([core, core]), Cluster = [{Name, Opts}, {Name1, Opts1}] = cluster([core, core]),
ct:pal("Starting ~p", [Cluster]), ct:pal("Starting ~p", [Cluster]),
Node1 = emqx_common_test_helpers:start_slave(Name, Opts), Node1 = emqx_common_test_helpers:start_peer(Name, Opts),
Node2 = emqx_common_test_helpers:start_slave(Name1, Opts1), Node2 = emqx_common_test_helpers:start_peer(Name1, Opts1),
try try
L1 = get_tcp_listeners(Node1), L1 = get_tcp_listeners(Node1),
@ -214,8 +214,8 @@ t_api_listeners_list_not_ready(Config) when is_list(Config) ->
?assert(length(L1) > length(L2), Comment), ?assert(length(L1) > length(L2), Comment),
?assertEqual(length(L2), length(L3), Comment) ?assertEqual(length(L2), length(L3), Comment)
after after
emqx_common_test_helpers:stop_slave(Node1), emqx_common_test_helpers:stop_peer(Node1),
emqx_common_test_helpers:stop_slave(Node2) emqx_common_test_helpers:stop_peer(Node2)
end. end.
t_clear_certs(Config) when is_list(Config) -> t_clear_certs(Config) when is_list(Config) ->

View File

@ -129,8 +129,8 @@ t_multiple_nodes_api(_) ->
Seq2 = list_to_atom(atom_to_list(?MODULE) ++ "2"), Seq2 = list_to_atom(atom_to_list(?MODULE) ++ "2"),
Cluster = [{Name, Opts}, {Name1, Opts1}] = cluster([{core, Seq1}, {core, Seq2}]), Cluster = [{Name, Opts}, {Name1, Opts1}] = cluster([{core, Seq1}, {core, Seq2}]),
ct:pal("Starting ~p", [Cluster]), ct:pal("Starting ~p", [Cluster]),
Node1 = emqx_common_test_helpers:start_slave(Name, Opts), Node1 = emqx_common_test_helpers:start_peer(Name, Opts),
Node2 = emqx_common_test_helpers:start_slave(Name1, Opts1), Node2 = emqx_common_test_helpers:start_peer(Name1, Opts1),
try try
{200, NodesList} = rpc:call(Node1, emqx_mgmt_api_nodes, nodes, [get, #{}]), {200, NodesList} = rpc:call(Node1, emqx_mgmt_api_nodes, nodes, [get, #{}]),
All = [Node1, Node2], All = [Node1, Node2],
@ -148,8 +148,8 @@ t_multiple_nodes_api(_) ->
]), ]),
?assertMatch(#{node := Node1}, Node11) ?assertMatch(#{node := Node1}, Node11)
after after
emqx_common_test_helpers:stop_slave(Node1), emqx_common_test_helpers:stop_peer(Node1),
emqx_common_test_helpers:stop_slave(Node2) emqx_common_test_helpers:stop_peer(Node2)
end, end,
ok. ok.

View File

@ -27,12 +27,12 @@ all() ->
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_mgmt_api_test_util:init_suite(), emqx_mgmt_api_test_util:init_suite(),
Slave = emqx_common_test_helpers:start_slave(some_node, []), Peer = emqx_common_test_helpers:start_peer(node1, []),
[{slave, Slave} | Config]. [{peer, Peer} | Config].
end_per_suite(Config) -> end_per_suite(Config) ->
Slave = ?config(slave, Config), Peer = ?config(peer, Config),
emqx_common_test_helpers:stop_slave(Slave), emqx_common_test_helpers:stop_peer(Peer),
mria:clear_table(?ROUTE_TAB), mria:clear_table(?ROUTE_TAB),
emqx_mgmt_api_test_util:end_suite(). emqx_mgmt_api_test_util:end_suite().
@ -80,18 +80,18 @@ t_nodes_api(Config) ->
%% get topics/:topic %% get topics/:topic
%% We add another route here to ensure that the response handles %% We add another route here to ensure that the response handles
%% multiple routes for a single topic %% multiple routes for a single topic
Slave = ?config(slave, Config), Peer = ?config(peer, Config),
ok = emqx_router:add_route(Topic, Slave), ok = emqx_router:add_route(Topic, Peer),
RoutePath = emqx_mgmt_api_test_util:api_path(["topics", Topic]), RoutePath = emqx_mgmt_api_test_util:api_path(["topics", Topic]),
{ok, RouteResponse} = emqx_mgmt_api_test_util:request_api(get, RoutePath), {ok, RouteResponse} = emqx_mgmt_api_test_util:request_api(get, RoutePath),
ok = emqx_router:delete_route(Topic, Slave), ok = emqx_router:delete_route(Topic, Peer),
[ [
#{<<"topic">> := Topic, <<"node">> := Node1}, #{<<"topic">> := Topic, <<"node">> := Node1},
#{<<"topic">> := Topic, <<"node">> := Node2} #{<<"topic">> := Topic, <<"node">> := Node2}
] = emqx_utils_json:decode(RouteResponse, [return_maps]), ] = emqx_utils_json:decode(RouteResponse, [return_maps]),
?assertEqual(lists:usort([Node, atom_to_binary(Slave)]), lists:usort([Node1, Node2])), ?assertEqual(lists:usort([Node, atom_to_binary(Peer)]), lists:usort([Node1, Node2])),
ok = emqtt:stop(Client). ok = emqtt:stop(Client).

View File

@ -136,7 +136,7 @@ t_rebalance_node_crash(Config) ->
?assertWaitEvent( ?assertWaitEvent(
begin begin
ok = rpc:call(DonorNode, emqx_node_rebalance, start, [Opts]), ok = rpc:call(DonorNode, emqx_node_rebalance, start, [Opts]),
emqx_common_test_helpers:stop_slave(RecipientNode) emqx_common_test_helpers:stop_peer(RecipientNode)
end, end,
#{?snk_kind := emqx_node_rebalance_started}, #{?snk_kind := emqx_node_rebalance_started},
1000 1000

View File

@ -628,11 +628,11 @@ group_t_copy_plugin_to_a_new_node({init, Config}) ->
load_schema => false load_schema => false
} }
), ),
CopyFromNode = emqx_common_test_helpers:start_slave( CopyFromNode = emqx_common_test_helpers:start_peer(
CopyFrom, maps:remove(join_to, CopyFromOpts) CopyFrom, maps:remove(join_to, CopyFromOpts)
), ),
ok = rpc:call(CopyFromNode, emqx_plugins, put_config, [install_dir, FromInstallDir]), ok = rpc:call(CopyFromNode, emqx_plugins, put_config, [install_dir, FromInstallDir]),
CopyToNode = emqx_common_test_helpers:start_slave(CopyTo, maps:remove(join_to, CopyToOpts)), CopyToNode = emqx_common_test_helpers:start_peer(CopyTo, maps:remove(join_to, CopyToOpts)),
ok = rpc:call(CopyToNode, emqx_plugins, put_config, [install_dir, ToInstallDir]), ok = rpc:call(CopyToNode, emqx_plugins, put_config, [install_dir, ToInstallDir]),
NameVsn = filename:basename(Package, ?PACKAGE_SUFFIX), NameVsn = filename:basename(Package, ?PACKAGE_SUFFIX),
ok = rpc:call(CopyFromNode, emqx_plugins, ensure_installed, [NameVsn]), ok = rpc:call(CopyFromNode, emqx_plugins, ensure_installed, [NameVsn]),
@ -662,8 +662,8 @@ group_t_copy_plugin_to_a_new_node({'end', Config}) ->
ok = rpc:call(CopyToNode, emqx_config, delete_override_conf_files, []), ok = rpc:call(CopyToNode, emqx_config, delete_override_conf_files, []),
rpc:call(CopyToNode, ekka, leave, []), rpc:call(CopyToNode, ekka, leave, []),
rpc:call(CopyFromNode, ekka, leave, []), rpc:call(CopyFromNode, ekka, leave, []),
ok = emqx_common_test_helpers:stop_slave(CopyToNode), ok = emqx_common_test_helpers:stop_peer(CopyToNode),
ok = emqx_common_test_helpers:stop_slave(CopyFromNode), ok = emqx_common_test_helpers:stop_peer(CopyFromNode),
ok = file:del_dir_r(proplists:get_value(to_install_dir, Config)), ok = file:del_dir_r(proplists:get_value(to_install_dir, Config)),
ok = file:del_dir_r(proplists:get_value(from_install_dir, Config)); ok = file:del_dir_r(proplists:get_value(from_install_dir, Config));
group_t_copy_plugin_to_a_new_node(Config) -> group_t_copy_plugin_to_a_new_node(Config) ->
@ -737,7 +737,6 @@ group_t_copy_plugin_to_a_new_node_single_node({init, Config}) ->
end, end,
priv_data_dir => PrivDataDir, priv_data_dir => PrivDataDir,
schema_mod => emqx_conf_schema, schema_mod => emqx_conf_schema,
peer_mod => slave,
load_schema => true load_schema => true
} }
), ),
@ -751,7 +750,7 @@ group_t_copy_plugin_to_a_new_node_single_node({init, Config}) ->
]; ];
group_t_copy_plugin_to_a_new_node_single_node({'end', Config}) -> group_t_copy_plugin_to_a_new_node_single_node({'end', Config}) ->
CopyToNode = proplists:get_value(copy_to_node_name, Config), CopyToNode = proplists:get_value(copy_to_node_name, Config),
ok = emqx_common_test_helpers:stop_slave(CopyToNode), ok = emqx_common_test_helpers:stop_peer(CopyToNode),
ok = file:del_dir_r(proplists:get_value(to_install_dir, Config)), ok = file:del_dir_r(proplists:get_value(to_install_dir, Config)),
ok; ok;
group_t_copy_plugin_to_a_new_node_single_node(Config) -> group_t_copy_plugin_to_a_new_node_single_node(Config) ->
@ -762,7 +761,7 @@ group_t_copy_plugin_to_a_new_node_single_node(Config) ->
%% Start the node for the first time. The plugin should start %% Start the node for the first time. The plugin should start
%% successfully even if it's not extracted yet. Simply starting %% successfully even if it's not extracted yet. Simply starting
%% the node would crash if not working properly. %% the node would crash if not working properly.
CopyToNode = emqx_common_test_helpers:start_slave(CopyTo, CopyToOpts), CopyToNode = emqx_common_test_helpers:start_peer(CopyTo, CopyToOpts),
ct:pal("~p config:\n ~p", [ ct:pal("~p config:\n ~p", [
CopyToNode, erpc:call(CopyToNode, emqx_plugins, get_config, [[], #{}]) CopyToNode, erpc:call(CopyToNode, emqx_plugins, get_config, [[], #{}])
]), ]),
@ -805,11 +804,10 @@ group_t_cluster_leave({init, Config}) ->
end, end,
priv_data_dir => PrivDataDir, priv_data_dir => PrivDataDir,
schema_mod => emqx_conf_schema, schema_mod => emqx_conf_schema,
peer_mod => slave,
load_schema => true load_schema => true
} }
), ),
Nodes = [emqx_common_test_helpers:start_slave(Name, Opts) || {Name, Opts} <- Cluster], Nodes = [emqx_common_test_helpers:start_peer(Name, Opts) || {Name, Opts} <- Cluster],
[ [
{to_install_dir, ToInstallDir}, {to_install_dir, ToInstallDir},
{cluster, Cluster}, {cluster, Cluster},
@ -820,7 +818,7 @@ group_t_cluster_leave({init, Config}) ->
]; ];
group_t_cluster_leave({'end', Config}) -> group_t_cluster_leave({'end', Config}) ->
Nodes = proplists:get_value(nodes, Config), Nodes = proplists:get_value(nodes, Config),
[ok = emqx_common_test_helpers:stop_slave(N) || N <- Nodes], [ok = emqx_common_test_helpers:stop_peer(N) || N <- Nodes],
ok = file:del_dir_r(proplists:get_value(to_install_dir, Config)), ok = file:del_dir_r(proplists:get_value(to_install_dir, Config)),
ok; ok;
group_t_cluster_leave(Config) -> group_t_cluster_leave(Config) ->

View File

@ -348,19 +348,11 @@ receive_published(Line) ->
cluster(Config) -> cluster(Config) ->
PrivDataDir = ?config(priv_dir, Config), PrivDataDir = ?config(priv_dir, Config),
PeerModule =
case os:getenv("IS_CI") of
false ->
slave;
_ ->
ct_slave
end,
Cluster = emqx_common_test_helpers:emqx_cluster( Cluster = emqx_common_test_helpers:emqx_cluster(
[core, core], [core, core],
[ [
{apps, ?APPS}, {apps, ?APPS},
{listener_ports, []}, {listener_ports, []},
{peer_mod, PeerModule},
{priv_data_dir, PrivDataDir}, {priv_data_dir, PrivDataDir},
{load_schema, true}, {load_schema, true},
{start_autocluster, true}, {start_autocluster, true},
@ -382,7 +374,7 @@ cluster(Config) ->
start_cluster(Cluster) -> start_cluster(Cluster) ->
Nodes = [ Nodes = [
emqx_common_test_helpers:start_slave(Name, Opts) emqx_common_test_helpers:start_peer(Name, Opts)
|| {Name, Opts} <- Cluster || {Name, Opts} <- Cluster
], ],
NumNodes = length(Nodes), NumNodes = length(Nodes),
@ -390,7 +382,7 @@ start_cluster(Cluster) ->
emqx_utils:pmap( emqx_utils:pmap(
fun(N) -> fun(N) ->
ct:pal("stopping ~p", [N]), ct:pal("stopping ~p", [N]),
ok = emqx_common_test_helpers:stop_slave(N) ok = emqx_common_test_helpers:stop_peer(N)
end, end,
Nodes Nodes
) )

View File

@ -154,7 +154,7 @@ init_per_testcase(t_exhook_info, Config) ->
emqx_common_test_helpers:start_apps([emqx_exhook]), emqx_common_test_helpers:start_apps([emqx_exhook]),
Config; Config;
init_per_testcase(t_cluster_uuid, Config) -> init_per_testcase(t_cluster_uuid, Config) ->
Node = start_slave(n1), Node = start_peer(n1),
[{n1, Node} | Config]; [{n1, Node} | Config];
init_per_testcase(t_uuid_restored_from_file, Config) -> init_per_testcase(t_uuid_restored_from_file, Config) ->
Config; Config;
@ -210,7 +210,7 @@ end_per_testcase(t_exhook_info, _Config) ->
ok; ok;
end_per_testcase(t_cluster_uuid, Config) -> end_per_testcase(t_cluster_uuid, Config) ->
Node = proplists:get_value(n1, Config), Node = proplists:get_value(n1, Config),
ok = stop_slave(Node); ok = stop_peer(Node);
end_per_testcase(t_num_clients, Config) -> end_per_testcase(t_num_clients, Config) ->
ok = snabbkaffe:stop(), ok = snabbkaffe:stop(),
Config; Config;
@ -782,7 +782,7 @@ find_gen_rpc_port() ->
{ok, {_, Port}} = inet:sockname(EPort), {ok, {_, Port}} = inet:sockname(EPort),
Port. Port.
start_slave(Name) -> start_peer(Name) ->
Port = find_gen_rpc_port(), Port = find_gen_rpc_port(),
TestNode = node(), TestNode = node(),
Handler = Handler =
@ -811,11 +811,9 @@ start_slave(Name) ->
apps => [emqx, emqx_conf, emqx_retainer, emqx_modules, emqx_telemetry] apps => [emqx, emqx_conf, emqx_retainer, emqx_modules, emqx_telemetry]
}, },
emqx_common_test_helpers:start_slave(Name, Opts). emqx_common_test_helpers:start_peer(Name, Opts).
stop_slave(Node) -> stop_peer(Node) ->
% This line don't work!!
%emqx_cluster_rpc:fast_forward_to_commit(Node, 100),
rpc:call(Node, ?MODULE, leave_cluster, []), rpc:call(Node, ?MODULE, leave_cluster, []),
ok = emqx_cth_peer:stop(Node), ok = emqx_cth_peer:stop(Node),
?assertEqual([node()], mria:running_nodes()), ?assertEqual([node()], mria:running_nodes()),