feat(testing): Multinode testing helper

This commit is contained in:
Georgy Sychev 2022-05-05 00:03:40 +04:00
parent bc96c9fda7
commit c29eb8a409
4 changed files with 325 additions and 266 deletions

View File

@ -38,25 +38,32 @@
]).
-export([
ensure_mnesia_stopped/0,
wait_for/4,
change_emqx_opts/1,
change_emqx_opts/2,
client_ssl_twoway/0,
client_ssl_twoway/1,
client_ssl/0,
client_ssl/1,
wait_mqtt_payload/1,
not_wait_mqtt_payload/1,
render_config_file/2,
read_schema_configs/2,
client_ssl_twoway/0,
client_ssl_twoway/1,
ensure_mnesia_stopped/0,
ensure_quic_listener/2,
is_tcp_server_available/2,
is_tcp_server_available/3,
load_config/2,
load_config/3,
is_tcp_server_available/2,
is_tcp_server_available/3
not_wait_mqtt_payload/1,
read_schema_configs/2,
render_config_file/2,
wait_for/4,
wait_mqtt_payload/1
]).
-export([ensure_quic_listener/2]).
-export([
emqx_cluster/1,
emqx_cluster/2,
start_epmd/0,
start_slave/2,
stop_slave/1
]).
-define(CERTS_PATH(CertName), filename:join(["etc", "certs", CertName])).
@ -542,3 +549,248 @@ ensure_quic_listener(Name, UdpPort) ->
ok -> ok;
{error, {already_started, _Pid}} -> ok
end.
%%
%% Clusterisation and multi-node testing
%%
emqx_cluster(Specs) ->
emqx_cluster(Specs, #{}).
emqx_cluster(Specs, CommonOpts) when is_list(CommonOpts) ->
emqx_cluster(Specs, maps:from_list(CommonOpts));
emqx_cluster(Specs0, CommonOpts) ->
Specs1 = lists:zip(Specs0, lists:seq(1, length(Specs0))),
Specs = expand_node_specs(Specs1, CommonOpts),
CoreNodes = [node_name(Name) || {{core, Name, _}, _} <- Specs],
%% Assign grpc ports:
GenRpcPorts = maps:from_list([
{node_name(Name), {tcp, gen_rpc_port(base_port(Num))}}
|| {{_, Name, _}, Num} <- Specs
]),
%% Set the default node of the cluster:
JoinTo =
case CoreNodes of
[First | _] -> First;
_ -> undefined
end,
[
{Name,
merge_opts(Opts, #{
base_port => base_port(Number),
join_to => JoinTo,
env => [
{mria, core_nodes, CoreNodes},
{mria, node_role, Role},
{gen_rpc, client_config_per_node, {internal, GenRpcPorts}}
]
})}
|| {{Role, Name, Opts}, Number} <- Specs
].
%% Lower level starting API
start_slave(Name, Opts) ->
{ok, Node} = ct_slave:start(
list_to_atom(atom_to_list(Name) ++ "@" ++ host()),
[
{kill_if_fail, true},
{monitor_master, true},
{init_timeout, 10000},
{startup_timeout, 10000},
{erl_flags, erl_flags()}
]
),
pong = net_adm:ping(Node),
setup_node(Node, Opts),
Node.
%% Node stopping
stop_slave(Node) ->
ct_slave:stop(Node).
%% EPMD starting
start_epmd() ->
[] = os:cmd("\"" ++ epmd_path() ++ "\" -daemon"),
ok.
epmd_path() ->
case os:find_executable("epmd") of
false ->
ct:pal(critical, "Could not find epmd.~n"),
exit(epmd_not_found);
GlobalEpmd ->
GlobalEpmd
end.
%% Node initialization
setup_node(Node, Opts) when is_list(Opts) ->
setup_node(Node, maps:from_list(Opts));
setup_node(Node, Opts) when is_map(Opts) ->
%% Default base port is selected upon Node from 1100 to 65530 with step 10
BasePort = maps:get(base_port, Opts, 1100 + erlang:phash2(Node, 6553 - 110) * 10),
Apps = maps:get(apps, Opts, []),
StartApps = maps:get(start_apps, Opts, true),
JoinTo = maps:get(join_to, Opts, undefined),
EnvHandler = maps:get(env_handler, Opts, fun(_) -> ok end),
ConfigureGenRpc = maps:get(configure_gen_rpc, Opts, true),
LoadSchema = maps:get(load_schema, Opts, true),
LoadApps = maps:get(load_apps, Opts, [gen_rpc, emqx, ekka, mria] ++ Apps),
Env = maps:get(env, Opts, []),
Conf = maps:get(conf, Opts, []),
ListenerPorts = maps:get(listener_ports, Opts, [
{Type, listener_port(BasePort, Type)}
|| Type <- [tcp, ssl, ws, wss]
]),
%% Load env before doing anything to avoid overriding
[ok = rpc:call(Node, application, load, [App]) || App <- LoadApps],
%% Needs to be set explicitly because ekka:start() (which calls `gen`) is called without Handler
%% in emqx_common_test_helpers:start_apps(...)
ConfigureGenRpc andalso
begin
ok = rpc:call(Node, application, set_env, [
gen_rpc, tcp_server_port, gen_rpc_port(BasePort)
]),
ok = rpc:call(Node, application, set_env, [gen_rpc, port_discovery, manual])
end,
%% Setting env before starting any applications
[
ok = rpc:call(Node, application, set_env, [Application, Key, Value])
|| {Application, Key, Value} <- Env
],
%% Here we start the apps
EnvHandlerForRpc =
fun(App) ->
%% We load configuration, and than set the special enviroment variable
%% which says that emqx shouldn't load configuration at startup
%% Otherwise, configuration get's loaded and all preset env in envhandler is lost
LoadSchema andalso
begin
emqx_config:init_load(emqx_schema),
application:set_env(emqx, init_config_load_done, true)
end,
%% Need to set this otherwise listeners will conflict between each other
[
ok = emqx_config:put([listeners, Type, default, bind], {
{127, 0, 0, 1}, Port
})
|| {Type, Port} <- ListenerPorts
],
[ok = emqx_config:put(KeyPath, Value) || {KeyPath, Value} <- Conf],
ok = EnvHandler(App),
ok
end,
StartApps andalso
begin
ok = rpc:call(Node, emqx_common_test_helpers, start_apps, [Apps, EnvHandlerForRpc])
end,
%% Join the cluster if JoinTo is specified
case JoinTo of
undefined ->
ok;
_ ->
case rpc:call(Node, ekka, join, [JoinTo]) of
ok ->
ok;
ignore ->
ok;
Err ->
stop_slave(Node),
error({failed_to_join_cluster, #{node => Node, error => Err}})
end
end,
ok.
%% Helpers
node_name(Name) ->
list_to_atom(lists:concat([Name, "@", host()])).
gen_node_name(Num) ->
list_to_atom("autocluster_node" ++ integer_to_list(Num)).
host() ->
[_, Host] = string:tokens(atom_to_list(node()), "@"),
Host.
merge_opts(Opts1, Opts2) ->
maps:merge_with(
fun
(env, Env1, Env2) -> lists:usort(Env2 ++ Env1);
(conf, Conf1, Conf2) -> lists:usort(Conf2 ++ Conf1);
(apps, Apps1, Apps2) -> lists:usort(Apps2 ++ Apps1);
(load_apps, Apps1, Apps2) -> lists:usort(Apps2 ++ Apps1);
(_Option, _Old, Value) -> Value
end,
Opts1,
Opts2
).
erl_flags() ->
%% One core and redirecting logs to master
"+S 1:1 -master " ++ atom_to_list(node()) ++ " " ++ ebin_path().
ebin_path() ->
string:join(["-pa" | lists:filter(fun is_lib/1, code:get_path())], " ").
is_lib(Path) ->
string:prefix(Path, code:lib_dir()) =:= nomatch andalso
string:str(Path, "_build/default/plugins") =:= 0.
%% Ports
base_port(Number) ->
10000 + Number * 100.
gen_rpc_port(BasePort) ->
BasePort - 1.
listener_port(BasePort, tcp) ->
BasePort;
listener_port(BasePort, ssl) ->
BasePort + 1;
listener_port(BasePort, quic) ->
BasePort + 2;
listener_port(BasePort, ws) ->
BasePort + 3;
listener_port(BasePort, wss) ->
BasePort + 4.
%% Autocluster helpers
expand_node_specs(Specs, CommonOpts) ->
lists:map(
fun({Spec, Num}) ->
{
case Spec of
core ->
{core, gen_node_name(Num), CommonOpts};
replicant ->
{replicant, gen_node_name(Num), CommonOpts};
{Role, Name} when is_atom(Name) ->
{Role, Name, CommonOpts};
{Role, Opts} when is_list(Opts) ->
Opts1 = maps:from_list(Opts),
{Role, gen_node_name(Num), merge_opts(CommonOpts, Opts1)};
{Role, Name, Opts} when is_list(Opts) ->
Opts1 = maps:from_list(Opts),
{Role, Name, merge_opts(CommonOpts, Opts1)};
{Role, Opts} ->
{Role, gen_node_name(Num), merge_opts(CommonOpts, Opts)};
{Role, Name, Opts} ->
{Role, Name, merge_opts(CommonOpts, Opts)}
end,
Num
}
end,
Specs
).

View File

@ -35,7 +35,7 @@ init_per_suite(Config) ->
ignored ->
%% calling `net_kernel:start' without `epmd'
%% running will result in a failure.
start_epmd(),
emqx_common_test_helpers:start_epmd(),
{ok, Pid} = net_kernel:start(['test@127.0.0.1', longnames]),
Pid;
_ ->
@ -60,7 +60,7 @@ init_per_testcase(TestCase, Config) when
TestCase =:= t_cleanup_monitor_node_down
->
ok = snabbkaffe:start_trace(),
Slave = start_slave(some_node),
Slave = emqx_common_test_helpers:start_slave(some_node, []),
[{slave, Slave} | Config];
init_per_testcase(_TestCase, Config) ->
Config.
@ -71,7 +71,7 @@ end_per_testcase(TestCase, Config) when
TestCase =:= t_cleanup_monitor_node_down
->
Slave = ?config(slave, Config),
stop_slave(Slave),
emqx_common_test_helpers:stop_slave(Slave),
mria:transaction(?ROUTE_SHARD, fun() -> mnesia:clear_table(?ROUTE_TAB) end),
snabbkaffe:stop(),
ok;
@ -120,7 +120,7 @@ t_cleanup_monitor_node_down(Config) ->
emqx_router:add_route(<<"d/e/f">>, node()),
?assertMatch([_, _], emqx_router:topics()),
?wait_async_action(
stop_slave(Slave),
emqx_common_test_helpers:stop_slave(Slave),
#{?snk_kind := emqx_router_helper_cleanup_done, node := Slave},
1_000
),
@ -130,39 +130,3 @@ t_message(_) ->
?ROUTER_HELPER ! testing,
gen_server:cast(?ROUTER_HELPER, testing),
gen_server:call(?ROUTER_HELPER, testing).
%%------------------------------------------------------------------------------
%% Internal functions
%%------------------------------------------------------------------------------
start_epmd() ->
[] = os:cmd("\"" ++ epmd_path() ++ "\" -daemon"),
ok.
epmd_path() ->
case os:find_executable("epmd") of
false ->
ct:pal(critical, "Could not find epmd.~n"),
exit(epmd_not_found);
GlobalEpmd ->
GlobalEpmd
end.
start_slave(Name) ->
% We want VMs to only occupy a single core
CommonBeamOpts = "+S 1:1 ",
{ok, Node} = slave:start_link(host(), Name, CommonBeamOpts ++ ebin_path()),
Node.
stop_slave(Node) ->
slave:stop(Node).
host() ->
[_, Host] = string:tokens(atom_to_list(node()), "@"),
Host.
ebin_path() ->
string:join(["-pa" | lists:filter(fun is_lib/1, code:get_path())], " ").
is_lib(Path) ->
string:prefix(Path, code:lib_dir()) =:= nomatch.

View File

@ -28,17 +28,18 @@ all() ->
emqx_common_test_helpers:all(?MODULE).
t_copy_conf_override_on_restarts(_Config) ->
net_kernel:start(['master@127.0.0.1', longnames]),
ct:timetrap({seconds, 120}),
snabbkaffe:fix_ct_logging(),
Cluster = cluster([core, core, core]),
%% 1. Start all nodes
Nodes = start_cluster(Cluster),
try
%% 1. Start all nodes
Nodes = start_cluster(Cluster),
[join_cluster(Spec) || Spec <- Cluster],
assert_config_load_done(Nodes),
%% 2. Stop each in order.
lists:foreach(fun stop_slave/1, Nodes),
stop_cluster(Nodes),
%% 3. Restart nodes in the same order. This should not
%% crash and eventually all nodes should be ready.
@ -50,7 +51,7 @@ t_copy_conf_override_on_restarts(_Config) ->
ok
after
teardown_cluster(Cluster)
stop_cluster(Nodes)
end.
%%------------------------------------------------------------------------------
@ -66,157 +67,37 @@ assert_config_load_done(Nodes) ->
Nodes
).
stop_cluster(Nodes) ->
[emqx_common_test_helpers:stop_slave(Node) || Node <- Nodes].
start_cluster(Specs) ->
[start_slave(I) || I <- Specs].
[emqx_common_test_helpers:start_slave(Name, Opts) || {Name, Opts} <- Specs].
start_cluster_async(Specs) ->
[
begin
spawn_link(fun() -> start_slave(I) end),
Opts1 = maps:remove(join_to, Opts),
spawn_link(fun() -> emqx_common_test_helpers:start_slave(Name, Opts1) end),
timer:sleep(7_000)
end
|| I <- Specs
|| {Name, Opts} <- Specs
].
cluster(Specs) ->
cluster(Specs, []).
cluster(Specs0, CommonEnv) ->
Specs1 = lists:zip(Specs0, lists:seq(1, length(Specs0))),
Specs = expand_node_specs(Specs1, CommonEnv),
CoreNodes = [node_id(Name) || {{core, Name, _}, _} <- Specs],
%% Assign grpc ports:
BaseGenRpcPort = 9000,
GenRpcPorts = maps:from_list([
{node_id(Name), {tcp, BaseGenRpcPort + Num}}
|| {{_, Name, _}, Num} <- Specs
]),
%% Set the default node of the cluster:
JoinTo =
case CoreNodes of
[First | _] -> #{join_to => First};
_ -> #{}
end,
[
JoinTo#{
name => Name,
node => node_id(Name),
env => [
{mria, core_nodes, CoreNodes},
{mria, node_role, Role},
{gen_rpc, tcp_server_port, BaseGenRpcPort + Number},
{gen_rpc, client_config_per_node, {internal, GenRpcPorts}}
| Env
],
number => Number,
role => Role
}
|| {{Role, Name, Env}, Number} <- Specs
].
start_apps(Node) ->
Handler = fun
(emqx) ->
application:set_env(emqx, boot_modules, []),
ok;
(_) ->
ok
end,
{Node, ok} =
{Node, rpc:call(Node, emqx_common_test_helpers, start_apps, [[emqx_conf], Handler])},
ok.
stop_apps(Node) ->
ok = rpc:call(Node, emqx_common_test_helpers, stop_apps, [[emqx_conf]]).
join_cluster(#{node := Node, join_to := JoinTo}) ->
case rpc:call(Node, ekka, join, [JoinTo]) of
ok -> ok;
ignore -> ok;
Err -> error({failed_to_join_cluster, #{node => Node, error => Err}})
end.
start_slave(#{node := Node, env := Env}) ->
%% We want VMs to only occupy a single core
CommonBeamOpts =
"+S 1:1 " ++
%% redirect logs to the master test node
" -master " ++ atom_to_list(node()) ++ " ",
%% We use `ct_slave' instead of `slave' because, in
%% `t_copy_conf_override_on_restarts', the nodes might be stuck
%% some time during boot up, and `slave' has a hard-coded boot
%% timeout.
{ok, Node} = ct_slave:start(
Node,
[
{erl_flags, CommonBeamOpts ++ ebin_path()},
{kill_if_fail, true},
{monitor_master, true},
{init_timeout, 30_000},
{startup_timeout, 30_000}
]
),
%% Load apps before setting the enviroment variables to avoid
%% overriding the environment during app start:
[rpc:call(Node, application, load, [App]) || App <- [gen_rpc]],
%% Disable gen_rpc listener by default:
Env1 = [{gen_rpc, tcp_server_port, false} | Env],
setenv(Node, Env1),
ok = start_apps(Node),
Node.
expand_node_specs(Specs, CommonEnv) ->
lists:map(
fun({Spec, Num}) ->
{
case Spec of
core ->
{core, gen_node_name(Num), CommonEnv};
replicant ->
{replicant, gen_node_name(Num), CommonEnv};
{Role, Name} when is_atom(Name) ->
{Role, Name, CommonEnv};
{Role, Env} when is_list(Env) ->
{Role, gen_node_name(Num), CommonEnv ++ Env};
{Role, Name, Env} ->
{Role, Name, CommonEnv ++ Env}
end,
Num
}
end,
Specs
).
setenv(Node, Env) ->
[rpc:call(Node, application, set_env, [App, Key, Val]) || {App, Key, Val} <- Env].
teardown_cluster(Specs) ->
Nodes = [I || #{node := I} <- Specs],
[rpc:call(I, emqx_common_test_helpers, stop_apps, [emqx_conf]) || I <- Nodes],
[stop_slave(I) || I <- Nodes],
ok.
stop_slave(Node) ->
ct_slave:stop(Node).
host() ->
[_, Host] = string:tokens(atom_to_list(node()), "@"),
Host.
node_id(Name) ->
list_to_atom(lists:concat([Name, "@", host()])).
gen_node_name(N) ->
list_to_atom("n" ++ integer_to_list(N)).
ebin_path() ->
string:join(["-pa" | paths()], " ").
paths() ->
[
Path
|| Path <- code:get_path(),
string:prefix(Path, code:lib_dir()) =:= nomatch,
string:str(Path, "_build/default/plugins") =:= 0
].
Env = [
{emqx, init_config_load_done, false},
{emqx, boot_modules, []}
],
emqx_common_test_helpers:emqx_cluster(Specs, [
{env, Env},
{apps, [emqx_conf]},
{load_schema, false},
{join_to, false},
{env_handler, fun
(emqx) ->
application:set_env(emqx, boot_modules, []),
ok;
(_) ->
ok
end}
]).

View File

@ -33,6 +33,7 @@
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
net_kernel:start(['master@127.0.0.1', longnames]),
ok = meck:new(emqx_authz, [non_strict, passthrough, no_history, no_link]),
meck:expect(
emqx_authz,
@ -159,7 +160,6 @@ init_per_testcase(t_exhook_info, Config) ->
Config;
init_per_testcase(t_cluster_uuid, Config) ->
Node = start_slave(n1),
ok = setup_slave(Node),
[{n1, Node} | Config];
init_per_testcase(t_uuid_restored_from_file, Config) ->
mock_httpc(),
@ -184,7 +184,6 @@ init_per_testcase(t_uuid_restored_from_file, Config) ->
fun set_special_configs/1
),
Node = start_slave(n1),
ok = setup_slave(Node),
[
{n1, Node},
{node_uuid, NodeUUID},
@ -801,12 +800,6 @@ set_special_configs(emqx_authz) ->
set_special_configs(_App) ->
ok.
start_slave(Name) ->
% We want VMs to only occupy a single core
CommonBeamOpts = "+S 1:1 ",
{ok, Node} = slave:start_link(host(), Name, CommonBeamOpts ++ ebin_path()),
Node.
%% for some unknown reason, gen_rpc running locally or in CI might
%% start with different `port_discovery' modes, which means that'll
%% either be listening at the port in the config (`tcp_server_port',
@ -822,57 +815,41 @@ find_gen_rpc_port() ->
{ok, {_, Port}} = inet:sockname(EPort),
Port.
setup_slave(Node) ->
TestNode = node(),
start_slave(Name) ->
Port = find_gen_rpc_port(),
[ok = rpc:call(Node, application, load, [App]) || App <- [gen_rpc, emqx]],
ok = rpc:call(
Node,
application,
set_env,
[gen_rpc, tcp_server_port, 9002]
),
ok = rpc:call(
Node,
application,
set_env,
[gen_rpc, client_config_per_node, {internal, #{TestNode => Port}}]
),
ok = rpc:call(
Node,
application,
set_env,
[gen_rpc, port_discovery, manual]
),
TestNode = node(),
Handler =
fun
(emqx) ->
application:set_env(
emqx,
boot_modules,
[]
),
application:set_env(emqx, boot_modules, []),
ekka:join(TestNode),
emqx_common_test_helpers:load_config(
emqx_modules_schema, ?BASE_CONF, #{raw_with_default => true}
),
ok;
(_) ->
(_App) ->
emqx_common_test_helpers:load_config(
emqx_modules_schema, ?BASE_CONF, #{raw_with_default => true}
),
ok
end,
ok = rpc:call(
Node,
emqx_common_test_helpers,
load_config,
[emqx_modules_schema, ?BASE_CONF, #{raw_with_default => true}]
),
ok = rpc:call(
Node,
emqx_common_test_helpers,
start_apps,
[
[emqx_conf, emqx_modules],
Handler
]
),
ok.
Opts = #{
env => [
{gen_rpc, tcp_server_port, 9002},
{gen_rpc, port_discovery, manual},
{gen_rpc, client_config_per_node, {internal, #{TestNode => Port}}}
],
load_schema => false,
configure_gen_rpc => false,
env_handler => Handler,
load_apps => [gen_rpc, emqx],
listener_ports => [],
apps => [emqx_conf, emqx_modules]
},
emqx_common_test_helpers:start_slave(Name, Opts).
stop_slave(Node) ->
% This line don't work!!
@ -892,18 +869,3 @@ leave_cluster() ->
application:set_env(mria, db_backend, mnesia),
ekka:leave()
end.
host() ->
[_, Host] = string:tokens(atom_to_list(node()), "@"),
Host.
ebin_path() ->
string:join(["-pa" | paths()], " ").
paths() ->
[
Path
|| Path <- code:get_path(),
string:prefix(Path, code:lib_dir()) =:= nomatch,
string:str(Path, "_build/default/plugins") =:= 0
].