diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index ea4ce159d..e2c726fe5 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -39,6 +39,7 @@ {emqx_mgmt_api_plugins,2}. {emqx_mgmt_cluster,1}. {emqx_mgmt_cluster,2}. +{emqx_mgmt_data_backup,1}. {emqx_mgmt_trace,1}. {emqx_mgmt_trace,2}. {emqx_node_rebalance,1}. diff --git a/apps/emqx_management/src/emqx_mgmt_api_data_backup.erl b/apps/emqx_management/src/emqx_mgmt_api_data_backup.erl new file mode 100644 index 000000000..4cd7d404e --- /dev/null +++ b/apps/emqx_management/src/emqx_mgmt_api_data_backup.erl @@ -0,0 +1,361 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_mgmt_api_data_backup). + +-behaviour(minirest_api). + +-include_lib("emqx/include/logger.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). + +-export([api_spec/0, paths/0, schema/1, fields/1]). + +-export([ + data_export/2, + data_import/2, + data_files/2, + data_file_by_name/2 +]). + +-define(TAGS, [<<"Data Backup">>]). + +-define(BAD_REQUEST, 'BAD_REQUEST'). +-define(NOT_FOUND, 'NOT_FOUND'). + +-define(node_field(IsRequired), ?node_field(IsRequired, #{})). +-define(node_field(IsRequired, Meta), + {node, ?HOCON(binary(), Meta#{desc => "Node name", required => IsRequired})} +). +-define(filename_field(IsRequired), ?filename_field(IsRequired, #{})). +-define(filename_field(IsRequired, Meta), + {filename, + ?HOCON(binary(), Meta#{ + desc => "Data backup file name", + required => IsRequired + })} +). + +api_spec() -> + emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). + +paths() -> + [ + "/data/export", + "/data/import", + "/data/files", + "/data/files/:filename" + ]. + +schema("/data/export") -> + #{ + 'operationId' => data_export, + post => #{ + tags => ?TAGS, + desc => <<"Export a data backup file">>, + responses => #{ + 200 => + emqx_dashboard_swagger:schema_with_example( + ?R_REF(backup_file_info), + backup_file_info_example() + ) + } + } + }; +schema("/data/import") -> + #{ + 'operationId' => data_import, + post => #{ + tags => ?TAGS, + desc => <<"Import a data backup file">>, + 'requestBody' => emqx_dashboard_swagger:schema_with_example( + ?R_REF(import_request_body), + maps:with([node, filename], backup_file_info_example()) + ), + + responses => #{ + 204 => <<"No Content">>, + 400 => emqx_dashboard_swagger:error_codes( + [?BAD_REQUEST], <<"Backup file import failed">> + ) + } + } + }; +schema("/data/files") -> + #{ + 'operationId' => data_files, + post => #{ + tags => ?TAGS, + desc => <<"Upload a data backup file">>, + 'requestBody' => emqx_dashboard_swagger:file_schema(filename), + responses => #{ + 204 => <<"No Content">>, + 400 => emqx_dashboard_swagger:error_codes( + [?BAD_REQUEST], <<"Bad backup file">> + ) + } + }, + get => #{ + tags => ?TAGS, + desc => <<"List backup files">>, + parameters => [ + ?R_REF(emqx_dashboard_swagger, page), + ?R_REF(emqx_dashboard_swagger, limit) + ], + responses => #{ + 200 => + emqx_dashboard_swagger:schema_with_example( + ?R_REF(files_response), + files_response_example() + ) + } + } + }; +schema("/data/files/:filename") -> + #{ + 'operationId' => data_file_by_name, + get => #{ + tags => ?TAGS, + desc => <<"Download a data backup file">>, + parameters => [ + ?filename_field(true, #{in => path}), + ?node_field(false, #{in => query}) + ], + responses => #{ + 200 => ?HOCON(binary), + 400 => emqx_dashboard_swagger:error_codes( + [?BAD_REQUEST], <<"Bad request">> + ), + 404 => emqx_dashboard_swagger:error_codes( + [?NOT_FOUND], <<"Backup file not found">> + ) + } + }, + delete => #{ + tags => ?TAGS, + desc => <<"Delete a data backup file">>, + parameters => [ + ?filename_field(true, #{in => path}), + ?node_field(false, #{in => query}) + ], + responses => #{ + 204 => <<"No Content">>, + 400 => emqx_dashboard_swagger:error_codes( + [?BAD_REQUEST], <<"Bad request">> + ), + 404 => emqx_dashboard_swagger:error_codes( + [?NOT_FOUND], <<"Backup file not found">> + ) + } + } + }. + +fields(files_response) -> + [ + {data, ?ARRAY(?R_REF(backup_file_info))}, + {meta, ?R_REF(emqx_dashboard_swagger, meta)} + ]; +fields(backup_file_info) -> + [ + ?node_field(true), + ?filename_field(true), + {created_at, + ?HOCON(binary(), #{ + desc => "Data backup file creation date and time", + required => true + })} + ]; +fields(import_request_body) -> + [?node_field(false), ?filename_field(true)]; +fields(data_backup_file) -> + [ + ?filename_field(true), + {file, + ?HOCON(binary(), #{ + desc => "Data backup file content", + required => true + })} + ]. + +%%------------------------------------------------------------------------------ +%% HTTP API Callbacks +%%------------------------------------------------------------------------------ + +data_export(post, _Request) -> + case emqx_mgmt_data_backup:export() of + {ok, #{filename := FileName} = File} -> + {200, File#{filename => filename:basename(FileName)}}; + Error -> + Error + end. + +data_import(post, #{body := #{<<"filename">> := FileName} = Body}) -> + case safe_parse_node(Body) of + {error, Msg} -> + {400, #{code => 'BAD_REQUEST', message => Msg}}; + FileNode -> + CoreNode = core_node(FileNode), + response( + emqx_mgmt_data_backup_proto_v1:import_file(CoreNode, FileNode, FileName, infinity) + ) + end. + +core_node(FileNode) -> + case mria_rlog:role(FileNode) of + core -> + FileNode; + replicant -> + case mria_rlog:role() of + core -> + node(); + replicant -> + mria_membership:coordinator() + end + end. + +data_files(post, #{body := #{<<"filename">> := #{type := _} = File}}) -> + [{FileName, FileContent} | _] = maps:to_list(maps:without([type], File)), + case emqx_mgmt_data_backup:upload(FileName, FileContent) of + ok -> + {204}; + {error, Reason} -> + {400, #{code => 'BAD_REQUEST', message => emqx_mgmt_data_backup:format_error(Reason)}} + end; +data_files(get, #{query_string := PageParams}) -> + case emqx_mgmt_api:parse_pager_params(PageParams) of + false -> + {400, #{code => ?BAD_REQUEST, message => <<"page_limit_invalid">>}}; + #{page := Page, limit := Limit} = Pager -> + {200, #{data => list_backup_files(Page, Limit), meta => Pager}} + end. + +data_file_by_name(Method, #{bindings := #{filename := Filename}, query_string := QS}) -> + case safe_parse_node(QS) of + {error, Msg} -> + {400, #{code => 'BAD_REQUEST', message => Msg}}; + Node -> + case get_or_delete_file(Method, Filename, Node) of + {error, not_found} -> + {404, #{ + code => ?NOT_FOUND, message => emqx_mgmt_data_backup:format_error(not_found) + }}; + Other -> + response(Other) + end + end. + +%%------------------------------------------------------------------------------ +%% Internal functions +%%------------------------------------------------------------------------------ + +get_or_delete_file(get, Filename, Node) -> + emqx_mgmt_data_backup_proto_v1:read_file(Node, Filename, infinity); +get_or_delete_file(delete, Filename, Node) -> + emqx_mgmt_data_backup_proto_v1:delete_file(Node, Filename, infinity). + +safe_parse_node(#{<<"node">> := NodeBin}) -> + NodesBin = [erlang:atom_to_binary(N, utf8) || N <- emqx:running_nodes()], + case lists:member(NodeBin, NodesBin) of + true -> erlang:binary_to_atom(NodeBin, utf8); + false -> {error, io_lib:format("Unknown node: ~s", [NodeBin])} + end; +safe_parse_node(_) -> + node(). + +response({ok, #{db_errors := DbErrs, config_errors := ConfErrs}}) -> + case DbErrs =:= #{} andalso ConfErrs =:= #{} of + true -> + {204}; + false -> + DbErrs1 = emqx_mgmt_data_backup:format_db_errors(DbErrs), + ConfErrs1 = emqx_mgmt_data_backup:format_conf_errors(ConfErrs), + Msg = unicode:characters_to_binary(io_lib:format("~s", [DbErrs1 ++ ConfErrs1])), + {400, #{code => ?BAD_REQUEST, message => Msg}} + end; +response({ok, Res}) -> + {200, Res}; +response(ok) -> + {204}; +response({error, Reason}) -> + {400, #{code => ?BAD_REQUEST, message => emqx_mgmt_data_backup:format_error(Reason)}}. + +list_backup_files(Page, Limit) -> + Start = Page * Limit - Limit + 1, + lists:sublist(list_backup_files(), Start, Limit). + +list_backup_files() -> + Nodes = emqx:running_nodes(), + Results = emqx_mgmt_data_backup_proto_v1:list_files(Nodes, 30_0000), + NodeResults = lists:zip(Nodes, Results), + {Successes, Failures} = + lists:partition( + fun({_Node, Result}) -> + case Result of + {ok, _} -> true; + _ -> false + end + end, + NodeResults + ), + case Failures of + [] -> + ok; + [_ | _] -> + ?SLOG(error, #{msg => "list_exported_backup_files_failed", node_errors => Failures}) + end, + FileList = [FileInfo || {_Node, {ok, FileInfos}} <- Successes, FileInfo <- FileInfos], + lists:sort( + fun(#{created_at_sec := T1, filename := F1}, #{created_at_sec := T2, filename := F2}) -> + case T1 =:= T2 of + true -> F1 >= F2; + false -> T1 > T2 + end + end, + FileList + ). + +backup_file_info_example() -> + #{ + created_at => <<"2023-11-23T19:13:19+02:00">>, + created_at_sec => 1700759599, + filename => <<"emqx-export-2023-11-23-19-13-19.043.tar.gz">>, + node => 'emqx@127.0.0.1', + size => 22740 + }. + +files_response_example() -> + #{ + data => [ + #{ + created_at => <<"2023-09-02T11:11:33+02:00">>, + created_at_sec => 1693645893, + filename => <<"emqx-export-2023-09-02-11-11-33.012.tar.gz">>, + node => 'emqx@127.0.0.1', + size => 22740 + }, + #{ + created_at => <<"2023-11-23T19:13:19+02:00">>, + created_at_sec => 1700759599, + filename => <<"emqx-export-2023-11-23-19-13-19.043.tar.gz">>, + node => 'emqx@127.0.0.1', + size => 22740 + } + ], + meta => #{ + page => 0, + limit => 20, + count => 300 + } + }. diff --git a/apps/emqx_management/src/emqx_mgmt_data_backup.erl b/apps/emqx_management/src/emqx_mgmt_data_backup.erl index 9825b26cf..e0887d788 100644 --- a/apps/emqx_management/src/emqx_mgmt_data_backup.erl +++ b/apps/emqx_management/src/emqx_mgmt_data_backup.erl @@ -24,8 +24,21 @@ format_error/1 ]). +%% HTTP API +-export([ + upload/2, + maybe_copy_and_import/2, + read_file/1, + delete_file/1, + list_files/0, + format_conf_errors/1, + format_db_errors/1 +]). + -export([default_validate_mnesia_backup/1]). +-export_type([import_res/0]). + -ifdef(TEST). -compile(export_all). -compile(nowarn_export_all). @@ -80,17 +93,21 @@ end end() ). +-define(backup_path(_FileName_), filename:join(root_backup_dir(), _FileName_)). -type backup_file_info() :: #{ - filename => binary(), - size => non_neg_integer(), - created_at => binary(), - node => node(), + filename := binary(), + size := non_neg_integer(), + created_at := binary(), + created_at_sec := integer(), + node := node(), atom() => _ }. -type db_error_details() :: #{mria:table() => {error, _}}. -type config_error_details() :: #{emqx_utils_maps:config_path() => {error, _}}. +-type import_res() :: + {ok, #{db_errors => db_error_details(), config_errors => config_error_details()}} | {error, _}. %%------------------------------------------------------------------------------ %% APIs @@ -120,15 +137,11 @@ export(Opts) -> file:del_dir_r(BackupName) end. --spec import(file:filename_all()) -> - {ok, #{db_errors => db_error_details(), config_errors => config_error_details()}} - | {error, _}. +-spec import(file:filename_all()) -> import_res(). import(BackupFileName) -> import(BackupFileName, ?DEFAULT_OPTS). --spec import(file:filename_all(), map()) -> - {ok, #{db_errors => db_error_details(), config_errors => config_error_details()}} - | {error, _}. +-spec import(file:filename_all(), map()) -> import_res(). import(BackupFileName, Opts) -> case is_import_allowed() of true -> @@ -142,6 +155,74 @@ import(BackupFileName, Opts) -> {error, not_core_node} end. +-spec maybe_copy_and_import(node(), file:filename_all()) -> import_res(). +maybe_copy_and_import(FileNode, BackupFileName) when FileNode =:= node() -> + import(BackupFileName, #{}); +maybe_copy_and_import(FileNode, BackupFileName) -> + %% The file can be already present locally + case filelib:is_file(?backup_path(str(BackupFileName))) of + true -> + import(BackupFileName, #{}); + false -> + copy_and_import(FileNode, BackupFileName) + end. + +-spec read_file(file:filename_all()) -> + {ok, #{filename => file:filename_all(), file => binary()}} | {error, _}. +read_file(BackupFileName) -> + BackupFileNameStr = str(BackupFileName), + case validate_backup_name(BackupFileNameStr) of + ok -> + maybe_not_found(file:read_file(?backup_path(BackupFileName))); + Err -> + Err + end. + +-spec delete_file(file:filename_all()) -> ok | {error, _}. +delete_file(BackupFileName) -> + BackupFileNameStr = str(BackupFileName), + case validate_backup_name(BackupFileNameStr) of + ok -> + maybe_not_found(file:delete(?backup_path(BackupFileName))); + Err -> + Err + end. + +-spec upload(file:filename_all(), binary()) -> ok | {error, _}. +upload(BackupFileName, BackupFileContent) -> + BackupFileNameStr = str(BackupFileName), + FilePath = ?backup_path(BackupFileNameStr), + case filelib:is_file(FilePath) of + true -> + {error, {already_exists, BackupFileNameStr}}; + false -> + do_upload(BackupFileNameStr, BackupFileContent) + end. + +-spec list_files() -> [backup_file_info()]. +list_files() -> + Filter = + fun(File) -> + case file:read_file_info(File, [{time, posix}]) of + {ok, #file_info{size = Size, ctime = CTimeSec}} -> + BaseFilename = bin(filename:basename(File)), + Info = #{ + filename => BaseFilename, + size => Size, + created_at => emqx_utils_calendar:epoch_to_rfc3339(CTimeSec, second), + created_at_sec => CTimeSec, + node => node() + }, + {true, Info}; + _ -> + false + end + end, + lists:filtermap(Filter, backup_files()). + +backup_files() -> + filelib:wildcard(?backup_path("*" ++ ?TAR_SUFFIX)). + format_error(not_core_node) -> str( io_lib:format( @@ -170,13 +251,83 @@ format_error({unsupported_version, ImportVersion}) -> [str(ImportVersion), str(emqx_release:version())] ) ); +format_error({already_exists, BackupFileName}) -> + str(io_lib:format("Backup file \"~s\" already exists", [BackupFileName])); format_error(Reason) -> Reason. +format_conf_errors(Errors) -> + Opts = #{print_fun => fun io_lib:format/2}, + maps:values(maps:map(conf_error_formatter(Opts), Errors)). + +format_db_errors(Errors) -> + Opts = #{print_fun => fun io_lib:format/2}, + maps:values( + maps:map( + fun(Tab, Err) -> maybe_print_mnesia_import_err(Tab, Err, Opts) end, + Errors + ) + ). + %%------------------------------------------------------------------------------ %% Internal functions %%------------------------------------------------------------------------------ +copy_and_import(FileNode, BackupFileName) -> + case emqx_mgmt_data_backup_proto_v1:read_file(FileNode, BackupFileName, infinity) of + {ok, BackupFileContent} -> + case upload(BackupFileName, BackupFileContent) of + ok -> + import(BackupFileName, #{}); + Err -> + Err + end; + Err -> + Err + end. + +%% compatibility with import API that uses lookup_file/1 and returns `not_found` reason +maybe_not_found({error, enoent}) -> + {error, not_found}; +maybe_not_found(Other) -> + Other. + +do_upload(BackupFileNameStr, BackupFileContent) -> + FilePath = ?backup_path(BackupFileNameStr), + BackupDir = ?backup_path(filename:basename(BackupFileNameStr, ?TAR_SUFFIX)), + try + ok = validate_backup_name(BackupFileNameStr), + ok = file:write_file(FilePath, BackupFileContent), + ok = extract_backup(FilePath), + {ok, _} = validate_backup(BackupDir), + HoconFileName = filename:join(BackupDir, ?CLUSTER_HOCON_FILENAME), + case filelib:is_regular(HoconFileName) of + true -> + {ok, RawConf} = hocon:files([HoconFileName]), + RawConf1 = upgrade_raw_conf(emqx_conf:schema_module(), RawConf), + {ok, _} = validate_cluster_hocon(RawConf1), + ok; + false -> + %% cluster.hocon can be missing in the backup + ok + end, + ?SLOG(info, #{msg => "emqx_data_upload_success"}) + catch + error:{badmatch, {error, Reason}}:Stack -> + ?SLOG(error, #{msg => "emqx_data_upload_failed", reason => Reason, stacktrace => Stack}), + {error, Reason}; + Class:Reason:Stack -> + ?SLOG(error, #{ + msg => "emqx_data_upload_failed", + exception => Class, + reason => Reason, + stacktrace => Stack + }), + {error, Reason} + after + file:del_dir_r(BackupDir) + end. + prepare_new_backup(Opts) -> Ts = erlang:system_time(millisecond), {{Y, M, D}, {HH, MM, SS}} = local_datetime(Ts), @@ -186,7 +337,7 @@ prepare_new_backup(Opts) -> [Y, M, D, HH, MM, SS, Ts rem 1000] ) ), - BackupName = filename:join(root_backup_dir(), BackupBaseName), + BackupName = ?backup_path(BackupBaseName), BackupTarName = ?tar(BackupName), maybe_print("Exporting data to ~p...~n", [BackupTarName], Opts), {ok, TarDescriptor} = ?fmt_tar_err(erl_tar:open(BackupTarName, [write, compressed])), @@ -208,13 +359,13 @@ do_export(BackupName, TarDescriptor, Opts) -> ok = ?fmt_tar_err(erl_tar:close(TarDescriptor)), {ok, #file_info{ size = Size, - ctime = {{Y1, M1, D1}, {H1, MM1, S1}} - }} = file:read_file_info(BackupTarName), - CreatedAt = io_lib:format("~p-~p-~p ~p:~p:~p", [Y1, M1, D1, H1, MM1, S1]), + ctime = CTime + }} = file:read_file_info(BackupTarName, [{time, posix}]), {ok, #{ filename => bin(BackupTarName), size => Size, - created_at => bin(CreatedAt), + created_at => emqx_utils_calendar:epoch_to_rfc3339(CTime, second), + created_at_sec => CTime, node => node() }}. @@ -351,7 +502,7 @@ parse_version_no_patch(VersionBin) -> end. do_import(BackupFileName, Opts) -> - BackupDir = filename:join(root_backup_dir(), filename:basename(BackupFileName, ?TAR_SUFFIX)), + BackupDir = ?backup_path(filename:basename(BackupFileName, ?TAR_SUFFIX)), maybe_print("Importing data from ~p...~n", [BackupFileName], Opts), try ok = validate_backup_name(BackupFileName), @@ -619,7 +770,7 @@ validate_cluster_hocon(RawConf) -> do_import_conf(RawConf, Opts) -> GenConfErrs = filter_errors(maps:from_list(import_generic_conf(RawConf))), - maybe_print_errors(GenConfErrs, Opts), + maybe_print_conf_errors(GenConfErrs, Opts), Errors = lists:foldl( fun(Module, ErrorsAcc) -> @@ -634,7 +785,7 @@ do_import_conf(RawConf, Opts) -> GenConfErrs, sort_importer_modules(find_behaviours(emqx_config_backup)) ), - maybe_print_errors(Errors, Opts), + maybe_print_conf_errors(Errors, Opts), Errors. sort_importer_modules(Modules) -> @@ -677,17 +828,17 @@ maybe_print_changed(Changed, Opts) -> Changed ). -maybe_print_errors(Errors, Opts) -> - maps:foreach( - fun(Path, Err) -> - maybe_print( - "Failed to import the following config path: ~p, reason: ~p~n", - [pretty_path(Path), Err], - Opts - ) - end, - Errors - ). +maybe_print_conf_errors(Errors, Opts) -> + maps:foreach(conf_error_formatter(Opts), Errors). + +conf_error_formatter(Opts) -> + fun(Path, Err) -> + maybe_print( + "Failed to import the following config path: ~p, reason: ~p~n", + [pretty_path(Path), Err], + Opts + ) + end. filter_errors(Results) -> maps:filter( @@ -727,7 +878,7 @@ lookup_file(FileName) -> %% Only lookup by basename, don't allow to lookup by file path case FileName =:= filename:basename(FileName) of true -> - FilePath = filename:join(root_backup_dir(), FileName), + FilePath = ?backup_path(FileName), case filelib:is_file(FilePath) of true -> {ok, FilePath}; false -> {error, not_found} diff --git a/apps/emqx_management/src/proto/emqx_mgmt_data_backup_proto_v1.erl b/apps/emqx_management/src/proto/emqx_mgmt_data_backup_proto_v1.erl new file mode 100644 index 000000000..066961c00 --- /dev/null +++ b/apps/emqx_management/src/proto/emqx_mgmt_data_backup_proto_v1.erl @@ -0,0 +1,51 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_mgmt_data_backup_proto_v1). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + import_file/4, + list_files/2, + read_file/3, + delete_file/3 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.4.0". + +-spec list_files([node()], timeout()) -> + emqx_rpc:erpc_multicall({non_neg_integer(), map()}). +list_files(Nodes, Timeout) -> + erpc:multicall(Nodes, emqx_mgmt_data_backup, list_files, [], Timeout). + +-spec import_file(node(), node(), binary(), timeout()) -> + emqx_mgmt_data_backup:import_res() | {badrpc, _}. +import_file(Node, FileNode, FileName, Timeout) -> + rpc:call(Node, emqx_mgmt_data_backup, maybe_copy_and_import, [FileNode, FileName], Timeout). + +-spec read_file(node(), binary(), timeout()) -> + {ok, binary()} | {error, _} | {bardrpc, _}. +read_file(Node, FileName, Timeout) -> + rpc:call(Node, emqx_mgmt_data_backup, read_file, [FileName], Timeout). + +-spec delete_file(node(), binary(), timeout()) -> ok | {error, _} | {bardrpc, _}. +delete_file(Node, FileName, Timeout) -> + rpc:call(Node, emqx_mgmt_data_backup, delete_file, [FileName], Timeout). diff --git a/apps/emqx_management/test/emqx_mgmt_api_data_backup_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_data_backup_SUITE.erl new file mode 100644 index 000000000..cef32ab92 --- /dev/null +++ b/apps/emqx_management/test/emqx_mgmt_api_data_backup_SUITE.erl @@ -0,0 +1,355 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_mgmt_api_data_backup_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-define(NODE1_PORT, 18085). +-define(NODE2_PORT, 18086). +-define(NODE3_PORT, 18087). +-define(api_base_url(_Port_), ("http://127.0.0.1:" ++ (integer_to_list(_Port_)))). + +-define(UPLOAD_EE_BACKUP, "emqx-export-upload-ee.tar.gz"). +-define(UPLOAD_CE_BACKUP, "emqx-export-upload-ce.tar.gz"). +-define(BAD_UPLOAD_BACKUP, "emqx-export-bad-upload.tar.gz"). +-define(BAD_IMPORT_BACKUP, "emqx-export-bad-file.tar.gz"). +-define(backup_path(_Config_, _BackupName_), + filename:join(?config(data_dir, _Config_), _BackupName_) +). + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + Config. + +end_per_suite(_) -> + ok. + +init_per_testcase(TC, Config) when + TC =:= t_upload_ee_backup; + TC =:= t_import_ee_backup +-> + case emqx_release:edition() of + ee -> do_init_per_testcase(TC, Config); + ce -> Config + end; +init_per_testcase(TC, Config) -> + do_init_per_testcase(TC, Config). + +end_per_testcase(_TC, Config) -> + case ?config(cluster, Config) of + undefined -> ok; + Cluster -> emqx_cth_cluster:stop(Cluster) + end. + +t_export_backup(Config) -> + Auth = ?config(auth, Config), + export_test(?NODE1_PORT, Auth), + export_test(?NODE2_PORT, Auth), + export_test(?NODE3_PORT, Auth). + +t_delete_backup(Config) -> + test_file_op(delete, Config). + +t_get_backup(Config) -> + test_file_op(get, Config). + +t_list_backups(Config) -> + Auth = ?config(auth, Config), + + [{ok, _} = export_backup(?NODE1_PORT, Auth) || _ <- lists:seq(1, 10)], + [{ok, _} = export_backup(?NODE2_PORT, Auth) || _ <- lists:seq(1, 10)], + + {ok, RespBody} = list_backups(?NODE1_PORT, Auth, <<"1">>, <<"100">>), + #{<<"data">> := Data, <<"meta">> := _} = emqx_utils_json:decode(RespBody), + ?assertEqual(20, length(Data)), + + {ok, EmptyRespBody} = list_backups(?NODE2_PORT, Auth, <<"2">>, <<"100">>), + #{<<"data">> := EmptyData, <<"meta">> := _} = emqx_utils_json:decode(EmptyRespBody), + ?assertEqual(0, length(EmptyData)), + + {ok, RespBodyP1} = list_backups(?NODE3_PORT, Auth, <<"1">>, <<"10">>), + {ok, RespBodyP2} = list_backups(?NODE3_PORT, Auth, <<"2">>, <<"10">>), + {ok, RespBodyP3} = list_backups(?NODE3_PORT, Auth, <<"3">>, <<"10">>), + + #{<<"data">> := DataP1, <<"meta">> := _} = emqx_utils_json:decode(RespBodyP1), + ?assertEqual(10, length(DataP1)), + #{<<"data">> := DataP2, <<"meta">> := _} = emqx_utils_json:decode(RespBodyP2), + ?assertEqual(10, length(DataP2)), + #{<<"data">> := DataP3, <<"meta">> := _} = emqx_utils_json:decode(RespBodyP3), + ?assertEqual(0, length(DataP3)), + + ?assertEqual(Data, DataP1 ++ DataP2). + +t_upload_ce_backup(Config) -> + upload_backup_test(Config, ?UPLOAD_CE_BACKUP). + +t_upload_ee_backup(Config) -> + case emqx_release:edition() of + ee -> upload_backup_test(Config, ?UPLOAD_EE_BACKUP); + ce -> ok + end. + +t_import_ce_backup(Config) -> + import_backup_test(Config, ?UPLOAD_CE_BACKUP). + +t_import_ee_backup(Config) -> + case emqx_release:edition() of + ee -> import_backup_test(Config, ?UPLOAD_EE_BACKUP); + ce -> ok + end. + +do_init_per_testcase(TC, Config) -> + Cluster = [Core1, _Core2, Repl] = cluster(TC, Config), + Auth = auth_header(Core1), + ok = wait_for_auth_replication(Repl), + [{auth, Auth}, {cluster, Cluster} | Config]. + +test_file_op(Method, Config) -> + Auth = ?config(auth, Config), + + {ok, Node1Resp} = export_backup(?NODE1_PORT, Auth), + {ok, Node2Resp} = export_backup(?NODE2_PORT, Auth), + {ok, Node3Resp} = export_backup(?NODE3_PORT, Auth), + + ParsedResps = [emqx_utils_json:decode(R) || R <- [Node1Resp, Node2Resp, Node3Resp]], + + [Node1Parsed, Node2Parsed, Node3Parsed] = ParsedResps, + + %% node param is not set in Query, expect get/delete the backup on the local node + F1 = fun() -> + backup_file_op(Method, ?NODE1_PORT, Auth, maps:get(<<"filename">>, Node1Parsed), []) + end, + ?assertMatch({ok, _}, F1()), + assert_second_call(Method, F1()), + + %% Node 2 must get/delete the backup on Node 3 via rpc + F2 = fun() -> + backup_file_op( + Method, + ?NODE2_PORT, + Auth, + maps:get(<<"filename">>, Node3Parsed), + [{<<"node">>, maps:get(<<"node">>, Node3Parsed)}] + ) + end, + ?assertMatch({ok, _}, F2()), + assert_second_call(Method, F2()), + + %% The same as above but nodes are switched + F3 = fun() -> + backup_file_op( + Method, + ?NODE3_PORT, + Auth, + maps:get(<<"filename">>, Node2Parsed), + [{<<"node">>, maps:get(<<"node">>, Node2Parsed)}] + ) + end, + ?assertMatch({ok, _}, F3()), + assert_second_call(Method, F3()). + +export_test(NodeApiPort, Auth) -> + {ok, RespBody} = export_backup(NodeApiPort, Auth), + #{ + <<"created_at">> := _, + <<"created_at_sec">> := CreatedSec, + <<"filename">> := _, + <<"node">> := _, + <<"size">> := Size + } = emqx_utils_json:decode(RespBody), + ?assert(is_integer(Size)), + ?assert(is_integer(CreatedSec) andalso CreatedSec > 0). + +upload_backup_test(Config, BackupName) -> + Auth = ?config(auth, Config), + UploadFile = ?backup_path(Config, BackupName), + BadImportFile = ?backup_path(Config, ?BAD_IMPORT_BACKUP), + BadUploadFile = ?backup_path(Config, ?BAD_UPLOAD_BACKUP), + + ?assertEqual(ok, upload_backup(?NODE3_PORT, Auth, UploadFile)), + %% This file was specially forged to pass upload validation bat fail on import + ?assertEqual(ok, upload_backup(?NODE2_PORT, Auth, BadImportFile)), + ?assertEqual({error, bad_request}, upload_backup(?NODE1_PORT, Auth, BadUploadFile)). + +import_backup_test(Config, BackupName) -> + Auth = ?config(auth, Config), + UploadFile = ?backup_path(Config, BackupName), + BadImportFile = ?backup_path(Config, ?BAD_IMPORT_BACKUP), + + ?assertEqual(ok, upload_backup(?NODE3_PORT, Auth, UploadFile)), + + %% This file was specially forged to pass upload validation bat fail on import + ?assertEqual(ok, upload_backup(?NODE2_PORT, Auth, BadImportFile)), + + %% Replicant node must be able to import the file by doing rpc to a core node + ?assertMatch({ok, _}, import_backup(?NODE3_PORT, Auth, BackupName)), + + [N1, N2, N3] = ?config(cluster, Config), + + ?assertMatch({ok, _}, import_backup(?NODE3_PORT, Auth, BackupName)), + + ?assertMatch({ok, _}, import_backup(?NODE1_PORT, Auth, BackupName, N3)), + %% Now this node must also have the file locally + ?assertMatch({ok, _}, import_backup(?NODE1_PORT, Auth, BackupName, N1)), + + ?assertMatch({error, {_, 400, _}}, import_backup(?NODE2_PORT, Auth, ?BAD_IMPORT_BACKUP, N2)). + +assert_second_call(get, Res) -> + ?assertMatch({ok, _}, Res); +assert_second_call(delete, Res) -> + ?assertMatch({error, {_, 404, _}}, Res). + +export_backup(NodeApiPort, Auth) -> + Path = ["data", "export"], + request(post, NodeApiPort, Path, Auth). + +import_backup(NodeApiPort, Auth, BackupName) -> + import_backup(NodeApiPort, Auth, BackupName, undefined). + +import_backup(NodeApiPort, Auth, BackupName, Node) -> + Path = ["data", "import"], + Body = #{<<"filename">> => unicode:characters_to_binary(BackupName)}, + Body1 = + case Node of + undefined -> Body; + _ -> Body#{<<"node">> => Node} + end, + request(post, NodeApiPort, Path, Body1, Auth). + +list_backups(NodeApiPort, Auth, Page, Limit) -> + Path = ["data", "files"], + request(get, NodeApiPort, Path, [{<<"page">>, Page}, {<<"limit">>, Limit}], [], Auth). + +backup_file_op(Method, NodeApiPort, Auth, BackupName, QueryList) -> + Path = ["data", "files", BackupName], + request(Method, NodeApiPort, Path, QueryList, [], Auth). + +upload_backup(NodeApiPort, Auth, BackupFilePath) -> + Path = emqx_mgmt_api_test_util:api_path(?api_base_url(NodeApiPort), ["data", "files"]), + Res = emqx_mgmt_api_test_util:upload_request( + Path, + BackupFilePath, + "filename", + <<"application/octet-stream">>, + [], + Auth + ), + case Res of + {ok, {{"HTTP/1.1", 204, _}, _Headers, _}} -> + ok; + {ok, {{"HTTP/1.1", 400, _}, _Headers, _} = Resp} -> + ct:pal("Backup upload failed: ~p", [Resp]), + {error, bad_request}; + Err -> + Err + end. + +request(Method, NodePort, PathParts, Auth) -> + request(Method, NodePort, PathParts, [], [], Auth). + +request(Method, NodePort, PathParts, Body, Auth) -> + request(Method, NodePort, PathParts, [], Body, Auth). + +request(Method, NodePort, PathParts, QueryList, Body, Auth) -> + Path = emqx_mgmt_api_test_util:api_path(?api_base_url(NodePort), PathParts), + Query = unicode:characters_to_list(uri_string:compose_query(QueryList)), + emqx_mgmt_api_test_util:request_api(Method, Path, Query, Auth, Body). + +cluster(TC, Config) -> + Nodes = emqx_cth_cluster:start( + [ + {api_data_backup_core1, #{role => core, apps => apps_spec(18085, TC)}}, + {api_data_backup_core2, #{role => core, apps => apps_spec(18086, TC)}}, + {api_data_backup_replicant, #{role => replicant, apps => apps_spec(18087, TC)}} + ], + #{work_dir => emqx_cth_suite:work_dir(TC, Config)} + ), + Nodes. + +auth_header(Node) -> + {ok, API} = erpc:call(Node, emqx_common_test_http, create_default_app, []), + emqx_common_test_http:auth_header(API). + +wait_for_auth_replication(ReplNode) -> + wait_for_auth_replication(ReplNode, 100). + +wait_for_auth_replication(ReplNode, 0) -> + {error, {ReplNode, auth_not_ready}}; +wait_for_auth_replication(ReplNode, Retries) -> + try + {_Header, _Val} = erpc:call(ReplNode, emqx_common_test_http, default_auth_header, []), + ok + catch + _:_ -> + timer:sleep(1), + wait_for_auth_replication(ReplNode, Retries - 1) + end. + +apps_spec(APIPort, TC) -> + common_apps_spec() ++ + app_spec_dashboard(APIPort) ++ + upload_import_apps_spec(TC). + +common_apps_spec() -> + [ + emqx, + emqx_conf, + emqx_management + ]. + +app_spec_dashboard(APIPort) -> + [ + {emqx_dashboard, #{ + config => + #{ + dashboard => + #{ + listeners => + #{ + http => + #{bind => APIPort} + }, + default_username => "", + default_password => "" + } + } + }} + ]. + +upload_import_apps_spec(TC) when + TC =:= t_upload_ee_backup; + TC =:= t_import_ee_backup; + TC =:= t_upload_ce_backup; + TC =:= t_import_ce_backup +-> + [ + emqx_auth, + emqx_auth_http, + emqx_auth_jwt, + emqx_auth_mnesia, + emqx_rule_engine, + emqx_modules, + emqx_bridge + ]; +upload_import_apps_spec(_TC) -> + []. diff --git a/apps/emqx_management/test/emqx_mgmt_api_data_backup_SUITE_data/emqx-export-bad-file.tar.gz b/apps/emqx_management/test/emqx_mgmt_api_data_backup_SUITE_data/emqx-export-bad-file.tar.gz new file mode 100644 index 000000000..ee56fcbe6 Binary files /dev/null and b/apps/emqx_management/test/emqx_mgmt_api_data_backup_SUITE_data/emqx-export-bad-file.tar.gz differ diff --git a/apps/emqx_management/test/emqx_mgmt_api_data_backup_SUITE_data/emqx-export-bad-upload.tar.gz b/apps/emqx_management/test/emqx_mgmt_api_data_backup_SUITE_data/emqx-export-bad-upload.tar.gz new file mode 100644 index 000000000..569a58a2c Binary files /dev/null and b/apps/emqx_management/test/emqx_mgmt_api_data_backup_SUITE_data/emqx-export-bad-upload.tar.gz differ diff --git a/apps/emqx_management/test/emqx_mgmt_api_data_backup_SUITE_data/emqx-export-upload-ce.tar.gz b/apps/emqx_management/test/emqx_mgmt_api_data_backup_SUITE_data/emqx-export-upload-ce.tar.gz new file mode 100644 index 000000000..1c74cb4f8 Binary files /dev/null and b/apps/emqx_management/test/emqx_mgmt_api_data_backup_SUITE_data/emqx-export-upload-ce.tar.gz differ diff --git a/apps/emqx_management/test/emqx_mgmt_api_data_backup_SUITE_data/emqx-export-upload-ee.tar.gz b/apps/emqx_management/test/emqx_mgmt_api_data_backup_SUITE_data/emqx-export-upload-ee.tar.gz new file mode 100644 index 000000000..5026c1554 Binary files /dev/null and b/apps/emqx_management/test/emqx_mgmt_api_data_backup_SUITE_data/emqx-export-upload-ee.tar.gz differ diff --git a/changes/ce/feat-12017.en.md b/changes/ce/feat-12017.en.md new file mode 100644 index 000000000..d364c5655 --- /dev/null +++ b/changes/ce/feat-12017.en.md @@ -0,0 +1 @@ +Implemented HTTP API for configuration and user data import/export.