test: improve cluster helper
1) Make each node have its own isolated data dir to avoid false negatives. 2) Allow parameterizing the peer module. 3) Fix cluster RPC after a node joins the cluster.
This commit is contained in:
parent
561c25f0e3
commit
b9e92173cf
|
@ -37,6 +37,7 @@
|
||||||
deps_path/2,
|
deps_path/2,
|
||||||
flush/0,
|
flush/0,
|
||||||
flush/1,
|
flush/1,
|
||||||
|
load/1,
|
||||||
render_and_load_app_config/1,
|
render_and_load_app_config/1,
|
||||||
render_and_load_app_config/2
|
render_and_load_app_config/2
|
||||||
]).
|
]).
|
||||||
|
@ -637,25 +638,53 @@ emqx_cluster(Specs0, CommonOpts) ->
|
||||||
%% Lower level starting API
|
%% Lower level starting API
|
||||||
|
|
||||||
-spec start_slave(shortname(), node_opts()) -> nodename().
|
-spec start_slave(shortname(), node_opts()) -> nodename().
|
||||||
start_slave(Name, Opts) ->
|
start_slave(Name, Opts) when is_list(Opts) ->
|
||||||
{ok, Node} = ct_slave:start(
|
start_slave(Name, maps:from_list(Opts));
|
||||||
list_to_atom(atom_to_list(Name) ++ "@" ++ host()),
|
start_slave(Name, Opts) when is_map(Opts) ->
|
||||||
[
|
SlaveMod = maps:get(peer_mod, Opts, ct_slave),
|
||||||
{kill_if_fail, true},
|
Node = node_name(Name),
|
||||||
{monitor_master, true},
|
DoStart =
|
||||||
{init_timeout, 10000},
|
fun() ->
|
||||||
{startup_timeout, 10000},
|
case SlaveMod of
|
||||||
{erl_flags, erl_flags()}
|
ct_slave ->
|
||||||
]
|
ct_slave:start(
|
||||||
),
|
Node,
|
||||||
|
[
|
||||||
|
{kill_if_fail, true},
|
||||||
|
{monitor_master, true},
|
||||||
|
{init_timeout, 10000},
|
||||||
|
{startup_timeout, 10000},
|
||||||
|
{erl_flags, erl_flags()}
|
||||||
|
]
|
||||||
|
);
|
||||||
|
slave ->
|
||||||
|
slave:start_link(host(), Name, ebin_path())
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
case DoStart() of
|
||||||
|
{ok, _} ->
|
||||||
|
ok;
|
||||||
|
{error, started_not_connected, _} ->
|
||||||
|
ok;
|
||||||
|
Other ->
|
||||||
|
throw(Other)
|
||||||
|
end,
|
||||||
pong = net_adm:ping(Node),
|
pong = net_adm:ping(Node),
|
||||||
|
put_peer_mod(Node, SlaveMod),
|
||||||
setup_node(Node, Opts),
|
setup_node(Node, Opts),
|
||||||
|
ok = snabbkaffe:forward_trace(Node),
|
||||||
Node.
|
Node.
|
||||||
|
|
||||||
%% Node stopping
|
%% Node stopping
|
||||||
stop_slave(Node) ->
|
stop_slave(Node0) ->
|
||||||
ct_slave:stop(Node).
|
Node = node_name(Node0),
|
||||||
|
SlaveMod = get_peer_mod(Node),
|
||||||
|
erase_peer_mod(Node),
|
||||||
|
case SlaveMod:stop(Node) of
|
||||||
|
ok -> ok;
|
||||||
|
{ok, _} -> ok;
|
||||||
|
{error, not_started, _} -> ok
|
||||||
|
end.
|
||||||
|
|
||||||
%% EPMD starting
|
%% EPMD starting
|
||||||
start_epmd() ->
|
start_epmd() ->
|
||||||
|
@ -693,9 +722,12 @@ setup_node(Node, Opts) when is_map(Opts) ->
|
||||||
{Type, listener_port(BasePort, Type)}
|
{Type, listener_port(BasePort, Type)}
|
||||||
|| Type <- [tcp, ssl, ws, wss]
|
|| Type <- [tcp, ssl, ws, wss]
|
||||||
]),
|
]),
|
||||||
|
%% we need a fresh data dir for each peer node to avoid unintended
|
||||||
|
%% successes due to sharing of data in the cluster.
|
||||||
|
PrivDataDir = maps:get(priv_data_dir, Opts, "/tmp"),
|
||||||
|
|
||||||
%% Load env before doing anything to avoid overriding
|
%% Load env before doing anything to avoid overriding
|
||||||
[ok = rpc:call(Node, application, load, [App]) || App <- LoadApps],
|
lists:foreach(fun(App) -> rpc:call(Node, ?MODULE, load, [App]) end, LoadApps),
|
||||||
|
|
||||||
%% Needs to be set explicitly because ekka:start() (which calls `gen`) is called without Handler
|
%% Needs to be set explicitly because ekka:start() (which calls `gen`) is called without Handler
|
||||||
%% in emqx_common_test_helpers:start_apps(...)
|
%% in emqx_common_test_helpers:start_apps(...)
|
||||||
|
@ -721,7 +753,19 @@ setup_node(Node, Opts) when is_map(Opts) ->
|
||||||
%% Otherwise, configuration gets loaded and all preset env in EnvHandler is lost
|
%% Otherwise, configuration gets loaded and all preset env in EnvHandler is lost
|
||||||
LoadSchema andalso
|
LoadSchema andalso
|
||||||
begin
|
begin
|
||||||
|
%% to avoid sharing data between executions and/or
|
||||||
|
%% nodes. these variables might notbe in the
|
||||||
|
%% config file (e.g.: emqx_ee_conf_schema).
|
||||||
|
NodeDataDir = filename:join([
|
||||||
|
PrivDataDir,
|
||||||
|
node(),
|
||||||
|
integer_to_list(erlang:unique_integer())
|
||||||
|
]),
|
||||||
|
os:putenv("EMQX_NODE__DATA_DIR", NodeDataDir),
|
||||||
|
os:putenv("EMQX_NODE__COOKIE", atom_to_list(erlang:get_cookie())),
|
||||||
emqx_config:init_load(SchemaMod),
|
emqx_config:init_load(SchemaMod),
|
||||||
|
os:unsetenv("EMQX_NODE__DATA_DIR"),
|
||||||
|
os:unsetenv("EMQX_NODE__COOKIE"),
|
||||||
application:set_env(emqx, init_config_load_done, true)
|
application:set_env(emqx, init_config_load_done, true)
|
||||||
end,
|
end,
|
||||||
|
|
||||||
|
@ -750,6 +794,11 @@ setup_node(Node, Opts) when is_map(Opts) ->
|
||||||
_ ->
|
_ ->
|
||||||
case rpc:call(Node, ekka, join, [JoinTo]) of
|
case rpc:call(Node, ekka, join, [JoinTo]) of
|
||||||
ok ->
|
ok ->
|
||||||
|
%% fix cluster rpc, as the conf app is not
|
||||||
|
%% restarted with the current test procedure.
|
||||||
|
StartApps andalso
|
||||||
|
lists:member(emqx_conf, Apps) andalso
|
||||||
|
(ok = erpc:call(Node, emqx_cluster_rpc, reset, [])),
|
||||||
ok;
|
ok;
|
||||||
ignore ->
|
ignore ->
|
||||||
ok;
|
ok;
|
||||||
|
@ -762,8 +811,27 @@ setup_node(Node, Opts) when is_map(Opts) ->
|
||||||
|
|
||||||
%% Helpers
|
%% Helpers
|
||||||
|
|
||||||
|
put_peer_mod(Node, SlaveMod) ->
|
||||||
|
put({?MODULE, Node}, SlaveMod),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
get_peer_mod(Node) ->
|
||||||
|
case get({?MODULE, Node}) of
|
||||||
|
undefined -> ct_slave;
|
||||||
|
SlaveMod -> SlaveMod
|
||||||
|
end.
|
||||||
|
|
||||||
|
erase_peer_mod(Node) ->
|
||||||
|
erase({?MODULE, Node}).
|
||||||
|
|
||||||
node_name(Name) ->
|
node_name(Name) ->
|
||||||
list_to_atom(lists:concat([Name, "@", host()])).
|
case string:tokens(atom_to_list(Name), "@") of
|
||||||
|
[_Name, _Host] ->
|
||||||
|
%% the name already has a @
|
||||||
|
Name;
|
||||||
|
_ ->
|
||||||
|
list_to_atom(atom_to_list(Name) ++ "@" ++ host())
|
||||||
|
end.
|
||||||
|
|
||||||
gen_node_name(Num) ->
|
gen_node_name(Num) ->
|
||||||
list_to_atom("autocluster_node" ++ integer_to_list(Num)).
|
list_to_atom("autocluster_node" ++ integer_to_list(Num)).
|
||||||
|
|
|
@ -25,7 +25,6 @@ all() ->
|
||||||
emqx_common_test_helpers:all(?MODULE).
|
emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
t_copy_conf_override_on_restarts(_Config) ->
|
t_copy_conf_override_on_restarts(_Config) ->
|
||||||
net_kernel:start(['master@127.0.0.1', longnames]),
|
|
||||||
ct:timetrap({seconds, 120}),
|
ct:timetrap({seconds, 120}),
|
||||||
snabbkaffe:fix_ct_logging(),
|
snabbkaffe:fix_ct_logging(),
|
||||||
Cluster = cluster([core, core, core]),
|
Cluster = cluster([core, core, core]),
|
||||||
|
@ -165,11 +164,10 @@ cluster(Specs) ->
|
||||||
{env, Env},
|
{env, Env},
|
||||||
{apps, [emqx_conf]},
|
{apps, [emqx_conf]},
|
||||||
{load_schema, false},
|
{load_schema, false},
|
||||||
{join_to, false},
|
{join_to, true},
|
||||||
{env_handler, fun
|
{env_handler, fun
|
||||||
(emqx) ->
|
(emqx) ->
|
||||||
application:set_env(emqx, boot_modules, []),
|
application:set_env(emqx, boot_modules, []),
|
||||||
io:format("~p~p~n", [node(), application:get_all_env(emqx)]),
|
|
||||||
ok;
|
ok;
|
||||||
(_) ->
|
(_) ->
|
||||||
ok
|
ok
|
||||||
|
|
Loading…
Reference in New Issue