From 332daabcc53a7f1f171cc9ae47fcf72328a9523f Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 31 May 2023 14:44:56 +0200 Subject: [PATCH] fix(config): Do not sync cluster config from nodes running new version --- apps/emqx_conf/src/emqx_conf_app.erl | 192 ++++++++++++-------- apps/emqx_conf/test/emqx_conf_app_SUITE.erl | 39 ++++ 2 files changed, 159 insertions(+), 72 deletions(-) diff --git a/apps/emqx_conf/src/emqx_conf_app.erl b/apps/emqx_conf/src/emqx_conf_app.erl index 70234b525..363e367b6 100644 --- a/apps/emqx_conf/src/emqx_conf_app.erl +++ b/apps/emqx_conf/src/emqx_conf_app.erl @@ -42,6 +42,8 @@ start(_StartType, _StartArgs) -> stop(_State) -> ok. +%% Read the cluster config from the local node. +%% This function is named 'override' due to historical reasons. get_override_config_file() -> Node = node(), case emqx_app:get_init_config_load_done() of @@ -63,7 +65,7 @@ get_override_config_file() -> tnx_id => TnxId, node => Node, has_deprecated_file => HasDeprecateFile, - release => emqx_app:get_release() + release => emqx_release:version_with_prefix() } end, case mria:ro_transaction(?CLUSTER_RPC_SHARD, Fun) of @@ -95,7 +97,7 @@ init_conf() -> %% Workaround for https://github.com/emqx/mria/issues/94: _ = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], 1000), _ = mria:wait_for_tables([?CLUSTER_MFA, ?CLUSTER_COMMIT]), - {ok, TnxId} = copy_override_conf_from_core_node(), + {ok, TnxId} = sync_cluster_conf(), _ = emqx_app:set_init_tnx_id(TnxId), ok = init_load(), ok = emqx_app:set_init_config_load_done(). @@ -103,88 +105,134 @@ init_conf() -> cluster_nodes() -> mria:cluster_nodes(cores) -- [node()]. -copy_override_conf_from_core_node() -> +%% @doc Try to sync the cluster config from other core nodes. +sync_cluster_conf() -> case cluster_nodes() of - %% The first core nodes is self. [] -> - ?SLOG(debug, #{msg => "skip_copy_override_conf_from_core_node"}), + %% The first core nodes is self. + ?SLOG(debug, #{ + msg => "skip_sync_cluster_conf", reason => "Running single node" + }), {ok, ?DEFAULT_INIT_TXN_ID}; Nodes -> - {Results, Failed} = emqx_conf_proto_v2:get_override_config_file(Nodes), - {Ready, NotReady0} = lists:partition(fun(Res) -> element(1, Res) =:= ok end, Results), - NotReady = lists:filter(fun(Res) -> element(1, Res) =:= error end, NotReady0), - case (Failed =/= [] orelse NotReady =/= []) andalso Ready =/= [] of + sync_cluster_conf2(Nodes) + end. + +%% @priv Some core nodes are running, try to sync the cluster config from them. +sync_cluster_conf2(Nodes) -> + {Results, Failed} = emqx_conf_proto_v2:get_override_config_file(Nodes), + {Ready, NotReady0} = lists:partition(fun(Res) -> element(1, Res) =:= ok end, Results), + NotReady = lists:filter(fun(Res) -> element(1, Res) =:= error end, NotReady0), + case (Failed =/= [] orelse NotReady =/= []) of + true when Ready =/= [] -> + %% Some core nodes failed to reply. + Warning = #{ + nodes => Nodes, + failed => Failed, + not_ready => NotReady, + msg => "ignored_nodes_when_sync_cluster_conf" + }, + ?SLOG(warning, Warning); + true -> + %% There are core nodes running but no one was able to reply. + ?SLOG(error, #{ + msg => "failed_to_sync_cluster_conf", + nodes => Nodes, + failed => Failed, + not_ready => NotReady + }); + false -> + ok + end, + case Ready of + [] -> + case should_proceed_with_boot() of true -> - Warning = #{ - nodes => Nodes, - failed => Failed, - not_ready => NotReady, - msg => "ignored_bad_nodes_when_copy_init_config" - }, - ?SLOG(warning, Warning); - false -> - ok - end, - case Ready of - [] -> - %% Other core nodes running but no one replicated it successfully. - ?SLOG(error, #{ - msg => "copy_override_conf_from_core_node_failed", + %% Act as if this node is alone, so it can + %% finish the boot sequence and load the + %% config for other nodes to copy it. + ?SLOG(info, #{ + msg => "skip_sync_cluster_conf", + loading_from_disk => true, nodes => Nodes, failed => Failed, not_ready => NotReady }), - - case should_proceed_with_boot() of - true -> - %% Act as if this node is alone, so it can - %% finish the boot sequence and load the - %% config for other nodes to copy it. - ?SLOG(info, #{ - msg => "skip_copy_override_conf_from_core_node", - loading_from_disk => true, - nodes => Nodes, - failed => Failed, - not_ready => NotReady - }), - {ok, ?DEFAULT_INIT_TXN_ID}; - false -> - %% retry in some time - Jitter = rand:uniform(2000), - Timeout = 10000 + Jitter, - ?SLOG(info, #{ - msg => "copy_cluster_conf_from_core_node_retry", - timeout => Timeout, - nodes => Nodes, - failed => Failed, - not_ready => NotReady - }), - timer:sleep(Timeout), - copy_override_conf_from_core_node() - end; - _ -> - [{ok, Info} | _] = lists:sort(fun conf_sort/2, Ready), - #{node := Node, conf := RawOverrideConf, tnx_id := TnxId} = Info, - HasDeprecatedFile = has_deprecated_file(Info), - ?SLOG(debug, #{ - msg => "copy_cluster_conf_from_core_node_success", - node => Node, - has_deprecated_file => HasDeprecatedFile, - local_release => emqx_app:get_release(), - remote_release => maps:get(release, Info, "before_v5.0.24|e5.0.3"), - data_dir => emqx:data_dir(), - tnx_id => TnxId + {ok, ?DEFAULT_INIT_TXN_ID}; + false -> + %% retry in some time + Jitter = rand:uniform(2000), + Timeout = 10000 + Jitter, + timer:sleep(Timeout), + ?SLOG(warning, #{ + msg => "sync_cluster_conf_retry", + timeout => Timeout, + nodes => Nodes, + failed => Failed, + not_ready => NotReady }), - ok = emqx_config:save_to_override_conf( - HasDeprecatedFile, - RawOverrideConf, - #{override_to => cluster} - ), - ok = sync_data_from_node(Node), - {ok, TnxId} - end + sync_cluster_conf() + end; + _ -> + sync_cluster_conf3(Ready) end. +%% @priv Filter out the nodes which are running a newer version than this node. +sync_cluster_conf3(Ready) -> + NotNewer = fun({ok, #{release := RemoteRelease}}) -> + try + emqx_release:vsn_compare(RemoteRelease) =/= newer + catch + _:_ -> + %% If the version is not valid (without v or e prefix), + %% we know it's older than v5.1.0/e5.1.0 + true + end + end, + case lists:filter(NotNewer, Ready) of + [] -> + %% All available core nodes are running a newer version than this node. + %% Start this node without syncing cluster config from them. + %% This is likely a restart of an older version node during cluster upgrade. + NodesAndVersions = lists:map( + fun({ok, #{node := Node, release := Release}}) -> + #{node => Node, version => Release} + end, + Ready + ), + ?SLOG(warning, #{ + msg => "all_available_nodes_running_newer_version", + hint => "Booting this node without syncing cluster config from peer core nodes", + peer_nodes => NodesAndVersions + }), + {ok, ?DEFAULT_INIT_TXN_ID}; + Ready2 -> + sync_cluster_conf4(Ready2) + end. + +%% @priv Some core nodes are running and replied with their configs successfully. +%% Try to sort the results and save the first one for local use. +sync_cluster_conf4(Ready) -> + [{ok, Info} | _] = lists:sort(fun conf_sort/2, Ready), + #{node := Node, conf := RawOverrideConf, tnx_id := TnxId} = Info, + HasDeprecatedFile = has_deprecated_file(Info), + ?SLOG(debug, #{ + msg => "sync_cluster_conf_success", + synced_from_node => Node, + has_deprecated_file => HasDeprecatedFile, + local_release => emqx_app:get_release(), + remote_release => maps:get(release, Info, "before_v5.0.24|e5.0.3"), + data_dir => emqx:data_dir(), + tnx_id => TnxId + }), + ok = emqx_config:save_to_override_conf( + HasDeprecatedFile, + RawOverrideConf, + #{override_to => cluster} + ), + ok = sync_data_from_node(Node), + {ok, TnxId}. + should_proceed_with_boot() -> TablesStatus = emqx_cluster_rpc:get_tables_status(), LocalNode = node(), diff --git a/apps/emqx_conf/test/emqx_conf_app_SUITE.erl b/apps/emqx_conf/test/emqx_conf_app_SUITE.erl index 583405158..95b4ce697 100644 --- a/apps/emqx_conf/test/emqx_conf_app_SUITE.erl +++ b/apps/emqx_conf/test/emqx_conf_app_SUITE.erl @@ -98,6 +98,34 @@ t_copy_deprecated_data_dir(_Config) -> stop_cluster(Nodes) end. +t_no_copy_from_newer_version_node(_Config) -> + net_kernel:start(['master2@127.0.0.1', longnames]), + ct:timetrap({seconds, 120}), + snabbkaffe:fix_ct_logging(), + Cluster = cluster([cluster_spec({core, 10}), cluster_spec({core, 11}), cluster_spec({core, 12})]), + OKs = [ok, ok, ok], + [First | Rest] = Nodes = start_cluster(Cluster), + try + File = "/configs/cluster.hocon", + assert_config_load_done(Nodes), + rpc:call(First, ?MODULE, create_data_dir, [File]), + {OKs, []} = rpc:multicall(Nodes, application, stop, [emqx_conf]), + {OKs, []} = rpc:multicall(Nodes, ?MODULE, set_data_dir_env, []), + {OKs, []} = rpc:multicall(Nodes, meck, new, [ + emqx_release, [passthrough, no_history, no_link, non_strict] + ]), + %% 99.9.9 is always newer than the current version + {OKs, []} = rpc:multicall(Nodes, meck, expect, [ + emqx_release, version_with_prefix, 0, "e99.9.9" + ]), + ok = rpc:call(First, application, start, [emqx_conf]), + {[ok, ok], []} = rpc:multicall(Rest, application, start, [emqx_conf]), + ok = assert_no_cluster_conf_copied(Rest, File), + stop_cluster(Nodes), + ok + after + stop_cluster(Nodes) + end. %%------------------------------------------------------------------------------ %% Helper functions %%------------------------------------------------------------------------------ @@ -158,6 +186,17 @@ assert_data_copy_done([First0 | Rest], File) -> Rest ). +assert_no_cluster_conf_copied([], _) -> + ok; +assert_no_cluster_conf_copied([Node | Nodes], File) -> + NodeStr = atom_to_list(Node), + ?assertEqual( + {error, enoent}, + file:read_file(NodeStr ++ File), + #{node => Node} + ), + assert_no_cluster_conf_copied(Nodes, File). + assert_config_load_done(Nodes) -> lists:foreach( fun(Node) ->