diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index e3b824d75..5be474601 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -42,6 +42,8 @@ code_change/3 ]). +-export([get_tables_status/0]). + -export_type([txn_id/0, succeed_num/0, multicall_return/1, multicall_return/0]). -ifdef(TEST). @@ -172,6 +174,29 @@ get_node_tnx_id(Node) -> [#cluster_rpc_commit{tnx_id = TnxId}] -> TnxId end. +%% Checks whether the Mnesia tables used by this module are waiting to +%% be loaded and from where. +-spec get_tables_status() -> #{atom() => {waiting, [node()]} | {disc | network, node()}}. +get_tables_status() -> + maps:from_list([ + {Tab, do_get_tables_status(Tab)} + || Tab <- [?CLUSTER_COMMIT, ?CLUSTER_MFA] + ]). + +do_get_tables_status(Tab) -> + Props = mnesia:table_info(Tab, all), + TabNodes = proplists:get_value(all_nodes, Props), + KnownDown = mnesia_recover:get_mnesia_downs(), + LocalNode = node(), + case proplists:get_value(load_node, Props) of + unknown -> + {waiting, TabNodes -- [LocalNode | KnownDown]}; + LocalNode -> + {disc, LocalNode}; + Node -> + {network, Node} + end. + %% Regardless of what MFA is returned, consider it a success), %% then move to the next tnxId. %% if the next TnxId failed, need call the function again to skip. diff --git a/apps/emqx_conf/src/emqx_conf_app.erl b/apps/emqx_conf/src/emqx_conf_app.erl index 81d0481df..2115bc617 100644 --- a/apps/emqx_conf/src/emqx_conf_app.erl +++ b/apps/emqx_conf/src/emqx_conf_app.erl @@ -31,64 +31,6 @@ start(_StartType, _StartArgs) -> stop(_State) -> ok. -%% internal functions -init_conf() -> - {ok, TnxId} = copy_override_conf_from_core_node(), - emqx_app:set_init_tnx_id(TnxId), - emqx_config:init_load(emqx_conf:schema_module()), - emqx_app:set_init_config_load_done(). - -copy_override_conf_from_core_node() -> - case mria_mnesia:running_nodes() -- [node()] of - %% The first core nodes is self. - [] -> - ?SLOG(debug, #{msg => "skip_copy_overide_conf_from_core_node"}), - {ok, -1}; - Nodes -> - {Results, Failed} = emqx_conf_proto_v1:get_override_config_file(Nodes), - {Ready, NotReady0} = lists:partition(fun(Res) -> element(1, Res) =:= ok end, Results), - NotReady = lists:filter(fun(Res) -> element(1, Res) =:= error end, NotReady0), - case (Failed =/= [] orelse NotReady =/= []) andalso Ready =/= [] of - true -> - Warning = #{ - nodes => Nodes, - failed => Failed, - not_ready => NotReady, - msg => "ignored_bad_nodes_when_copy_init_config" - }, - ?SLOG(warning, Warning); - false -> - ok - end, - case Ready of - [] -> - %% Other core nodes running but no one replicated it successfully. - ?SLOG(error, #{ - msg => "copy_overide_conf_from_core_node_failed", - nodes => Nodes, - failed => Failed, - not_ready => NotReady - }), - {error, "core node not ready"}; - _ -> - SortFun = fun( - {ok, #{wall_clock := W1}}, - {ok, #{wall_clock := W2}} - ) -> - W1 > W2 - end, - [{ok, Info} | _] = lists:sort(SortFun, Ready), - #{node := Node, conf := RawOverrideConf, tnx_id := TnxId} = Info, - Msg = #{msg => "copy_overide_conf_from_core_node_success", node => Node}, - ?SLOG(debug, Msg), - ok = emqx_config:save_to_override_conf( - RawOverrideConf, - #{override_to => cluster} - ), - {ok, TnxId} - end - end. - get_override_config_file() -> Node = node(), Role = mria_rlog:role(), @@ -114,3 +56,111 @@ get_override_config_file() -> true when Role =:= replicant -> {ignore, #{node => Node}} end. + +%% ------------------------------------------------------------------------------ +%% Internal functions +%% ------------------------------------------------------------------------------ + +init_conf() -> + {ok, TnxId} = copy_override_conf_from_core_node(), + emqx_app:set_init_tnx_id(TnxId), + emqx_config:init_load(emqx_conf:schema_module()), + emqx_app:set_init_config_load_done(). + +cluster_nodes() -> + maps:get(running_nodes, ekka_cluster:info()) -- [node()]. + +copy_override_conf_from_core_node() -> + case cluster_nodes() of + %% The first core nodes is self. + [] -> + ?SLOG(debug, #{msg => "skip_copy_overide_conf_from_core_node"}), + {ok, ?DEFAULT_INIT_TXN_ID}; + Nodes -> + {Results, Failed} = emqx_conf_proto_v1:get_override_config_file(Nodes), + {Ready, NotReady0} = lists:partition(fun(Res) -> element(1, Res) =:= ok end, Results), + NotReady = lists:filter(fun(Res) -> element(1, Res) =:= error end, NotReady0), + case (Failed =/= [] orelse NotReady =/= []) andalso Ready =/= [] of + true -> + Warning = #{ + nodes => Nodes, + failed => Failed, + not_ready => NotReady, + msg => "ignored_bad_nodes_when_copy_init_config" + }, + ?SLOG(warning, Warning); + false -> + ok + end, + case Ready of + [] -> + %% Other core nodes running but no one replicated it successfully. + ?SLOG(error, #{ + msg => "copy_override_conf_from_core_node_failed", + nodes => Nodes, + failed => Failed, + not_ready => NotReady + }), + + case should_proceed_with_boot() of + true -> + %% Act as if this node is alone, so it can + %% finish the boot sequence and load the + %% config for other nodes to copy it. + ?SLOG(info, #{ + msg => "skip_copy_overide_conf_from_core_node", + loading_from_disk => true, + nodes => Nodes, + failed => Failed, + not_ready => NotReady + }), + {ok, ?DEFAULT_INIT_TXN_ID}; + false -> + %% retry in some time + Jitter = rand:uniform(2_000), + Timeout = 10_000 + Jitter, + ?SLOG(info, #{ + msg => "copy_overide_conf_from_core_node_retry", + timeout => Timeout, + nodes => Nodes, + failed => Failed, + not_ready => NotReady + }), + timer:sleep(Timeout), + copy_override_conf_from_core_node() + end; + _ -> + SortFun = fun( + {ok, #{wall_clock := W1}}, + {ok, #{wall_clock := W2}} + ) -> + W1 > W2 + end, + [{ok, Info} | _] = lists:sort(SortFun, Ready), + #{node := Node, conf := RawOverrideConf, tnx_id := TnxId} = Info, + Msg = #{ + msg => "copy_overide_conf_from_core_node_success", + node => Node + }, + ?SLOG(debug, Msg), + ok = emqx_config:save_to_override_conf( + RawOverrideConf, + #{override_to => cluster} + ), + {ok, TnxId} + end + end. + +should_proceed_with_boot() -> + TablesStatus = emqx_cluster_rpc:get_tables_status(), + LocalNode = node(), + case maps:get(?CLUSTER_COMMIT, TablesStatus) of + {disc, LocalNode} -> + %% Loading locally; let this node finish its boot sequence + %% so others can copy the config from this one. + true; + _ -> + %% Loading from another node or still waiting for nodes to + %% be up. Try again. + false + end. diff --git a/apps/emqx_conf/test/emqx_conf_app_SUITE.erl b/apps/emqx_conf/test/emqx_conf_app_SUITE.erl new file mode 100644 index 000000000..29b1404e8 --- /dev/null +++ b/apps/emqx_conf/test/emqx_conf_app_SUITE.erl @@ -0,0 +1,222 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_conf_app_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include("emqx_conf.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +all() -> + emqx_common_test_helpers:all(?MODULE). + +t_copy_conf_override_on_restarts(_Config) -> + ct:timetrap({seconds, 120}), + snabbkaffe:fix_ct_logging(), + Cluster = cluster([core, core, core]), + try + %% 1. Start all nodes + Nodes = start_cluster(Cluster), + [join_cluster(Spec) || Spec <- Cluster], + assert_config_load_done(Nodes), + + %% 2. Stop each in order. + lists:foreach(fun stop_slave/1, Nodes), + + %% 3. Restart nodes in the same order. This should not + %% crash and eventually all nodes should be ready. + start_cluster_async(Cluster), + + timer:sleep(15_000), + + assert_config_load_done(Nodes), + + ok + after + teardown_cluster(Cluster) + end. + +%%------------------------------------------------------------------------------ +%% Helper functions +%%------------------------------------------------------------------------------ + +assert_config_load_done(Nodes) -> + lists:foreach( + fun(Node) -> + Done = rpc:call(Node, emqx_app, get_init_config_load_done, []), + ?assert(Done, #{node => Node}) + end, + Nodes + ). + +start_cluster(Specs) -> + [start_slave(I) || I <- Specs]. + +start_cluster_async(Specs) -> + [ + begin + spawn_link(fun() -> start_slave(I) end), + timer:sleep(7_000) + end + || I <- Specs + ]. + +cluster(Specs) -> + cluster(Specs, []). + +cluster(Specs0, CommonEnv) -> + Specs1 = lists:zip(Specs0, lists:seq(1, length(Specs0))), + Specs = expand_node_specs(Specs1, CommonEnv), + CoreNodes = [node_id(Name) || {{core, Name, _}, _} <- Specs], + %% Assign grpc ports: + BaseGenRpcPort = 9000, + GenRpcPorts = maps:from_list([ + {node_id(Name), {tcp, BaseGenRpcPort + Num}} + || {{_, Name, _}, Num} <- Specs + ]), + %% Set the default node of the cluster: + JoinTo = + case CoreNodes of + [First | _] -> #{join_to => First}; + _ -> #{} + end, + [ + JoinTo#{ + name => Name, + node => node_id(Name), + env => [ + {mria, core_nodes, CoreNodes}, + {mria, node_role, Role}, + {gen_rpc, tcp_server_port, BaseGenRpcPort + Number}, + {gen_rpc, client_config_per_node, {internal, GenRpcPorts}} + | Env + ], + number => Number, + role => Role + } + || {{Role, Name, Env}, Number} <- Specs + ]. + +start_apps(Node) -> + Handler = fun + (emqx) -> + application:set_env(emqx, boot_modules, []), + ok; + (_) -> + ok + end, + {Node, ok} = + {Node, rpc:call(Node, emqx_common_test_helpers, start_apps, [[emqx_conf], Handler])}, + ok. + +stop_apps(Node) -> + ok = rpc:call(Node, emqx_common_test_helpers, stop_apps, [[emqx_conf]]). + +join_cluster(#{node := Node, join_to := JoinTo}) -> + case rpc:call(Node, ekka, join, [JoinTo]) of + ok -> ok; + ignore -> ok; + Err -> error({failed_to_join_cluster, #{node => Node, error => Err}}) + end. + +start_slave(#{node := Node, env := Env}) -> + %% We want VMs to only occupy a single core + CommonBeamOpts = + "+S 1:1 " ++ + %% redirect logs to the master test node + " -master " ++ atom_to_list(node()) ++ " ", + %% We use `ct_slave' instead of `slave' because, in + %% `t_copy_conf_override_on_restarts', the nodes might be stuck + %% some time during boot up, and `slave' has a hard-coded boot + %% timeout. + {ok, Node} = ct_slave:start( + Node, + [ + {erl_flags, CommonBeamOpts ++ ebin_path()}, + {kill_if_fail, true}, + {monitor_master, true}, + {init_timeout, 30_000}, + {startup_timeout, 30_000} + ] + ), + + %% Load apps before setting the enviroment variables to avoid + %% overriding the environment during app start: + [rpc:call(Node, application, load, [App]) || App <- [gen_rpc]], + %% Disable gen_rpc listener by default: + Env1 = [{gen_rpc, tcp_server_port, false} | Env], + setenv(Node, Env1), + ok = start_apps(Node), + Node. + +expand_node_specs(Specs, CommonEnv) -> + lists:map( + fun({Spec, Num}) -> + { + case Spec of + core -> + {core, gen_node_name(Num), CommonEnv}; + replicant -> + {replicant, gen_node_name(Num), CommonEnv}; + {Role, Name} when is_atom(Name) -> + {Role, Name, CommonEnv}; + {Role, Env} when is_list(Env) -> + {Role, gen_node_name(Num), CommonEnv ++ Env}; + {Role, Name, Env} -> + {Role, Name, CommonEnv ++ Env} + end, + Num + } + end, + Specs + ). + +setenv(Node, Env) -> + [rpc:call(Node, application, set_env, [App, Key, Val]) || {App, Key, Val} <- Env]. + +teardown_cluster(Specs) -> + Nodes = [I || #{node := I} <- Specs], + [rpc:call(I, emqx_common_test_helpers, stop_apps, [emqx_conf]) || I <- Nodes], + [stop_slave(I) || I <- Nodes], + ok. + +stop_slave(Node) -> + ct_slave:stop(Node). + +host() -> + [_, Host] = string:tokens(atom_to_list(node()), "@"), + Host. + +node_id(Name) -> + list_to_atom(lists:concat([Name, "@", host()])). + +gen_node_name(N) -> + list_to_atom("n" ++ integer_to_list(N)). + +ebin_path() -> + string:join(["-pa" | paths()], " "). + +paths() -> + [ + Path + || Path <- code:get_path(), + string:prefix(Path, code:lib_dir()) =:= nomatch, + string:str(Path, "_build/default/plugins") =:= 0 + ].