Merge pull request #7731 from thalesmg/fix-copy-conf-override-startup

fix(emqx_conf): avoid crash/deadlock depending on node startup order
This commit is contained in:
Thales Macedo Garitezi 2022-04-27 10:01:06 -03:00 committed by GitHub
commit 1be9d4f48f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 326 additions and 30 deletions

View File

@ -42,6 +42,8 @@
code_change/3
]).
-export([get_tables_status/0]).
-export_type([txn_id/0, succeed_num/0, multicall_return/1, multicall_return/0]).
-ifdef(TEST).
@ -172,6 +174,29 @@ get_node_tnx_id(Node) ->
[#cluster_rpc_commit{tnx_id = TnxId}] -> TnxId
end.
%% Checks whether the Mnesia tables used by this module are waiting to
%% be loaded and from where.
-spec get_tables_status() -> #{atom() => {waiting, [node()]} | {disc | network, node()}}.
get_tables_status() ->
maps:from_list([
{Tab, do_get_tables_status(Tab)}
|| Tab <- [?CLUSTER_COMMIT, ?CLUSTER_MFA]
]).
do_get_tables_status(Tab) ->
Props = mnesia:table_info(Tab, all),
TabNodes = proplists:get_value(all_nodes, Props),
KnownDown = mnesia_recover:get_mnesia_downs(),
LocalNode = node(),
case proplists:get_value(load_node, Props) of
unknown ->
{waiting, TabNodes -- [LocalNode | KnownDown]};
LocalNode ->
{disc, LocalNode};
Node ->
{network, Node}
end.
%% Regardless of what MFA is returned, consider it a success),
%% then move to the next tnxId.
%% if the next TnxId failed, need call the function again to skip.

View File

