feat(emqx_management): implement data backup API

This commit is contained in:
Serge Tupchii 2023-11-23 23:10:49 +02:00
parent b5a00ec6b2
commit 64ee29af81
10 changed files with 950 additions and 30 deletions

View File

@ -39,6 +39,7 @@
{emqx_mgmt_api_plugins,2}.
{emqx_mgmt_cluster,1}.
{emqx_mgmt_cluster,2}.
{emqx_mgmt_data_backup,1}.
{emqx_mgmt_trace,1}.
{emqx_mgmt_trace,2}.
{emqx_node_rebalance,1}.

View File

@ -0,0 +1,361 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mgmt_api_data_backup).
-behaviour(minirest_api).
-include_lib("emqx/include/logger.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-export([api_spec/0, paths/0, schema/1, fields/1]).
-export([
data_export/2,
data_import/2,
data_files/2,
data_file_by_name/2
]).
-define(TAGS, [<<"Data Backup">>]).
-define(BAD_REQUEST, 'BAD_REQUEST').
-define(NOT_FOUND, 'NOT_FOUND').
-define(node_field(IsRequired), ?node_field(IsRequired, #{})).
-define(node_field(IsRequired, Meta),
{node, ?HOCON(binary(), Meta#{desc => "Node name", required => IsRequired})}
).
-define(filename_field(IsRequired), ?filename_field(IsRequired, #{})).
-define(filename_field(IsRequired, Meta),
{filename,
?HOCON(binary(), Meta#{
desc => "Data backup file name",
required => IsRequired
})}
).
api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
paths() ->
[
"/data/export",
"/data/import",
"/data/files",
"/data/files/:filename"
].
schema("/data/export") ->
#{
'operationId' => data_export,
post => #{
tags => ?TAGS,
desc => <<"Export a data backup file">>,
responses => #{
200 =>
emqx_dashboard_swagger:schema_with_example(
?R_REF(backup_file_info),
backup_file_info_example()
)
}
}
};
schema("/data/import") ->
#{
'operationId' => data_import,
post => #{
tags => ?TAGS,
desc => <<"Import a data backup file">>,
'requestBody' => emqx_dashboard_swagger:schema_with_example(
?R_REF(import_request_body),
maps:with([node, filename], backup_file_info_example())
),
responses => #{
204 => <<"No Content">>,
400 => emqx_dashboard_swagger:error_codes(
[?BAD_REQUEST], <<"Backup file import failed">>
)
}
}
};
schema("/data/files") ->
#{
'operationId' => data_files,
post => #{
tags => ?TAGS,
desc => <<"Upload a data backup file">>,
'requestBody' => emqx_dashboard_swagger:file_schema(filename),
responses => #{
204 => <<"No Content">>,
400 => emqx_dashboard_swagger:error_codes(
[?BAD_REQUEST], <<"Bad backup file">>
)
}
},
get => #{
tags => ?TAGS,
desc => <<"List backup files">>,
parameters => [
?R_REF(emqx_dashboard_swagger, page),
?R_REF(emqx_dashboard_swagger, limit)
],
responses => #{
200 =>
emqx_dashboard_swagger:schema_with_example(
?R_REF(files_response),
files_response_example()
)
}
}
};
schema("/data/files/:filename") ->
#{
'operationId' => data_file_by_name,
get => #{
tags => ?TAGS,
desc => <<"Download a data backup file">>,
parameters => [
?filename_field(true, #{in => path}),
?node_field(false, #{in => query})
],
responses => #{
200 => ?HOCON(binary),
400 => emqx_dashboard_swagger:error_codes(
[?BAD_REQUEST], <<"Bad request">>
),
404 => emqx_dashboard_swagger:error_codes(
[?NOT_FOUND], <<"Backup file not found">>
)
}
},
delete => #{
tags => ?TAGS,
desc => <<"Delete a data backup file">>,
parameters => [
?filename_field(true, #{in => path}),
?node_field(false, #{in => query})
],
responses => #{
204 => <<"No Content">>,
400 => emqx_dashboard_swagger:error_codes(
[?BAD_REQUEST], <<"Bad request">>
),
404 => emqx_dashboard_swagger:error_codes(
[?NOT_FOUND], <<"Backup file not found">>
)
}
}
}.
fields(files_response) ->
[
{data, ?ARRAY(?R_REF(backup_file_info))},
{meta, ?R_REF(emqx_dashboard_swagger, meta)}
];
fields(backup_file_info) ->
[
?node_field(true),
?filename_field(true),
{created_at,
?HOCON(binary(), #{
desc => "Data backup file creation date and time",
required => true
})}
];
fields(import_request_body) ->
[?node_field(false), ?filename_field(true)];
fields(data_backup_file) ->
[
?filename_field(true),
{file,
?HOCON(binary(), #{
desc => "Data backup file content",
required => true
})}
].
%%------------------------------------------------------------------------------
%% HTTP API Callbacks
%%------------------------------------------------------------------------------
data_export(post, _Request) ->
case emqx_mgmt_data_backup:export() of
{ok, #{filename := FileName} = File} ->
{200, File#{filename => filename:basename(FileName)}};
Error ->
Error
end.
data_import(post, #{body := #{<<"filename">> := FileName} = Body}) ->
case safe_parse_node(Body) of
{error, Msg} ->
{400, #{code => 'BAD_REQUEST', message => Msg}};
FileNode ->
CoreNode = core_node(FileNode),
response(
emqx_mgmt_data_backup_proto_v1:import_file(CoreNode, FileNode, FileName, infinity)
)
end.
core_node(FileNode) ->
case mria_rlog:role(FileNode) of
core ->
FileNode;
replicant ->
case mria_rlog:role() of
core ->
node();
replicant ->
mria_membership:coordinator()
end
end.
data_files(post, #{body := #{<<"filename">> := #{type := _} = File}}) ->
[{FileName, FileContent} | _] = maps:to_list(maps:without([type], File)),
case emqx_mgmt_data_backup:upload(FileName, FileContent) of
ok ->
{204};
{error, Reason} ->
{400, #{code => 'BAD_REQUEST', message => emqx_mgmt_data_backup:format_error(Reason)}}
end;
data_files(get, #{query_string := PageParams}) ->
case emqx_mgmt_api:parse_pager_params(PageParams) of
false ->
{400, #{code => ?BAD_REQUEST, message => <<"page_limit_invalid">>}};
#{page := Page, limit := Limit} = Pager ->
{200, #{data => list_backup_files(Page, Limit), meta => Pager}}
end.
data_file_by_name(Method, #{bindings := #{filename := Filename}, query_string := QS}) ->
case safe_parse_node(QS) of
{error, Msg} ->
{400, #{code => 'BAD_REQUEST', message => Msg}};
Node ->
case get_or_delete_file(Method, Filename, Node) of
{error, not_found} ->
{404, #{
code => ?NOT_FOUND, message => emqx_mgmt_data_backup:format_error(not_found)
}};
Other ->
response(Other)
end
end.
%%------------------------------------------------------------------------------
%% Internal functions
%%------------------------------------------------------------------------------
get_or_delete_file(get, Filename, Node) ->
emqx_mgmt_data_backup_proto_v1:read_file(Node, Filename, infinity);
get_or_delete_file(delete, Filename, Node) ->
emqx_mgmt_data_backup_proto_v1:delete_file(Node, Filename, infinity).
safe_parse_node(#{<<"node">> := NodeBin}) ->
NodesBin = [erlang:atom_to_binary(N, utf8) || N <- emqx:running_nodes()],
case lists:member(NodeBin, NodesBin) of
true -> erlang:binary_to_atom(NodeBin, utf8);
false -> {error, io_lib:format("Unknown node: ~s", [NodeBin])}
end;
safe_parse_node(_) ->
node().
response({ok, #{db_errors := DbErrs, config_errors := ConfErrs}}) ->
case DbErrs =:= #{} andalso ConfErrs =:= #{} of
true ->
{204};
false ->
DbErrs1 = emqx_mgmt_data_backup:format_db_errors(DbErrs),
ConfErrs1 = emqx_mgmt_data_backup:format_conf_errors(ConfErrs),
Msg = unicode:characters_to_binary(io_lib:format("~s", [DbErrs1 ++ ConfErrs1])),
{400, #{code => ?BAD_REQUEST, message => Msg}}
end;
response({ok, Res}) ->
{200, Res};
response(ok) ->
{204};
response({error, Reason}) ->
{400, #{code => ?BAD_REQUEST, message => emqx_mgmt_data_backup:format_error(Reason)}}.
list_backup_files(Page, Limit) ->
Start = Page * Limit - Limit + 1,
lists:sublist(list_backup_files(), Start, Limit).
list_backup_files() ->
Nodes = emqx:running_nodes(),
Results = emqx_mgmt_data_backup_proto_v1:list_files(Nodes, 30_0000),
NodeResults = lists:zip(Nodes, Results),
{Successes, Failures} =
lists:partition(
fun({_Node, Result}) ->
case Result of
{ok, _} -> true;
_ -> false
end
end,
NodeResults
),
case Failures of
[] ->
ok;
[_ | _] ->
?SLOG(error, #{msg => "list_exported_backup_files_failed", node_errors => Failures})
end,
FileList = [FileInfo || {_Node, {ok, FileInfos}} <- Successes, FileInfo <- FileInfos],
lists:sort(
fun(#{created_at_sec := T1, filename := F1}, #{created_at_sec := T2, filename := F2}) ->
case T1 =:= T2 of
true -> F1 >= F2;
false -> T1 > T2
end
end,
FileList
).
backup_file_info_example() ->
#{
created_at => <<"2023-11-23T19:13:19+02:00">>,
created_at_sec => 1700759599,
filename => <<"emqx-export-2023-11-23-19-13-19.043.tar.gz">>,
node => 'emqx@127.0.0.1',
size => 22740
}.
files_response_example() ->
#{
data => [
#{
created_at => <<"2023-09-02T11:11:33+02:00">>,
created_at_sec => 1693645893,
filename => <<"emqx-export-2023-09-02-11-11-33.012.tar.gz">>,
node => 'emqx@127.0.0.1',
size => 22740
},
#{
created_at => <<"2023-11-23T19:13:19+02:00">>,
created_at_sec => 1700759599,
filename => <<"emqx-export-2023-11-23-19-13-19.043.tar.gz">>,
node => 'emqx@127.0.0.1',
size => 22740
}
],
meta => #{
page => 0,
limit => 20,
count => 300
}
}.

View File

@ -24,8 +24,21 @@
format_error/1
]).
%% HTTP API
-export([
upload/2,
maybe_copy_and_import/2,
read_file/1,
delete_file/1,
list_files/0,
format_conf_errors/1,
format_db_errors/1
]).
-export([default_validate_mnesia_backup/1]).
-export_type([import_res/0]).
-ifdef(TEST).
-compile(export_all).
-compile(nowarn_export_all).
@ -80,17 +93,21 @@
end
end()
).
-define(backup_path(_FileName_), filename:join(root_backup_dir(), _FileName_)).
-type backup_file_info() :: #{
filename => binary(),
size => non_neg_integer(),
created_at => binary(),
node => node(),
filename := binary(),
size := non_neg_integer(),
created_at := binary(),
created_at_sec := integer(),
node := node(),
atom() => _
}.
-type db_error_details() :: #{mria:table() => {error, _}}.
-type config_error_details() :: #{emqx_utils_maps:config_path() => {error, _}}.
-type import_res() ::
{ok, #{db_errors => db_error_details(), config_errors => config_error_details()}} | {error, _}.
%%------------------------------------------------------------------------------
%% APIs
@ -120,15 +137,11 @@ export(Opts) ->
file:del_dir_r(BackupName)
end.
-spec import(file:filename_all()) ->
{ok, #{db_errors => db_error_details(), config_errors => config_error_details()}}
| {error, _}.
-spec import(file:filename_all()) -> import_res().
import(BackupFileName) ->
import(BackupFileName, ?DEFAULT_OPTS).
-spec import(file:filename_all(), map()) ->
{ok, #{db_errors => db_error_details(), config_errors => config_error_details()}}
| {error, _}.
-spec import(file:filename_all(), map()) -> import_res().
import(BackupFileName, Opts) ->
case is_import_allowed() of
true ->
@ -142,6 +155,74 @@ import(BackupFileName, Opts) ->
{error, not_core_node}
end.
-spec maybe_copy_and_import(node(), file:filename_all()) -> import_res().
maybe_copy_and_import(FileNode, BackupFileName) when FileNode =:= node() ->
import(BackupFileName, #{});
maybe_copy_and_import(FileNode, BackupFileName) ->
%% The file can be already present locally
case filelib:is_file(?backup_path(str(BackupFileName))) of
true ->
import(BackupFileName, #{});
false ->
copy_and_import(FileNode, BackupFileName)
end.
-spec read_file(file:filename_all()) ->
{ok, #{filename => file:filename_all(), file => binary()}} | {error, _}.
read_file(BackupFileName) ->
BackupFileNameStr = str(BackupFileName),
case validate_backup_name(BackupFileNameStr) of
ok ->
maybe_not_found(file:read_file(?backup_path(BackupFileName)));
Err ->
Err
end.
-spec delete_file(file:filename_all()) -> ok | {error, _}.
delete_file(BackupFileName) ->
BackupFileNameStr = str(BackupFileName),
case validate_backup_name(BackupFileNameStr) of
ok ->
maybe_not_found(file:delete(?backup_path(BackupFileName)));
Err ->
Err
end.
-spec upload(file:filename_all(), binary()) -> ok | {error, _}.
upload(BackupFileName, BackupFileContent) ->
BackupFileNameStr = str(BackupFileName),
FilePath = ?backup_path(BackupFileNameStr),
case filelib:is_file(FilePath) of
true ->
{error, {already_exists, BackupFileNameStr}};
false ->
do_upload(BackupFileNameStr, BackupFileContent)
end.
-spec list_files() -> [backup_file_info()].
list_files() ->
Filter =
fun(File) ->
case file:read_file_info(File, [{time, posix}]) of
{ok, #file_info{size = Size, ctime = CTimeSec}} ->
BaseFilename = bin(filename:basename(File)),
Info = #{
filename => BaseFilename,
size => Size,
created_at => emqx_utils_calendar:epoch_to_rfc3339(CTimeSec, second),
created_at_sec => CTimeSec,
node => node()
},
{true, Info};
_ ->
false
end
end,
lists:filtermap(Filter, backup_files()).
backup_files() ->
filelib:wildcard(?backup_path("*" ++ ?TAR_SUFFIX)).
format_error(not_core_node) ->
str(
io_lib:format(
@ -170,13 +251,83 @@ format_error({unsupported_version, ImportVersion}) ->
[str(ImportVersion), str(emqx_release:version())]
)
);
format_error({already_exists, BackupFileName}) ->
str(io_lib:format("Backup file \"~s\" already exists", [BackupFileName]));
format_error(Reason) ->
Reason.
format_conf_errors(Errors) ->
Opts = #{print_fun => fun io_lib:format/2},
maps:values(maps:map(conf_error_formatter(Opts), Errors)).
format_db_errors(Errors) ->
Opts = #{print_fun => fun io_lib:format/2},
maps:values(
maps:map(
fun(Tab, Err) -> maybe_print_mnesia_import_err(Tab, Err, Opts) end,
Errors
)
).
%%------------------------------------------------------------------------------
%% Internal functions
%%------------------------------------------------------------------------------
copy_and_import(FileNode, BackupFileName) ->
case emqx_mgmt_data_backup_proto_v1:read_file(FileNode, BackupFileName, infinity) of
{ok, BackupFileContent} ->
case upload(BackupFileName, BackupFileContent) of
ok ->
import(BackupFileName, #{});
Err ->
Err
end;
Err ->
Err
end.
%% compatibility with import API that uses lookup_file/1 and returns `not_found` reason
maybe_not_found({error, enoent}) ->
{error, not_found};
maybe_not_found(Other) ->
Other.
do_upload(BackupFileNameStr, BackupFileContent) ->
FilePath = ?backup_path(BackupFileNameStr),
BackupDir = ?backup_path(filename:basename(BackupFileNameStr, ?TAR_SUFFIX)),
try
ok = validate_backup_name(BackupFileNameStr),
ok = file:write_file(FilePath, BackupFileContent),
ok = extract_backup(FilePath),
{ok, _} = validate_backup(BackupDir),
HoconFileName = filename:join(BackupDir, ?CLUSTER_HOCON_FILENAME),
case filelib:is_regular(HoconFileName) of
true ->
{ok, RawConf} = hocon:files([HoconFileName]),
RawConf1 = upgrade_raw_conf(emqx_conf:schema_module(), RawConf),
{ok, _} = validate_cluster_hocon(RawConf1),
ok;
false ->
%% cluster.hocon can be missing in the backup
ok
end,
?SLOG(info, #{msg => "emqx_data_upload_success"})
catch
error:{badmatch, {error, Reason}}:Stack ->
?SLOG(error, #{msg => "emqx_data_upload_failed", reason => Reason, stacktrace => Stack}),
{error, Reason};
Class:Reason:Stack ->
?SLOG(error, #{
msg => "emqx_data_upload_failed",
exception => Class,
reason => Reason,
stacktrace => Stack
}),
{error, Reason}
after
file:del_dir_r(BackupDir)
end.
prepare_new_backup(Opts) ->
Ts = erlang:system_time(millisecond),
{{Y, M, D}, {HH, MM, SS}} = local_datetime(Ts),
@ -186,7 +337,7 @@ prepare_new_backup(Opts) ->
[Y, M, D, HH, MM, SS, Ts rem 1000]
)
),
BackupName = filename:join(root_backup_dir(), BackupBaseName),
BackupName = ?backup_path(BackupBaseName),
BackupTarName = ?tar(BackupName),
maybe_print("Exporting data to ~p...~n", [BackupTarName], Opts),
{ok, TarDescriptor} = ?fmt_tar_err(erl_tar:open(BackupTarName, [write, compressed])),
@ -208,13 +359,13 @@ do_export(BackupName, TarDescriptor, Opts) ->
ok = ?fmt_tar_err(erl_tar:close(TarDescriptor)),
{ok, #file_info{
size = Size,
ctime = {{Y1, M1, D1}, {H1, MM1, S1}}
}} = file:read_file_info(BackupTarName),
CreatedAt = io_lib:format("~p-~p-~p ~p:~p:~p", [Y1, M1, D1, H1, MM1, S1]),
ctime = CTime
}} = file:read_file_info(BackupTarName, [{time, posix}]),
{ok, #{
filename => bin(BackupTarName),
size => Size,
created_at => bin(CreatedAt),
created_at => emqx_utils_calendar:epoch_to_rfc3339(CTime, second),
created_at_sec => CTime,
node => node()
}}.
@ -351,7 +502,7 @@ parse_version_no_patch(VersionBin) ->
end.
do_import(BackupFileName, Opts) ->
BackupDir = filename:join(root_backup_dir(), filename:basename(BackupFileName, ?TAR_SUFFIX)),
BackupDir = ?backup_path(filename:basename(BackupFileName, ?TAR_SUFFIX)),
maybe_print("Importing data from ~p...~n", [BackupFileName], Opts),
try
ok = validate_backup_name(BackupFileName),
@ -619,7 +770,7 @@ validate_cluster_hocon(RawConf) ->
do_import_conf(RawConf, Opts) ->
GenConfErrs = filter_errors(maps:from_list(import_generic_conf(RawConf))),
maybe_print_errors(GenConfErrs, Opts),
maybe_print_conf_errors(GenConfErrs, Opts),
Errors =
lists:foldl(
fun(Module, ErrorsAcc) ->
@ -634,7 +785,7 @@ do_import_conf(RawConf, Opts) ->
GenConfErrs,
sort_importer_modules(find_behaviours(emqx_config_backup))
),
maybe_print_errors(Errors, Opts),
maybe_print_conf_errors(Errors, Opts),
Errors.
sort_importer_modules(Modules) ->
@ -677,17 +828,17 @@ maybe_print_changed(Changed, Opts) ->
Changed
).
maybe_print_errors(Errors, Opts) ->
maps:foreach(
fun(Path, Err) ->
maybe_print(
"Failed to import the following config path: ~p, reason: ~p~n",
[pretty_path(Path), Err],
Opts
)
end,
Errors
).
maybe_print_conf_errors(Errors, Opts) ->
maps:foreach(conf_error_formatter(Opts), Errors).
conf_error_formatter(Opts) ->
fun(Path, Err) ->
maybe_print(
"Failed to import the following config path: ~p, reason: ~p~n",
[pretty_path(Path), Err],
Opts
)
end.
filter_errors(Results) ->
maps:filter(
@ -727,7 +878,7 @@ lookup_file(FileName) ->
%% Only lookup by basename, don't allow to lookup by file path
case FileName =:= filename:basename(FileName) of
true ->
FilePath = filename:join(root_backup_dir(), FileName),
FilePath = ?backup_path(FileName),
case filelib:is_file(FilePath) of
true -> {ok, FilePath};
false -> {error, not_found}

View File

@ -0,0 +1,51 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mgmt_data_backup_proto_v1).
-behaviour(emqx_bpapi).
-export([
introduced_in/0,
import_file/4,
list_files/2,
read_file/3,
delete_file/3
]).
-include_lib("emqx/include/bpapi.hrl").
introduced_in() ->
"5.4.0".
-spec list_files([node()], timeout()) ->
emqx_rpc:erpc_multicall({non_neg_integer(), map()}).
list_files(Nodes, Timeout) ->
erpc:multicall(Nodes, emqx_mgmt_data_backup, list_files, [], Timeout).
-spec import_file(node(), node(), binary(), timeout()) ->
emqx_mgmt_data_backup:import_res() | {badrpc, _}.
import_file(Node, FileNode, FileName, Timeout) ->
rpc:call(Node, emqx_mgmt_data_backup, maybe_copy_and_import, [FileNode, FileName], Timeout).
-spec read_file(node(), binary(), timeout()) ->
{ok, binary()} | {error, _} | {bardrpc, _}.
read_file(Node, FileName, Timeout) ->
rpc:call(Node, emqx_mgmt_data_backup, read_file, [FileName], Timeout).
-spec delete_file(node(), binary(), timeout()) -> ok | {error, _} | {bardrpc, _}.
delete_file(Node, FileName, Timeout) ->
rpc:call(Node, emqx_mgmt_data_backup, delete_file, [FileName], Timeout).

View File

@ -0,0 +1,355 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mgmt_api_data_backup_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(NODE1_PORT, 18085).
-define(NODE2_PORT, 18086).
-define(NODE3_PORT, 18087).
-define(api_base_url(_Port_), ("http://127.0.0.1:" ++ (integer_to_list(_Port_)))).
-define(UPLOAD_EE_BACKUP, "emqx-export-upload-ee.tar.gz").
-define(UPLOAD_CE_BACKUP, "emqx-export-upload-ce.tar.gz").
-define(BAD_UPLOAD_BACKUP, "emqx-export-bad-upload.tar.gz").
-define(BAD_IMPORT_BACKUP, "emqx-export-bad-file.tar.gz").
-define(backup_path(_Config_, _BackupName_),
filename:join(?config(data_dir, _Config_), _BackupName_)
).
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
Config.
end_per_suite(_) ->
ok.
init_per_testcase(TC, Config) when
TC =:= t_upload_ee_backup;
TC =:= t_import_ee_backup
->
case emqx_release:edition() of
ee -> do_init_per_testcase(TC, Config);
ce -> Config
end;
init_per_testcase(TC, Config) ->
do_init_per_testcase(TC, Config).
end_per_testcase(_TC, Config) ->
case ?config(cluster, Config) of
undefined -> ok;
Cluster -> emqx_cth_cluster:stop(Cluster)
end.
t_export_backup(Config) ->
Auth = ?config(auth, Config),
export_test(?NODE1_PORT, Auth),
export_test(?NODE2_PORT, Auth),
export_test(?NODE3_PORT, Auth).
t_delete_backup(Config) ->
test_file_op(delete, Config).
t_get_backup(Config) ->
test_file_op(get, Config).
t_list_backups(Config) ->
Auth = ?config(auth, Config),
[{ok, _} = export_backup(?NODE1_PORT, Auth) || _ <- lists:seq(1, 10)],
[{ok, _} = export_backup(?NODE2_PORT, Auth) || _ <- lists:seq(1, 10)],
{ok, RespBody} = list_backups(?NODE1_PORT, Auth, <<"1">>, <<"100">>),
#{<<"data">> := Data, <<"meta">> := _} = emqx_utils_json:decode(RespBody),
?assertEqual(20, length(Data)),
{ok, EmptyRespBody} = list_backups(?NODE2_PORT, Auth, <<"2">>, <<"100">>),
#{<<"data">> := EmptyData, <<"meta">> := _} = emqx_utils_json:decode(EmptyRespBody),
?assertEqual(0, length(EmptyData)),
{ok, RespBodyP1} = list_backups(?NODE3_PORT, Auth, <<"1">>, <<"10">>),
{ok, RespBodyP2} = list_backups(?NODE3_PORT, Auth, <<"2">>, <<"10">>),
{ok, RespBodyP3} = list_backups(?NODE3_PORT, Auth, <<"3">>, <<"10">>),
#{<<"data">> := DataP1, <<"meta">> := _} = emqx_utils_json:decode(RespBodyP1),
?assertEqual(10, length(DataP1)),
#{<<"data">> := DataP2, <<"meta">> := _} = emqx_utils_json:decode(RespBodyP2),
?assertEqual(10, length(DataP2)),
#{<<"data">> := DataP3, <<"meta">> := _} = emqx_utils_json:decode(RespBodyP3),
?assertEqual(0, length(DataP3)),
?assertEqual(Data, DataP1 ++ DataP2).
t_upload_ce_backup(Config) ->
upload_backup_test(Config, ?UPLOAD_CE_BACKUP).
t_upload_ee_backup(Config) ->
case emqx_release:edition() of
ee -> upload_backup_test(Config, ?UPLOAD_EE_BACKUP);
ce -> ok
end.
t_import_ce_backup(Config) ->
import_backup_test(Config, ?UPLOAD_CE_BACKUP).
t_import_ee_backup(Config) ->
case emqx_release:edition() of
ee -> import_backup_test(Config, ?UPLOAD_EE_BACKUP);
ce -> ok
end.
do_init_per_testcase(TC, Config) ->
Cluster = [Core1, _Core2, Repl] = cluster(TC, Config),
Auth = auth_header(Core1),
ok = wait_for_auth_replication(Repl),
[{auth, Auth}, {cluster, Cluster} | Config].
test_file_op(Method, Config) ->
Auth = ?config(auth, Config),
{ok, Node1Resp} = export_backup(?NODE1_PORT, Auth),
{ok, Node2Resp} = export_backup(?NODE2_PORT, Auth),
{ok, Node3Resp} = export_backup(?NODE3_PORT, Auth),
ParsedResps = [emqx_utils_json:decode(R) || R <- [Node1Resp, Node2Resp, Node3Resp]],
[Node1Parsed, Node2Parsed, Node3Parsed] = ParsedResps,
%% node param is not set in Query, expect get/delete the backup on the local node
F1 = fun() ->
backup_file_op(Method, ?NODE1_PORT, Auth, maps:get(<<"filename">>, Node1Parsed), [])
end,
?assertMatch({ok, _}, F1()),
assert_second_call(Method, F1()),
%% Node 2 must get/delete the backup on Node 3 via rpc
F2 = fun() ->
backup_file_op(
Method,
?NODE2_PORT,
Auth,
maps:get(<<"filename">>, Node3Parsed),
[{<<"node">>, maps:get(<<"node">>, Node3Parsed)}]
)
end,
?assertMatch({ok, _}, F2()),
assert_second_call(Method, F2()),
%% The same as above but nodes are switched
F3 = fun() ->
backup_file_op(
Method,
?NODE3_PORT,
Auth,
maps:get(<<"filename">>, Node2Parsed),
[{<<"node">>, maps:get(<<"node">>, Node2Parsed)}]
)
end,
?assertMatch({ok, _}, F3()),
assert_second_call(Method, F3()).
export_test(NodeApiPort, Auth) ->
{ok, RespBody} = export_backup(NodeApiPort, Auth),
#{
<<"created_at">> := _,
<<"created_at_sec">> := CreatedSec,
<<"filename">> := _,
<<"node">> := _,
<<"size">> := Size
} = emqx_utils_json:decode(RespBody),
?assert(is_integer(Size)),
?assert(is_integer(CreatedSec) andalso CreatedSec > 0).
upload_backup_test(Config, BackupName) ->
Auth = ?config(auth, Config),
UploadFile = ?backup_path(Config, BackupName),
BadImportFile = ?backup_path(Config, ?BAD_IMPORT_BACKUP),
BadUploadFile = ?backup_path(Config, ?BAD_UPLOAD_BACKUP),
?assertEqual(ok, upload_backup(?NODE3_PORT, Auth, UploadFile)),
%% This file was specially forged to pass upload validation bat fail on import
?assertEqual(ok, upload_backup(?NODE2_PORT, Auth, BadImportFile)),
?assertEqual({error, bad_request}, upload_backup(?NODE1_PORT, Auth, BadUploadFile)).
import_backup_test(Config, BackupName) ->
Auth = ?config(auth, Config),
UploadFile = ?backup_path(Config, BackupName),
BadImportFile = ?backup_path(Config, ?BAD_IMPORT_BACKUP),
?assertEqual(ok, upload_backup(?NODE3_PORT, Auth, UploadFile)),
%% This file was specially forged to pass upload validation bat fail on import
?assertEqual(ok, upload_backup(?NODE2_PORT, Auth, BadImportFile)),
%% Replicant node must be able to import the file by doing rpc to a core node
?assertMatch({ok, _}, import_backup(?NODE3_PORT, Auth, BackupName)),
[N1, N2, N3] = ?config(cluster, Config),
?assertMatch({ok, _}, import_backup(?NODE3_PORT, Auth, BackupName)),
?assertMatch({ok, _}, import_backup(?NODE1_PORT, Auth, BackupName, N3)),
%% Now this node must also have the file locally
?assertMatch({ok, _}, import_backup(?NODE1_PORT, Auth, BackupName, N1)),
?assertMatch({error, {_, 400, _}}, import_backup(?NODE2_PORT, Auth, ?BAD_IMPORT_BACKUP, N2)).
assert_second_call(get, Res) ->
?assertMatch({ok, _}, Res);
assert_second_call(delete, Res) ->
?assertMatch({error, {_, 404, _}}, Res).
export_backup(NodeApiPort, Auth) ->
Path = ["data", "export"],
request(post, NodeApiPort, Path, Auth).
import_backup(NodeApiPort, Auth, BackupName) ->
import_backup(NodeApiPort, Auth, BackupName, undefined).
import_backup(NodeApiPort, Auth, BackupName, Node) ->
Path = ["data", "import"],
Body = #{<<"filename">> => unicode:characters_to_binary(BackupName)},
Body1 =
case Node of
undefined -> Body;
_ -> Body#{<<"node">> => Node}
end,
request(post, NodeApiPort, Path, Body1, Auth).
list_backups(NodeApiPort, Auth, Page, Limit) ->
Path = ["data", "files"],
request(get, NodeApiPort, Path, [{<<"page">>, Page}, {<<"limit">>, Limit}], [], Auth).
backup_file_op(Method, NodeApiPort, Auth, BackupName, QueryList) ->
Path = ["data", "files", BackupName],
request(Method, NodeApiPort, Path, QueryList, [], Auth).
upload_backup(NodeApiPort, Auth, BackupFilePath) ->
Path = emqx_mgmt_api_test_util:api_path(?api_base_url(NodeApiPort), ["data", "files"]),
Res = emqx_mgmt_api_test_util:upload_request(
Path,
BackupFilePath,
"filename",
<<"application/octet-stream">>,
[],
Auth
),
case Res of
{ok, {{"HTTP/1.1", 204, _}, _Headers, _}} ->
ok;
{ok, {{"HTTP/1.1", 400, _}, _Headers, _} = Resp} ->
ct:pal("Backup upload failed: ~p", [Resp]),
{error, bad_request};
Err ->
Err
end.
request(Method, NodePort, PathParts, Auth) ->
request(Method, NodePort, PathParts, [], [], Auth).
request(Method, NodePort, PathParts, Body, Auth) ->
request(Method, NodePort, PathParts, [], Body, Auth).
request(Method, NodePort, PathParts, QueryList, Body, Auth) ->
Path = emqx_mgmt_api_test_util:api_path(?api_base_url(NodePort), PathParts),
Query = unicode:characters_to_list(uri_string:compose_query(QueryList)),
emqx_mgmt_api_test_util:request_api(Method, Path, Query, Auth, Body).
cluster(TC, Config) ->
Nodes = emqx_cth_cluster:start(
[
{api_data_backup_core1, #{role => core, apps => apps_spec(18085, TC)}},
{api_data_backup_core2, #{role => core, apps => apps_spec(18086, TC)}},
{api_data_backup_replicant, #{role => replicant, apps => apps_spec(18087, TC)}}
],
#{work_dir => emqx_cth_suite:work_dir(TC, Config)}
),
Nodes.
auth_header(Node) ->
{ok, API} = erpc:call(Node, emqx_common_test_http, create_default_app, []),
emqx_common_test_http:auth_header(API).
wait_for_auth_replication(ReplNode) ->
wait_for_auth_replication(ReplNode, 100).
wait_for_auth_replication(ReplNode, 0) ->
{error, {ReplNode, auth_not_ready}};
wait_for_auth_replication(ReplNode, Retries) ->
try
{_Header, _Val} = erpc:call(ReplNode, emqx_common_test_http, default_auth_header, []),
ok
catch
_:_ ->
timer:sleep(1),
wait_for_auth_replication(ReplNode, Retries - 1)
end.
apps_spec(APIPort, TC) ->
common_apps_spec() ++
app_spec_dashboard(APIPort) ++
upload_import_apps_spec(TC).
common_apps_spec() ->
[
emqx,
emqx_conf,
emqx_management
].
app_spec_dashboard(APIPort) ->
[
{emqx_dashboard, #{
config =>
#{
dashboard =>
#{
listeners =>
#{
http =>
#{bind => APIPort}
},
default_username => "",
default_password => ""
}
}
}}
].
upload_import_apps_spec(TC) when
TC =:= t_upload_ee_backup;
TC =:= t_import_ee_backup;
TC =:= t_upload_ce_backup;
TC =:= t_import_ce_backup
->
[
emqx_auth,
emqx_auth_http,
emqx_auth_jwt,
emqx_auth_mnesia,
emqx_rule_engine,
emqx_modules,
emqx_bridge
];
upload_import_apps_spec(_TC) ->
[].

View File

@ -0,0 +1 @@
Implemented HTTP API for configuration and user data import/export.