From c29eb8a4098fce024b0e0459681df27fc896fd4c Mon Sep 17 00:00:00 2001 From: Georgy Sychev Date: Thu, 5 May 2022 00:03:40 +0400 Subject: [PATCH] feat(testing): Multinode testing helper --- apps/emqx/test/emqx_common_test_helpers.erl | 274 +++++++++++++++++- apps/emqx/test/emqx_router_helper_SUITE.erl | 44 +-- apps/emqx_conf/test/emqx_conf_app_SUITE.erl | 179 ++---------- .../test/emqx_telemetry_SUITE.erl | 94 ++---- 4 files changed, 325 insertions(+), 266 deletions(-) diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl index 38064914c..dd9a56841 100644 --- a/apps/emqx/test/emqx_common_test_helpers.erl +++ b/apps/emqx/test/emqx_common_test_helpers.erl @@ -38,25 +38,32 @@ ]). -export([ - ensure_mnesia_stopped/0, - wait_for/4, change_emqx_opts/1, change_emqx_opts/2, - client_ssl_twoway/0, - client_ssl_twoway/1, client_ssl/0, client_ssl/1, - wait_mqtt_payload/1, - not_wait_mqtt_payload/1, - render_config_file/2, - read_schema_configs/2, + client_ssl_twoway/0, + client_ssl_twoway/1, + ensure_mnesia_stopped/0, + ensure_quic_listener/2, + is_tcp_server_available/2, + is_tcp_server_available/3, load_config/2, load_config/3, - is_tcp_server_available/2, - is_tcp_server_available/3 + not_wait_mqtt_payload/1, + read_schema_configs/2, + render_config_file/2, + wait_for/4, + wait_mqtt_payload/1 ]). --export([ensure_quic_listener/2]). +-export([ + emqx_cluster/1, + emqx_cluster/2, + start_epmd/0, + start_slave/2, + stop_slave/1 +]). -define(CERTS_PATH(CertName), filename:join(["etc", "certs", CertName])). @@ -542,3 +549,248 @@ ensure_quic_listener(Name, UdpPort) -> ok -> ok; {error, {already_started, _Pid}} -> ok end. + +%% +%% Clusterisation and multi-node testing +%% + +emqx_cluster(Specs) -> + emqx_cluster(Specs, #{}). + +emqx_cluster(Specs, CommonOpts) when is_list(CommonOpts) -> + emqx_cluster(Specs, maps:from_list(CommonOpts)); +emqx_cluster(Specs0, CommonOpts) -> + Specs1 = lists:zip(Specs0, lists:seq(1, length(Specs0))), + Specs = expand_node_specs(Specs1, CommonOpts), + CoreNodes = [node_name(Name) || {{core, Name, _}, _} <- Specs], + %% Assign grpc ports: + GenRpcPorts = maps:from_list([ + {node_name(Name), {tcp, gen_rpc_port(base_port(Num))}} + || {{_, Name, _}, Num} <- Specs + ]), + %% Set the default node of the cluster: + JoinTo = + case CoreNodes of + [First | _] -> First; + _ -> undefined + end, + [ + {Name, + merge_opts(Opts, #{ + base_port => base_port(Number), + join_to => JoinTo, + env => [ + {mria, core_nodes, CoreNodes}, + {mria, node_role, Role}, + {gen_rpc, client_config_per_node, {internal, GenRpcPorts}} + ] + })} + || {{Role, Name, Opts}, Number} <- Specs + ]. + +%% Lower level starting API +start_slave(Name, Opts) -> + {ok, Node} = ct_slave:start( + list_to_atom(atom_to_list(Name) ++ "@" ++ host()), + [ + {kill_if_fail, true}, + {monitor_master, true}, + {init_timeout, 10000}, + {startup_timeout, 10000}, + {erl_flags, erl_flags()} + ] + ), + + pong = net_adm:ping(Node), + setup_node(Node, Opts), + Node. + +%% Node stopping +stop_slave(Node) -> + ct_slave:stop(Node). + +%% EPMD starting +start_epmd() -> + [] = os:cmd("\"" ++ epmd_path() ++ "\" -daemon"), + ok. + +epmd_path() -> + case os:find_executable("epmd") of + false -> + ct:pal(critical, "Could not find epmd.~n"), + exit(epmd_not_found); + GlobalEpmd -> + GlobalEpmd + end. + +%% Node initialization + +setup_node(Node, Opts) when is_list(Opts) -> + setup_node(Node, maps:from_list(Opts)); +setup_node(Node, Opts) when is_map(Opts) -> + %% Default base port is selected upon Node from 1100 to 65530 with step 10 + BasePort = maps:get(base_port, Opts, 1100 + erlang:phash2(Node, 6553 - 110) * 10), + Apps = maps:get(apps, Opts, []), + StartApps = maps:get(start_apps, Opts, true), + JoinTo = maps:get(join_to, Opts, undefined), + EnvHandler = maps:get(env_handler, Opts, fun(_) -> ok end), + ConfigureGenRpc = maps:get(configure_gen_rpc, Opts, true), + LoadSchema = maps:get(load_schema, Opts, true), + LoadApps = maps:get(load_apps, Opts, [gen_rpc, emqx, ekka, mria] ++ Apps), + Env = maps:get(env, Opts, []), + Conf = maps:get(conf, Opts, []), + ListenerPorts = maps:get(listener_ports, Opts, [ + {Type, listener_port(BasePort, Type)} + || Type <- [tcp, ssl, ws, wss] + ]), + + %% Load env before doing anything to avoid overriding + [ok = rpc:call(Node, application, load, [App]) || App <- LoadApps], + + %% Needs to be set explicitly because ekka:start() (which calls `gen`) is called without Handler + %% in emqx_common_test_helpers:start_apps(...) + ConfigureGenRpc andalso + begin + ok = rpc:call(Node, application, set_env, [ + gen_rpc, tcp_server_port, gen_rpc_port(BasePort) + ]), + ok = rpc:call(Node, application, set_env, [gen_rpc, port_discovery, manual]) + end, + + %% Setting env before starting any applications + [ + ok = rpc:call(Node, application, set_env, [Application, Key, Value]) + || {Application, Key, Value} <- Env + ], + + %% Here we start the apps + EnvHandlerForRpc = + fun(App) -> + %% We load configuration, and than set the special enviroment variable + %% which says that emqx shouldn't load configuration at startup + %% Otherwise, configuration get's loaded and all preset env in envhandler is lost + LoadSchema andalso + begin + emqx_config:init_load(emqx_schema), + application:set_env(emqx, init_config_load_done, true) + end, + + %% Need to set this otherwise listeners will conflict between each other + [ + ok = emqx_config:put([listeners, Type, default, bind], { + {127, 0, 0, 1}, Port + }) + || {Type, Port} <- ListenerPorts + ], + + [ok = emqx_config:put(KeyPath, Value) || {KeyPath, Value} <- Conf], + ok = EnvHandler(App), + ok + end, + + StartApps andalso + begin + ok = rpc:call(Node, emqx_common_test_helpers, start_apps, [Apps, EnvHandlerForRpc]) + end, + + %% Join the cluster if JoinTo is specified + case JoinTo of + undefined -> + ok; + _ -> + case rpc:call(Node, ekka, join, [JoinTo]) of + ok -> + ok; + ignore -> + ok; + Err -> + stop_slave(Node), + error({failed_to_join_cluster, #{node => Node, error => Err}}) + end + end, + ok. + +%% Helpers + +node_name(Name) -> + list_to_atom(lists:concat([Name, "@", host()])). + +gen_node_name(Num) -> + list_to_atom("autocluster_node" ++ integer_to_list(Num)). + +host() -> + [_, Host] = string:tokens(atom_to_list(node()), "@"), + Host. + +merge_opts(Opts1, Opts2) -> + maps:merge_with( + fun + (env, Env1, Env2) -> lists:usort(Env2 ++ Env1); + (conf, Conf1, Conf2) -> lists:usort(Conf2 ++ Conf1); + (apps, Apps1, Apps2) -> lists:usort(Apps2 ++ Apps1); + (load_apps, Apps1, Apps2) -> lists:usort(Apps2 ++ Apps1); + (_Option, _Old, Value) -> Value + end, + Opts1, + Opts2 + ). + +erl_flags() -> + %% One core and redirecting logs to master + "+S 1:1 -master " ++ atom_to_list(node()) ++ " " ++ ebin_path(). + +ebin_path() -> + string:join(["-pa" | lists:filter(fun is_lib/1, code:get_path())], " "). + +is_lib(Path) -> + string:prefix(Path, code:lib_dir()) =:= nomatch andalso + string:str(Path, "_build/default/plugins") =:= 0. + +%% Ports + +base_port(Number) -> + 10000 + Number * 100. + +gen_rpc_port(BasePort) -> + BasePort - 1. + +listener_port(BasePort, tcp) -> + BasePort; +listener_port(BasePort, ssl) -> + BasePort + 1; +listener_port(BasePort, quic) -> + BasePort + 2; +listener_port(BasePort, ws) -> + BasePort + 3; +listener_port(BasePort, wss) -> + BasePort + 4. + +%% Autocluster helpers + +expand_node_specs(Specs, CommonOpts) -> + lists:map( + fun({Spec, Num}) -> + { + case Spec of + core -> + {core, gen_node_name(Num), CommonOpts}; + replicant -> + {replicant, gen_node_name(Num), CommonOpts}; + {Role, Name} when is_atom(Name) -> + {Role, Name, CommonOpts}; + {Role, Opts} when is_list(Opts) -> + Opts1 = maps:from_list(Opts), + {Role, gen_node_name(Num), merge_opts(CommonOpts, Opts1)}; + {Role, Name, Opts} when is_list(Opts) -> + Opts1 = maps:from_list(Opts), + {Role, Name, merge_opts(CommonOpts, Opts1)}; + {Role, Opts} -> + {Role, gen_node_name(Num), merge_opts(CommonOpts, Opts)}; + {Role, Name, Opts} -> + {Role, Name, merge_opts(CommonOpts, Opts)} + end, + Num + } + end, + Specs + ). diff --git a/apps/emqx/test/emqx_router_helper_SUITE.erl b/apps/emqx/test/emqx_router_helper_SUITE.erl index 4c96091c5..70f07e0e3 100644 --- a/apps/emqx/test/emqx_router_helper_SUITE.erl +++ b/apps/emqx/test/emqx_router_helper_SUITE.erl @@ -35,7 +35,7 @@ init_per_suite(Config) -> ignored -> %% calling `net_kernel:start' without `epmd' %% running will result in a failure. - start_epmd(), + emqx_common_test_helpers:start_epmd(), {ok, Pid} = net_kernel:start(['test@127.0.0.1', longnames]), Pid; _ -> @@ -60,7 +60,7 @@ init_per_testcase(TestCase, Config) when TestCase =:= t_cleanup_monitor_node_down -> ok = snabbkaffe:start_trace(), - Slave = start_slave(some_node), + Slave = emqx_common_test_helpers:start_slave(some_node, []), [{slave, Slave} | Config]; init_per_testcase(_TestCase, Config) -> Config. @@ -71,7 +71,7 @@ end_per_testcase(TestCase, Config) when TestCase =:= t_cleanup_monitor_node_down -> Slave = ?config(slave, Config), - stop_slave(Slave), + emqx_common_test_helpers:stop_slave(Slave), mria:transaction(?ROUTE_SHARD, fun() -> mnesia:clear_table(?ROUTE_TAB) end), snabbkaffe:stop(), ok; @@ -120,7 +120,7 @@ t_cleanup_monitor_node_down(Config) -> emqx_router:add_route(<<"d/e/f">>, node()), ?assertMatch([_, _], emqx_router:topics()), ?wait_async_action( - stop_slave(Slave), + emqx_common_test_helpers:stop_slave(Slave), #{?snk_kind := emqx_router_helper_cleanup_done, node := Slave}, 1_000 ), @@ -130,39 +130,3 @@ t_message(_) -> ?ROUTER_HELPER ! testing, gen_server:cast(?ROUTER_HELPER, testing), gen_server:call(?ROUTER_HELPER, testing). - -%%------------------------------------------------------------------------------ -%% Internal functions -%%------------------------------------------------------------------------------ - -start_epmd() -> - [] = os:cmd("\"" ++ epmd_path() ++ "\" -daemon"), - ok. - -epmd_path() -> - case os:find_executable("epmd") of - false -> - ct:pal(critical, "Could not find epmd.~n"), - exit(epmd_not_found); - GlobalEpmd -> - GlobalEpmd - end. - -start_slave(Name) -> - % We want VMs to only occupy a single core - CommonBeamOpts = "+S 1:1 ", - {ok, Node} = slave:start_link(host(), Name, CommonBeamOpts ++ ebin_path()), - Node. - -stop_slave(Node) -> - slave:stop(Node). - -host() -> - [_, Host] = string:tokens(atom_to_list(node()), "@"), - Host. - -ebin_path() -> - string:join(["-pa" | lists:filter(fun is_lib/1, code:get_path())], " "). - -is_lib(Path) -> - string:prefix(Path, code:lib_dir()) =:= nomatch. diff --git a/apps/emqx_conf/test/emqx_conf_app_SUITE.erl b/apps/emqx_conf/test/emqx_conf_app_SUITE.erl index 29b1404e8..f10e42759 100644 --- a/apps/emqx_conf/test/emqx_conf_app_SUITE.erl +++ b/apps/emqx_conf/test/emqx_conf_app_SUITE.erl @@ -28,17 +28,18 @@ all() -> emqx_common_test_helpers:all(?MODULE). t_copy_conf_override_on_restarts(_Config) -> + net_kernel:start(['master@127.0.0.1', longnames]), ct:timetrap({seconds, 120}), snabbkaffe:fix_ct_logging(), Cluster = cluster([core, core, core]), + + %% 1. Start all nodes + Nodes = start_cluster(Cluster), try - %% 1. Start all nodes - Nodes = start_cluster(Cluster), - [join_cluster(Spec) || Spec <- Cluster], assert_config_load_done(Nodes), %% 2. Stop each in order. - lists:foreach(fun stop_slave/1, Nodes), + stop_cluster(Nodes), %% 3. Restart nodes in the same order. This should not %% crash and eventually all nodes should be ready. @@ -50,7 +51,7 @@ t_copy_conf_override_on_restarts(_Config) -> ok after - teardown_cluster(Cluster) + stop_cluster(Nodes) end. %%------------------------------------------------------------------------------ @@ -66,157 +67,37 @@ assert_config_load_done(Nodes) -> Nodes ). +stop_cluster(Nodes) -> + [emqx_common_test_helpers:stop_slave(Node) || Node <- Nodes]. + start_cluster(Specs) -> - [start_slave(I) || I <- Specs]. + [emqx_common_test_helpers:start_slave(Name, Opts) || {Name, Opts} <- Specs]. start_cluster_async(Specs) -> [ begin - spawn_link(fun() -> start_slave(I) end), + Opts1 = maps:remove(join_to, Opts), + spawn_link(fun() -> emqx_common_test_helpers:start_slave(Name, Opts1) end), timer:sleep(7_000) end - || I <- Specs + || {Name, Opts} <- Specs ]. cluster(Specs) -> - cluster(Specs, []). - -cluster(Specs0, CommonEnv) -> - Specs1 = lists:zip(Specs0, lists:seq(1, length(Specs0))), - Specs = expand_node_specs(Specs1, CommonEnv), - CoreNodes = [node_id(Name) || {{core, Name, _}, _} <- Specs], - %% Assign grpc ports: - BaseGenRpcPort = 9000, - GenRpcPorts = maps:from_list([ - {node_id(Name), {tcp, BaseGenRpcPort + Num}} - || {{_, Name, _}, Num} <- Specs - ]), - %% Set the default node of the cluster: - JoinTo = - case CoreNodes of - [First | _] -> #{join_to => First}; - _ -> #{} - end, - [ - JoinTo#{ - name => Name, - node => node_id(Name), - env => [ - {mria, core_nodes, CoreNodes}, - {mria, node_role, Role}, - {gen_rpc, tcp_server_port, BaseGenRpcPort + Number}, - {gen_rpc, client_config_per_node, {internal, GenRpcPorts}} - | Env - ], - number => Number, - role => Role - } - || {{Role, Name, Env}, Number} <- Specs - ]. - -start_apps(Node) -> - Handler = fun - (emqx) -> - application:set_env(emqx, boot_modules, []), - ok; - (_) -> - ok - end, - {Node, ok} = - {Node, rpc:call(Node, emqx_common_test_helpers, start_apps, [[emqx_conf], Handler])}, - ok. - -stop_apps(Node) -> - ok = rpc:call(Node, emqx_common_test_helpers, stop_apps, [[emqx_conf]]). - -join_cluster(#{node := Node, join_to := JoinTo}) -> - case rpc:call(Node, ekka, join, [JoinTo]) of - ok -> ok; - ignore -> ok; - Err -> error({failed_to_join_cluster, #{node => Node, error => Err}}) - end. - -start_slave(#{node := Node, env := Env}) -> - %% We want VMs to only occupy a single core - CommonBeamOpts = - "+S 1:1 " ++ - %% redirect logs to the master test node - " -master " ++ atom_to_list(node()) ++ " ", - %% We use `ct_slave' instead of `slave' because, in - %% `t_copy_conf_override_on_restarts', the nodes might be stuck - %% some time during boot up, and `slave' has a hard-coded boot - %% timeout. - {ok, Node} = ct_slave:start( - Node, - [ - {erl_flags, CommonBeamOpts ++ ebin_path()}, - {kill_if_fail, true}, - {monitor_master, true}, - {init_timeout, 30_000}, - {startup_timeout, 30_000} - ] - ), - - %% Load apps before setting the enviroment variables to avoid - %% overriding the environment during app start: - [rpc:call(Node, application, load, [App]) || App <- [gen_rpc]], - %% Disable gen_rpc listener by default: - Env1 = [{gen_rpc, tcp_server_port, false} | Env], - setenv(Node, Env1), - ok = start_apps(Node), - Node. - -expand_node_specs(Specs, CommonEnv) -> - lists:map( - fun({Spec, Num}) -> - { - case Spec of - core -> - {core, gen_node_name(Num), CommonEnv}; - replicant -> - {replicant, gen_node_name(Num), CommonEnv}; - {Role, Name} when is_atom(Name) -> - {Role, Name, CommonEnv}; - {Role, Env} when is_list(Env) -> - {Role, gen_node_name(Num), CommonEnv ++ Env}; - {Role, Name, Env} -> - {Role, Name, CommonEnv ++ Env} - end, - Num - } - end, - Specs - ). - -setenv(Node, Env) -> - [rpc:call(Node, application, set_env, [App, Key, Val]) || {App, Key, Val} <- Env]. - -teardown_cluster(Specs) -> - Nodes = [I || #{node := I} <- Specs], - [rpc:call(I, emqx_common_test_helpers, stop_apps, [emqx_conf]) || I <- Nodes], - [stop_slave(I) || I <- Nodes], - ok. - -stop_slave(Node) -> - ct_slave:stop(Node). - -host() -> - [_, Host] = string:tokens(atom_to_list(node()), "@"), - Host. - -node_id(Name) -> - list_to_atom(lists:concat([Name, "@", host()])). - -gen_node_name(N) -> - list_to_atom("n" ++ integer_to_list(N)). - -ebin_path() -> - string:join(["-pa" | paths()], " "). - -paths() -> - [ - Path - || Path <- code:get_path(), - string:prefix(Path, code:lib_dir()) =:= nomatch, - string:str(Path, "_build/default/plugins") =:= 0 - ]. + Env = [ + {emqx, init_config_load_done, false}, + {emqx, boot_modules, []} + ], + emqx_common_test_helpers:emqx_cluster(Specs, [ + {env, Env}, + {apps, [emqx_conf]}, + {load_schema, false}, + {join_to, false}, + {env_handler, fun + (emqx) -> + application:set_env(emqx, boot_modules, []), + ok; + (_) -> + ok + end} + ]). diff --git a/apps/emqx_modules/test/emqx_telemetry_SUITE.erl b/apps/emqx_modules/test/emqx_telemetry_SUITE.erl index 022fb0b5f..cd25a3f51 100644 --- a/apps/emqx_modules/test/emqx_telemetry_SUITE.erl +++ b/apps/emqx_modules/test/emqx_telemetry_SUITE.erl @@ -33,6 +33,7 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> + net_kernel:start(['master@127.0.0.1', longnames]), ok = meck:new(emqx_authz, [non_strict, passthrough, no_history, no_link]), meck:expect( emqx_authz, @@ -159,7 +160,6 @@ init_per_testcase(t_exhook_info, Config) -> Config; init_per_testcase(t_cluster_uuid, Config) -> Node = start_slave(n1), - ok = setup_slave(Node), [{n1, Node} | Config]; init_per_testcase(t_uuid_restored_from_file, Config) -> mock_httpc(), @@ -184,7 +184,6 @@ init_per_testcase(t_uuid_restored_from_file, Config) -> fun set_special_configs/1 ), Node = start_slave(n1), - ok = setup_slave(Node), [ {n1, Node}, {node_uuid, NodeUUID}, @@ -801,12 +800,6 @@ set_special_configs(emqx_authz) -> set_special_configs(_App) -> ok. -start_slave(Name) -> - % We want VMs to only occupy a single core - CommonBeamOpts = "+S 1:1 ", - {ok, Node} = slave:start_link(host(), Name, CommonBeamOpts ++ ebin_path()), - Node. - %% for some unknown reason, gen_rpc running locally or in CI might %% start with different `port_discovery' modes, which means that'll %% either be listening at the port in the config (`tcp_server_port', @@ -822,57 +815,41 @@ find_gen_rpc_port() -> {ok, {_, Port}} = inet:sockname(EPort), Port. -setup_slave(Node) -> - TestNode = node(), +start_slave(Name) -> Port = find_gen_rpc_port(), - [ok = rpc:call(Node, application, load, [App]) || App <- [gen_rpc, emqx]], - ok = rpc:call( - Node, - application, - set_env, - [gen_rpc, tcp_server_port, 9002] - ), - ok = rpc:call( - Node, - application, - set_env, - [gen_rpc, client_config_per_node, {internal, #{TestNode => Port}}] - ), - ok = rpc:call( - Node, - application, - set_env, - [gen_rpc, port_discovery, manual] - ), + TestNode = node(), Handler = fun (emqx) -> - application:set_env( - emqx, - boot_modules, - [] - ), + application:set_env(emqx, boot_modules, []), ekka:join(TestNode), + emqx_common_test_helpers:load_config( + emqx_modules_schema, ?BASE_CONF, #{raw_with_default => true} + ), + ok; - (_) -> + (_App) -> + emqx_common_test_helpers:load_config( + emqx_modules_schema, ?BASE_CONF, #{raw_with_default => true} + ), ok end, - ok = rpc:call( - Node, - emqx_common_test_helpers, - load_config, - [emqx_modules_schema, ?BASE_CONF, #{raw_with_default => true}] - ), - ok = rpc:call( - Node, - emqx_common_test_helpers, - start_apps, - [ - [emqx_conf, emqx_modules], - Handler - ] - ), - ok. + Opts = #{ + env => [ + {gen_rpc, tcp_server_port, 9002}, + {gen_rpc, port_discovery, manual}, + {gen_rpc, client_config_per_node, {internal, #{TestNode => Port}}} + ], + + load_schema => false, + configure_gen_rpc => false, + env_handler => Handler, + load_apps => [gen_rpc, emqx], + listener_ports => [], + apps => [emqx_conf, emqx_modules] + }, + + emqx_common_test_helpers:start_slave(Name, Opts). stop_slave(Node) -> % This line don't work!! @@ -892,18 +869,3 @@ leave_cluster() -> application:set_env(mria, db_backend, mnesia), ekka:leave() end. - -host() -> - [_, Host] = string:tokens(atom_to_list(node()), "@"), - Host. - -ebin_path() -> - string:join(["-pa" | paths()], " "). - -paths() -> - [ - Path - || Path <- code:get_path(), - string:prefix(Path, code:lib_dir()) =:= nomatch, - string:str(Path, "_build/default/plugins") =:= 0 - ].