diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 0e0957adf..d597646d6 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -13,6 +13,7 @@ {emqx_dashboard,1}. {emqx_delayed,1}. {emqx_exhook,1}. +{emqx_ft_storage_exporter_fs,1}. {emqx_ft_storage_fs,1}. {emqx_ft_storage_fs_reader,1}. {emqx_gateway_api_listeners,1}. diff --git a/apps/emqx_dashboard/test/emqx_dashboard_api_test_helpers.erl b/apps/emqx_dashboard/test/emqx_dashboard_api_test_helpers.erl index 8df130897..f70f464b3 100644 --- a/apps/emqx_dashboard/test/emqx_dashboard_api_test_helpers.erl +++ b/apps/emqx_dashboard/test/emqx_dashboard_api_test_helpers.erl @@ -25,11 +25,12 @@ request/4, multipart_formdata_request/3, multipart_formdata_request/4, + host/0, uri/0, uri/1 ]). --define(HOST, "http://127.0.0.1:18083/"). +-define(HOST, "http://127.0.0.1:18083"). -define(API_VERSION, "v5"). -define(BASE_PATH, "api"). @@ -94,10 +95,13 @@ request(Username, Method, Url, Body) -> {error, Reason} end. +host() -> + ?HOST. + uri() -> uri([]). uri(Parts) when is_list(Parts) -> NParts = [E || E <- Parts], - ?HOST ++ to_list(filename:join([?BASE_PATH, ?API_VERSION | NParts])). + host() ++ "/" ++ to_list(filename:join([?BASE_PATH, ?API_VERSION | NParts])). auth_header(Username) -> Password = <<"public">>, diff --git a/apps/emqx_ft/i18n/emqx_ft_api_i18n.conf b/apps/emqx_ft/i18n/emqx_ft_api_i18n.conf index 0bda935f8..6ac01ea23 100644 --- a/apps/emqx_ft/i18n/emqx_ft_api_i18n.conf +++ b/apps/emqx_ft/i18n/emqx_ft_api_i18n.conf @@ -11,6 +11,10 @@ emqx_ft_api { } } +} + +emqx_ft_storage_exporter_fs_api { + file_get { desc { en: "Get a file by its id." diff --git a/apps/emqx_ft/src/emqx_ft_api.erl b/apps/emqx_ft/src/emqx_ft_api.erl index 143d629de..eeb047b16 100644 --- a/apps/emqx_ft/src/emqx_ft_api.erl +++ b/apps/emqx_ft/src/emqx_ft_api.erl @@ -30,18 +30,12 @@ ]). -export([ - fields/1, roots/0 ]). %% API callbacks -export([ - '/file_transfer/files'/2, - '/file_transfer/file'/2 -]). - --export([ - mk_file_uri/3 + '/file_transfer/files'/2 ]). -import(hoconsc, [mk/2, ref/1, ref/2]). @@ -53,8 +47,7 @@ api_spec() -> paths() -> [ - "/file_transfer/files", - "/file_transfer/file" + "/file_transfer/files" ]. schema("/file_transfer/files") -> @@ -71,31 +64,6 @@ schema("/file_transfer/files") -> ) } } - }; -schema("/file_transfer/file") -> - % TODO - % This is conceptually another API, because this logic is inherent only to the - % local filesystem exporter. Ideally, we won't even publish it if `emqx_ft` is - % configured with another exporter. Accordingly, things that look too specific - % for this module (i.e. `parse_filepath/1`) should go away in another API module. - #{ - 'operationId' => '/file_transfer/file', - get => #{ - tags => [<<"file_transfer">>], - summary => <<"Download a particular file">>, - description => ?DESC("file_get"), - parameters => [ - ref(file_node), - ref(file_ref) - ], - responses => #{ - 200 => <<"Operation success">>, - 404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Not found">>), - 503 => emqx_dashboard_swagger:error_codes( - ['SERVICE_UNAVAILABLE'], <<"Service unavailable">> - ) - } - } }. '/file_transfer/files'(get, #{}) -> @@ -106,76 +74,11 @@ schema("/file_transfer/file") -> {503, error_msg('SERVICE_UNAVAILABLE', <<"Service unavailable">>)} end. -'/file_transfer/file'(get, #{query_string := Query}) -> - try - Node = parse_node(maps:get(<<"node">>, Query)), - Filepath = parse_filepath(maps:get(<<"fileref">>, Query)), - case emqx_ft_storage_fs_proto_v1:read_export_file(Node, Filepath, self()) of - {ok, ReaderPid} -> - FileData = emqx_ft_storage_fs_reader:table(ReaderPid), - {200, - #{ - <<"content-type">> => <<"application/data">>, - <<"content-disposition">> => <<"attachment">> - }, - FileData}; - {error, enoent} -> - {404, error_msg('NOT_FOUND', <<"Not found">>)}; - {error, Error} -> - ?SLOG(warning, #{msg => "get_ready_transfer_fail", error => Error}), - {503, error_msg('SERVICE_UNAVAILABLE', <<"Service unavailable">>)} - end - catch - throw:{invalid, Param} -> - {404, - error_msg( - 'NOT_FOUND', - iolist_to_binary(["Invalid query parameter: ", Param]) - )}; - error:{erpc, noconnection} -> - {503, error_msg('SERVICE_UNAVAILABLE', <<"Service unavailable">>)} - end. - error_msg(Code, Msg) -> #{code => Code, message => emqx_misc:readable_error_msg(Msg)}. --spec fields(hocon_schema:name()) -> hocon_schema:fields(). -fields(file_ref) -> - [ - {fileref, - hoconsc:mk(binary(), #{ - in => query, - desc => <<"File reference">>, - example => <<"file1">>, - required => true - })} - ]; -fields(file_node) -> - [ - {node, - hoconsc:mk(binary(), #{ - in => query, - desc => <<"Node under which the file is located">>, - example => atom_to_list(node()), - required => true - })} - ]. - roots() -> - [ - file_node, - file_ref - ]. - -mk_file_uri(_Options, Node, Filepath) -> - % TODO: qualify with `?BASE_PATH` - [ - "/file_transfer/file?", - uri_string:compose_query([ - {"node", atom_to_list(Node)}, - {"fileref", Filepath} - ]) - ]. + []. %%-------------------------------------------------------------------- %% Helpers @@ -207,33 +110,3 @@ format_export_info( format_timestamp(Timestamp) -> iolist_to_binary(calendar:system_time_to_rfc3339(Timestamp, [{unit, second}])). - -parse_node(NodeBin) -> - case emqx_misc:safe_to_existing_atom(NodeBin) of - {ok, Node} -> - Node; - {error, _} -> - throw({invalid, NodeBin}) - end. - -parse_filepath(PathBin) -> - case filename:pathtype(PathBin) of - relative -> - ok; - absolute -> - throw({invalid, PathBin}) - end, - PathComponents = filename:split(PathBin), - case lists:any(fun is_special_component/1, PathComponents) of - false -> - filename:join(PathComponents); - true -> - throw({invalid, PathBin}) - end. - -is_special_component(<<".", _/binary>>) -> - true; -is_special_component([$. | _]) -> - true; -is_special_component(_) -> - false. diff --git a/apps/emqx_ft/src/emqx_ft_assembler.erl b/apps/emqx_ft/src/emqx_ft_assembler.erl index 5489a232a..a86116b02 100644 --- a/apps/emqx_ft/src/emqx_ft_assembler.erl +++ b/apps/emqx_ft/src/emqx_ft_assembler.erl @@ -106,11 +106,15 @@ handle_event(internal, _, {list_remote_fragments, Nodes}, St) -> {error, _} = Error -> {stop, {shutdown, Error}} end; -handle_event(internal, _, start_assembling, St = #st{assembly = Asm}) -> - Filemeta = emqx_ft_assembly:filemeta(Asm), - Coverage = emqx_ft_assembly:coverage(Asm), +handle_event(internal, _, start_assembling, St = #st{}) -> + Filemeta = emqx_ft_assembly:filemeta(St#st.assembly), + Coverage = emqx_ft_assembly:coverage(St#st.assembly), % TODO: better error handling - {ok, Export} = export_start(Filemeta, St), + {ok, Export} = emqx_ft_storage_exporter:start_export( + St#st.storage, + St#st.transfer, + Filemeta + ), {next_state, {assemble, Coverage}, St#st{export = Export}, ?internal([])}; handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #st{}) -> % TODO @@ -119,17 +123,17 @@ handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #st{}) -> % TODO: pipelining % TODO: better error handling {ok, Content} = pread(Node, Segment, St), - {ok, NExport} = export_write(St#st.export, Content), + {ok, NExport} = emqx_ft_storage_exporter:write(St#st.export, Content), {next_state, {assemble, Rest}, St#st{export = NExport}, ?internal([])}; handle_event(internal, _, {assemble, []}, St = #st{}) -> {next_state, complete, St, ?internal([])}; handle_event(internal, _, complete, St = #st{}) -> - Result = export_complete(St#st.export), + Result = emqx_ft_storage_exporter:complete(St#st.export), ok = maybe_garbage_collect(Result, St), {stop, {shutdown, Result}, St#st{export = undefined}}. terminate(_Reason, _StateName, #st{export = Export}) -> - Export /= undefined andalso export_discard(Export). + Export /= undefined andalso emqx_ft_storage_exporter:discard(Export). pread(Node, Segment, St) when Node =:= node() -> emqx_ft_storage_fs:pread(St#st.storage, St#st.transfer, Segment, 0, segsize(Segment)); @@ -138,31 +142,6 @@ pread(Node, Segment, St) -> %% -export_start(Filemeta, #st{storage = Storage, transfer = Transfer}) -> - {ExporterMod, Exporter} = emqx_ft_storage_fs:exporter(Storage), - case ExporterMod:start_export(Exporter, Transfer, Filemeta) of - {ok, Export} -> - {ok, {ExporterMod, Export}}; - {error, _} = Error -> - Error - end. - -export_write({ExporterMod, Export}, Content) -> - case ExporterMod:write(Export, Content) of - {ok, ExportNext} -> - {ok, {ExporterMod, ExportNext}}; - {error, _} = Error -> - Error - end. - -export_complete({ExporterMod, Export}) -> - ExporterMod:complete(Export). - -export_discard({ExporterMod, Export}) -> - ExporterMod:discard(Export). - -%% - maybe_garbage_collect(ok, #st{storage = Storage, transfer = Transfer, assembly = Asm}) -> Nodes = emqx_ft_assembly:nodes(Asm), emqx_ft_storage_fs_gc:collect(Storage, Transfer, Nodes); diff --git a/apps/emqx_ft/src/emqx_ft_storage.erl b/apps/emqx_ft/src/emqx_ft_storage.erl index 1d1c08ce9..05a053fbf 100644 --- a/apps/emqx_ft/src/emqx_ft_storage.erl +++ b/apps/emqx_ft/src/emqx_ft_storage.erl @@ -26,6 +26,7 @@ exports/0, + with_storage_type/2, with_storage_type/3 ] ). @@ -33,7 +34,9 @@ -type storage() :: emqx_config:config(). -export_type([assemble_callback/0]). +-export_type([export_info/0]). -export_type([export_data/0]). +-export_type([reader/0]). -type assemble_callback() :: fun((ok | {error, term()}) -> any()). @@ -46,6 +49,7 @@ }. -type export_data() :: binary() | qlc:query_handle(). +-type reader() :: pid(). %%-------------------------------------------------------------------- %% Behaviour @@ -106,11 +110,17 @@ exports() -> Mod = mod(), Mod:exports(storage()). --spec with_storage_type(atom(), atom(), list(term())) -> any(). +-spec with_storage_type(atom(), atom() | function()) -> any(). +with_storage_type(Type, Fun) -> + with_storage_type(Type, Fun, []). + +-spec with_storage_type(atom(), atom() | function(), list(term())) -> any(). with_storage_type(Type, Fun, Args) -> Storage = storage(), case Storage of - #{type := Type} -> + #{type := Type} when is_function(Fun) -> + apply(Fun, [Storage | Args]); + #{type := Type} when is_atom(Fun) -> Mod = mod(Storage), apply(Mod, Fun, [Storage | Args]); disabled -> diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter.erl new file mode 100644 index 000000000..97e260e3a --- /dev/null +++ b/apps/emqx_ft/src/emqx_ft_storage_exporter.erl @@ -0,0 +1,97 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% Filesystem storage exporter +%% +%% This is conceptually a part of the Filesystem storage backend that defines +%% how and where complete tranfers are assembled into files and stored. + +-module(emqx_ft_storage_exporter). + +%% Export API +-export([start_export/3]). +-export([write/2]). +-export([complete/1]). +-export([discard/1]). + +%% Listing API +-export([list/1]). +% TODO +% -export([list/2]). + +-export([exporter/1]). + +-type storage() :: emxt_ft_storage_fs:storage(). +-type transfer() :: emqx_ft:transfer(). +-type filemeta() :: emqx_ft:filemeta(). + +-type options() :: map(). +-type export() :: term(). + +-callback start_export(options(), transfer(), filemeta()) -> + {ok, export()} | {error, _Reason}. + +-callback write(ExportSt :: export(), iodata()) -> + {ok, ExportSt :: export()} | {error, _Reason}. + +-callback complete(ExportSt :: export()) -> + ok | {error, _Reason}. + +-callback discard(ExportSt :: export()) -> + ok | {error, _Reason}. + +-callback list(options()) -> + {ok, [emqx_ft_storage:export_info()]} | {error, _Reason}. + +%% + +start_export(Storage, Transfer, Filemeta) -> + {ExporterMod, Exporter} = exporter(Storage), + case ExporterMod:start_export(Exporter, Transfer, Filemeta) of + {ok, ExportSt} -> + {ok, {ExporterMod, ExportSt}}; + {error, _} = Error -> + Error + end. + +write({ExporterMod, ExportSt}, Content) -> + case ExporterMod:write(ExportSt, Content) of + {ok, ExportStNext} -> + {ok, {ExporterMod, ExportStNext}}; + {error, _} = Error -> + Error + end. + +complete({ExporterMod, ExportSt}) -> + ExporterMod:complete(ExportSt). + +discard({ExporterMod, ExportSt}) -> + ExporterMod:discard(ExportSt). + +%% + +list(Storage) -> + {ExporterMod, ExporterOpts} = exporter(Storage), + ExporterMod:list(ExporterOpts). + +%% + +-spec exporter(storage()) -> {module(), _ExporterOptions}. +exporter(Storage) -> + case maps:get(exporter, Storage) of + #{type := local} = Options -> + {emqx_ft_storage_exporter_fs, Options} + end. diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl index 6456e2ba5..7cf0ef6d5 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl @@ -30,6 +30,7 @@ -export([start_reader/3]). -export([list/1]). +% TODO % -export([list/2]). -export_type([export/0]). @@ -228,7 +229,7 @@ mk_exportinfo(Options, Filename, RelFilepath, Transfer, Fileinfo) -> #{ transfer => Transfer, name => Filename, - uri => mk_export_uri(Options, RelFilepath), + uri => mk_export_uri(RelFilepath), timestamp => Fileinfo#file_info.mtime, size => Fileinfo#file_info.size, path => filename:join(Root, RelFilepath) @@ -247,15 +248,14 @@ try_read_filemeta(Filepath, Info) -> Info end. -mk_export_uri(Options, RelFilepath) -> - % emqx_ft_storage_exporter_fs_api:mk_export_uri(Options, RelFilepath). - emqx_ft_api:mk_file_uri(Options, node(), RelFilepath). +mk_export_uri(RelFilepath) -> + emqx_ft_storage_exporter_fs_api:mk_export_uri(node(), RelFilepath). -spec start_reader(options(), file:name(), _Caller :: pid()) -> {ok, reader()} | {error, enoent}. -start_reader(Options, Filepath, CallerPid) -> +start_reader(Options, RelFilepath, CallerPid) -> Root = get_storage_root(Options), - case filelib:safe_relative_path(Filepath, Root) of + case filelib:safe_relative_path(RelFilepath, Root) of SafeFilepath when SafeFilepath /= unsafe -> AbsFilepath = filename:join(Root, SafeFilepath), emqx_ft_storage_fs_reader:start_supervised(CallerPid, AbsFilepath); @@ -269,7 +269,7 @@ start_reader(Options, Filepath, CallerPid) -> {ok, [exportinfo(), ...]} | {error, file_error()}. list(_Options) -> Nodes = mria_mnesia:running_nodes(), - Results = emqx_ft_storage_fs_proto_v1:list_exports(Nodes), + Results = emqx_ft_storage_exporter_fs_proto_v1:list_exports(Nodes), {GoodResults, BadResults} = lists:partition( fun ({_Node, {ok, {ok, _}}}) -> true; diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter_fs_api.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs_api.erl new file mode 100644 index 000000000..b7ad86436 --- /dev/null +++ b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs_api.erl @@ -0,0 +1,180 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ft_storage_exporter_fs_api). + +-behaviour(minirest_api). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("emqx/include/logger.hrl"). + +%% Swagger specs from hocon schema +-export([ + api_spec/0, + paths/0, + schema/1, + namespace/0 +]). + +-export([ + fields/1, + roots/0 +]). + +%% API callbacks +-export([ + '/file_transfer/file'/2 +]). + +-export([mk_export_uri/2]). + +%% + +namespace() -> "file_transfer". + +api_spec() -> + emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). + +paths() -> + [ + "/file_transfer/file" + ]. + +schema("/file_transfer/file") -> + #{ + 'operationId' => '/file_transfer/file', + get => #{ + tags => [<<"file_transfer">>], + summary => <<"Download a particular file">>, + description => ?DESC("file_get"), + parameters => [ + hoconsc:ref(file_node), + hoconsc:ref(file_ref) + ], + responses => #{ + 200 => <<"Operation success">>, + 404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Not found">>), + 503 => emqx_dashboard_swagger:error_codes( + ['SERVICE_UNAVAILABLE'], <<"Service unavailable">> + ) + } + } + }. + +roots() -> + [ + file_node, + file_ref + ]. + +-spec fields(hocon_schema:name()) -> hocon_schema:fields(). +fields(file_ref) -> + [ + {fileref, + hoconsc:mk(binary(), #{ + in => query, + desc => <<"File reference">>, + example => <<"file1">>, + required => true + })} + ]; +fields(file_node) -> + [ + {node, + hoconsc:mk(binary(), #{ + in => query, + desc => <<"Node under which the file is located">>, + example => atom_to_list(node()), + required => true + })} + ]. + +'/file_transfer/file'(get, #{query_string := Query}) -> + try + Node = parse_node(maps:get(<<"node">>, Query)), + Filepath = parse_filepath(maps:get(<<"fileref">>, Query)), + case emqx_ft_storage_exporter_fs_proto_v1:read_export_file(Node, Filepath, self()) of + {ok, ReaderPid} -> + FileData = emqx_ft_storage_fs_reader:table(ReaderPid), + {200, + #{ + <<"content-type">> => <<"application/data">>, + <<"content-disposition">> => <<"attachment">> + }, + FileData}; + {error, enoent} -> + {404, error_msg('NOT_FOUND', <<"Not found">>)}; + {error, Error} -> + ?SLOG(warning, #{msg => "get_ready_transfer_fail", error => Error}), + {503, error_msg('SERVICE_UNAVAILABLE', <<"Service unavailable">>)} + end + catch + throw:{invalid, Param} -> + {404, + error_msg( + 'NOT_FOUND', + iolist_to_binary(["Invalid query parameter: ", Param]) + )}; + error:{erpc, noconnection} -> + {503, error_msg('SERVICE_UNAVAILABLE', <<"Service unavailable">>)} + end. + +error_msg(Code, Msg) -> + #{code => Code, message => emqx_misc:readable_error_msg(Msg)}. + +-spec mk_export_uri(node(), file:name()) -> + uri_string:uri_string(). +mk_export_uri(Node, Filepath) -> + emqx_dashboard_swagger:relative_uri([ + "/file_transfer/file?", + uri_string:compose_query([ + {"node", atom_to_list(Node)}, + {"fileref", Filepath} + ]) + ]). + +%% + +parse_node(NodeBin) -> + case emqx_misc:safe_to_existing_atom(NodeBin) of + {ok, Node} -> + Node; + {error, _} -> + throw({invalid, NodeBin}) + end. + +parse_filepath(PathBin) -> + case filename:pathtype(PathBin) of + relative -> + ok; + absolute -> + throw({invalid, PathBin}) + end, + PathComponents = filename:split(PathBin), + case lists:any(fun is_special_component/1, PathComponents) of + false -> + filename:join(PathComponents); + true -> + throw({invalid, PathBin}) + end. + +is_special_component(<<".", _/binary>>) -> + true; +is_special_component([$. | _]) -> + true; +is_special_component(_) -> + false. diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter_fs_proxy.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs_proxy.erl new file mode 100644 index 000000000..bdb64f543 --- /dev/null +++ b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs_proxy.erl @@ -0,0 +1,46 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% This methods are called via rpc by `emqx_ft_storage_exporter_fs` +%% They populate the call with actual storage which may be configured differently +%% on a concrete node. + +-module(emqx_ft_storage_exporter_fs_proxy). + +-export([ + list_exports_local/0, + read_export_file_local/2 +]). + +list_exports_local() -> + emqx_ft_storage:with_storage_type(local, fun(Storage) -> + case emqx_ft_storage_exporter:exporter(Storage) of + {emqx_ft_storage_exporter_fs, Options} -> + emqx_ft_storage_exporter_fs:list_local(Options); + InvalidExporter -> + {error, {invalid_exporter, InvalidExporter}} + end + end). + +read_export_file_local(Filepath, CallerPid) -> + emqx_ft_storage:with_storage_type(local, fun(Storage) -> + case emqx_ft_storage_exporter:exporter(Storage) of + {emqx_ft_storage_exporter_fs, Options} -> + emqx_ft_storage_exporter_fs:start_reader(Options, Filepath, CallerPid); + InvalidExporter -> + {error, {invalid_exporter, InvalidExporter}} + end + end). diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_fs.erl index 6c5b6962c..b328cfd2b 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs.erl @@ -44,12 +44,7 @@ -export([get_subdir/2]). -export([get_subdir/3]). --export([exporter/1]). - -% Exporter-specific API -export([exports/1]). --export([exports_local/1]). --export([exports_local/2]). -export_type([storage/0]). -export_type([filefrag/1]). @@ -214,24 +209,8 @@ assemble(Storage, Transfer, Size) -> %% --spec exporter(storage()) -> {module(), _ExporterOptions}. -exporter(Storage) -> - case maps:get(exporter, Storage) of - #{type := local} = Options -> - {emqx_ft_storage_exporter_fs, Options} - end. - exports(Storage) -> - {ExporterMod, ExporterOpts} = exporter(Storage), - ExporterMod:list(ExporterOpts). - -exports_local(Storage) -> - {ExporterMod, ExporterOpts} = exporter(Storage), - ExporterMod:list_local(ExporterOpts). - -exports_local(Storage, Transfer) -> - {ExporterMod, ExporterOpts} = exporter(Storage), - ExporterMod:list_local(ExporterOpts, Transfer). + emqx_ft_storage_exporter:list(Storage). %% diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs_proxy.erl b/apps/emqx_ft/src/emqx_ft_storage_fs_proxy.erl index dbd0cd6dc..597f84091 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs_proxy.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs_proxy.erl @@ -22,9 +22,7 @@ -export([ list_local/2, - pread_local/4, - list_exports_local/0, - read_export_file_local/2 + pread_local/4 ]). list_local(Transfer, What) -> @@ -32,19 +30,3 @@ list_local(Transfer, What) -> pread_local(Transfer, Frag, Offset, Size) -> emqx_ft_storage:with_storage_type(local, pread, [Transfer, Frag, Offset, Size]). - -list_exports_local() -> - case emqx_ft_storage:with_storage_type(local, exporter, []) of - {emqx_ft_storage_exporter_fs, Options} -> - emqx_ft_storage_exporter_fs:list_local(Options); - InvalidExporter -> - {error, {invalid_exporter, InvalidExporter}} - end. - -read_export_file_local(Filepath, CallerPid) -> - case emqx_ft_storage:with_storage_type(local, exporter, []) of - {emqx_ft_storage_exporter_fs, Options} -> - emqx_ft_storage_exporter_fs:start_reader(Options, Filepath, CallerPid); - InvalidExporter -> - {error, {invalid_exporter, InvalidExporter}} - end. diff --git a/apps/emqx_ft/src/proto/emqx_ft_storage_exporter_fs_proto_v1.erl b/apps/emqx_ft/src/proto/emqx_ft_storage_exporter_fs_proto_v1.erl new file mode 100644 index 000000000..ee7a857be --- /dev/null +++ b/apps/emqx_ft/src/proto/emqx_ft_storage_exporter_fs_proto_v1.erl @@ -0,0 +1,51 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ft_storage_exporter_fs_proto_v1). + +-behaviour(emqx_bpapi). + +-export([introduced_in/0]). + +-export([list_exports/1]). +-export([read_export_file/3]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.0.17". + +-spec list_exports([node()]) -> + emqx_rpc:erpc_multicall([emqx_ft_storage:export_info()]). +list_exports(Nodes) -> + erpc:multicall( + Nodes, + emqx_ft_storage_exporter_fs_proxy, + list_exports_local, + [] + ). + +-spec read_export_file(node(), file:name(), pid()) -> + {ok, emqx_ft_storage:reader()} + | {error, term()} + | no_return(). +read_export_file(Node, Filepath, CallerPid) -> + erpc:call( + Node, + emqx_ft_storage_exporter_fs_proxy, + read_export_file_local, + [Filepath, CallerPid] + ). diff --git a/apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl b/apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl index f152928fe..3b91684e6 100644 --- a/apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl +++ b/apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl @@ -23,10 +23,6 @@ -export([multilist/3]). -export([pread/5]). -%% TODO: These should be defined in a separate BPAPI --export([list_exports/1]). --export([read_export_file/3]). - -type offset() :: emqx_ft:offset(). -type transfer() :: emqx_ft:transfer(). -type filefrag() :: emqx_ft_storage_fs:filefrag(). @@ -45,17 +41,3 @@ multilist(Nodes, Transfer, What) -> {ok, [filefrag()]} | {error, term()} | no_return(). pread(Node, Transfer, Frag, Offset, Size) -> erpc:call(Node, emqx_ft_storage_fs_proxy, pread_local, [Transfer, Frag, Offset, Size]). - -%% - --spec list_exports([node()]) -> - emqx_rpc:erpc_multicall([emqx_ft_storage:export_info()]). -list_exports(Nodes) -> - erpc:multicall(Nodes, emqx_ft_storage_fs_proxy, list_exports_local, []). - --spec read_export_file(node(), file:name(), pid()) -> - {ok, emqx_ft_storage:export_data()} - | {error, term()} - | no_return(). -read_export_file(Node, Filepath, CallerPid) -> - erpc:call(Node, emqx_ft_storage_fs_proxy, read_export_file_local, [Filepath, CallerPid]). diff --git a/apps/emqx_ft/test/emqx_ft_api_SUITE.erl b/apps/emqx_ft/test/emqx_ft_api_SUITE.erl index 56622f3cd..a1bef2a2a 100644 --- a/apps/emqx_ft/test/emqx_ft_api_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_api_SUITE.erl @@ -24,7 +24,7 @@ -include_lib("emqx/include/asserts.hrl"). --import(emqx_mgmt_api_test_util, [uri/1]). +-import(emqx_dashboard_api_test_helpers, [host/0, uri/1]). all() -> emqx_common_test_helpers:all(?MODULE). @@ -111,7 +111,7 @@ t_download_transfer(Config) -> {ok, 200, #{<<"files">> := [File]}} = request(get, uri(["file_transfer", "files"]), fun json/1), - {ok, 200, Response} = request(get, uri([]) ++ maps:get(<<"uri">>, File)), + {ok, 200, Response} = request(get, host() ++ maps:get(<<"uri">>, File)), ?assertEqual( <<"data">>, diff --git a/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl b/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl index 4c7c0f08c..7c5d14b38 100644 --- a/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl @@ -86,8 +86,7 @@ t_assemble_empty_transfer(Config) -> ), Status = complete_assemble(Storage, Transfer, 0), ?assertEqual({shutdown, ok}, Status), - {ok, [_Result = #{size := _Size = 0}]} = - emqx_ft_storage_fs:exports_local(Storage, Transfer), + {ok, [_Result = #{size := _Size = 0}]} = list_exports(Config, Transfer), % ?assertEqual( % {error, eof}, % emqx_ft_storage_fs:pread(Storage, Transfer, Result, 0, Size) @@ -138,10 +137,9 @@ t_assemble_complete_local_transfer(Config) -> meta := #{} } ]}, - emqx_ft_storage_fs:exports_local(Storage, Transfer) + list_exports(Config, Transfer) ), - {ok, [#{path := AssemblyFilename}]} = - emqx_ft_storage_fs:exports_local(Storage, Transfer), + {ok, [#{path := AssemblyFilename}]} = list_exports(Config, Transfer), ?assertMatch( {ok, #file_info{type = regular, size = TransferSize}}, file:read_file_info(AssemblyFilename) @@ -193,8 +191,7 @@ complete_assemble(Storage, Transfer, Size, Timeout) -> %% t_list_transfers(Config) -> - Storage = storage(Config), - {ok, Exports} = emqx_ft_storage_fs:exports_local(Storage), + {ok, Exports} = list_exports(Config), ?assertMatch( [ #{ @@ -237,6 +234,17 @@ inspect_file(Filename) -> mk_fileid() -> integer_to_binary(erlang:system_time(millisecond)). +list_exports(Config) -> + {emqx_ft_storage_exporter_fs, Options} = exporter(Config), + emqx_ft_storage_exporter_fs:list_local(Options). + +list_exports(Config, Transfer) -> + {emqx_ft_storage_exporter_fs, Options} = exporter(Config), + emqx_ft_storage_exporter_fs:list_local(Options, Transfer). + +exporter(Config) -> + emqx_ft_storage_exporter:exporter(storage(Config)). + storage(Config) -> #{ type => local,