emqx/apps/emqx_management/src/emqx_mgmt_data_backup.erl

1000 lines
33 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mgmt_data_backup).
-export([
export/0,
export/1,
import/1,
import/2,
format_error/1
]).
%% HTTP API
-export([
upload/2,
maybe_copy_and_import/2,
read_file/1,
delete_file/1,
list_files/0,
format_conf_errors/1,
format_db_errors/1
]).
-export([default_validate_mnesia_backup/1]).
-export_type([import_res/0]).
-ifdef(TEST).
-compile(export_all).
-compile(nowarn_export_all).
-endif.
-elvis([{elvis_style, invalid_dynamic_call, disable}]).
-include_lib("kernel/include/file.hrl").
-include_lib("emqx/include/logger.hrl").
-define(ROOT_BACKUP_DIR, "backup").
-define(BACKUP_MNESIA_DIR, "mnesia").
-define(TAR_SUFFIX, ".tar.gz").
-define(META_FILENAME, "META.hocon").
-define(CLUSTER_HOCON_FILENAME, "cluster.hocon").
-define(CONF_KEYS, [
[<<"delayed">>],
[<<"rewrite">>],
[<<"retainer">>],
[<<"mqtt">>],
[<<"alarm">>],
[<<"sysmon">>],
[<<"sys_topics">>],
[<<"limiter">>],
[<<"log">>],
[<<"persistent_session_store">>],
[<<"durable_sessions">>],
[<<"prometheus">>],
[<<"crl_cache">>],
[<<"conn_congestion">>],
[<<"force_shutdown">>],
[<<"flapping_detect">>],
[<<"broker">>],
[<<"force_gc">>],
[<<"zones">>],
[<<"slow_subs">>],
[<<"cluster">>, <<"links">>]
]).
%% emqx_bridge_v2 depends on emqx_connector, so connectors need to be imported first
-define(IMPORT_ORDER, [
emqx_connector,
emqx_bridge_v2
]).
-define(DEFAULT_OPTS, #{}).
-define(tar(_FileName_), _FileName_ ++ ?TAR_SUFFIX).
-define(fmt_tar_err(_Expr_),
fun() ->
case _Expr_ of
{error, _Reason_} -> {error, erl_tar:format_error(_Reason_)};
_Other_ -> _Other_
end
end()
).
-define(backup_path(_FileName_), filename:join(root_backup_dir(), _FileName_)).
-type backup_file_info() :: #{
filename := binary(),
size := non_neg_integer(),
created_at := binary(),
created_at_sec := integer(),
node := node(),
atom() => _
}.
-type db_error_details() :: #{mria:table() => {error, _}}.
-type config_error_details() :: #{emqx_utils_maps:config_key_path() => {error, _}}.
-type import_res() ::
{ok, #{db_errors => db_error_details(), config_errors => config_error_details()}} | {error, _}.
%%------------------------------------------------------------------------------
%% APIs
%%------------------------------------------------------------------------------
-spec export() -> {ok, backup_file_info()} | {error, _}.
export() ->
export(?DEFAULT_OPTS).
-spec export(map()) -> {ok, backup_file_info()} | {error, _}.
export(Opts) ->
{BackupName, TarDescriptor} = prepare_new_backup(Opts),
try
do_export(BackupName, TarDescriptor, Opts)
catch
Class:Reason:Stack ->
?SLOG(error, #{
msg => "emqx_data_export_failed",
exception => Class,
reason => Reason,
stacktrace => Stack
}),
{error, Reason}
after
%% erl_tar:close/1 raises error if called on an already closed tar
catch erl_tar:close(TarDescriptor),
file:del_dir_r(BackupName)
end.
-spec import(file:filename_all()) -> import_res().
import(BackupFileName) ->
import(BackupFileName, ?DEFAULT_OPTS).
-spec import(file:filename_all(), map()) -> import_res().
import(BackupFileName, Opts) ->
case is_import_allowed() of
true ->
case lookup_file(str(BackupFileName)) of
{ok, FilePath} ->
do_import(FilePath, Opts);
Err ->
Err
end;
false ->
{error, not_core_node}
end.
-spec maybe_copy_and_import(node(), file:filename_all()) -> import_res().
maybe_copy_and_import(FileNode, BackupFileName) when FileNode =:= node() ->
import(BackupFileName, #{});
maybe_copy_and_import(FileNode, BackupFileName) ->
%% The file can be already present locally
case filelib:is_file(?backup_path(str(BackupFileName))) of
true ->
import(BackupFileName, #{});
false ->
copy_and_import(FileNode, BackupFileName)
end.
-spec read_file(file:filename_all()) ->
{ok, #{filename => file:filename_all(), file => binary()}} | {error, _}.
read_file(BackupFileName) ->
BackupFileNameStr = str(BackupFileName),
case validate_backup_name(BackupFileNameStr) of
ok ->
maybe_not_found(file:read_file(?backup_path(BackupFileName)));
Err ->
Err
end.
-spec delete_file(file:filename_all()) -> ok | {error, _}.
delete_file(BackupFileName) ->
BackupFileNameStr = str(BackupFileName),
case validate_backup_name(BackupFileNameStr) of
ok ->
maybe_not_found(file:delete(?backup_path(BackupFileName)));
Err ->
Err
end.
-spec upload(file:filename_all(), binary()) -> ok | {error, _}.
upload(BackupFileName, BackupFileContent) ->
BackupFileNameStr = str(BackupFileName),
FilePath = ?backup_path(BackupFileNameStr),
case filelib:is_file(FilePath) of
true ->
{error, {already_exists, BackupFileNameStr}};
false ->
do_upload(BackupFileNameStr, BackupFileContent)
end.
-spec list_files() -> [backup_file_info()].
list_files() ->
Filter =
fun(File) ->
case file:read_file_info(File, [{time, posix}]) of
{ok, #file_info{size = Size, ctime = CTimeSec}} ->
BaseFilename = bin(filename:basename(File)),
Info = #{
filename => BaseFilename,
size => Size,
created_at => emqx_utils_calendar:epoch_to_rfc3339(CTimeSec, second),
created_at_sec => CTimeSec,
node => node()
},
{true, Info};
_ ->
false
end
end,
lists:filtermap(Filter, backup_files()).
backup_files() ->
filelib:wildcard(?backup_path("*" ++ ?TAR_SUFFIX)).
format_error(not_core_node) ->
str(
io_lib:format(
"backup data import is only allowed on core EMQX nodes, but requested node ~p is not core",
[node()]
)
);
format_error(ee_to_ce_backup) ->
"importing EMQX Enterprise data backup to EMQX is not allowed";
format_error(missing_backup_meta) ->
"invalid backup archive file: missing " ?META_FILENAME;
format_error(invalid_edition) ->
"invalid backup archive content: wrong EMQX edition value in " ?META_FILENAME;
format_error(invalid_version) ->
"invalid backup archive content: wrong EMQX version value in " ?META_FILENAME;
format_error(bad_archive_dir) ->
"invalid backup archive content: all files in the archive must be under <backup name> directory";
format_error(not_found) ->
"backup file not found";
format_error(bad_backup_name) ->
"invalid backup name: file name must have " ?TAR_SUFFIX " extension";
format_error({unsupported_version, ImportVersion}) ->
str(
io_lib:format(
"[warning] Backup version ~p is newer than EMQX version ~p, import is not allowed.~n",
[str(ImportVersion), str(emqx_release:version())]
)
);
format_error({already_exists, BackupFileName}) ->
str(io_lib:format("Backup file \"~s\" already exists", [BackupFileName]));
format_error(Reason) ->
Reason.
format_conf_errors(Errors) ->
Opts = #{print_fun => fun io_lib:format/2},
maps:values(maps:map(conf_error_formatter(Opts), Errors)).
format_db_errors(Errors) ->
Opts = #{print_fun => fun io_lib:format/2},
maps:values(
maps:map(
fun(Tab, Err) -> maybe_print_mnesia_import_err(Tab, Err, Opts) end,
Errors
)
).
%%------------------------------------------------------------------------------
%% Internal functions
%%------------------------------------------------------------------------------
copy_and_import(FileNode, BackupFileName) ->
case emqx_mgmt_data_backup_proto_v1:read_file(FileNode, BackupFileName, infinity) of
{ok, BackupFileContent} ->
case upload(BackupFileName, BackupFileContent) of
ok ->
import(BackupFileName, #{});
Err ->
Err
end;
Err ->
Err
end.
%% compatibility with import API that uses lookup_file/1 and returns `not_found` reason
maybe_not_found({error, enoent}) ->
{error, not_found};
maybe_not_found(Other) ->
Other.
do_upload(BackupFileNameStr, BackupFileContent) ->
FilePath = ?backup_path(BackupFileNameStr),
BackupDir = ?backup_path(filename:basename(BackupFileNameStr, ?TAR_SUFFIX)),
try
ok = validate_backup_name(BackupFileNameStr),
ok = file:write_file(FilePath, BackupFileContent),
ok = extract_backup(FilePath),
{ok, _} = validate_backup(BackupDir),
HoconFileName = filename:join(BackupDir, ?CLUSTER_HOCON_FILENAME),
case filelib:is_regular(HoconFileName) of
true ->
{ok, RawConf} = hocon:files([HoconFileName]),
RawConf1 = upgrade_raw_conf(emqx_conf:schema_module(), RawConf),
{ok, _} = validate_cluster_hocon(RawConf1),
ok;
false ->
%% cluster.hocon can be missing in the backup
ok
end,
?SLOG(info, #{msg => "emqx_data_upload_success"})
catch
error:{badmatch, {error, Reason}}:Stack ->
?SLOG(error, #{msg => "emqx_data_upload_failed", reason => Reason, stacktrace => Stack}),
_ = file:delete(FilePath),
{error, Reason};
Class:Reason:Stack ->
_ = file:delete(FilePath),
?SLOG(error, #{
msg => "emqx_data_upload_failed",
exception => Class,
reason => Reason,
stacktrace => Stack
}),
{error, Reason}
after
file:del_dir_r(BackupDir)
end.
prepare_new_backup(Opts) ->
Ts = erlang:system_time(millisecond),
{{Y, M, D}, {HH, MM, SS}} = local_datetime(Ts),
BackupBaseName = str(
io_lib:format(
"emqx-export-~0p-~2..0b-~2..0b-~2..0b-~2..0b-~2..0b.~3..0b",
[Y, M, D, HH, MM, SS, Ts rem 1000]
)
),
BackupName = ?backup_path(BackupBaseName),
BackupTarName = ?tar(BackupName),
maybe_print("Exporting data to ~p...~n", [BackupTarName], Opts),
{ok, TarDescriptor} = ?fmt_tar_err(erl_tar:open(BackupTarName, [write, compressed])),
{BackupName, TarDescriptor}.
do_export(BackupName, TarDescriptor, Opts) ->
BackupBaseName = filename:basename(BackupName),
BackupTarName = ?tar(BackupName),
Meta = #{
version => emqx_release:version(),
edition => emqx_release:edition()
},
MetaBin = bin(hocon_pp:do(Meta, #{})),
MetaFileName = filename:join(BackupBaseName, ?META_FILENAME),
ok = ?fmt_tar_err(erl_tar:add(TarDescriptor, MetaBin, MetaFileName, [])),
ok = export_cluster_hocon(TarDescriptor, BackupBaseName, Opts),
ok = export_mnesia_tabs(TarDescriptor, BackupName, BackupBaseName, Opts),
ok = ?fmt_tar_err(erl_tar:close(TarDescriptor)),
{ok, #file_info{
size = Size,
ctime = CTime
}} = file:read_file_info(BackupTarName, [{time, posix}]),
{ok, #{
filename => bin(BackupTarName),
size => Size,
created_at => emqx_utils_calendar:epoch_to_rfc3339(CTime, second),
created_at_sec => CTime,
node => node()
}}.
export_cluster_hocon(TarDescriptor, BackupBaseName, Opts) ->
maybe_print("Exporting cluster configuration...~n", [], Opts),
RawConf = emqx_config:read_override_conf(#{override_to => cluster}),
maybe_print(
"Exporting additional files from EMQX data_dir: ~p...~n", [str(emqx:data_dir())], Opts
),
RawConf1 = read_data_files(RawConf),
RawConfBin = bin(hocon_pp:do(RawConf1, #{})),
NameInArchive = filename:join(BackupBaseName, ?CLUSTER_HOCON_FILENAME),
ok = ?fmt_tar_err(erl_tar:add(TarDescriptor, RawConfBin, NameInArchive, [])).
export_mnesia_tabs(TarDescriptor, BackupName, BackupBaseName, Opts) ->
maybe_print("Exporting built-in database...~n", [], Opts),
lists:foreach(
fun(Mod) ->
Tabs = Mod:backup_tables(),
lists:foreach(
fun(Tab) ->
export_mnesia_tab(TarDescriptor, Tab, BackupName, BackupBaseName, Opts)
end,
Tabs
)
end,
tabs_to_backup()
).
export_mnesia_tab(TarDescriptor, TabName, BackupName, BackupBaseName, Opts) ->
maybe_print("Exporting ~p database table...~n", [TabName], Opts),
{ok, MnesiaBackupName} = do_export_mnesia_tab(TabName, BackupName),
NameInArchive = mnesia_backup_name(BackupBaseName, TabName),
ok = ?fmt_tar_err(erl_tar:add(TarDescriptor, MnesiaBackupName, NameInArchive, [])),
_ = file:delete(MnesiaBackupName),
ok.
do_export_mnesia_tab(TabName, BackupName) ->
Node = node(),
try
Opts0 = [{name, TabName}, {min, [TabName]}, {allow_remote, false}],
Opts =
case mnesia:table_info(TabName, storage_type) of
ram_copies -> [{ram_overrides_dump, true} | Opts0];
_ -> Opts0
end,
{ok, TabName, [Node]} = mnesia:activate_checkpoint(Opts),
MnesiaBackupName = mnesia_backup_name(BackupName, TabName),
ok = filelib:ensure_dir(MnesiaBackupName),
ok = mnesia:backup_checkpoint(TabName, MnesiaBackupName),
{ok, MnesiaBackupName}
after
mnesia:deactivate_checkpoint(TabName)
end.
-ifdef(TEST).
tabs_to_backup() ->
%% Allow mocking in tests
?MODULE:mnesia_tabs_to_backup().
-else.
tabs_to_backup() ->
mnesia_tabs_to_backup().
-endif.
mnesia_tabs_to_backup() ->
lists:flatten([M || M <- find_behaviours(emqx_db_backup)]).
mnesia_backup_name(Path, TabName) ->
filename:join([Path, ?BACKUP_MNESIA_DIR, atom_to_list(TabName)]).
is_import_allowed() ->
mria_rlog:role() =:= core.
validate_backup(BackupDir) ->
case hocon:files([filename:join(BackupDir, ?META_FILENAME)]) of
{ok, #{
<<"edition">> := Edition,
<<"version">> := Version
}} = Meta ->
validate(
[
fun() -> check_edition(Edition) end,
fun() -> check_version(Version) end
],
Meta
);
_ ->
?SLOG(error, #{msg => "missing_backup_meta", backup => BackupDir}),
{error, missing_backup_meta}
end.
validate([ValidatorFun | T], OkRes) ->
case ValidatorFun() of
ok -> validate(T, OkRes);
Err -> Err
end;
validate([], OkRes) ->
OkRes.
check_edition(BackupEdition) when BackupEdition =:= <<"ce">>; BackupEdition =:= <<"ee">> ->
Edition = bin(emqx_release:edition()),
case {BackupEdition, Edition} of
{<<"ee">>, <<"ce">>} ->
{error, ee_to_ce_backup};
_ ->
ok
end;
check_edition(BackupEdition) ->
?SLOG(error, #{msg => "invalid_backup_edition", edition => BackupEdition}),
{error, invalid_edition}.
check_version(ImportVersion) ->
case parse_version_no_patch(ImportVersion) of
{ok, {ImportMajorInt, ImportMinorInt}} ->
Version = emqx_release:version(),
{ok, {MajorInt, MinorInt}} = parse_version_no_patch(bin(Version)),
case ImportMajorInt > MajorInt orelse ImportMinorInt > MinorInt of
true ->
%% 4.x backup files are anyway not compatible and will be treated as invalid,
%% before this step,
{error, {unsupported_version, str(ImportVersion)}};
false ->
ok
end;
Err ->
Err
end.
parse_version_no_patch(VersionBin) ->
case string:split(VersionBin, ".", all) of
[Major, Minor | _] ->
{MajorInt, _} = emqx_utils_binary:bin_to_int(Major),
{MinorInt, _} = emqx_utils_binary:bin_to_int(Minor),
{ok, {MajorInt, MinorInt}};
_ ->
?SLOG(error, #{msg => "failed_to_parse_backup_version", version => VersionBin}),
{error, invalid_version}
end.
do_import(BackupFileName, Opts) ->
BackupDir = ?backup_path(filename:basename(BackupFileName, ?TAR_SUFFIX)),
maybe_print("Importing data from ~p...~n", [BackupFileName], Opts),
try
ok = validate_backup_name(BackupFileName),
ok = extract_backup(BackupFileName),
{ok, _} = validate_backup(BackupDir),
ConfErrors = import_cluster_hocon(BackupDir, Opts),
MnesiaErrors = import_mnesia_tabs(BackupDir, Opts),
?SLOG(info, #{msg => "emqx_data_import_success"}),
{ok, #{db_errors => MnesiaErrors, config_errors => ConfErrors}}
catch
error:{badmatch, {error, Reason}}:Stack ->
?SLOG(error, #{msg => "emqx_data_import_failed", reason => Reason, stacktrace => Stack}),
{error, Reason};
Class:Reason:Stack ->
?SLOG(error, #{
msg => "emqx_data_import_failed",
exception => Class,
reason => Reason,
stacktrace => Stack
}),
{error, Reason}
after
file:del_dir_r(BackupDir)
end.
import_mnesia_tabs(BackupDir, Opts) ->
maybe_print("Importing built-in database...~n", [], Opts),
filter_errors(
lists:foldr(
fun(Mod, Acc) ->
Tabs = Mod:backup_tables(),
lists:foldr(
fun(Tab, InAcc) ->
InAcc#{Tab => import_mnesia_tab(BackupDir, Mod, Tab, Opts)}
end,
Acc,
Tabs
)
end,
#{},
tabs_to_backup()
)
).
-spec import_mnesia_tab(file:filename_all(), module(), mria:table(), map()) ->
ok | {ok, no_backup_file} | {error, term()} | no_return().
import_mnesia_tab(BackupDir, Mod, TabName, Opts) ->
MnesiaBackupFileName = mnesia_backup_name(BackupDir, TabName),
case filelib:is_regular(MnesiaBackupFileName) of
true ->
maybe_print("Importing ~p database table...~n", [TabName], Opts),
restore_mnesia_tab(BackupDir, MnesiaBackupFileName, Mod, TabName, Opts);
false ->
maybe_print("No backup file for ~p database table...~n", [TabName], Opts),
?SLOG(info, #{msg => "missing_mnesia_backup", table => TabName, backup => BackupDir}),
ok
end.
restore_mnesia_tab(BackupDir, MnesiaBackupFileName, Mod, TabName, Opts) ->
Validated = validate_mnesia_backup(MnesiaBackupFileName, Mod),
try
case Validated of
{ok, #{backup_file := BackupFile}} ->
%% As we use keep_tables option, we don't need to modify 'copies' (nodes)
%% in a backup file before restoring it, as `mnsia:restore/2` will ignore
%% backed-up schema and keep the current table schema unchanged
Restored = mnesia:restore(BackupFile, [{default_op, keep_tables}]),
case Restored of
{atomic, [TabName]} ->
on_table_imported(Mod, TabName, Opts);
RestoreErr ->
?SLOG(error, #{
msg => "failed_to_restore_mnesia_backup",
table => TabName,
backup => BackupDir,
reason => RestoreErr
}),
maybe_print_mnesia_import_err(TabName, RestoreErr, Opts),
{error, RestoreErr}
end;
PrepareErr ->
?SLOG(error, #{
msg => "failed_to_prepare_mnesia_backup_for_restoring",
table => TabName,
backup => BackupDir,
reason => PrepareErr
}),
maybe_print_mnesia_import_err(TabName, PrepareErr, Opts),
PrepareErr
end
after
%% Cleanup files as soon as they are not needed any more for more efficient disk usage
_ = file:delete(MnesiaBackupFileName)
end.
on_table_imported(Mod, Tab, Opts) ->
case erlang:function_exported(Mod, on_backup_table_imported, 2) of
true ->
try
Mod:on_backup_table_imported(Tab, Opts)
catch
Class:Reason:Stack ->
?SLOG(error, #{
msg => "post_database_import_callback_failed",
table => Tab,
module => Mod,
exception => Class,
reason => Reason,
stacktrace => Stack
}),
{error, Reason}
end;
false ->
ok
end.
%% NOTE: if backup file is valid, we keep traversing it, though we only need to validate schema.
%% Looks like there is no clean way to abort traversal without triggering any error reporting,
%% `mnesia_bup:read_schema/2` is an option but its direct usage should also be avoided...
validate_mnesia_backup(MnesiaBackupFileName, Mod) ->
Init = #{backup_file => MnesiaBackupFileName},
Validated =
catch mnesia:traverse_backup(
MnesiaBackupFileName,
mnesia_backup,
dummy,
read_only,
mnesia_backup_validator(Mod),
Init
),
case Validated of
ok ->
{ok, Init};
{error, {_, over}} ->
{ok, Init};
{error, {_, migrate}} ->
migrate_mnesia_backup(MnesiaBackupFileName, Mod, Init);
Error ->
Error
end.
%% if the module has validator callback, use it else use the default
mnesia_backup_validator(Mod) ->
Validator =
case erlang:function_exported(Mod, validate_mnesia_backup, 1) of
true ->
fun Mod:validate_mnesia_backup/1;
_ ->
fun default_validate_mnesia_backup/1
end,
fun(Schema, Acc) ->
case Validator(Schema) of
ok ->
{[Schema], Acc};
{ok, Break} ->
throw({error, Break});
Error ->
throw(Error)
end
end.
default_validate_mnesia_backup({schema, Tab, CreateList}) ->
ImportAttributes = proplists:get_value(attributes, CreateList),
Attributes = mnesia:table_info(Tab, attributes),
case ImportAttributes == Attributes of
true ->
ok;
false ->
{error, different_table_schema}
end;
default_validate_mnesia_backup(_Other) ->
ok.
migrate_mnesia_backup(MnesiaBackupFileName, Mod, Acc) ->
case erlang:function_exported(Mod, migrate_mnesia_backup, 1) of
true ->
MigrateFile = MnesiaBackupFileName ++ ".migrate",
Migrator = fun(Schema, InAcc) ->
case Mod:migrate_mnesia_backup(Schema) of
{ok, NewSchema} ->
{[NewSchema], InAcc};
Error ->
throw(Error)
end
end,
catch mnesia:traverse_backup(
MnesiaBackupFileName,
MigrateFile,
Migrator,
Acc#{backup_file := MigrateFile}
);
_ ->
{error, no_migrator}
end.
extract_backup(BackupFileName) ->
BackupDir = root_backup_dir(),
ok = validate_filenames(BackupFileName),
?fmt_tar_err(erl_tar:extract(BackupFileName, [{cwd, BackupDir}, compressed])).
validate_filenames(BackupFileName) ->
{ok, FileNames} = ?fmt_tar_err(erl_tar:table(BackupFileName, [compressed])),
BackupName = filename:basename(BackupFileName, ?TAR_SUFFIX),
IsValid = lists:all(
fun(FileName) ->
[Root | _] = filename:split(FileName),
Root =:= BackupName
end,
FileNames
),
case IsValid of
true -> ok;
false -> {error, bad_archive_dir}
end.
import_cluster_hocon(BackupDir, Opts) ->
HoconFileName = filename:join(BackupDir, ?CLUSTER_HOCON_FILENAME),
case filelib:is_regular(HoconFileName) of
true ->
{ok, RawConf} = hocon:files([HoconFileName]),
RawConf1 = upgrade_raw_conf(emqx_conf:schema_module(), RawConf),
{ok, _} = validate_cluster_hocon(RawConf1),
maybe_print("Importing cluster configuration...~n", [], Opts),
%% At this point, when all validations have been passed, we want to log errors (if any)
%% but proceed with the next items, instead of aborting the whole import operation
do_import_conf(RawConf1, Opts);
false ->
maybe_print("No cluster configuration to be imported.~n", [], Opts),
?SLOG(info, #{
msg => "no_backup_hocon_config_to_import",
backup => BackupDir
}),
#{}
end.
upgrade_raw_conf(SchemaMod, RawConf) ->
_ = SchemaMod:module_info(),
case erlang:function_exported(SchemaMod, upgrade_raw_conf, 1) of
true ->
%% TODO make it a schema module behaviour in hocon_schema
apply(SchemaMod, upgrade_raw_conf, [RawConf]);
false ->
RawConf
end.
read_data_files(RawConf) ->
DataDir = bin(emqx:data_dir()),
{ok, Cwd} = file:get_cwd(),
AbsDataDir = bin(filename:join(Cwd, DataDir)),
RawConf1 = emqx_authz:maybe_read_files(RawConf),
emqx_utils_maps:deep_convert(RawConf1, fun read_data_file/4, [DataDir, AbsDataDir]).
-define(dir_pattern(_Dir_), <<_Dir_:(byte_size(_Dir_))/binary, _/binary>>).
read_data_file(Key, Val, DataDir, AbsDataDir) ->
Val1 =
case Val of
?dir_pattern(DataDir) = FileName ->
do_read_file(FileName);
?dir_pattern(AbsDataDir) = FileName ->
do_read_file(FileName);
V ->
V
end,
{Key, Val1}.
do_read_file(FileName) ->
case file:read_file(FileName) of
{ok, Content} ->
Content;
{error, Reason} ->
?SLOG(warning, #{
msg => "failed_to_read_data_file",
filename => FileName,
reason => Reason
}),
FileName
end.
validate_cluster_hocon(RawConf) ->
%% write ACL file to comply with the schema...
RawConf1 = emqx_authz:maybe_write_files(RawConf),
emqx_hocon:check(
emqx_conf:schema_module(),
maps:merge(emqx:get_raw_config([]), RawConf1),
#{atom_key => false, required => false}
).
do_import_conf(RawConf, Opts) ->
GenConfErrs = filter_errors(maps:from_list(import_generic_conf(RawConf))),
maybe_print_conf_errors(GenConfErrs, Opts),
Modules = sort_importer_modules(find_behaviours(emqx_config_backup)),
Errors = lists:foldl(print_ok_results_collect_errors(RawConf, Opts), GenConfErrs, Modules),
maybe_print_conf_errors(Errors, Opts),
Errors.
print_ok_results_collect_errors(RawConf, Opts) ->
fun(Module, Errors) ->
case Module:import_config(RawConf) of
{results, {OkResults, ErrResults}} ->
print_ok_results(OkResults, Opts),
collect_errors(ErrResults, Errors);
{ok, OkResult} ->
print_ok_results([OkResult], Opts),
Errors;
{error, ErrResult} ->
collect_errors([ErrResult], Errors)
end
end.
print_ok_results(Results, Opts) ->
lists:foreach(
fun(#{changed := Changed}) ->
maybe_print_changed(Changed, Opts)
end,
Results
).
collect_errors(Results, Errors) ->
lists:foldr(
fun(#{root_key := RootKey, reason := Reason}, Acc) ->
Acc#{[RootKey] => Reason}
end,
Errors,
Results
).
sort_importer_modules(Modules) ->
lists:sort(
fun(M1, M2) -> order(M1, ?IMPORT_ORDER) =< order(M2, ?IMPORT_ORDER) end,
Modules
).
order(Elem, List) ->
order(Elem, List, 0).
order(_Elem, [], Order) ->
Order;
order(Elem, [Elem | _], Order) ->
Order;
order(Elem, [_ | T], Order) ->
order(Elem, T, Order + 1).
import_generic_conf(Data) ->
lists:map(
fun(KeyPath) ->
case emqx_utils_maps:deep_get(KeyPath, Data, undefined) of
undefined -> {[KeyPath], ok};
Conf -> {[KeyPath], emqx_conf:update(KeyPath, Conf, #{override_to => cluster})}
end
end,
?CONF_KEYS
).
maybe_print_changed(Changed, Opts) ->
lists:foreach(
fun(ChangedPath) ->
maybe_print(
"Config key path ~p was present before import and "
"has been overwritten.~n",
[pretty_path(ChangedPath)],
Opts
)
end,
Changed
).
maybe_print_conf_errors(Errors, Opts) ->
maps:foreach(conf_error_formatter(Opts), Errors).
conf_error_formatter(Opts) ->
fun(Path, Err) ->
maybe_print(
"Failed to import the following config path: ~p, reason: ~p~n",
[pretty_path(Path), Err],
Opts
)
end.
filter_errors(Results) ->
maps:filter(
fun
(_Path, {error, _}) -> true;
(_, _) -> false
end,
Results
).
pretty_path(Path) ->
str(lists:join(".", [str(Part) || Part <- Path])).
str(Data) when is_atom(Data) ->
atom_to_list(Data);
str(Data) ->
unicode:characters_to_list(Data).
bin(Data) when is_atom(Data) ->
atom_to_binary(Data, utf8);
bin(Data) ->
unicode:characters_to_binary(Data).
validate_backup_name(FileName) ->
BaseName = filename:basename(FileName, ?TAR_SUFFIX),
ValidName = BaseName ++ ?TAR_SUFFIX,
case filename:basename(FileName) of
ValidName -> ok;
_ -> {error, bad_backup_name}
end.
lookup_file(FileName) ->
case filelib:is_regular(FileName) of
true ->
{ok, FileName};
false ->
%% Only lookup by basename, don't allow to lookup by file path
case FileName =:= filename:basename(FileName) of
true ->
FilePath = ?backup_path(FileName),
case filelib:is_file(FilePath) of
true -> {ok, FilePath};
false -> {error, not_found}
end;
false ->
{error, not_found}
end
end.
root_backup_dir() ->
Dir = filename:join(emqx:data_dir(), ?ROOT_BACKUP_DIR),
ok = ensure_path(Dir),
Dir.
-if(?OTP_RELEASE < 25).
ensure_path(Path) -> filelib:ensure_dir(filename:join([Path, "dummy"])).
-else.
ensure_path(Path) -> filelib:ensure_path(Path).
-endif.
local_datetime(MillisecondTs) ->
calendar:system_time_to_local_time(MillisecondTs, millisecond).
maybe_print(Format, Args, #{print_fun := PrintFun}) ->
PrintFun(Format, Args);
maybe_print(_Format, _Args, _Opts) ->
ok.
maybe_print_mnesia_import_err(TabName, Error, Opts) ->
maybe_print(
"[error] Failed to import built-in database table: ~p, reason: ~p~n",
[TabName, Error],
Opts
).
find_behaviours(Behaviour) ->
find_behaviours(Behaviour, apps(), []).
%% Based on minirest_api:find_api_modules/1
find_behaviours(_Behaviour, [] = _Apps, Acc) ->
Acc;
find_behaviours(Behaviour, [App | Apps], Acc) ->
case application:get_key(App, modules) of
undefined ->
Acc;
{ok, Modules} ->
NewAcc = lists:filter(
fun(Module) ->
Info = Module:module_info(attributes),
Bhvrs = lists:flatten(
proplists:get_all_values(behavior, Info) ++
proplists:get_all_values(behaviour, Info)
),
lists:member(Behaviour, Bhvrs)
end,
Modules
),
find_behaviours(Behaviour, Apps, NewAcc ++ Acc)
end.
apps() ->
[
App
|| {App, _, _} <- application:loaded_applications(),
case re:run(atom_to_list(App), "^emqx") of
{match, [{0, 4}]} -> true;
_ -> false
end
].