Merge pull request #10902 from zmstone/0529-donot-copy-cluster-conf-from-newer-version
0529 donot copy cluster conf from newer version
This commit is contained in:
commit
aecea50564
|
@ -31,11 +31,11 @@
|
|||
%% NOTE: ALso make sure to follow the instructions in end of
|
||||
%% `apps/emqx/src/bpapi/README.md'
|
||||
|
||||
%% Community edition
|
||||
%% Opensource edition
|
||||
-define(EMQX_RELEASE_CE, "5.1.0-alpha.3").
|
||||
|
||||
%% Enterprise edition
|
||||
-define(EMQX_RELEASE_EE, "5.1.0-alpha.3").
|
||||
|
||||
%% the HTTP API version
|
||||
%% The HTTP API version
|
||||
-define(EMQX_API_VERSION, "5.0").
|
||||
|
|
|
@ -91,7 +91,7 @@
|
|||
-export([ensure_atom_conf_path/2]).
|
||||
|
||||
-ifdef(TEST).
|
||||
-export([erase_all/0]).
|
||||
-export([erase_all/0, backup_and_write/2]).
|
||||
-endif.
|
||||
|
||||
-include("logger.hrl").
|
||||
|
@ -105,6 +105,7 @@
|
|||
-define(LISTENER_CONF_PATH(TYPE, LISTENER, PATH), [listeners, TYPE, LISTENER | PATH]).
|
||||
|
||||
-define(CONFIG_NOT_FOUND_MAGIC, '$0tFound').
|
||||
-define(MAX_KEEP_BACKUP_CONFIGS, 10).
|
||||
|
||||
-export_type([
|
||||
update_request/0,
|
||||
|
@ -597,43 +598,94 @@ save_to_config_map(Conf, RawConf) ->
|
|||
-spec save_to_override_conf(boolean(), raw_config(), update_opts()) -> ok | {error, term()}.
|
||||
save_to_override_conf(_, undefined, _) ->
|
||||
ok;
|
||||
%% TODO: Remove deprecated override conf file when 5.1
|
||||
save_to_override_conf(true, RawConf, Opts) ->
|
||||
case deprecated_conf_file(Opts) of
|
||||
undefined ->
|
||||
ok;
|
||||
FileName ->
|
||||
ok = filelib:ensure_dir(FileName),
|
||||
case file:write_file(FileName, hocon_pp:do(RawConf, #{})) of
|
||||
ok ->
|
||||
ok;
|
||||
{error, Reason} ->
|
||||
?SLOG(error, #{
|
||||
msg => "failed_to_write_override_file",
|
||||
filename => FileName,
|
||||
reason => Reason
|
||||
}),
|
||||
{error, Reason}
|
||||
end
|
||||
backup_and_write(FileName, hocon_pp:do(RawConf, #{}))
|
||||
end;
|
||||
save_to_override_conf(false, RawConf, _Opts) ->
|
||||
case cluster_hocon_file() of
|
||||
undefined ->
|
||||
ok;
|
||||
FileName ->
|
||||
ok = filelib:ensure_dir(FileName),
|
||||
case file:write_file(FileName, hocon_pp:do(RawConf, #{})) of
|
||||
backup_and_write(FileName, hocon_pp:do(RawConf, #{}))
|
||||
end.
|
||||
|
||||
%% @private This is the same human-readable timestamp format as
|
||||
%% hocon-cli generated app.<time>.config file name.
|
||||
now_time() ->
|
||||
Ts = os:system_time(millisecond),
|
||||
{{Y, M, D}, {HH, MM, SS}} = calendar:system_time_to_local_time(Ts, millisecond),
|
||||
Res = io_lib:format(
|
||||
"~0p.~2..0b.~2..0b.~2..0b.~2..0b.~2..0b.~3..0b",
|
||||
[Y, M, D, HH, MM, SS, Ts rem 1000]
|
||||
),
|
||||
lists:flatten(Res).
|
||||
|
||||
%% @private Backup the current config to a file with a timestamp suffix and
|
||||
%% then save the new config to the config file.
|
||||
backup_and_write(Path, Content) ->
|
||||
%% this may fail, but we don't care
|
||||
%% e.g. read-only file system
|
||||
_ = filelib:ensure_dir(Path),
|
||||
TmpFile = Path ++ ".tmp",
|
||||
case file:write_file(TmpFile, Content) of
|
||||
ok ->
|
||||
backup_and_replace(Path, TmpFile);
|
||||
{error, Reason} ->
|
||||
?SLOG(error, #{
|
||||
msg => "failed_to_save_conf_file",
|
||||
hint =>
|
||||
"The updated cluster config is note saved on this node, please check the file system.",
|
||||
filename => TmpFile,
|
||||
reason => Reason
|
||||
}),
|
||||
%% e.g. read-only, it's not the end of the world
|
||||
ok
|
||||
end.
|
||||
|
||||
backup_and_replace(Path, TmpPath) ->
|
||||
Backup = Path ++ "." ++ now_time() ++ ".bak",
|
||||
case file:rename(Path, Backup) of
|
||||
ok ->
|
||||
ok = file:rename(TmpPath, Path),
|
||||
ok = prune_backup_files(Path);
|
||||
{error, enoent} ->
|
||||
%% not created yet
|
||||
ok = file:rename(TmpPath, Path);
|
||||
{error, Reason} ->
|
||||
?SLOG(warning, #{
|
||||
msg => "failed_to_backup_conf_file",
|
||||
filename => Backup,
|
||||
reason => Reason
|
||||
}),
|
||||
ok
|
||||
end.
|
||||
|
||||
prune_backup_files(Path) ->
|
||||
Files0 = filelib:wildcard(Path ++ ".*"),
|
||||
Re = "\\.[0-9]{4}\\.[0-9]{2}\\.[0-9]{2}\\.[0-9]{2}\\.[0-9]{2}\\.[0-9]{2}\\.[0-9]{3}\\.bak$",
|
||||
Files = lists:filter(fun(F) -> re:run(F, Re) =/= nomatch end, Files0),
|
||||
Sorted = lists:reverse(lists:sort(Files)),
|
||||
{_Keeps, Deletes} = lists:split(min(?MAX_KEEP_BACKUP_CONFIGS, length(Sorted)), Sorted),
|
||||
lists:foreach(
|
||||
fun(F) ->
|
||||
case file:delete(F) of
|
||||
ok ->
|
||||
ok;
|
||||
{error, Reason} ->
|
||||
?SLOG(error, #{
|
||||
msg => "failed_to_save_conf_file",
|
||||
filename => FileName,
|
||||
?SLOG(warning, #{
|
||||
msg => "failed_to_delete_backup_conf_file",
|
||||
filename => F,
|
||||
reason => Reason
|
||||
}),
|
||||
{error, Reason}
|
||||
ok
|
||||
end
|
||||
end.
|
||||
end,
|
||||
Deletes
|
||||
).
|
||||
|
||||
add_handlers() ->
|
||||
ok = emqx_config_logger:add_handler(),
|
||||
|
|
|
@ -21,7 +21,10 @@
|
|||
edition_vsn_prefix/0,
|
||||
edition_longstr/0,
|
||||
description/0,
|
||||
version/0
|
||||
version/0,
|
||||
version_with_prefix/0,
|
||||
vsn_compare/1,
|
||||
vsn_compare/2
|
||||
]).
|
||||
|
||||
-include("emqx_release.hrl").
|
||||
|
@ -68,6 +71,10 @@ edition_vsn_prefix() ->
|
|||
edition_longstr() ->
|
||||
maps:get(edition(), ?EMQX_REL_NAME).
|
||||
|
||||
%% @doc Return the release version with prefix.
|
||||
version_with_prefix() ->
|
||||
edition_vsn_prefix() ++ version().
|
||||
|
||||
%% @doc Return the release version.
|
||||
version() ->
|
||||
case lists:keyfind(emqx_vsn, 1, ?MODULE:module_info(compile)) of
|
||||
|
@ -92,3 +99,47 @@ version() ->
|
|||
|
||||
build_vsn() ->
|
||||
maps:get(edition(), ?EMQX_REL_VSNS).
|
||||
|
||||
%% @doc Compare the given version with the current running version,
|
||||
%% return 'newer' 'older' or 'same'.
|
||||
vsn_compare("v" ++ Vsn) ->
|
||||
vsn_compare(?EMQX_RELEASE_CE, Vsn);
|
||||
vsn_compare("e" ++ Vsn) ->
|
||||
vsn_compare(?EMQX_RELEASE_EE, Vsn).
|
||||
|
||||
%% @private Compare the second argument with the first argument, return
|
||||
%% 'newer' 'older' or 'same' semver comparison result.
|
||||
vsn_compare(Vsn1, Vsn2) ->
|
||||
ParsedVsn1 = parse_vsn(Vsn1),
|
||||
ParsedVsn2 = parse_vsn(Vsn2),
|
||||
case ParsedVsn1 =:= ParsedVsn2 of
|
||||
true ->
|
||||
same;
|
||||
false when ParsedVsn1 < ParsedVsn2 ->
|
||||
newer;
|
||||
false ->
|
||||
older
|
||||
end.
|
||||
|
||||
%% @private Parse the version string to a tuple.
|
||||
%% Return {{Major, Minor, Patch}, Suffix}.
|
||||
%% Where Suffix is either an empty string or a tuple like {"rc", 1}.
|
||||
%% NOTE: taking the nature ordering of the suffix:
|
||||
%% {"alpha", _} < {"beta", _} < {"rc", _} < ""
|
||||
parse_vsn(Vsn) ->
|
||||
try
|
||||
[V1, V2, V3 | Suffix0] = string:tokens(Vsn, ".-"),
|
||||
Suffix =
|
||||
case Suffix0 of
|
||||
"" ->
|
||||
%% For the case like "5.1.0"
|
||||
"";
|
||||
[ReleaseStage, Number] ->
|
||||
%% For the case like "5.1.0-rc.1"
|
||||
{ReleaseStage, list_to_integer(Number)}
|
||||
end,
|
||||
{{list_to_integer(V1), list_to_integer(V2), list_to_integer(V3)}, Suffix}
|
||||
catch
|
||||
_:_ ->
|
||||
erlang:error({invalid_version_string, Vsn})
|
||||
end.
|
||||
|
|
|
@ -32,7 +32,23 @@ init_per_suite(Config) ->
|
|||
end_per_suite(_Config) ->
|
||||
emqx_common_test_helpers:stop_apps([]).
|
||||
|
||||
t_fill_default_values(_) ->
|
||||
init_per_testcase(TestCase, Config) ->
|
||||
try
|
||||
?MODULE:TestCase({init, Config})
|
||||
catch
|
||||
error:function_clause ->
|
||||
Config
|
||||
end.
|
||||
|
||||
end_per_testcase(TestCase, Config) ->
|
||||
try
|
||||
?MODULE:TestCase({'end', Config})
|
||||
catch
|
||||
error:function_clause ->
|
||||
ok
|
||||
end.
|
||||
|
||||
t_fill_default_values(C) when is_list(C) ->
|
||||
Conf = #{
|
||||
<<"broker">> => #{
|
||||
<<"perf">> => #{},
|
||||
|
@ -61,7 +77,7 @@ t_fill_default_values(_) ->
|
|||
_ = emqx_utils_json:encode(WithDefaults),
|
||||
ok.
|
||||
|
||||
t_init_load(_Config) ->
|
||||
t_init_load(C) when is_list(C) ->
|
||||
ConfFile = "./test_emqx.conf",
|
||||
ok = file:write_file(ConfFile, <<"">>),
|
||||
ExpectRootNames = lists:sort(hocon_schema:root_names(emqx_schema)),
|
||||
|
@ -80,7 +96,7 @@ t_init_load(_Config) ->
|
|||
?assertMatch({ok, #{raw_config := 128}}, emqx:update_config([mqtt, max_topic_levels], 128)),
|
||||
ok = file:delete(DeprecatedFile).
|
||||
|
||||
t_unknown_rook_keys(_) ->
|
||||
t_unknown_root_keys(C) when is_list(C) ->
|
||||
?check_trace(
|
||||
#{timetrap => 1000},
|
||||
begin
|
||||
|
@ -98,7 +114,50 @@ t_unknown_rook_keys(_) ->
|
|||
),
|
||||
ok.
|
||||
|
||||
t_init_load_emqx_schema(Config) ->
|
||||
t_cluster_hocon_backup({init, C}) ->
|
||||
C;
|
||||
t_cluster_hocon_backup({'end', _C}) ->
|
||||
File = "backup-test.hocon",
|
||||
Files = [File | filelib:wildcard(File ++ ".*.bak")],
|
||||
lists:foreach(fun file:delete/1, Files);
|
||||
t_cluster_hocon_backup(C) when is_list(C) ->
|
||||
Write = fun(Path, Content) ->
|
||||
%% avoid name clash
|
||||
timer:sleep(1),
|
||||
emqx_config:backup_and_write(Path, Content)
|
||||
end,
|
||||
File = "backup-test.hocon",
|
||||
%% write 12 times, 10 backups should be kept
|
||||
%% the latest one is File itself without suffix
|
||||
%% the oldest one is expected to be deleted
|
||||
N = 12,
|
||||
Inputs = lists:seq(1, N),
|
||||
Backups = lists:seq(N - 10, N - 1),
|
||||
InputContents = [integer_to_binary(I) || I <- Inputs],
|
||||
BackupContents = [integer_to_binary(I) || I <- Backups],
|
||||
lists:foreach(
|
||||
fun(Content) ->
|
||||
Write(File, Content)
|
||||
end,
|
||||
InputContents
|
||||
),
|
||||
LatestContent = integer_to_binary(N),
|
||||
?assertEqual({ok, LatestContent}, file:read_file(File)),
|
||||
Re = "\\.[0-9]{4}\\.[0-9]{2}\\.[0-9]{2}\\.[0-9]{2}\\.[0-9]{2}\\.[0-9]{2}\\.[0-9]{3}\\.bak$",
|
||||
Files = filelib:wildcard(File ++ ".*.bak"),
|
||||
?assert(lists:all(fun(F) -> re:run(F, Re) =/= nomatch end, Files)),
|
||||
%% keep only the latest 10
|
||||
?assertEqual(10, length(Files)),
|
||||
FilesSorted = lists:zip(lists:sort(Files), BackupContents),
|
||||
lists:foreach(
|
||||
fun({BackupFile, ExpectedContent}) ->
|
||||
?assertEqual({ok, ExpectedContent}, file:read_file(BackupFile))
|
||||
end,
|
||||
FilesSorted
|
||||
),
|
||||
ok.
|
||||
|
||||
t_init_load_emqx_schema(Config) when is_list(Config) ->
|
||||
emqx_config:erase_all(),
|
||||
%% Given empty config file
|
||||
ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"">>, Config),
|
||||
|
@ -127,7 +186,7 @@ t_init_load_emqx_schema(Config) ->
|
|||
Default
|
||||
).
|
||||
|
||||
t_init_zones_load_emqx_schema_no_default_for_none_existing(Config) ->
|
||||
t_init_zones_load_emqx_schema_no_default_for_none_existing(Config) when is_list(Config) ->
|
||||
emqx_config:erase_all(),
|
||||
%% Given empty config file
|
||||
ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"">>, Config),
|
||||
|
@ -140,7 +199,7 @@ t_init_zones_load_emqx_schema_no_default_for_none_existing(Config) ->
|
|||
emqx_config:get([zones, no_exists])
|
||||
).
|
||||
|
||||
t_init_zones_load_other_schema(Config) ->
|
||||
t_init_zones_load_other_schema(Config) when is_list(Config) ->
|
||||
emqx_config:erase_all(),
|
||||
%% Given empty config file
|
||||
ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"">>, Config),
|
||||
|
@ -159,7 +218,7 @@ t_init_zones_load_other_schema(Config) ->
|
|||
emqx_config:get([zones, default])
|
||||
).
|
||||
|
||||
t_init_zones_with_user_defined_default_zone(Config) ->
|
||||
t_init_zones_with_user_defined_default_zone(Config) when is_list(Config) ->
|
||||
emqx_config:erase_all(),
|
||||
%% Given user defined config for default zone
|
||||
ConfFile = prepare_conf_file(
|
||||
|
@ -176,7 +235,7 @@ t_init_zones_with_user_defined_default_zone(Config) ->
|
|||
%% Then others are defaults
|
||||
?assertEqual(ExpectedOthers, Others).
|
||||
|
||||
t_init_zones_with_user_defined_other_zone(Config) ->
|
||||
t_init_zones_with_user_defined_other_zone(Config) when is_list(Config) ->
|
||||
emqx_config:erase_all(),
|
||||
%% Given user defined config for default zone
|
||||
ConfFile = prepare_conf_file(
|
||||
|
@ -196,7 +255,7 @@ t_init_zones_with_user_defined_other_zone(Config) ->
|
|||
%% Then default zone still have the defaults
|
||||
?assertEqual(zone_global_defaults(), emqx_config:get([zones, default])).
|
||||
|
||||
t_init_zones_with_cust_root_mqtt(Config) ->
|
||||
t_init_zones_with_cust_root_mqtt(Config) when is_list(Config) ->
|
||||
emqx_config:erase_all(),
|
||||
%% Given config file with mqtt user overrides
|
||||
ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"mqtt.retry_interval=10m">>, Config),
|
||||
|
@ -211,7 +270,7 @@ t_init_zones_with_cust_root_mqtt(Config) ->
|
|||
emqx_config:get([zones, default, mqtt])
|
||||
).
|
||||
|
||||
t_default_zone_is_updated_after_global_defaults_updated(Config) ->
|
||||
t_default_zone_is_updated_after_global_defaults_updated(Config) when is_list(Config) ->
|
||||
emqx_config:erase_all(),
|
||||
%% Given empty emqx conf
|
||||
ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"">>, Config),
|
||||
|
@ -227,7 +286,7 @@ t_default_zone_is_updated_after_global_defaults_updated(Config) ->
|
|||
emqx_config:get([zones, default, mqtt])
|
||||
).
|
||||
|
||||
t_myzone_is_updated_after_global_defaults_updated(Config) ->
|
||||
t_myzone_is_updated_after_global_defaults_updated(Config) when is_list(Config) ->
|
||||
emqx_config:erase_all(),
|
||||
%% Given emqx conf file with user override in myzone (none default zone)
|
||||
ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"zones.myzone.mqtt.max_inflight=32">>, Config),
|
||||
|
@ -251,7 +310,7 @@ t_myzone_is_updated_after_global_defaults_updated(Config) ->
|
|||
emqx_config:get([zones, default, mqtt])
|
||||
).
|
||||
|
||||
t_zone_no_user_defined_overrides(Config) ->
|
||||
t_zone_no_user_defined_overrides(Config) when is_list(Config) ->
|
||||
emqx_config:erase_all(),
|
||||
%% Given emqx conf file with user specified myzone
|
||||
ConfFile = prepare_conf_file(
|
||||
|
@ -268,7 +327,7 @@ t_zone_no_user_defined_overrides(Config) ->
|
|||
%% Then user defined value from config is not overwritten
|
||||
?assertMatch(600000, emqx_config:get([zones, myzone, mqtt, retry_interval])).
|
||||
|
||||
t_zone_no_user_defined_overrides_internal_represent(Config) ->
|
||||
t_zone_no_user_defined_overrides_internal_represent(Config) when is_list(Config) ->
|
||||
emqx_config:erase_all(),
|
||||
%% Given emqx conf file with user specified myzone
|
||||
ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"zones.myzone.mqtt.max_inflight=1">>, Config),
|
||||
|
@ -281,7 +340,7 @@ t_zone_no_user_defined_overrides_internal_represent(Config) ->
|
|||
?assertMatch(2, emqx_config:get([zones, default, mqtt, max_inflight])),
|
||||
?assertMatch(1, emqx_config:get([zones, myzone, mqtt, max_inflight])).
|
||||
|
||||
t_update_global_defaults_no_updates_on_user_overrides(Config) ->
|
||||
t_update_global_defaults_no_updates_on_user_overrides(Config) when is_list(Config) ->
|
||||
emqx_config:erase_all(),
|
||||
%% Given default zone config in conf file.
|
||||
ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"zones.default.mqtt.max_inflight=1">>, Config),
|
||||
|
@ -293,7 +352,7 @@ t_update_global_defaults_no_updates_on_user_overrides(Config) ->
|
|||
%% Then the value is not reflected in default `zone'
|
||||
?assertMatch(1, emqx_config:get([zones, default, mqtt, max_inflight])).
|
||||
|
||||
t_zone_update_with_new_zone(Config) ->
|
||||
t_zone_update_with_new_zone(Config) when is_list(Config) ->
|
||||
emqx_config:erase_all(),
|
||||
%% Given loaded an empty conf file
|
||||
ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"">>, Config),
|
||||
|
@ -308,7 +367,7 @@ t_zone_update_with_new_zone(Config) ->
|
|||
emqx_config:get([zones, myzone, mqtt])
|
||||
).
|
||||
|
||||
t_init_zone_with_global_defaults(_Config) ->
|
||||
t_init_zone_with_global_defaults(Config) when is_list(Config) ->
|
||||
%% Given uninitialized empty config
|
||||
emqx_config:erase_all(),
|
||||
Zones = #{myzone => #{mqtt => #{max_inflight => 3}}},
|
||||
|
|
|
@ -0,0 +1,56 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_release_tests).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
vsn_compre_test_() ->
|
||||
CurrentVersion = emqx_release:version_with_prefix(),
|
||||
[
|
||||
{"must be 'same' when comparing with current version", fun() ->
|
||||
?assertEqual(same, emqx_release:vsn_compare(CurrentVersion))
|
||||
end},
|
||||
{"must be 'same' when comparing same version strings", fun() ->
|
||||
?assertEqual(same, emqx_release:vsn_compare("1.1.1", "1.1.1"))
|
||||
end},
|
||||
{"1.1.1 is older than 1.1.2", fun() ->
|
||||
?assertEqual(older, emqx_release:vsn_compare("1.1.2", "1.1.1")),
|
||||
?assertEqual(newer, emqx_release:vsn_compare("1.1.1", "1.1.2"))
|
||||
end},
|
||||
{"1.1.9 is older than 1.1.10", fun() ->
|
||||
?assertEqual(older, emqx_release:vsn_compare("1.1.10", "1.1.9")),
|
||||
?assertEqual(newer, emqx_release:vsn_compare("1.1.9", "1.1.10"))
|
||||
end},
|
||||
{"alpha is older than beta", fun() ->
|
||||
?assertEqual(older, emqx_release:vsn_compare("1.1.1-beta.1", "1.1.1-alpha.2")),
|
||||
?assertEqual(newer, emqx_release:vsn_compare("1.1.1-alpha.2", "1.1.1-beta.1"))
|
||||
end},
|
||||
{"beta is older than rc", fun() ->
|
||||
?assertEqual(older, emqx_release:vsn_compare("1.1.1-rc.1", "1.1.1-beta.2")),
|
||||
?assertEqual(newer, emqx_release:vsn_compare("1.1.1-beta.2", "1.1.1-rc.1"))
|
||||
end},
|
||||
{"rc is older than official cut", fun() ->
|
||||
?assertEqual(older, emqx_release:vsn_compare("1.1.1", "1.1.1-rc.1")),
|
||||
?assertEqual(newer, emqx_release:vsn_compare("1.1.1-rc.1", "1.1.1"))
|
||||
end},
|
||||
{"invalid version string will crash", fun() ->
|
||||
?assertError({invalid_version_string, "1.1.a"}, emqx_release:vsn_compare("v1.1.a")),
|
||||
?assertError(
|
||||
{invalid_version_string, "1.1.1-alpha"}, emqx_release:vsn_compare("e1.1.1-alpha")
|
||||
)
|
||||
end}
|
||||
].
|
|
@ -575,7 +575,7 @@ maybe_init_tnx_id(Node, TnxId) ->
|
|||
{atomic, _} = transaction(fun ?MODULE:commit/2, [Node, TnxId]),
|
||||
ok.
|
||||
|
||||
%% @priv Cannot proceed until emqx app is ready.
|
||||
%% @private Cannot proceed until emqx app is ready.
|
||||
%% Otherwise the committed transaction catch up may fail.
|
||||
wait_for_emqx_ready() ->
|
||||
%% wait 10 seconds for emqx to start
|
||||
|
|
|
@ -42,6 +42,8 @@ start(_StartType, _StartArgs) ->
|
|||
stop(_State) ->
|
||||
ok.
|
||||
|
||||
%% Read the cluster config from the local node.
|
||||
%% This function is named 'override' due to historical reasons.
|
||||
get_override_config_file() ->
|
||||
Node = node(),
|
||||
case emqx_app:get_init_config_load_done() of
|
||||
|
@ -63,7 +65,7 @@ get_override_config_file() ->
|
|||
tnx_id => TnxId,
|
||||
node => Node,
|
||||
has_deprecated_file => HasDeprecateFile,
|
||||
release => emqx_app:get_release()
|
||||
release => emqx_release:version_with_prefix()
|
||||
}
|
||||
end,
|
||||
case mria:ro_transaction(?CLUSTER_RPC_SHARD, Fun) of
|
||||
|
@ -95,7 +97,7 @@ init_conf() ->
|
|||
%% Workaround for https://github.com/emqx/mria/issues/94:
|
||||
_ = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], 1000),
|
||||
_ = mria:wait_for_tables([?CLUSTER_MFA, ?CLUSTER_COMMIT]),
|
||||
{ok, TnxId} = copy_override_conf_from_core_node(),
|
||||
{ok, TnxId} = sync_cluster_conf(),
|
||||
_ = emqx_app:set_init_tnx_id(TnxId),
|
||||
ok = init_load(),
|
||||
ok = emqx_app:set_init_config_load_done().
|
||||
|
@ -103,88 +105,137 @@ init_conf() ->
|
|||
cluster_nodes() ->
|
||||
mria:cluster_nodes(cores) -- [node()].
|
||||
|
||||
copy_override_conf_from_core_node() ->
|
||||
%% @doc Try to sync the cluster config from other core nodes.
|
||||
sync_cluster_conf() ->
|
||||
case cluster_nodes() of
|
||||
%% The first core nodes is self.
|
||||
[] ->
|
||||
?SLOG(debug, #{msg => "skip_copy_override_conf_from_core_node"}),
|
||||
%% The first core nodes is self.
|
||||
?SLOG(debug, #{
|
||||
msg => "skip_sync_cluster_conf",
|
||||
reason => "This is a single node, or the first node in the cluster"
|
||||
}),
|
||||
{ok, ?DEFAULT_INIT_TXN_ID};
|
||||
Nodes ->
|
||||
{Results, Failed} = emqx_conf_proto_v2:get_override_config_file(Nodes),
|
||||
{Ready, NotReady0} = lists:partition(fun(Res) -> element(1, Res) =:= ok end, Results),
|
||||
NotReady = lists:filter(fun(Res) -> element(1, Res) =:= error end, NotReady0),
|
||||
case (Failed =/= [] orelse NotReady =/= []) andalso Ready =/= [] of
|
||||
sync_cluster_conf2(Nodes)
|
||||
end.
|
||||
|
||||
%% @private Some core nodes are running, try to sync the cluster config from them.
|
||||
sync_cluster_conf2(Nodes) ->
|
||||
{Results, Failed} = emqx_conf_proto_v2:get_override_config_file(Nodes),
|
||||
{Ready, NotReady0} = lists:partition(fun(Res) -> element(1, Res) =:= ok end, Results),
|
||||
NotReady = lists:filter(fun(Res) -> element(1, Res) =:= error end, NotReady0),
|
||||
case (Failed =/= [] orelse NotReady =/= []) of
|
||||
true when Ready =/= [] ->
|
||||
%% Some core nodes failed to reply.
|
||||
Warning = #{
|
||||
nodes => Nodes,
|
||||
failed => Failed,
|
||||
not_ready => NotReady,
|
||||
msg => "ignored_nodes_when_sync_cluster_conf"
|
||||
},
|
||||
?SLOG(warning, Warning);
|
||||
true ->
|
||||
%% There are core nodes running but no one was able to reply.
|
||||
?SLOG(error, #{
|
||||
msg => "failed_to_sync_cluster_conf",
|
||||
nodes => Nodes,
|
||||
failed => Failed,
|
||||
not_ready => NotReady
|
||||
});
|
||||
false ->
|
||||
ok
|
||||
end,
|
||||
case Ready of
|
||||
[] ->
|
||||
case should_proceed_with_boot() of
|
||||
true ->
|
||||
Warning = #{
|
||||
nodes => Nodes,
|
||||
failed => Failed,
|
||||
not_ready => NotReady,
|
||||
msg => "ignored_bad_nodes_when_copy_init_config"
|
||||
},
|
||||
?SLOG(warning, Warning);
|
||||
false ->
|
||||
ok
|
||||
end,
|
||||
case Ready of
|
||||
[] ->
|
||||
%% Other core nodes running but no one replicated it successfully.
|
||||
?SLOG(error, #{
|
||||
msg => "copy_override_conf_from_core_node_failed",
|
||||
%% Act as if this node is alone, so it can
|
||||
%% finish the boot sequence and load the
|
||||
%% config for other nodes to copy it.
|
||||
?SLOG(info, #{
|
||||
msg => "skip_sync_cluster_conf",
|
||||
loading_from_disk => true,
|
||||
nodes => Nodes,
|
||||
failed => Failed,
|
||||
not_ready => NotReady
|
||||
}),
|
||||
|
||||
case should_proceed_with_boot() of
|
||||
true ->
|
||||
%% Act as if this node is alone, so it can
|
||||
%% finish the boot sequence and load the
|
||||
%% config for other nodes to copy it.
|
||||
?SLOG(info, #{
|
||||
msg => "skip_copy_override_conf_from_core_node",
|
||||
loading_from_disk => true,
|
||||
nodes => Nodes,
|
||||
failed => Failed,
|
||||
not_ready => NotReady
|
||||
}),
|
||||
{ok, ?DEFAULT_INIT_TXN_ID};
|
||||
false ->
|
||||
%% retry in some time
|
||||
Jitter = rand:uniform(2000),
|
||||
Timeout = 10000 + Jitter,
|
||||
?SLOG(info, #{
|
||||
msg => "copy_cluster_conf_from_core_node_retry",
|
||||
timeout => Timeout,
|
||||
nodes => Nodes,
|
||||
failed => Failed,
|
||||
not_ready => NotReady
|
||||
}),
|
||||
timer:sleep(Timeout),
|
||||
copy_override_conf_from_core_node()
|
||||
end;
|
||||
_ ->
|
||||
[{ok, Info} | _] = lists:sort(fun conf_sort/2, Ready),
|
||||
#{node := Node, conf := RawOverrideConf, tnx_id := TnxId} = Info,
|
||||
HasDeprecatedFile = has_deprecated_file(Info),
|
||||
?SLOG(debug, #{
|
||||
msg => "copy_cluster_conf_from_core_node_success",
|
||||
node => Node,
|
||||
has_deprecated_file => HasDeprecatedFile,
|
||||
local_release => emqx_app:get_release(),
|
||||
remote_release => maps:get(release, Info, "before_v5.0.24|e5.0.3"),
|
||||
data_dir => emqx:data_dir(),
|
||||
tnx_id => TnxId
|
||||
{ok, ?DEFAULT_INIT_TXN_ID};
|
||||
false ->
|
||||
%% retry in some time
|
||||
Jitter = rand:uniform(2000),
|
||||
Timeout = 10000 + Jitter,
|
||||
timer:sleep(Timeout),
|
||||
?SLOG(warning, #{
|
||||
msg => "sync_cluster_conf_retry",
|
||||
timeout => Timeout,
|
||||
nodes => Nodes,
|
||||
failed => Failed,
|
||||
not_ready => NotReady
|
||||
}),
|
||||
ok = emqx_config:save_to_override_conf(
|
||||
HasDeprecatedFile,
|
||||
RawOverrideConf,
|
||||
#{override_to => cluster}
|
||||
),
|
||||
ok = sync_data_from_node(Node),
|
||||
{ok, TnxId}
|
||||
end
|
||||
sync_cluster_conf()
|
||||
end;
|
||||
_ ->
|
||||
sync_cluster_conf3(Ready)
|
||||
end.
|
||||
|
||||
%% @private Filter out the nodes which are running a newer version than this node.
|
||||
sync_cluster_conf3(Ready) ->
|
||||
NotNewer = fun({ok, #{release := RemoteRelease}}) ->
|
||||
try
|
||||
emqx_release:vsn_compare(RemoteRelease) =/= newer
|
||||
catch
|
||||
_:_ ->
|
||||
%% If the version is not valid (without v or e prefix),
|
||||
%% we know it's older than v5.1.0/e5.1.0
|
||||
true
|
||||
end
|
||||
end,
|
||||
case lists:filter(NotNewer, Ready) of
|
||||
[] ->
|
||||
%% All available core nodes are running a newer version than this node.
|
||||
%% Start this node without syncing cluster config from them.
|
||||
%% This is likely a restart of an older version node during cluster upgrade.
|
||||
NodesAndVersions = lists:map(
|
||||
fun({ok, #{node := Node, release := Release}}) ->
|
||||
#{node => Node, version => Release}
|
||||
end,
|
||||
Ready
|
||||
),
|
||||
?SLOG(warning, #{
|
||||
msg => "all_available_nodes_running_newer_version",
|
||||
hint =>
|
||||
"Booting this node without syncing cluster config from peer core nodes "
|
||||
"because other nodes are running a newer version",
|
||||
peer_nodes => NodesAndVersions
|
||||
}),
|
||||
{ok, ?DEFAULT_INIT_TXN_ID};
|
||||
Ready2 ->
|
||||
sync_cluster_conf4(Ready2)
|
||||
end.
|
||||
|
||||
%% @private Some core nodes are running and replied with their configs successfully.
|
||||
%% Try to sort the results and save the first one for local use.
|
||||
sync_cluster_conf4(Ready) ->
|
||||
[{ok, Info} | _] = lists:sort(fun conf_sort/2, Ready),
|
||||
#{node := Node, conf := RawOverrideConf, tnx_id := TnxId} = Info,
|
||||
HasDeprecatedFile = has_deprecated_file(Info),
|
||||
?SLOG(debug, #{
|
||||
msg => "sync_cluster_conf_success",
|
||||
synced_from_node => Node,
|
||||
has_deprecated_file => HasDeprecatedFile,
|
||||
local_release => emqx_app:get_release(),
|
||||
remote_release => maps:get(release, Info, "before_v5.0.24|e5.0.3"),
|
||||
data_dir => emqx:data_dir(),
|
||||
tnx_id => TnxId
|
||||
}),
|
||||
ok = emqx_config:save_to_override_conf(
|
||||
HasDeprecatedFile,
|
||||
RawOverrideConf,
|
||||
#{override_to => cluster}
|
||||
),
|
||||
ok = sync_data_from_node(Node),
|
||||
{ok, TnxId}.
|
||||
|
||||
should_proceed_with_boot() ->
|
||||
TablesStatus = emqx_cluster_rpc:get_tables_status(),
|
||||
LocalNode = node(),
|
||||
|
|
|
@ -98,6 +98,34 @@ t_copy_deprecated_data_dir(_Config) ->
|
|||
stop_cluster(Nodes)
|
||||
end.
|
||||
|
||||
t_no_copy_from_newer_version_node(_Config) ->
|
||||
net_kernel:start(['master2@127.0.0.1', longnames]),
|
||||
ct:timetrap({seconds, 120}),
|
||||
snabbkaffe:fix_ct_logging(),
|
||||
Cluster = cluster([cluster_spec({core, 10}), cluster_spec({core, 11}), cluster_spec({core, 12})]),
|
||||
OKs = [ok, ok, ok],
|
||||
[First | Rest] = Nodes = start_cluster(Cluster),
|
||||
try
|
||||
File = "/configs/cluster.hocon",
|
||||
assert_config_load_done(Nodes),
|
||||
rpc:call(First, ?MODULE, create_data_dir, [File]),
|
||||
{OKs, []} = rpc:multicall(Nodes, application, stop, [emqx_conf]),
|
||||
{OKs, []} = rpc:multicall(Nodes, ?MODULE, set_data_dir_env, []),
|
||||
{OKs, []} = rpc:multicall(Nodes, meck, new, [
|
||||
emqx_release, [passthrough, no_history, no_link, non_strict]
|
||||
]),
|
||||
%% 99.9.9 is always newer than the current version
|
||||
{OKs, []} = rpc:multicall(Nodes, meck, expect, [
|
||||
emqx_release, version_with_prefix, 0, "e99.9.9"
|
||||
]),
|
||||
ok = rpc:call(First, application, start, [emqx_conf]),
|
||||
{[ok, ok], []} = rpc:multicall(Rest, application, start, [emqx_conf]),
|
||||
ok = assert_no_cluster_conf_copied(Rest, File),
|
||||
stop_cluster(Nodes),
|
||||
ok
|
||||
after
|
||||
stop_cluster(Nodes)
|
||||
end.
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Helper functions
|
||||
%%------------------------------------------------------------------------------
|
||||
|
@ -158,6 +186,17 @@ assert_data_copy_done([First0 | Rest], File) ->
|
|||
Rest
|
||||
).
|
||||
|
||||
assert_no_cluster_conf_copied([], _) ->
|
||||
ok;
|
||||
assert_no_cluster_conf_copied([Node | Nodes], File) ->
|
||||
NodeStr = atom_to_list(Node),
|
||||
?assertEqual(
|
||||
{error, enoent},
|
||||
file:read_file(NodeStr ++ File),
|
||||
#{node => Node}
|
||||
),
|
||||
assert_no_cluster_conf_copied(Nodes, File).
|
||||
|
||||
assert_config_load_done(Nodes) ->
|
||||
lists:foreach(
|
||||
fun(Node) ->
|
||||
|
|
|
@ -0,0 +1,6 @@
|
|||
Avoid syncing cluser.hocon file from the nodes runing a newer version than self.
|
||||
|
||||
During cluster rolling upgrade, if an older version node has to restart due to whatever reason,
|
||||
if it copies the cluster.hocon file from a newer version node, it may fail to start.
|
||||
After this fix, the older version node will not copy the cluster.hocon file from a newer,
|
||||
so it will use its own cluster.hocon file to start.
|
Loading…
Reference in New Issue