diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index e3b824d75..5be474601 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -42,6 +42,8 @@ code_change/3 ]). +-export([get_tables_status/0]). + -export_type([txn_id/0, succeed_num/0, multicall_return/1, multicall_return/0]). -ifdef(TEST). @@ -172,6 +174,29 @@ get_node_tnx_id(Node) -> [#cluster_rpc_commit{tnx_id = TnxId}] -> TnxId end. +%% Checks whether the Mnesia tables used by this module are waiting to +%% be loaded and from where. +-spec get_tables_status() -> #{atom() => {waiting, [node()]} | {disc | network, node()}}. +get_tables_status() -> + maps:from_list([ + {Tab, do_get_tables_status(Tab)} + || Tab <- [?CLUSTER_COMMIT, ?CLUSTER_MFA] + ]). + +do_get_tables_status(Tab) -> + Props = mnesia:table_info(Tab, all), + TabNodes = proplists:get_value(all_nodes, Props), + KnownDown = mnesia_recover:get_mnesia_downs(), + LocalNode = node(), + case proplists:get_value(load_node, Props) of + unknown -> + {waiting, TabNodes -- [LocalNode | KnownDown]}; + LocalNode -> + {disc, LocalNode}; + Node -> + {network, Node} + end. + %% Regardless of what MFA is returned, consider it a success), %% then move to the next tnxId. %% if the next TnxId failed, need call the function again to skip. diff --git a/apps/emqx_conf/src/emqx_conf_app.erl b/apps/emqx_conf/src/emqx_conf_app.erl index 81d0481df..1ad460043 100644 --- a/apps/emqx_conf/src/emqx_conf_app.erl +++ b/apps/emqx_conf/src/emqx_conf_app.erl @@ -24,6 +24,8 @@ -include_lib("emqx/include/logger.hrl"). -include("emqx_conf.hrl"). +-define(DEFAULT_INIT_TXN_ID, -1). + start(_StartType, _StartArgs) -> init_conf(), emqx_conf_sup:start_link(). @@ -31,19 +33,48 @@ start(_StartType, _StartArgs) -> stop(_State) -> ok. -%% internal functions +get_override_config_file() -> + Node = node(), + case emqx_app:get_init_config_load_done() of + false -> + {error, #{node => Node, msg => "init_conf_load_not_done"}}; + true -> + case erlang:whereis(emqx_config_handler) of + undefined -> + {error, #{node => Node, msg => "emqx_config_handler_not_ready"}}; + _ -> + Fun = fun() -> + TnxId = emqx_cluster_rpc:get_node_tnx_id(Node), + WallClock = erlang:statistics(wall_clock), + Conf = emqx_config_handler:get_raw_cluster_override_conf(), + #{wall_clock => WallClock, conf => Conf, tnx_id => TnxId, node => Node} + end, + case mria:ro_transaction(?CLUSTER_RPC_SHARD, Fun) of + {atomic, Res} -> {ok, Res}; + {aborted, Reason} -> {error, #{node => Node, msg => Reason}} + end + end + end. + +%% ------------------------------------------------------------------------------ +%% Internal functions +%% ------------------------------------------------------------------------------ + init_conf() -> {ok, TnxId} = copy_override_conf_from_core_node(), emqx_app:set_init_tnx_id(TnxId), emqx_config:init_load(emqx_conf:schema_module()), emqx_app:set_init_config_load_done(). +cluster_nodes() -> + maps:get(running_nodes, ekka_cluster:info()) -- [node()]. + copy_override_conf_from_core_node() -> - case mria_mnesia:running_nodes() -- [node()] of + case cluster_nodes() of %% The first core nodes is self. [] -> ?SLOG(debug, #{msg => "skip_copy_overide_conf_from_core_node"}), - {ok, -1}; + {ok, ?DEFAULT_INIT_TXN_ID}; Nodes -> {Results, Failed} = emqx_conf_proto_v1:get_override_config_file(Nodes), {Ready, NotReady0} = lists:partition(fun(Res) -> element(1, Res) =:= ok end, Results), @@ -64,12 +95,39 @@ copy_override_conf_from_core_node() -> [] -> %% Other core nodes running but no one replicated it successfully. ?SLOG(error, #{ - msg => "copy_overide_conf_from_core_node_failed", + msg => "copy_override_conf_from_core_node_failed", nodes => Nodes, failed => Failed, not_ready => NotReady }), - {error, "core node not ready"}; + + case should_proceed_with_boot() of + true -> + %% Act as if this node is alone, so it can + %% finish the boot sequence and load the + %% config for other nodes to copy it. + ?SLOG(info, #{ + msg => "skip_copy_overide_conf_from_core_node", + loading_from_disk => true, + nodes => Nodes, + failed => Failed, + not_ready => NotReady + }), + {ok, ?DEFAULT_INIT_TXN_ID}; + false -> + %% retry in some time + Jitter = rand:uniform(2_000), + Timeout = 10_000 + Jitter, + ?SLOG(info, #{ + msg => "copy_overide_conf_from_core_node_retry", + timeout => Timeout, + nodes => Nodes, + failed => Failed, + not_ready => NotReady + }), + timer:sleep(Timeout), + copy_override_conf_from_core_node() + end; _ -> SortFun = fun( {ok, #{wall_clock := W1}}, @@ -79,7 +137,10 @@ copy_override_conf_from_core_node() -> end, [{ok, Info} | _] = lists:sort(SortFun, Ready), #{node := Node, conf := RawOverrideConf, tnx_id := TnxId} = Info, - Msg = #{msg => "copy_overide_conf_from_core_node_success", node => Node}, + Msg = #{ + msg => "copy_overide_conf_from_core_node_success", + node => Node + }, ?SLOG(debug, Msg), ok = emqx_config:save_to_override_conf( RawOverrideConf, @@ -89,28 +150,16 @@ copy_override_conf_from_core_node() -> end end. -get_override_config_file() -> - Node = node(), - Role = mria_rlog:role(), - case emqx_app:get_init_config_load_done() of - false -> - {error, #{node => Node, msg => "init_conf_load_not_done"}}; - true when Role =:= core -> - case erlang:whereis(emqx_config_handler) of - undefined -> - {error, #{node => Node, msg => "emqx_config_handler_not_ready"}}; - _ -> - Fun = fun() -> - TnxId = emqx_cluster_rpc:get_node_tnx_id(Node), - WallClock = erlang:statistics(wall_clock), - Conf = emqx_config_handler:get_raw_cluster_override_conf(), - #{wall_clock => WallClock, conf => Conf, tnx_id => TnxId, node => Node} - end, - case mria:ro_transaction(?CLUSTER_RPC_SHARD, Fun) of - {atomic, Res} -> {ok, Res}; - {aborted, Reason} -> {error, #{node => Node, msg => Reason}} - end - end; - true when Role =:= replicant -> - {ignore, #{node => Node}} +should_proceed_with_boot() -> + TablesStatus = emqx_cluster_rpc:get_tables_status(), + LocalNode = node(), + case maps:get(?CLUSTER_COMMIT, TablesStatus) of + {disc, LocalNode} -> + %% Loading locally; let this node finish its boot sequence + %% so others can copy the config from this one. + true; + _ -> + %% Loading from another node or still waiting for nodes to + %% be up. Try again. + false end. diff --git a/apps/emqx_conf/test/emqx_conf_app_SUITE.erl b/apps/emqx_conf/test/emqx_conf_app_SUITE.erl new file mode 100644 index 000000000..29b1404e8 --- /dev/null +++ b/apps/emqx_conf/test/emqx_conf_app_SUITE.erl @@ -0,0 +1,222 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_conf_app_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include("emqx_conf.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +all() -> + emqx_common_test_helpers:all(?MODULE). + +t_copy_conf_override_on_restarts(_Config) -> + ct:timetrap({seconds, 120}), + snabbkaffe:fix_ct_logging(), + Cluster = cluster([core, core, core]), + try + %% 1. Start all nodes + Nodes = start_cluster(Cluster), + [join_cluster(Spec) || Spec <- Cluster], + assert_config_load_done(Nodes), + + %% 2. Stop each in order. + lists:foreach(fun stop_slave/1, Nodes), + + %% 3. Restart nodes in the same order. This should not + %% crash and eventually all nodes should be ready. + start_cluster_async(Cluster), + + timer:sleep(15_000), + + assert_config_load_done(Nodes), + + ok + after + teardown_cluster(Cluster) + end. + +%%------------------------------------------------------------------------------ +%% Helper functions +%%------------------------------------------------------------------------------ + +assert_config_load_done(Nodes) -> + lists:foreach( + fun(Node) -> + Done = rpc:call(Node, emqx_app, get_init_config_load_done, []), + ?assert(Done, #{node => Node}) + end, + Nodes + ). + +start_cluster(Specs) -> + [start_slave(I) || I <- Specs]. + +start_cluster_async(Specs) -> + [ + begin + spawn_link(fun() -> start_slave(I) end), + timer:sleep(7_000) + end + || I <- Specs + ]. + +cluster(Specs) -> + cluster(Specs, []). + +cluster(Specs0, CommonEnv) -> + Specs1 = lists:zip(Specs0, lists:seq(1, length(Specs0))), + Specs = expand_node_specs(Specs1, CommonEnv), + CoreNodes = [node_id(Name) || {{core, Name, _}, _} <- Specs], + %% Assign grpc ports: + BaseGenRpcPort = 9000, + GenRpcPorts = maps:from_list([ + {node_id(Name), {tcp, BaseGenRpcPort + Num}} + || {{_, Name, _}, Num} <- Specs + ]), + %% Set the default node of the cluster: + JoinTo = + case CoreNodes of + [First | _] -> #{join_to => First}; + _ -> #{} + end, + [ + JoinTo#{ + name => Name, + node => node_id(Name), + env => [ + {mria, core_nodes, CoreNodes}, + {mria, node_role, Role}, + {gen_rpc, tcp_server_port, BaseGenRpcPort + Number}, + {gen_rpc, client_config_per_node, {internal, GenRpcPorts}} + | Env + ], + number => Number, + role => Role + } + || {{Role, Name, Env}, Number} <- Specs + ]. + +start_apps(Node) -> + Handler = fun + (emqx) -> + application:set_env(emqx, boot_modules, []), + ok; + (_) -> + ok + end, + {Node, ok} = + {Node, rpc:call(Node, emqx_common_test_helpers, start_apps, [[emqx_conf], Handler])}, + ok. + +stop_apps(Node) -> + ok = rpc:call(Node, emqx_common_test_helpers, stop_apps, [[emqx_conf]]). + +join_cluster(#{node := Node, join_to := JoinTo}) -> + case rpc:call(Node, ekka, join, [JoinTo]) of + ok -> ok; + ignore -> ok; + Err -> error({failed_to_join_cluster, #{node => Node, error => Err}}) + end. + +start_slave(#{node := Node, env := Env}) -> + %% We want VMs to only occupy a single core + CommonBeamOpts = + "+S 1:1 " ++ + %% redirect logs to the master test node + " -master " ++ atom_to_list(node()) ++ " ", + %% We use `ct_slave' instead of `slave' because, in + %% `t_copy_conf_override_on_restarts', the nodes might be stuck + %% some time during boot up, and `slave' has a hard-coded boot + %% timeout. + {ok, Node} = ct_slave:start( + Node, + [ + {erl_flags, CommonBeamOpts ++ ebin_path()}, + {kill_if_fail, true}, + {monitor_master, true}, + {init_timeout, 30_000}, + {startup_timeout, 30_000} + ] + ), + + %% Load apps before setting the enviroment variables to avoid + %% overriding the environment during app start: + [rpc:call(Node, application, load, [App]) || App <- [gen_rpc]], + %% Disable gen_rpc listener by default: + Env1 = [{gen_rpc, tcp_server_port, false} | Env], + setenv(Node, Env1), + ok = start_apps(Node), + Node. + +expand_node_specs(Specs, CommonEnv) -> + lists:map( + fun({Spec, Num}) -> + { + case Spec of + core -> + {core, gen_node_name(Num), CommonEnv}; + replicant -> + {replicant, gen_node_name(Num), CommonEnv}; + {Role, Name} when is_atom(Name) -> + {Role, Name, CommonEnv}; + {Role, Env} when is_list(Env) -> + {Role, gen_node_name(Num), CommonEnv ++ Env}; + {Role, Name, Env} -> + {Role, Name, CommonEnv ++ Env} + end, + Num + } + end, + Specs + ). + +setenv(Node, Env) -> + [rpc:call(Node, application, set_env, [App, Key, Val]) || {App, Key, Val} <- Env]. + +teardown_cluster(Specs) -> + Nodes = [I || #{node := I} <- Specs], + [rpc:call(I, emqx_common_test_helpers, stop_apps, [emqx_conf]) || I <- Nodes], + [stop_slave(I) || I <- Nodes], + ok. + +stop_slave(Node) -> + ct_slave:stop(Node). + +host() -> + [_, Host] = string:tokens(atom_to_list(node()), "@"), + Host. + +node_id(Name) -> + list_to_atom(lists:concat([Name, "@", host()])). + +gen_node_name(N) -> + list_to_atom("n" ++ integer_to_list(N)). + +ebin_path() -> + string:join(["-pa" | paths()], " "). + +paths() -> + [ + Path + || Path <- code:get_path(), + string:prefix(Path, code:lib_dir()) =:= nomatch, + string:str(Path, "_build/default/plugins") =:= 0 + ].