1000 lines
33 KiB
Erlang
1000 lines
33 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_mgmt_data_backup).
|
|
|
|
-export([
|
|
export/0,
|
|
export/1,
|
|
import/1,
|
|
import/2,
|
|
format_error/1
|
|
]).
|
|
|
|
%% HTTP API
|
|
-export([
|
|
upload/2,
|
|
maybe_copy_and_import/2,
|
|
read_file/1,
|
|
delete_file/1,
|
|
list_files/0,
|
|
format_conf_errors/1,
|
|
format_db_errors/1
|
|
]).
|
|
|
|
-export([default_validate_mnesia_backup/1]).
|
|
|
|
-export_type([import_res/0]).
|
|
|
|
-ifdef(TEST).
|
|
-compile(export_all).
|
|
-compile(nowarn_export_all).
|
|
-endif.
|
|
|
|
-elvis([{elvis_style, invalid_dynamic_call, disable}]).
|
|
|
|
-include_lib("kernel/include/file.hrl").
|
|
-include_lib("emqx/include/logger.hrl").
|
|
|
|
-define(ROOT_BACKUP_DIR, "backup").
|
|
-define(BACKUP_MNESIA_DIR, "mnesia").
|
|
-define(TAR_SUFFIX, ".tar.gz").
|
|
-define(META_FILENAME, "META.hocon").
|
|
-define(CLUSTER_HOCON_FILENAME, "cluster.hocon").
|
|
-define(CONF_KEYS, [
|
|
[<<"delayed">>],
|
|
[<<"rewrite">>],
|
|
[<<"retainer">>],
|
|
[<<"mqtt">>],
|
|
[<<"alarm">>],
|
|
[<<"sysmon">>],
|
|
[<<"sys_topics">>],
|
|
[<<"limiter">>],
|
|
[<<"log">>],
|
|
[<<"persistent_session_store">>],
|
|
[<<"durable_sessions">>],
|
|
[<<"prometheus">>],
|
|
[<<"crl_cache">>],
|
|
[<<"conn_congestion">>],
|
|
[<<"force_shutdown">>],
|
|
[<<"flapping_detect">>],
|
|
[<<"broker">>],
|
|
[<<"force_gc">>],
|
|
[<<"zones">>],
|
|
[<<"slow_subs">>],
|
|
[<<"cluster">>, <<"links">>]
|
|
]).
|
|
|
|
%% emqx_bridge_v2 depends on emqx_connector, so connectors need to be imported first
|
|
-define(IMPORT_ORDER, [
|
|
emqx_connector,
|
|
emqx_bridge_v2
|
|
]).
|
|
|
|
-define(DEFAULT_OPTS, #{}).
|
|
-define(tar(_FileName_), _FileName_ ++ ?TAR_SUFFIX).
|
|
-define(fmt_tar_err(_Expr_),
|
|
fun() ->
|
|
case _Expr_ of
|
|
{error, _Reason_} -> {error, erl_tar:format_error(_Reason_)};
|
|
_Other_ -> _Other_
|
|
end
|
|
end()
|
|
).
|
|
-define(backup_path(_FileName_), filename:join(root_backup_dir(), _FileName_)).
|
|
|
|
-type backup_file_info() :: #{
|
|
filename := binary(),
|
|
size := non_neg_integer(),
|
|
created_at := binary(),
|
|
created_at_sec := integer(),
|
|
node := node(),
|
|
atom() => _
|
|
}.
|
|
|
|
-type db_error_details() :: #{mria:table() => {error, _}}.
|
|
-type config_error_details() :: #{emqx_utils_maps:config_key_path() => {error, _}}.
|
|
-type import_res() ::
|
|
{ok, #{db_errors => db_error_details(), config_errors => config_error_details()}} | {error, _}.
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% APIs
|
|
%%------------------------------------------------------------------------------
|
|
|
|
-spec export() -> {ok, backup_file_info()} | {error, _}.
|
|
export() ->
|
|
export(?DEFAULT_OPTS).
|
|
|
|
-spec export(map()) -> {ok, backup_file_info()} | {error, _}.
|
|
export(Opts) ->
|
|
{BackupName, TarDescriptor} = prepare_new_backup(Opts),
|
|
try
|
|
do_export(BackupName, TarDescriptor, Opts)
|
|
catch
|
|
Class:Reason:Stack ->
|
|
?SLOG(error, #{
|
|
msg => "emqx_data_export_failed",
|
|
exception => Class,
|
|
reason => Reason,
|
|
stacktrace => Stack
|
|
}),
|
|
{error, Reason}
|
|
after
|
|
%% erl_tar:close/1 raises error if called on an already closed tar
|
|
catch erl_tar:close(TarDescriptor),
|
|
file:del_dir_r(BackupName)
|
|
end.
|
|
|
|
-spec import(file:filename_all()) -> import_res().
|
|
import(BackupFileName) ->
|
|
import(BackupFileName, ?DEFAULT_OPTS).
|
|
|
|
-spec import(file:filename_all(), map()) -> import_res().
|
|
import(BackupFileName, Opts) ->
|
|
case is_import_allowed() of
|
|
true ->
|
|
case lookup_file(str(BackupFileName)) of
|
|
{ok, FilePath} ->
|
|
do_import(FilePath, Opts);
|
|
Err ->
|
|
Err
|
|
end;
|
|
false ->
|
|
{error, not_core_node}
|
|
end.
|
|
|
|
-spec maybe_copy_and_import(node(), file:filename_all()) -> import_res().
|
|
maybe_copy_and_import(FileNode, BackupFileName) when FileNode =:= node() ->
|
|
import(BackupFileName, #{});
|
|
maybe_copy_and_import(FileNode, BackupFileName) ->
|
|
%% The file can be already present locally
|
|
case filelib:is_file(?backup_path(str(BackupFileName))) of
|
|
true ->
|
|
import(BackupFileName, #{});
|
|
false ->
|
|
copy_and_import(FileNode, BackupFileName)
|
|
end.
|
|
|
|
-spec read_file(file:filename_all()) ->
|
|
{ok, #{filename => file:filename_all(), file => binary()}} | {error, _}.
|
|
read_file(BackupFileName) ->
|
|
BackupFileNameStr = str(BackupFileName),
|
|
case validate_backup_name(BackupFileNameStr) of
|
|
ok ->
|
|
maybe_not_found(file:read_file(?backup_path(BackupFileName)));
|
|
Err ->
|
|
Err
|
|
end.
|
|
|
|
-spec delete_file(file:filename_all()) -> ok | {error, _}.
|
|
delete_file(BackupFileName) ->
|
|
BackupFileNameStr = str(BackupFileName),
|
|
case validate_backup_name(BackupFileNameStr) of
|
|
ok ->
|
|
maybe_not_found(file:delete(?backup_path(BackupFileName)));
|
|
Err ->
|
|
Err
|
|
end.
|
|
|
|
-spec upload(file:filename_all(), binary()) -> ok | {error, _}.
|
|
upload(BackupFileName, BackupFileContent) ->
|
|
BackupFileNameStr = str(BackupFileName),
|
|
FilePath = ?backup_path(BackupFileNameStr),
|
|
case filelib:is_file(FilePath) of
|
|
true ->
|
|
{error, {already_exists, BackupFileNameStr}};
|
|
false ->
|
|
do_upload(BackupFileNameStr, BackupFileContent)
|
|
end.
|
|
|
|
-spec list_files() -> [backup_file_info()].
|
|
list_files() ->
|
|
Filter =
|
|
fun(File) ->
|
|
case file:read_file_info(File, [{time, posix}]) of
|
|
{ok, #file_info{size = Size, ctime = CTimeSec}} ->
|
|
BaseFilename = bin(filename:basename(File)),
|
|
Info = #{
|
|
filename => BaseFilename,
|
|
size => Size,
|
|
created_at => emqx_utils_calendar:epoch_to_rfc3339(CTimeSec, second),
|
|
created_at_sec => CTimeSec,
|
|
node => node()
|
|
},
|
|
{true, Info};
|
|
_ ->
|
|
false
|
|
end
|
|
end,
|
|
lists:filtermap(Filter, backup_files()).
|
|
|
|
backup_files() ->
|
|
filelib:wildcard(?backup_path("*" ++ ?TAR_SUFFIX)).
|
|
|
|
format_error(not_core_node) ->
|
|
str(
|
|
io_lib:format(
|
|
"backup data import is only allowed on core EMQX nodes, but requested node ~p is not core",
|
|
[node()]
|
|
)
|
|
);
|
|
format_error(ee_to_ce_backup) ->
|
|
"importing EMQX Enterprise data backup to EMQX is not allowed";
|
|
format_error(missing_backup_meta) ->
|
|
"invalid backup archive file: missing " ?META_FILENAME;
|
|
format_error(invalid_edition) ->
|
|
"invalid backup archive content: wrong EMQX edition value in " ?META_FILENAME;
|
|
format_error(invalid_version) ->
|
|
"invalid backup archive content: wrong EMQX version value in " ?META_FILENAME;
|
|
format_error(bad_archive_dir) ->
|
|
"invalid backup archive content: all files in the archive must be under <backup name> directory";
|
|
format_error(not_found) ->
|
|
"backup file not found";
|
|
format_error(bad_backup_name) ->
|
|
"invalid backup name: file name must have " ?TAR_SUFFIX " extension";
|
|
format_error({unsupported_version, ImportVersion}) ->
|
|
str(
|
|
io_lib:format(
|
|
"[warning] Backup version ~p is newer than EMQX version ~p, import is not allowed.~n",
|
|
[str(ImportVersion), str(emqx_release:version())]
|
|
)
|
|
);
|
|
format_error({already_exists, BackupFileName}) ->
|
|
str(io_lib:format("Backup file \"~s\" already exists", [BackupFileName]));
|
|
format_error(Reason) ->
|
|
Reason.
|
|
|
|
format_conf_errors(Errors) ->
|
|
Opts = #{print_fun => fun io_lib:format/2},
|
|
maps:values(maps:map(conf_error_formatter(Opts), Errors)).
|
|
|
|
format_db_errors(Errors) ->
|
|
Opts = #{print_fun => fun io_lib:format/2},
|
|
maps:values(
|
|
maps:map(
|
|
fun(Tab, Err) -> maybe_print_mnesia_import_err(Tab, Err, Opts) end,
|
|
Errors
|
|
)
|
|
).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Internal functions
|
|
%%------------------------------------------------------------------------------
|
|
|
|
copy_and_import(FileNode, BackupFileName) ->
|
|
case emqx_mgmt_data_backup_proto_v1:read_file(FileNode, BackupFileName, infinity) of
|
|
{ok, BackupFileContent} ->
|
|
case upload(BackupFileName, BackupFileContent) of
|
|
ok ->
|
|
import(BackupFileName, #{});
|
|
Err ->
|
|
Err
|
|
end;
|
|
Err ->
|
|
Err
|
|
end.
|
|
|
|
%% compatibility with import API that uses lookup_file/1 and returns `not_found` reason
|
|
maybe_not_found({error, enoent}) ->
|
|
{error, not_found};
|
|
maybe_not_found(Other) ->
|
|
Other.
|
|
|
|
do_upload(BackupFileNameStr, BackupFileContent) ->
|
|
FilePath = ?backup_path(BackupFileNameStr),
|
|
BackupDir = ?backup_path(filename:basename(BackupFileNameStr, ?TAR_SUFFIX)),
|
|
try
|
|
ok = validate_backup_name(BackupFileNameStr),
|
|
ok = file:write_file(FilePath, BackupFileContent),
|
|
ok = extract_backup(FilePath),
|
|
{ok, _} = validate_backup(BackupDir),
|
|
HoconFileName = filename:join(BackupDir, ?CLUSTER_HOCON_FILENAME),
|
|
case filelib:is_regular(HoconFileName) of
|
|
true ->
|
|
{ok, RawConf} = hocon:files([HoconFileName]),
|
|
RawConf1 = upgrade_raw_conf(emqx_conf:schema_module(), RawConf),
|
|
{ok, _} = validate_cluster_hocon(RawConf1),
|
|
ok;
|
|
false ->
|
|
%% cluster.hocon can be missing in the backup
|
|
ok
|
|
end,
|
|
?SLOG(info, #{msg => "emqx_data_upload_success"})
|
|
catch
|
|
error:{badmatch, {error, Reason}}:Stack ->
|
|
?SLOG(error, #{msg => "emqx_data_upload_failed", reason => Reason, stacktrace => Stack}),
|
|
_ = file:delete(FilePath),
|
|
{error, Reason};
|
|
Class:Reason:Stack ->
|
|
_ = file:delete(FilePath),
|
|
?SLOG(error, #{
|
|
msg => "emqx_data_upload_failed",
|
|
exception => Class,
|
|
reason => Reason,
|
|
stacktrace => Stack
|
|
}),
|
|
{error, Reason}
|
|
after
|
|
file:del_dir_r(BackupDir)
|
|
end.
|
|
|
|
prepare_new_backup(Opts) ->
|
|
Ts = erlang:system_time(millisecond),
|
|
{{Y, M, D}, {HH, MM, SS}} = local_datetime(Ts),
|
|
BackupBaseName = str(
|
|
io_lib:format(
|
|
"emqx-export-~0p-~2..0b-~2..0b-~2..0b-~2..0b-~2..0b.~3..0b",
|
|
[Y, M, D, HH, MM, SS, Ts rem 1000]
|
|
)
|
|
),
|
|
BackupName = ?backup_path(BackupBaseName),
|
|
BackupTarName = ?tar(BackupName),
|
|
maybe_print("Exporting data to ~p...~n", [BackupTarName], Opts),
|
|
{ok, TarDescriptor} = ?fmt_tar_err(erl_tar:open(BackupTarName, [write, compressed])),
|
|
{BackupName, TarDescriptor}.
|
|
|
|
do_export(BackupName, TarDescriptor, Opts) ->
|
|
BackupBaseName = filename:basename(BackupName),
|
|
BackupTarName = ?tar(BackupName),
|
|
Meta = #{
|
|
version => emqx_release:version(),
|
|
edition => emqx_release:edition()
|
|
},
|
|
MetaBin = bin(hocon_pp:do(Meta, #{})),
|
|
MetaFileName = filename:join(BackupBaseName, ?META_FILENAME),
|
|
|
|
ok = ?fmt_tar_err(erl_tar:add(TarDescriptor, MetaBin, MetaFileName, [])),
|
|
ok = export_cluster_hocon(TarDescriptor, BackupBaseName, Opts),
|
|
ok = export_mnesia_tabs(TarDescriptor, BackupName, BackupBaseName, Opts),
|
|
ok = ?fmt_tar_err(erl_tar:close(TarDescriptor)),
|
|
{ok, #file_info{
|
|
size = Size,
|
|
ctime = CTime
|
|
}} = file:read_file_info(BackupTarName, [{time, posix}]),
|
|
{ok, #{
|
|
filename => bin(BackupTarName),
|
|
size => Size,
|
|
created_at => emqx_utils_calendar:epoch_to_rfc3339(CTime, second),
|
|
created_at_sec => CTime,
|
|
node => node()
|
|
}}.
|
|
|
|
export_cluster_hocon(TarDescriptor, BackupBaseName, Opts) ->
|
|
maybe_print("Exporting cluster configuration...~n", [], Opts),
|
|
RawConf = emqx_config:read_override_conf(#{override_to => cluster}),
|
|
maybe_print(
|
|
"Exporting additional files from EMQX data_dir: ~p...~n", [str(emqx:data_dir())], Opts
|
|
),
|
|
RawConf1 = read_data_files(RawConf),
|
|
RawConfBin = bin(hocon_pp:do(RawConf1, #{})),
|
|
NameInArchive = filename:join(BackupBaseName, ?CLUSTER_HOCON_FILENAME),
|
|
ok = ?fmt_tar_err(erl_tar:add(TarDescriptor, RawConfBin, NameInArchive, [])).
|
|
|
|
export_mnesia_tabs(TarDescriptor, BackupName, BackupBaseName, Opts) ->
|
|
maybe_print("Exporting built-in database...~n", [], Opts),
|
|
lists:foreach(
|
|
fun(Mod) ->
|
|
Tabs = Mod:backup_tables(),
|
|
lists:foreach(
|
|
fun(Tab) ->
|
|
export_mnesia_tab(TarDescriptor, Tab, BackupName, BackupBaseName, Opts)
|
|
end,
|
|
Tabs
|
|
)
|
|
end,
|
|
tabs_to_backup()
|
|
).
|
|
|
|
export_mnesia_tab(TarDescriptor, TabName, BackupName, BackupBaseName, Opts) ->
|
|
maybe_print("Exporting ~p database table...~n", [TabName], Opts),
|
|
{ok, MnesiaBackupName} = do_export_mnesia_tab(TabName, BackupName),
|
|
NameInArchive = mnesia_backup_name(BackupBaseName, TabName),
|
|
ok = ?fmt_tar_err(erl_tar:add(TarDescriptor, MnesiaBackupName, NameInArchive, [])),
|
|
_ = file:delete(MnesiaBackupName),
|
|
ok.
|
|
|
|
do_export_mnesia_tab(TabName, BackupName) ->
|
|
Node = node(),
|
|
try
|
|
Opts0 = [{name, TabName}, {min, [TabName]}, {allow_remote, false}],
|
|
Opts =
|
|
case mnesia:table_info(TabName, storage_type) of
|
|
ram_copies -> [{ram_overrides_dump, true} | Opts0];
|
|
_ -> Opts0
|
|
end,
|
|
{ok, TabName, [Node]} = mnesia:activate_checkpoint(Opts),
|
|
MnesiaBackupName = mnesia_backup_name(BackupName, TabName),
|
|
ok = filelib:ensure_dir(MnesiaBackupName),
|
|
ok = mnesia:backup_checkpoint(TabName, MnesiaBackupName),
|
|
{ok, MnesiaBackupName}
|
|
after
|
|
mnesia:deactivate_checkpoint(TabName)
|
|
end.
|
|
|
|
-ifdef(TEST).
|
|
tabs_to_backup() ->
|
|
%% Allow mocking in tests
|
|
?MODULE:mnesia_tabs_to_backup().
|
|
-else.
|
|
tabs_to_backup() ->
|
|
mnesia_tabs_to_backup().
|
|
-endif.
|
|
|
|
mnesia_tabs_to_backup() ->
|
|
lists:flatten([M || M <- find_behaviours(emqx_db_backup)]).
|
|
|
|
mnesia_backup_name(Path, TabName) ->
|
|
filename:join([Path, ?BACKUP_MNESIA_DIR, atom_to_list(TabName)]).
|
|
|
|
is_import_allowed() ->
|
|
mria_rlog:role() =:= core.
|
|
|
|
validate_backup(BackupDir) ->
|
|
case hocon:files([filename:join(BackupDir, ?META_FILENAME)]) of
|
|
{ok, #{
|
|
<<"edition">> := Edition,
|
|
<<"version">> := Version
|
|
}} = Meta ->
|
|
validate(
|
|
[
|
|
fun() -> check_edition(Edition) end,
|
|
fun() -> check_version(Version) end
|
|
],
|
|
Meta
|
|
);
|
|
_ ->
|
|
?SLOG(error, #{msg => "missing_backup_meta", backup => BackupDir}),
|
|
{error, missing_backup_meta}
|
|
end.
|
|
|
|
validate([ValidatorFun | T], OkRes) ->
|
|
case ValidatorFun() of
|
|
ok -> validate(T, OkRes);
|
|
Err -> Err
|
|
end;
|
|
validate([], OkRes) ->
|
|
OkRes.
|
|
|
|
check_edition(BackupEdition) when BackupEdition =:= <<"ce">>; BackupEdition =:= <<"ee">> ->
|
|
Edition = bin(emqx_release:edition()),
|
|
case {BackupEdition, Edition} of
|
|
{<<"ee">>, <<"ce">>} ->
|
|
{error, ee_to_ce_backup};
|
|
_ ->
|
|
ok
|
|
end;
|
|
check_edition(BackupEdition) ->
|
|
?SLOG(error, #{msg => "invalid_backup_edition", edition => BackupEdition}),
|
|
{error, invalid_edition}.
|
|
|
|
check_version(ImportVersion) ->
|
|
case parse_version_no_patch(ImportVersion) of
|
|
{ok, {ImportMajorInt, ImportMinorInt}} ->
|
|
Version = emqx_release:version(),
|
|
{ok, {MajorInt, MinorInt}} = parse_version_no_patch(bin(Version)),
|
|
case ImportMajorInt > MajorInt orelse ImportMinorInt > MinorInt of
|
|
true ->
|
|
%% 4.x backup files are anyway not compatible and will be treated as invalid,
|
|
%% before this step,
|
|
{error, {unsupported_version, str(ImportVersion)}};
|
|
false ->
|
|
ok
|
|
end;
|
|
Err ->
|
|
Err
|
|
end.
|
|
|
|
parse_version_no_patch(VersionBin) ->
|
|
case string:split(VersionBin, ".", all) of
|
|
[Major, Minor | _] ->
|
|
{MajorInt, _} = emqx_utils_binary:bin_to_int(Major),
|
|
{MinorInt, _} = emqx_utils_binary:bin_to_int(Minor),
|
|
{ok, {MajorInt, MinorInt}};
|
|
_ ->
|
|
?SLOG(error, #{msg => "failed_to_parse_backup_version", version => VersionBin}),
|
|
{error, invalid_version}
|
|
end.
|
|
|
|
do_import(BackupFileName, Opts) ->
|
|
BackupDir = ?backup_path(filename:basename(BackupFileName, ?TAR_SUFFIX)),
|
|
maybe_print("Importing data from ~p...~n", [BackupFileName], Opts),
|
|
try
|
|
ok = validate_backup_name(BackupFileName),
|
|
ok = extract_backup(BackupFileName),
|
|
{ok, _} = validate_backup(BackupDir),
|
|
ConfErrors = import_cluster_hocon(BackupDir, Opts),
|
|
MnesiaErrors = import_mnesia_tabs(BackupDir, Opts),
|
|
?SLOG(info, #{msg => "emqx_data_import_success"}),
|
|
{ok, #{db_errors => MnesiaErrors, config_errors => ConfErrors}}
|
|
catch
|
|
error:{badmatch, {error, Reason}}:Stack ->
|
|
?SLOG(error, #{msg => "emqx_data_import_failed", reason => Reason, stacktrace => Stack}),
|
|
{error, Reason};
|
|
Class:Reason:Stack ->
|
|
?SLOG(error, #{
|
|
msg => "emqx_data_import_failed",
|
|
exception => Class,
|
|
reason => Reason,
|
|
stacktrace => Stack
|
|
}),
|
|
{error, Reason}
|
|
after
|
|
file:del_dir_r(BackupDir)
|
|
end.
|
|
|
|
import_mnesia_tabs(BackupDir, Opts) ->
|
|
maybe_print("Importing built-in database...~n", [], Opts),
|
|
filter_errors(
|
|
lists:foldr(
|
|
fun(Mod, Acc) ->
|
|
Tabs = Mod:backup_tables(),
|
|
lists:foldr(
|
|
fun(Tab, InAcc) ->
|
|
InAcc#{Tab => import_mnesia_tab(BackupDir, Mod, Tab, Opts)}
|
|
end,
|
|
Acc,
|
|
Tabs
|
|
)
|
|
end,
|
|
#{},
|
|
tabs_to_backup()
|
|
)
|
|
).
|
|
|
|
-spec import_mnesia_tab(file:filename_all(), module(), mria:table(), map()) ->
|
|
ok | {ok, no_backup_file} | {error, term()} | no_return().
|
|
import_mnesia_tab(BackupDir, Mod, TabName, Opts) ->
|
|
MnesiaBackupFileName = mnesia_backup_name(BackupDir, TabName),
|
|
case filelib:is_regular(MnesiaBackupFileName) of
|
|
true ->
|
|
maybe_print("Importing ~p database table...~n", [TabName], Opts),
|
|
restore_mnesia_tab(BackupDir, MnesiaBackupFileName, Mod, TabName, Opts);
|
|
false ->
|
|
maybe_print("No backup file for ~p database table...~n", [TabName], Opts),
|
|
?SLOG(info, #{msg => "missing_mnesia_backup", table => TabName, backup => BackupDir}),
|
|
ok
|
|
end.
|
|
|
|
restore_mnesia_tab(BackupDir, MnesiaBackupFileName, Mod, TabName, Opts) ->
|
|
Validated = validate_mnesia_backup(MnesiaBackupFileName, Mod),
|
|
try
|
|
case Validated of
|
|
{ok, #{backup_file := BackupFile}} ->
|
|
%% As we use keep_tables option, we don't need to modify 'copies' (nodes)
|
|
%% in a backup file before restoring it, as `mnsia:restore/2` will ignore
|
|
%% backed-up schema and keep the current table schema unchanged
|
|
Restored = mnesia:restore(BackupFile, [{default_op, keep_tables}]),
|
|
case Restored of
|
|
{atomic, [TabName]} ->
|
|
on_table_imported(Mod, TabName, Opts);
|
|
RestoreErr ->
|
|
?SLOG(error, #{
|
|
msg => "failed_to_restore_mnesia_backup",
|
|
table => TabName,
|
|
backup => BackupDir,
|
|
reason => RestoreErr
|
|
}),
|
|
maybe_print_mnesia_import_err(TabName, RestoreErr, Opts),
|
|
{error, RestoreErr}
|
|
end;
|
|
PrepareErr ->
|
|
?SLOG(error, #{
|
|
msg => "failed_to_prepare_mnesia_backup_for_restoring",
|
|
table => TabName,
|
|
backup => BackupDir,
|
|
reason => PrepareErr
|
|
}),
|
|
maybe_print_mnesia_import_err(TabName, PrepareErr, Opts),
|
|
PrepareErr
|
|
end
|
|
after
|
|
%% Cleanup files as soon as they are not needed any more for more efficient disk usage
|
|
_ = file:delete(MnesiaBackupFileName)
|
|
end.
|
|
|
|
on_table_imported(Mod, Tab, Opts) ->
|
|
case erlang:function_exported(Mod, on_backup_table_imported, 2) of
|
|
true ->
|
|
try
|
|
Mod:on_backup_table_imported(Tab, Opts)
|
|
catch
|
|
Class:Reason:Stack ->
|
|
?SLOG(error, #{
|
|
msg => "post_database_import_callback_failed",
|
|
table => Tab,
|
|
module => Mod,
|
|
exception => Class,
|
|
reason => Reason,
|
|
stacktrace => Stack
|
|
}),
|
|
{error, Reason}
|
|
end;
|
|
false ->
|
|
ok
|
|
end.
|
|
|
|
%% NOTE: if backup file is valid, we keep traversing it, though we only need to validate schema.
|
|
%% Looks like there is no clean way to abort traversal without triggering any error reporting,
|
|
%% `mnesia_bup:read_schema/2` is an option but its direct usage should also be avoided...
|
|
validate_mnesia_backup(MnesiaBackupFileName, Mod) ->
|
|
Init = #{backup_file => MnesiaBackupFileName},
|
|
Validated =
|
|
catch mnesia:traverse_backup(
|
|
MnesiaBackupFileName,
|
|
mnesia_backup,
|
|
dummy,
|
|
read_only,
|
|
mnesia_backup_validator(Mod),
|
|
Init
|
|
),
|
|
case Validated of
|
|
ok ->
|
|
{ok, Init};
|
|
{error, {_, over}} ->
|
|
{ok, Init};
|
|
{error, {_, migrate}} ->
|
|
migrate_mnesia_backup(MnesiaBackupFileName, Mod, Init);
|
|
Error ->
|
|
Error
|
|
end.
|
|
|
|
%% if the module has validator callback, use it else use the default
|
|
mnesia_backup_validator(Mod) ->
|
|
Validator =
|
|
case erlang:function_exported(Mod, validate_mnesia_backup, 1) of
|
|
true ->
|
|
fun Mod:validate_mnesia_backup/1;
|
|
_ ->
|
|
fun default_validate_mnesia_backup/1
|
|
end,
|
|
fun(Schema, Acc) ->
|
|
case Validator(Schema) of
|
|
ok ->
|
|
{[Schema], Acc};
|
|
{ok, Break} ->
|
|
throw({error, Break});
|
|
Error ->
|
|
throw(Error)
|
|
end
|
|
end.
|
|
|
|
default_validate_mnesia_backup({schema, Tab, CreateList}) ->
|
|
ImportAttributes = proplists:get_value(attributes, CreateList),
|
|
Attributes = mnesia:table_info(Tab, attributes),
|
|
case ImportAttributes == Attributes of
|
|
true ->
|
|
ok;
|
|
false ->
|
|
{error, different_table_schema}
|
|
end;
|
|
default_validate_mnesia_backup(_Other) ->
|
|
ok.
|
|
|
|
migrate_mnesia_backup(MnesiaBackupFileName, Mod, Acc) ->
|
|
case erlang:function_exported(Mod, migrate_mnesia_backup, 1) of
|
|
true ->
|
|
MigrateFile = MnesiaBackupFileName ++ ".migrate",
|
|
Migrator = fun(Schema, InAcc) ->
|
|
case Mod:migrate_mnesia_backup(Schema) of
|
|
{ok, NewSchema} ->
|
|
{[NewSchema], InAcc};
|
|
Error ->
|
|
throw(Error)
|
|
end
|
|
end,
|
|
catch mnesia:traverse_backup(
|
|
MnesiaBackupFileName,
|
|
MigrateFile,
|
|
Migrator,
|
|
Acc#{backup_file := MigrateFile}
|
|
);
|
|
_ ->
|
|
{error, no_migrator}
|
|
end.
|
|
|
|
extract_backup(BackupFileName) ->
|
|
BackupDir = root_backup_dir(),
|
|
ok = validate_filenames(BackupFileName),
|
|
?fmt_tar_err(erl_tar:extract(BackupFileName, [{cwd, BackupDir}, compressed])).
|
|
|
|
validate_filenames(BackupFileName) ->
|
|
{ok, FileNames} = ?fmt_tar_err(erl_tar:table(BackupFileName, [compressed])),
|
|
BackupName = filename:basename(BackupFileName, ?TAR_SUFFIX),
|
|
IsValid = lists:all(
|
|
fun(FileName) ->
|
|
[Root | _] = filename:split(FileName),
|
|
Root =:= BackupName
|
|
end,
|
|
FileNames
|
|
),
|
|
case IsValid of
|
|
true -> ok;
|
|
false -> {error, bad_archive_dir}
|
|
end.
|
|
|
|
import_cluster_hocon(BackupDir, Opts) ->
|
|
HoconFileName = filename:join(BackupDir, ?CLUSTER_HOCON_FILENAME),
|
|
case filelib:is_regular(HoconFileName) of
|
|
true ->
|
|
{ok, RawConf} = hocon:files([HoconFileName]),
|
|
RawConf1 = upgrade_raw_conf(emqx_conf:schema_module(), RawConf),
|
|
{ok, _} = validate_cluster_hocon(RawConf1),
|
|
maybe_print("Importing cluster configuration...~n", [], Opts),
|
|
%% At this point, when all validations have been passed, we want to log errors (if any)
|
|
%% but proceed with the next items, instead of aborting the whole import operation
|
|
do_import_conf(RawConf1, Opts);
|
|
false ->
|
|
maybe_print("No cluster configuration to be imported.~n", [], Opts),
|
|
?SLOG(info, #{
|
|
msg => "no_backup_hocon_config_to_import",
|
|
backup => BackupDir
|
|
}),
|
|
#{}
|
|
end.
|
|
|
|
upgrade_raw_conf(SchemaMod, RawConf) ->
|
|
_ = SchemaMod:module_info(),
|
|
case erlang:function_exported(SchemaMod, upgrade_raw_conf, 1) of
|
|
true ->
|
|
%% TODO make it a schema module behaviour in hocon_schema
|
|
apply(SchemaMod, upgrade_raw_conf, [RawConf]);
|
|
false ->
|
|
RawConf
|
|
end.
|
|
|
|
read_data_files(RawConf) ->
|
|
DataDir = bin(emqx:data_dir()),
|
|
{ok, Cwd} = file:get_cwd(),
|
|
AbsDataDir = bin(filename:join(Cwd, DataDir)),
|
|
RawConf1 = emqx_authz:maybe_read_files(RawConf),
|
|
emqx_utils_maps:deep_convert(RawConf1, fun read_data_file/4, [DataDir, AbsDataDir]).
|
|
|
|
-define(dir_pattern(_Dir_), <<_Dir_:(byte_size(_Dir_))/binary, _/binary>>).
|
|
|
|
read_data_file(Key, Val, DataDir, AbsDataDir) ->
|
|
Val1 =
|
|
case Val of
|
|
?dir_pattern(DataDir) = FileName ->
|
|
do_read_file(FileName);
|
|
?dir_pattern(AbsDataDir) = FileName ->
|
|
do_read_file(FileName);
|
|
V ->
|
|
V
|
|
end,
|
|
{Key, Val1}.
|
|
|
|
do_read_file(FileName) ->
|
|
case file:read_file(FileName) of
|
|
{ok, Content} ->
|
|
Content;
|
|
{error, Reason} ->
|
|
?SLOG(warning, #{
|
|
msg => "failed_to_read_data_file",
|
|
filename => FileName,
|
|
reason => Reason
|
|
}),
|
|
FileName
|
|
end.
|
|
|
|
validate_cluster_hocon(RawConf) ->
|
|
%% write ACL file to comply with the schema...
|
|
RawConf1 = emqx_authz:maybe_write_files(RawConf),
|
|
emqx_hocon:check(
|
|
emqx_conf:schema_module(),
|
|
maps:merge(emqx:get_raw_config([]), RawConf1),
|
|
#{atom_key => false, required => false}
|
|
).
|
|
|
|
do_import_conf(RawConf, Opts) ->
|
|
GenConfErrs = filter_errors(maps:from_list(import_generic_conf(RawConf))),
|
|
maybe_print_conf_errors(GenConfErrs, Opts),
|
|
Modules = sort_importer_modules(find_behaviours(emqx_config_backup)),
|
|
Errors = lists:foldl(print_ok_results_collect_errors(RawConf, Opts), GenConfErrs, Modules),
|
|
maybe_print_conf_errors(Errors, Opts),
|
|
Errors.
|
|
|
|
print_ok_results_collect_errors(RawConf, Opts) ->
|
|
fun(Module, Errors) ->
|
|
case Module:import_config(RawConf) of
|
|
{results, {OkResults, ErrResults}} ->
|
|
print_ok_results(OkResults, Opts),
|
|
collect_errors(ErrResults, Errors);
|
|
{ok, OkResult} ->
|
|
print_ok_results([OkResult], Opts),
|
|
Errors;
|
|
{error, ErrResult} ->
|
|
collect_errors([ErrResult], Errors)
|
|
end
|
|
end.
|
|
|
|
print_ok_results(Results, Opts) ->
|
|
lists:foreach(
|
|
fun(#{changed := Changed}) ->
|
|
maybe_print_changed(Changed, Opts)
|
|
end,
|
|
Results
|
|
).
|
|
|
|
collect_errors(Results, Errors) ->
|
|
lists:foldr(
|
|
fun(#{root_key := RootKey, reason := Reason}, Acc) ->
|
|
Acc#{[RootKey] => Reason}
|
|
end,
|
|
Errors,
|
|
Results
|
|
).
|
|
|
|
sort_importer_modules(Modules) ->
|
|
lists:sort(
|
|
fun(M1, M2) -> order(M1, ?IMPORT_ORDER) =< order(M2, ?IMPORT_ORDER) end,
|
|
Modules
|
|
).
|
|
|
|
order(Elem, List) ->
|
|
order(Elem, List, 0).
|
|
|
|
order(_Elem, [], Order) ->
|
|
Order;
|
|
order(Elem, [Elem | _], Order) ->
|
|
Order;
|
|
order(Elem, [_ | T], Order) ->
|
|
order(Elem, T, Order + 1).
|
|
|
|
import_generic_conf(Data) ->
|
|
lists:map(
|
|
fun(KeyPath) ->
|
|
case emqx_utils_maps:deep_get(KeyPath, Data, undefined) of
|
|
undefined -> {[KeyPath], ok};
|
|
Conf -> {[KeyPath], emqx_conf:update(KeyPath, Conf, #{override_to => cluster})}
|
|
end
|
|
end,
|
|
?CONF_KEYS
|
|
).
|
|
|
|
maybe_print_changed(Changed, Opts) ->
|
|
lists:foreach(
|
|
fun(ChangedPath) ->
|
|
maybe_print(
|
|
"Config key path ~p was present before import and "
|
|
"has been overwritten.~n",
|
|
[pretty_path(ChangedPath)],
|
|
Opts
|
|
)
|
|
end,
|
|
Changed
|
|
).
|
|
|
|
maybe_print_conf_errors(Errors, Opts) ->
|
|
maps:foreach(conf_error_formatter(Opts), Errors).
|
|
|
|
conf_error_formatter(Opts) ->
|
|
fun(Path, Err) ->
|
|
maybe_print(
|
|
"Failed to import the following config path: ~p, reason: ~p~n",
|
|
[pretty_path(Path), Err],
|
|
Opts
|
|
)
|
|
end.
|
|
|
|
filter_errors(Results) ->
|
|
maps:filter(
|
|
fun
|
|
(_Path, {error, _}) -> true;
|
|
(_, _) -> false
|
|
end,
|
|
Results
|
|
).
|
|
|
|
pretty_path(Path) ->
|
|
str(lists:join(".", [str(Part) || Part <- Path])).
|
|
|
|
str(Data) when is_atom(Data) ->
|
|
atom_to_list(Data);
|
|
str(Data) ->
|
|
unicode:characters_to_list(Data).
|
|
|
|
bin(Data) when is_atom(Data) ->
|
|
atom_to_binary(Data, utf8);
|
|
bin(Data) ->
|
|
unicode:characters_to_binary(Data).
|
|
|
|
validate_backup_name(FileName) ->
|
|
BaseName = filename:basename(FileName, ?TAR_SUFFIX),
|
|
ValidName = BaseName ++ ?TAR_SUFFIX,
|
|
case filename:basename(FileName) of
|
|
ValidName -> ok;
|
|
_ -> {error, bad_backup_name}
|
|
end.
|
|
|
|
lookup_file(FileName) ->
|
|
case filelib:is_regular(FileName) of
|
|
true ->
|
|
{ok, FileName};
|
|
false ->
|
|
%% Only lookup by basename, don't allow to lookup by file path
|
|
case FileName =:= filename:basename(FileName) of
|
|
true ->
|
|
FilePath = ?backup_path(FileName),
|
|
case filelib:is_file(FilePath) of
|
|
true -> {ok, FilePath};
|
|
false -> {error, not_found}
|
|
end;
|
|
false ->
|
|
{error, not_found}
|
|
end
|
|
end.
|
|
|
|
root_backup_dir() ->
|
|
Dir = filename:join(emqx:data_dir(), ?ROOT_BACKUP_DIR),
|
|
ok = ensure_path(Dir),
|
|
Dir.
|
|
|
|
-if(?OTP_RELEASE < 25).
|
|
ensure_path(Path) -> filelib:ensure_dir(filename:join([Path, "dummy"])).
|
|
-else.
|
|
ensure_path(Path) -> filelib:ensure_path(Path).
|
|
-endif.
|
|
|
|
local_datetime(MillisecondTs) ->
|
|
calendar:system_time_to_local_time(MillisecondTs, millisecond).
|
|
|
|
maybe_print(Format, Args, #{print_fun := PrintFun}) ->
|
|
PrintFun(Format, Args);
|
|
maybe_print(_Format, _Args, _Opts) ->
|
|
ok.
|
|
|
|
maybe_print_mnesia_import_err(TabName, Error, Opts) ->
|
|
maybe_print(
|
|
"[error] Failed to import built-in database table: ~p, reason: ~p~n",
|
|
[TabName, Error],
|
|
Opts
|
|
).
|
|
|
|
find_behaviours(Behaviour) ->
|
|
find_behaviours(Behaviour, apps(), []).
|
|
|
|
%% Based on minirest_api:find_api_modules/1
|
|
find_behaviours(_Behaviour, [] = _Apps, Acc) ->
|
|
Acc;
|
|
find_behaviours(Behaviour, [App | Apps], Acc) ->
|
|
case application:get_key(App, modules) of
|
|
undefined ->
|
|
Acc;
|
|
{ok, Modules} ->
|
|
NewAcc = lists:filter(
|
|
fun(Module) ->
|
|
Info = Module:module_info(attributes),
|
|
Bhvrs = lists:flatten(
|
|
proplists:get_all_values(behavior, Info) ++
|
|
proplists:get_all_values(behaviour, Info)
|
|
),
|
|
lists:member(Behaviour, Bhvrs)
|
|
end,
|
|
Modules
|
|
),
|
|
find_behaviours(Behaviour, Apps, NewAcc ++ Acc)
|
|
end.
|
|
|
|
apps() ->
|
|
[
|
|
App
|
|
|| {App, _, _} <- application:loaded_applications(),
|
|
case re:run(atom_to_list(App), "^emqx") of
|
|
{match, [{0, 4}]} -> true;
|
|
_ -> false
|
|
end
|
|
].
|