test: improve cluster node helpers
* Add option to start autocluster. This is useful for scenarios where a cluster is already running and has some configurations set (via config handler/cluster rpc) and later another node joins. * Improve mnesia data directory isolation for nodes. A new dir is set so that we may avoid intra and inter-suite flakiness due to dirty tables and schema. * The janitor is now called synchronously. This ensures the cleanup is done before the test pid dies.
This commit is contained in:
parent
d464e2aad5
commit
947e014132
|
@ -67,7 +67,8 @@
|
||||||
emqx_cluster/2,
|
emqx_cluster/2,
|
||||||
start_epmd/0,
|
start_epmd/0,
|
||||||
start_slave/2,
|
start_slave/2,
|
||||||
stop_slave/1
|
stop_slave/1,
|
||||||
|
listener_port/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([clear_screen/0]).
|
-export([clear_screen/0]).
|
||||||
|
@ -588,6 +589,12 @@ ensure_quic_listener(Name, UdpPort, ExtraSettings) ->
|
||||||
%% Whether to execute `emqx_config:init_load(SchemaMod)`
|
%% Whether to execute `emqx_config:init_load(SchemaMod)`
|
||||||
%% default: true
|
%% default: true
|
||||||
load_schema => boolean(),
|
load_schema => boolean(),
|
||||||
|
%% If we want to exercise the scenario where a node joins an
|
||||||
|
%% existing cluster where there has already been some
|
||||||
|
%% configuration changes (via cluster rpc), then we need to enable
|
||||||
|
%% autocluster so that the joining node will restart the
|
||||||
|
%% `emqx_conf' app and correctly catch up the config.
|
||||||
|
start_autocluster => boolean(),
|
||||||
%% Eval by emqx_config:put/2
|
%% Eval by emqx_config:put/2
|
||||||
conf => [{KeyPath :: list(), Val :: term()}],
|
conf => [{KeyPath :: list(), Val :: term()}],
|
||||||
%% Fast option to config listener port
|
%% Fast option to config listener port
|
||||||
|
@ -725,9 +732,24 @@ setup_node(Node, Opts) when is_map(Opts) ->
|
||||||
%% we need a fresh data dir for each peer node to avoid unintended
|
%% we need a fresh data dir for each peer node to avoid unintended
|
||||||
%% successes due to sharing of data in the cluster.
|
%% successes due to sharing of data in the cluster.
|
||||||
PrivDataDir = maps:get(priv_data_dir, Opts, "/tmp"),
|
PrivDataDir = maps:get(priv_data_dir, Opts, "/tmp"),
|
||||||
|
%% If we want to exercise the scenario where a node joins an
|
||||||
|
%% existing cluster where there has already been some
|
||||||
|
%% configuration changes (via cluster rpc), then we need to enable
|
||||||
|
%% autocluster so that the joining node will restart the
|
||||||
|
%% `emqx_conf' app and correctly catch up the config.
|
||||||
|
StartAutocluster = maps:get(start_autocluster, Opts, false),
|
||||||
|
|
||||||
%% Load env before doing anything to avoid overriding
|
%% Load env before doing anything to avoid overriding
|
||||||
lists:foreach(fun(App) -> rpc:call(Node, ?MODULE, load, [App]) end, LoadApps),
|
lists:foreach(fun(App) -> rpc:call(Node, ?MODULE, load, [App]) end, LoadApps),
|
||||||
|
%% Ensure a clean mnesia directory for each run to avoid
|
||||||
|
%% inter-test flakiness.
|
||||||
|
MnesiaDataDir = filename:join([
|
||||||
|
PrivDataDir,
|
||||||
|
node(),
|
||||||
|
integer_to_list(erlang:unique_integer()),
|
||||||
|
"mnesia"
|
||||||
|
]),
|
||||||
|
erpc:call(Node, application, set_env, [mnesia, dir, MnesiaDataDir]),
|
||||||
|
|
||||||
%% Needs to be set explicitly because ekka:start() (which calls `gen`) is called without Handler
|
%% Needs to be set explicitly because ekka:start() (which calls `gen`) is called without Handler
|
||||||
%% in emqx_common_test_helpers:start_apps(...)
|
%% in emqx_common_test_helpers:start_apps(...)
|
||||||
|
@ -792,13 +814,10 @@ setup_node(Node, Opts) when is_map(Opts) ->
|
||||||
undefined ->
|
undefined ->
|
||||||
ok;
|
ok;
|
||||||
_ ->
|
_ ->
|
||||||
|
StartAutocluster andalso
|
||||||
|
(ok = rpc:call(Node, emqx_machine_boot, start_autocluster, [])),
|
||||||
case rpc:call(Node, ekka, join, [JoinTo]) of
|
case rpc:call(Node, ekka, join, [JoinTo]) of
|
||||||
ok ->
|
ok ->
|
||||||
%% fix cluster rpc, as the conf app is not
|
|
||||||
%% restarted with the current test procedure.
|
|
||||||
StartApps andalso
|
|
||||||
lists:member(emqx_conf, Apps) andalso
|
|
||||||
(ok = erpc:call(Node, emqx_cluster_rpc, reset, [])),
|
|
||||||
ok;
|
ok;
|
||||||
ignore ->
|
ignore ->
|
||||||
ok;
|
ok;
|
||||||
|
@ -872,6 +891,9 @@ base_port(Number) ->
|
||||||
gen_rpc_port(BasePort) ->
|
gen_rpc_port(BasePort) ->
|
||||||
BasePort - 1.
|
BasePort - 1.
|
||||||
|
|
||||||
|
listener_port(Opts, Type) when is_map(Opts) ->
|
||||||
|
BasePort = maps:get(base_port, Opts),
|
||||||
|
listener_port(BasePort, Type);
|
||||||
listener_port(BasePort, tcp) ->
|
listener_port(BasePort, tcp) ->
|
||||||
BasePort;
|
BasePort;
|
||||||
listener_port(BasePort, ssl) ->
|
listener_port(BasePort, ssl) ->
|
||||||
|
@ -1057,7 +1079,7 @@ latency_up_proxy(off, Name, ProxyHost, ProxyPort) ->
|
||||||
%% noise in the logs.
|
%% noise in the logs.
|
||||||
call_janitor() ->
|
call_janitor() ->
|
||||||
Janitor = get_or_spawn_janitor(),
|
Janitor = get_or_spawn_janitor(),
|
||||||
exit(Janitor, normal),
|
ok = emqx_test_janitor:stop(Janitor),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
get_or_spawn_janitor() ->
|
get_or_spawn_janitor() ->
|
||||||
|
|
|
@ -30,6 +30,7 @@
|
||||||
%% API
|
%% API
|
||||||
-export([
|
-export([
|
||||||
start_link/0,
|
start_link/0,
|
||||||
|
stop/1,
|
||||||
push_on_exit_callback/2
|
push_on_exit_callback/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
@ -40,6 +41,9 @@
|
||||||
start_link() ->
|
start_link() ->
|
||||||
gen_server:start_link(?MODULE, self(), []).
|
gen_server:start_link(?MODULE, self(), []).
|
||||||
|
|
||||||
|
stop(Server) ->
|
||||||
|
gen_server:call(Server, terminate).
|
||||||
|
|
||||||
push_on_exit_callback(Server, Callback) when is_function(Callback, 0) ->
|
push_on_exit_callback(Server, Callback) when is_function(Callback, 0) ->
|
||||||
gen_server:call(Server, {push, Callback}).
|
gen_server:call(Server, {push, Callback}).
|
||||||
|
|
||||||
|
@ -56,6 +60,9 @@ terminate(_Reason, #{callbacks := Callbacks}) ->
|
||||||
|
|
||||||
handle_call({push, Callback}, _From, State = #{callbacks := Callbacks}) ->
|
handle_call({push, Callback}, _From, State = #{callbacks := Callbacks}) ->
|
||||||
{reply, ok, State#{callbacks := [Callback | Callbacks]}};
|
{reply, ok, State#{callbacks := [Callback | Callbacks]}};
|
||||||
|
handle_call(terminate, _From, State = #{callbacks := Callbacks}) ->
|
||||||
|
lists:foreach(fun(Fun) -> Fun() end, Callbacks),
|
||||||
|
{stop, normal, ok, State};
|
||||||
handle_call(_Req, _From, State) ->
|
handle_call(_Req, _From, State) ->
|
||||||
{reply, error, State}.
|
{reply, error, State}.
|
||||||
|
|
||||||
|
|
|
@ -559,8 +559,8 @@ group_t_copy_plugin_to_a_new_node({'end', Config}) ->
|
||||||
ok = rpc:call(CopyToNode, emqx_config, delete_override_conf_files, []),
|
ok = rpc:call(CopyToNode, emqx_config, delete_override_conf_files, []),
|
||||||
rpc:call(CopyToNode, ekka, leave, []),
|
rpc:call(CopyToNode, ekka, leave, []),
|
||||||
rpc:call(CopyFromNode, ekka, leave, []),
|
rpc:call(CopyFromNode, ekka, leave, []),
|
||||||
{ok, _} = emqx_common_test_helpers:stop_slave(CopyToNode),
|
ok = emqx_common_test_helpers:stop_slave(CopyToNode),
|
||||||
{ok, _} = emqx_common_test_helpers:stop_slave(CopyFromNode),
|
ok = emqx_common_test_helpers:stop_slave(CopyFromNode),
|
||||||
ok = file:del_dir_r(proplists:get_value(to_install_dir, Config)),
|
ok = file:del_dir_r(proplists:get_value(to_install_dir, Config)),
|
||||||
ok = file:del_dir_r(proplists:get_value(from_install_dir, Config));
|
ok = file:del_dir_r(proplists:get_value(from_install_dir, Config));
|
||||||
group_t_copy_plugin_to_a_new_node(Config) ->
|
group_t_copy_plugin_to_a_new_node(Config) ->
|
||||||
|
|
Loading…
Reference in New Issue