fix(config): Do not sync cluster config from nodes running new version

This commit is contained in:
Zaiming (Stone) Shi 2023-05-31 14:44:56 +02:00
parent 39d6f612ca
commit 332daabcc5
2 changed files with 159 additions and 72 deletions

View File

@ -42,6 +42,8 @@ start(_StartType, _StartArgs) ->
stop(_State) ->
ok.
%% Read the cluster config from the local node.
%% This function is named 'override' due to historical reasons.
get_override_config_file() ->
Node = node(),
case emqx_app:get_init_config_load_done() of
@ -63,7 +65,7 @@ get_override_config_file() ->
tnx_id => TnxId,
node => Node,
has_deprecated_file => HasDeprecateFile,
release => emqx_app:get_release()
release => emqx_release:version_with_prefix()
}
end,
case mria:ro_transaction(?CLUSTER_RPC_SHARD, Fun) of
@ -95,7 +97,7 @@ init_conf() ->
%% Workaround for https://github.com/emqx/mria/issues/94:
_ = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], 1000),
_ = mria:wait_for_tables([?CLUSTER_MFA, ?CLUSTER_COMMIT]),
{ok, TnxId} = copy_override_conf_from_core_node(),
{ok, TnxId} = sync_cluster_conf(),
_ = emqx_app:set_init_tnx_id(TnxId),
ok = init_load(),
ok = emqx_app:set_init_config_load_done().
@ -103,88 +105,134 @@ init_conf() ->
cluster_nodes() ->
mria:cluster_nodes(cores) -- [node()].
copy_override_conf_from_core_node() ->
%% @doc Try to sync the cluster config from other core nodes.
sync_cluster_conf() ->
case cluster_nodes() of
%% The first core nodes is self.
[] ->
?SLOG(debug, #{msg => "skip_copy_override_conf_from_core_node"}),
%% The first core nodes is self.
?SLOG(debug, #{
msg => "skip_sync_cluster_conf", reason => "Running single node"
}),
{ok, ?DEFAULT_INIT_TXN_ID};
Nodes ->
{Results, Failed} = emqx_conf_proto_v2:get_override_config_file(Nodes),
{Ready, NotReady0} = lists:partition(fun(Res) -> element(1, Res) =:= ok end, Results),
NotReady = lists:filter(fun(Res) -> element(1, Res) =:= error end, NotReady0),
case (Failed =/= [] orelse NotReady =/= []) andalso Ready =/= [] of
sync_cluster_conf2(Nodes)
end.
%% @priv Some core nodes are running, try to sync the cluster config from them.
sync_cluster_conf2(Nodes) ->
{Results, Failed} = emqx_conf_proto_v2:get_override_config_file(Nodes),
{Ready, NotReady0} = lists:partition(fun(Res) -> element(1, Res) =:= ok end, Results),
NotReady = lists:filter(fun(Res) -> element(1, Res) =:= error end, NotReady0),
case (Failed =/= [] orelse NotReady =/= []) of
true when Ready =/= [] ->
%% Some core nodes failed to reply.
Warning = #{
nodes => Nodes,
failed => Failed,
not_ready => NotReady,
msg => "ignored_nodes_when_sync_cluster_conf"
},
?SLOG(warning, Warning);
true ->
%% There are core nodes running but no one was able to reply.
?SLOG(error, #{
msg => "failed_to_sync_cluster_conf",
nodes => Nodes,
failed => Failed,
not_ready => NotReady
});
false ->
ok
end,
case Ready of
[] ->
case should_proceed_with_boot() of
true ->
Warning = #{
nodes => Nodes,
failed => Failed,
not_ready => NotReady,
msg => "ignored_bad_nodes_when_copy_init_config"
},
?SLOG(warning, Warning);
false ->
ok
end,
case Ready of
[] ->
%% Other core nodes running but no one replicated it successfully.
?SLOG(error, #{
msg => "copy_override_conf_from_core_node_failed",
%% Act as if this node is alone, so it can
%% finish the boot sequence and load the
%% config for other nodes to copy it.
?SLOG(info, #{
msg => "skip_sync_cluster_conf",
loading_from_disk => true,
nodes => Nodes,
failed => Failed,
not_ready => NotReady
}),
case should_proceed_with_boot() of
true ->
%% Act as if this node is alone, so it can
%% finish the boot sequence and load the
%% config for other nodes to copy it.
?SLOG(info, #{
msg => "skip_copy_override_conf_from_core_node",
loading_from_disk => true,
nodes => Nodes,
failed => Failed,
not_ready => NotReady
}),
{ok, ?DEFAULT_INIT_TXN_ID};
false ->
%% retry in some time
Jitter = rand:uniform(2000),
Timeout = 10000 + Jitter,
?SLOG(info, #{
msg => "copy_cluster_conf_from_core_node_retry",
timeout => Timeout,
nodes => Nodes,
failed => Failed,
not_ready => NotReady
}),
timer:sleep(Timeout),
copy_override_conf_from_core_node()
end;
_ ->
[{ok, Info} | _] = lists:sort(fun conf_sort/2, Ready),
#{node := Node, conf := RawOverrideConf, tnx_id := TnxId} = Info,
HasDeprecatedFile = has_deprecated_file(Info),
?SLOG(debug, #{
msg => "copy_cluster_conf_from_core_node_success",
node => Node,
has_deprecated_file => HasDeprecatedFile,
local_release => emqx_app:get_release(),
remote_release => maps:get(release, Info, "before_v5.0.24|e5.0.3"),
data_dir => emqx:data_dir(),
tnx_id => TnxId
{ok, ?DEFAULT_INIT_TXN_ID};
false ->
%% retry in some time
Jitter = rand:uniform(2000),
Timeout = 10000 + Jitter,
timer:sleep(Timeout),
?SLOG(warning, #{
msg => "sync_cluster_conf_retry",
timeout => Timeout,
nodes => Nodes,
failed => Failed,
not_ready => NotReady
}),
ok = emqx_config:save_to_override_conf(
HasDeprecatedFile,
RawOverrideConf,
#{override_to => cluster}
),
ok = sync_data_from_node(Node),
{ok, TnxId}
end
sync_cluster_conf()
end;
_ ->
sync_cluster_conf3(Ready)
end.
%% @priv Filter out the nodes which are running a newer version than this node.
sync_cluster_conf3(Ready) ->
NotNewer = fun({ok, #{release := RemoteRelease}}) ->
try
emqx_release:vsn_compare(RemoteRelease) =/= newer
catch
_:_ ->
%% If the version is not valid (without v or e prefix),
%% we know it's older than v5.1.0/e5.1.0
true
end
end,
case lists:filter(NotNewer, Ready) of
[] ->
%% All available core nodes are running a newer version than this node.
%% Start this node without syncing cluster config from them.
%% This is likely a restart of an older version node during cluster upgrade.
NodesAndVersions = lists:map(
fun({ok, #{node := Node, release := Release}}) ->
#{node => Node, version => Release}
end,
Ready
),
?SLOG(warning, #{
msg => "all_available_nodes_running_newer_version",
hint => "Booting this node without syncing cluster config from peer core nodes",
peer_nodes => NodesAndVersions
}),
{ok, ?DEFAULT_INIT_TXN_ID};
Ready2 ->
sync_cluster_conf4(Ready2)
end.
%% @priv Some core nodes are running and replied with their configs successfully.
%% Try to sort the results and save the first one for local use.
sync_cluster_conf4(Ready) ->
[{ok, Info} | _] = lists:sort(fun conf_sort/2, Ready),
#{node := Node, conf := RawOverrideConf, tnx_id := TnxId} = Info,
HasDeprecatedFile = has_deprecated_file(Info),
?SLOG(debug, #{
msg => "sync_cluster_conf_success",
synced_from_node => Node,
has_deprecated_file => HasDeprecatedFile,
local_release => emqx_app:get_release(),
remote_release => maps:get(release, Info, "before_v5.0.24|e5.0.3"),
data_dir => emqx:data_dir(),
tnx_id => TnxId
}),
ok = emqx_config:save_to_override_conf(
HasDeprecatedFile,
RawOverrideConf,
#{override_to => cluster}
),
ok = sync_data_from_node(Node),
{ok, TnxId}.
should_proceed_with_boot() ->
TablesStatus = emqx_cluster_rpc:get_tables_status(),
LocalNode = node(),

View File

@ -98,6 +98,34 @@ t_copy_deprecated_data_dir(_Config) ->
stop_cluster(Nodes)
end.
t_no_copy_from_newer_version_node(_Config) ->
net_kernel:start(['master2@127.0.0.1', longnames]),
ct:timetrap({seconds, 120}),
snabbkaffe:fix_ct_logging(),
Cluster = cluster([cluster_spec({core, 10}), cluster_spec({core, 11}), cluster_spec({core, 12})]),
OKs = [ok, ok, ok],
[First | Rest] = Nodes = start_cluster(Cluster),
try
File = "/configs/cluster.hocon",
assert_config_load_done(Nodes),
rpc:call(First, ?MODULE, create_data_dir, [File]),
{OKs, []} = rpc:multicall(Nodes, application, stop, [emqx_conf]),
{OKs, []} = rpc:multicall(Nodes, ?MODULE, set_data_dir_env, []),
{OKs, []} = rpc:multicall(Nodes, meck, new, [
emqx_release, [passthrough, no_history, no_link, non_strict]
]),
%% 99.9.9 is always newer than the current version
{OKs, []} = rpc:multicall(Nodes, meck, expect, [
emqx_release, version_with_prefix, 0, "e99.9.9"
]),
ok = rpc:call(First, application, start, [emqx_conf]),
{[ok, ok], []} = rpc:multicall(Rest, application, start, [emqx_conf]),
ok = assert_no_cluster_conf_copied(Rest, File),
stop_cluster(Nodes),
ok
after
stop_cluster(Nodes)
end.
%%------------------------------------------------------------------------------
%% Helper functions
%%------------------------------------------------------------------------------
@ -158,6 +186,17 @@ assert_data_copy_done([First0 | Rest], File) ->
Rest
).
assert_no_cluster_conf_copied([], _) ->
ok;
assert_no_cluster_conf_copied([Node | Nodes], File) ->
NodeStr = atom_to_list(Node),
?assertEqual(
{error, enoent},
file:read_file(NodeStr ++ File),
#{node => Node}
),
assert_no_cluster_conf_copied(Nodes, File).
assert_config_load_done(Nodes) ->
lists:foreach(
fun(Node) ->