@ -24,6 +24,8 @@
-include_lib("emqx/include/logger.hrl").
-include("emqx_conf.hrl").
-define(DEFAULT_INIT_TXN_ID, -1).
start(_StartType, _StartArgs) ->
init_conf(),
emqx_conf_sup:start_link().
@ -31,19 +33,48 @@ start(_StartType, _StartArgs) ->
stop(_State) ->
ok.
%% internal functions
get_override_config_file() ->
Node = node(),
case emqx_app:get_init_config_load_done() of
false ->
{error, #{node => Node, msg => "init_conf_load_not_done"}};
true ->
case erlang:whereis(emqx_config_handler) of
undefined ->
{error, #{node => Node, msg => "emqx_config_handler_not_ready"}};
_ ->
Fun = fun() ->
TnxId = emqx_cluster_rpc:get_node_tnx_id(Node),
WallClock = erlang:statistics(wall_clock),
Conf = emqx_config_handler:get_raw_cluster_override_conf(),
#{wall_clock => WallClock, conf => Conf, tnx_id => TnxId, node => Node}
end,
case mria:ro_transaction(?CLUSTER_RPC_SHARD, Fun) of
{atomic, Res} -> {ok, Res};
{aborted, Reason} -> {error, #{node => Node, msg => Reason}}
end
end
end.
%% ------------------------------------------------------------------------------
%% Internal functions
%% ------------------------------------------------------------------------------
init_conf() ->
{ok, TnxId} = copy_override_conf_from_core_node(),
emqx_app:set_init_tnx_id(TnxId),
emqx_config:init_load(emqx_conf:schema_module()),
emqx_app:set_init_config_load_done().
cluster_nodes() ->
maps:get(running_nodes, ekka_cluster:info()) -- [node()].
copy_override_conf_from_core_node() ->
case mria_mnesia:running_nodes() -- [node()] of
case cluster_nodes() of
%% The first core nodes is self.
[] ->
?SLOG(debug, #{msg => "skip_copy_overide_conf_from_core_node"}),
{ok, -1};
{ok, ?DEFAULT_INIT_TXN_ID};
Nodes ->
{Results, Failed} = emqx_conf_proto_v1:get_override_config_file(Nodes),
{Ready, NotReady0} = lists:partition(fun(Res) -> element(1, Res) =:= ok end, Results),
@ -64,12 +95,39 @@ copy_override_conf_from_core_node() ->
[] ->
%% Other core nodes running but no one replicated it successfully.
?SLOG(error, #{
msg => "copy_overide_conf_from_core_node_failed",
msg => "copy_override_conf_from_core_node_failed",
nodes => Nodes,
failed => Failed,
not_ready => NotReady
}),
{error, "core node not ready"};
case should_proceed_with_boot() of
true ->
%% Act as if this node is alone, so it can
%% finish the boot sequence and load the
%% config for other nodes to copy it.
?SLOG(info, #{
msg => "skip_copy_overide_conf_from_core_node",
loading_from_disk => true,
nodes => Nodes,
failed => Failed,
not_ready => NotReady
}),
{ok, ?DEFAULT_INIT_TXN_ID};
false ->
%% retry in some time
Jitter = rand:uniform(2_000),
Timeout = 10_000 + Jitter,
?SLOG(info, #{
msg => "copy_overide_conf_from_core_node_retry",
timeout => Timeout,
nodes => Nodes,
failed => Failed,
not_ready => NotReady
}),
timer:sleep(Timeout),
copy_override_conf_from_core_node()
end;
_ ->
SortFun = fun(
{ok, #{wall_clock := W1}},
@ -79,7 +137,10 @@ copy_override_conf_from_core_node() ->
end,
[{ok, Info} | _] = lists:sort(SortFun, Ready),
#{node := Node, conf := RawOverrideConf, tnx_id := TnxId} = Info,
Msg = #{msg => "copy_overide_conf_from_core_node_success", node => Node},
Msg = #{
msg => "copy_overide_conf_from_core_node_success",
node => Node
},
?SLOG(debug, Msg),
ok = emqx_config:save_to_override_conf(
RawOverrideConf,
@ -89,28 +150,16 @@ copy_override_conf_from_core_node() ->
end
end.
get_override_config_file() ->
Node = node(),
Role = mria_rlog:role(),
case emqx_app:get_init_config_load_done() of
false ->
{error, #{node => Node, msg => "init_conf_load_not_done"}};
true when Role =:= core ->
case erlang:whereis(emqx_config_handler) of
undefined ->
{error, #{node => Node, msg => "emqx_config_handler_not_ready"}};
_ ->
Fun = fun() ->
TnxId = emqx_cluster_rpc:get_node_tnx_id(Node),
WallClock = erlang:statistics(wall_clock),
Conf = emqx_config_handler:get_raw_cluster_override_conf(),
#{wall_clock => WallClock, conf => Conf, tnx_id => TnxId, node => Node}
end,
case mria:ro_transaction(?CLUSTER_RPC_SHARD, Fun) of
{atomic, Res} -> {ok, Res};
{aborted, Reason} -> {error, #{node => Node, msg => Reason}}
end
end;
true when Role =:= replicant ->
{ignore, #{node => Node}}
should_proceed_with_boot() ->
TablesStatus = emqx_cluster_rpc:get_tables_status(),
LocalNode = node(),
case maps:get(?CLUSTER_COMMIT, TablesStatus) of
{disc, LocalNode} ->
%% Loading locally; let this node finish its boot sequence
%% so others can copy the config from this one.
true;
_ ->
%% Loading from another node or still waiting for nodes to
%% be up. Try again.
false
end.

View File

@ -0,0 +1,222 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_conf_app_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx_conf.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
all() ->
emqx_common_test_helpers:all(?MODULE).
t_copy_conf_override_on_restarts(_Config) ->
ct:timetrap({seconds, 120}),
snabbkaffe:fix_ct_logging(),
Cluster = cluster([core, core, core]),
try
%% 1. Start all nodes
Nodes = start_cluster(Cluster),
[join_cluster(Spec) || Spec <- Cluster],
assert_config_load_done(Nodes),
%% 2. Stop each in order.
lists:foreach(fun stop_slave/1, Nodes),
%% 3. Restart nodes in the same order. This should not
%% crash and eventually all nodes should be ready.
start_cluster_async(Cluster),
timer:sleep(15_000),
assert_config_load_done(Nodes),
ok
after
teardown_cluster(Cluster)
end.
%%------------------------------------------------------------------------------
%% Helper functions
%%------------------------------------------------------------------------------
assert_config_load_done(Nodes) ->
lists:foreach(
fun(Node) ->
Done = rpc:call(Node, emqx_app, get_init_config_load_done, []),
?assert(Done, #{node => Node})
end,
Nodes
).
start_cluster(Specs) ->
[start_slave(I) || I <- Specs].
start_cluster_async(Specs) ->
[
begin
spawn_link(fun() -> start_slave(I) end),
timer:sleep(7_000)
end
|| I <- Specs
].
cluster(Specs) ->
cluster(Specs, []).
cluster(Specs0, CommonEnv) ->
Specs1 = lists:zip(Specs0, lists:seq(1, length(Specs0))),
Specs = expand_node_specs(Specs1, CommonEnv),
CoreNodes = [node_id(Name) || {{core, Name, _}, _} <- Specs],
%% Assign grpc ports:
BaseGenRpcPort = 9000,
GenRpcPorts = maps:from_list([
{node_id(Name), {tcp, BaseGenRpcPort + Num}}
|| {{_, Name, _}, Num} <- Specs
]),
%% Set the default node of the cluster:
JoinTo =
case CoreNodes of
[First | _] -> #{join_to => First};
_ -> #{}
end,
[
JoinTo#{
name => Name,
node => node_id(Name),
env => [
{mria, core_nodes, CoreNodes},
{mria, node_role, Role},
{gen_rpc, tcp_server_port, BaseGenRpcPort + Number},
{gen_rpc, client_config_per_node, {internal, GenRpcPorts}}
| Env
],
number => Number,
role => Role
}
|| {{Role, Name, Env}, Number} <- Specs
].
start_apps(Node) ->
Handler = fun
(emqx) ->
application:set_env(emqx, boot_modules, []),
ok;
(_) ->
ok
end,
{Node, ok} =
{Node, rpc:call(Node, emqx_common_test_helpers, start_apps, [[emqx_conf], Handler])},
ok.
stop_apps(Node) ->
ok = rpc:call(Node, emqx_common_test_helpers, stop_apps, [[emqx_conf]]).
join_cluster(#{node := Node, join_to := JoinTo}) ->
case rpc:call(Node, ekka, join, [JoinTo]) of
ok -> ok;
ignore -> ok;
Err -> error({failed_to_join_cluster, #{node => Node, error => Err}})
end.
start_slave(#{node := Node, env := Env}) ->
%% We want VMs to only occupy a single core
CommonBeamOpts =
"+S 1:1 " ++
%% redirect logs to the master test node
" -master " ++ atom_to_list(node()) ++ " ",
%% We use `ct_slave' instead of `slave' because, in
%% `t_copy_conf_override_on_restarts', the nodes might be stuck
%% some time during boot up, and `slave' has a hard-coded boot
%% timeout.
{ok, Node} = ct_slave:start(
Node,
[
{erl_flags, CommonBeamOpts ++ ebin_path()},
{kill_if_fail, true},
{monitor_master, true},
{init_timeout, 30_000},
{startup_timeout, 30_000}
]
),
%% Load apps before setting the enviroment variables to avoid
%% overriding the environment during app start:
[rpc:call(Node, application, load, [App]) || App <- [gen_rpc]],
%% Disable gen_rpc listener by default:
Env1 = [{gen_rpc, tcp_server_port, false} | Env],
setenv(Node, Env1),
ok = start_apps(Node),
Node.
expand_node_specs(Specs, CommonEnv) ->
lists:map(
fun({Spec, Num}) ->
{
case Spec of
core ->
{core, gen_node_name(Num), CommonEnv};
replicant ->
{replicant, gen_node_name(Num), CommonEnv};
{Role, Name} when is_atom(Name) ->
{Role, Name, CommonEnv};
{Role, Env} when is_list(Env) ->
{Role, gen_node_name(Num), CommonEnv ++ Env};
{Role, Name, Env} ->
{Role, Name, CommonEnv ++ Env}
end,
Num
}
end,
Specs
).
setenv(Node, Env) ->
[rpc:call(Node, application, set_env, [App, Key, Val]) || {App, Key, Val} <- Env].
teardown_cluster(Specs) ->
Nodes = [I || #{node := I} <- Specs],
[rpc:call(I, emqx_common_test_helpers, stop_apps, [emqx_conf]) || I <- Nodes],
[stop_slave(I) || I <- Nodes],
ok.
stop_slave(Node) ->
ct_slave:stop(Node).
host() ->
[_, Host] = string:tokens(atom_to_list(node()), "@"),
Host.
node_id(Name) ->
list_to_atom(lists:concat([Name, "@", host()])).
gen_node_name(N) ->
list_to_atom("n" ++ integer_to_list(N)).
ebin_path() ->
string:join(["-pa" | paths()], " ").
paths() ->
[
Path
|| Path <- code:get_path(),
string:prefix(Path, code:lib_dir()) =:= nomatch,
string:str(Path, "_build/default/plugins") =:= 0
].