223 lines
6.6 KiB
Erlang
223 lines
6.6 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_conf_app_SUITE).
|
|
|
|
-compile(export_all).
|
|
-compile(nowarn_export_all).
|
|
|
|
-include("emqx_conf.hrl").
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
-include_lib("common_test/include/ct.hrl").
|
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
|
|
all() ->
|
|
emqx_common_test_helpers:all(?MODULE).
|
|
|
|
t_copy_conf_override_on_restarts(_Config) ->
|
|
ct:timetrap({seconds, 120}),
|
|
snabbkaffe:fix_ct_logging(),
|
|
Cluster = cluster([core, core, core]),
|
|
try
|
|
%% 1. Start all nodes
|
|
Nodes = start_cluster(Cluster),
|
|
[join_cluster(Spec) || Spec <- Cluster],
|
|
assert_config_load_done(Nodes),
|
|
|
|
%% 2. Stop each in order.
|
|
lists:foreach(fun stop_slave/1, Nodes),
|
|
|
|
%% 3. Restart nodes in the same order. This should not
|
|
%% crash and eventually all nodes should be ready.
|
|
start_cluster_async(Cluster),
|
|
|
|
timer:sleep(15_000),
|
|
|
|
assert_config_load_done(Nodes),
|
|
|
|
ok
|
|
after
|
|
teardown_cluster(Cluster)
|
|
end.
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Helper functions
|
|
%%------------------------------------------------------------------------------
|
|
|
|
assert_config_load_done(Nodes) ->
|
|
lists:foreach(
|
|
fun(Node) ->
|
|
Done = rpc:call(Node, emqx_app, get_init_config_load_done, []),
|
|
?assert(Done, #{node => Node})
|
|
end,
|
|
Nodes
|
|
).
|
|
|
|
start_cluster(Specs) ->
|
|
[start_slave(I) || I <- Specs].
|
|
|
|
start_cluster_async(Specs) ->
|
|
[
|
|
begin
|
|
spawn_link(fun() -> start_slave(I) end),
|
|
timer:sleep(7_000)
|
|
end
|
|
|| I <- Specs
|
|
].
|
|
|
|
cluster(Specs) ->
|
|
cluster(Specs, []).
|
|
|
|
cluster(Specs0, CommonEnv) ->
|
|
Specs1 = lists:zip(Specs0, lists:seq(1, length(Specs0))),
|
|
Specs = expand_node_specs(Specs1, CommonEnv),
|
|
CoreNodes = [node_id(Name) || {{core, Name, _}, _} <- Specs],
|
|
%% Assign grpc ports:
|
|
BaseGenRpcPort = 9000,
|
|
GenRpcPorts = maps:from_list([
|
|
{node_id(Name), {tcp, BaseGenRpcPort + Num}}
|
|
|| {{_, Name, _}, Num} <- Specs
|
|
]),
|
|
%% Set the default node of the cluster:
|
|
JoinTo =
|
|
case CoreNodes of
|
|
[First | _] -> #{join_to => First};
|
|
_ -> #{}
|
|
end,
|
|
[
|
|
JoinTo#{
|
|
name => Name,
|
|
node => node_id(Name),
|
|
env => [
|
|
{mria, core_nodes, CoreNodes},
|
|
{mria, node_role, Role},
|
|
{gen_rpc, tcp_server_port, BaseGenRpcPort + Number},
|
|
{gen_rpc, client_config_per_node, {internal, GenRpcPorts}}
|
|
| Env
|
|
],
|
|
number => Number,
|
|
role => Role
|
|
}
|
|
|| {{Role, Name, Env}, Number} <- Specs
|
|
].
|
|
|
|
start_apps(Node) ->
|
|
Handler = fun
|
|
(emqx) ->
|
|
application:set_env(emqx, boot_modules, []),
|
|
ok;
|
|
(_) ->
|
|
ok
|
|
end,
|
|
{Node, ok} =
|
|
{Node, rpc:call(Node, emqx_common_test_helpers, start_apps, [[emqx_conf], Handler])},
|
|
ok.
|
|
|
|
stop_apps(Node) ->
|
|
ok = rpc:call(Node, emqx_common_test_helpers, stop_apps, [[emqx_conf]]).
|
|
|
|
join_cluster(#{node := Node, join_to := JoinTo}) ->
|
|
case rpc:call(Node, ekka, join, [JoinTo]) of
|
|
ok -> ok;
|
|
ignore -> ok;
|
|
Err -> error({failed_to_join_cluster, #{node => Node, error => Err}})
|
|
end.
|
|
|
|
start_slave(#{node := Node, env := Env}) ->
|
|
%% We want VMs to only occupy a single core
|
|
CommonBeamOpts =
|
|
"+S 1:1 " ++
|
|
%% redirect logs to the master test node
|
|
" -master " ++ atom_to_list(node()) ++ " ",
|
|
%% We use `ct_slave' instead of `slave' because, in
|
|
%% `t_copy_conf_override_on_restarts', the nodes might be stuck
|
|
%% some time during boot up, and `slave' has a hard-coded boot
|
|
%% timeout.
|
|
{ok, Node} = ct_slave:start(
|
|
Node,
|
|
[
|
|
{erl_flags, CommonBeamOpts ++ ebin_path()},
|
|
{kill_if_fail, true},
|
|
{monitor_master, true},
|
|
{init_timeout, 30_000},
|
|
{startup_timeout, 30_000}
|
|
]
|
|
),
|
|
|
|
%% Load apps before setting the enviroment variables to avoid
|
|
%% overriding the environment during app start:
|
|
[rpc:call(Node, application, load, [App]) || App <- [gen_rpc]],
|
|
%% Disable gen_rpc listener by default:
|
|
Env1 = [{gen_rpc, tcp_server_port, false} | Env],
|
|
setenv(Node, Env1),
|
|
ok = start_apps(Node),
|
|
Node.
|
|
|
|
expand_node_specs(Specs, CommonEnv) ->
|
|
lists:map(
|
|
fun({Spec, Num}) ->
|
|
{
|
|
case Spec of
|
|
core ->
|
|
{core, gen_node_name(Num), CommonEnv};
|
|
replicant ->
|
|
{replicant, gen_node_name(Num), CommonEnv};
|
|
{Role, Name} when is_atom(Name) ->
|
|
{Role, Name, CommonEnv};
|
|
{Role, Env} when is_list(Env) ->
|
|
{Role, gen_node_name(Num), CommonEnv ++ Env};
|
|
{Role, Name, Env} ->
|
|
{Role, Name, CommonEnv ++ Env}
|
|
end,
|
|
Num
|
|
}
|
|
end,
|
|
Specs
|
|
).
|
|
|
|
setenv(Node, Env) ->
|
|
[rpc:call(Node, application, set_env, [App, Key, Val]) || {App, Key, Val} <- Env].
|
|
|
|
teardown_cluster(Specs) ->
|
|
Nodes = [I || #{node := I} <- Specs],
|
|
[rpc:call(I, emqx_common_test_helpers, stop_apps, [emqx_conf]) || I <- Nodes],
|
|
[stop_slave(I) || I <- Nodes],
|
|
ok.
|
|
|
|
stop_slave(Node) ->
|
|
ct_slave:stop(Node).
|
|
|
|
host() ->
|
|
[_, Host] = string:tokens(atom_to_list(node()), "@"),
|
|
Host.
|
|
|
|
node_id(Name) ->
|
|
list_to_atom(lists:concat([Name, "@", host()])).
|
|
|
|
gen_node_name(N) ->
|
|
list_to_atom("n" ++ integer_to_list(N)).
|
|
|
|
ebin_path() ->
|
|
string:join(["-pa" | paths()], " ").
|
|
|
|
paths() ->
|
|
[
|
|
Path
|
|
|| Path <- code:get_path(),
|
|
string:prefix(Path, code:lib_dir()) =:= nomatch,
|
|
string:str(Path, "_build/default/plugins") =:= 0
|
|
].
|