From 4132f5a5fb3127878532d436613a20026e8da7b4 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 10 Mar 2023 15:27:50 +0300 Subject: [PATCH] feat(ft): introduce exporter concept in local storage backend The exporter is responsible for keeping fully transferred and successfully assembled files. This was on the local storage itself before. This abstraction is needed to give us an ability to support S3 destinations more easily, just by swapping the storage exporter. Also implement local filesystem exporter and reimplement parts of the `emqx_ft` API on top of it. --- apps/emqx_ft/etc/emqx_ft.conf | 3 + apps/emqx_ft/i18n/emqx_ft_schema_i18n.conf | 38 +- apps/emqx_ft/src/emqx_ft.erl | 8 +- apps/emqx_ft/src/emqx_ft_api.erl | 174 ++++++--- apps/emqx_ft/src/emqx_ft_assembler.erl | 59 ++- apps/emqx_ft/src/emqx_ft_fs_util.erl | 101 +++++ apps/emqx_ft/src/emqx_ft_schema.erl | 26 +- apps/emqx_ft/src/emqx_ft_storage.erl | 35 +- .../src/emqx_ft_storage_exporter_fs.erl | 352 ++++++++++++++++++ apps/emqx_ft/src/emqx_ft_storage_fs.erl | 281 +++----------- apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl | 22 +- apps/emqx_ft/src/emqx_ft_storage_fs_proxy.erl | 22 +- .../src/proto/emqx_ft_storage_fs_proto_v1.erl | 29 +- apps/emqx_ft/test/emqx_ft_SUITE.erl | 67 ++-- apps/emqx_ft/test/emqx_ft_api_SUITE.erl | 66 ++-- apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl | 48 ++- apps/emqx_ft/test/emqx_ft_conf_SUITE.erl | 11 +- .../emqx_ft/test/emqx_ft_storage_fs_SUITE.erl | 69 +--- .../test/emqx_ft_storage_fs_gc_SUITE.erl | 12 +- apps/emqx_ft/test/emqx_ft_test_helpers.erl | 15 +- 20 files changed, 936 insertions(+), 502 deletions(-) create mode 100644 apps/emqx_ft/src/emqx_ft_fs_util.erl create mode 100644 apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl diff --git a/apps/emqx_ft/etc/emqx_ft.conf b/apps/emqx_ft/etc/emqx_ft.conf index 250dca6a9..8d921e79c 100644 --- a/apps/emqx_ft/etc/emqx_ft.conf +++ b/apps/emqx_ft/etc/emqx_ft.conf @@ -1,5 +1,8 @@ file_transfer { storage { type = local + exporter { + type = local + } } } diff --git a/apps/emqx_ft/i18n/emqx_ft_schema_i18n.conf b/apps/emqx_ft/i18n/emqx_ft_schema_i18n.conf index dd2d2a1dc..15c42dcfa 100644 --- a/apps/emqx_ft/i18n/emqx_ft_schema_i18n.conf +++ b/apps/emqx_ft/i18n/emqx_ft_schema_i18n.conf @@ -13,7 +13,7 @@ emqx_ft_schema { local_type { desc { - en: "Use local file system to store uploaded files and temporary data." + en: "Use local file system to store uploaded fragments and temporary data." zh: "使用本地文件系统来存储上传的文件和临时数据。" } label: { @@ -24,7 +24,7 @@ emqx_ft_schema { local_storage_root { desc { - en: "File system path to keep uploaded files and temporary data." + en: "File system path to keep uploaded fragments and temporary data." zh: "保存上传文件和临时数据的文件系统路径。" } label: { @@ -33,6 +33,40 @@ emqx_ft_schema { } } + local_storage_exporter { + desc { + en: "Exporter for the local file system storage backend.
" + "Exporter defines where and how fully transferred and assembled files are stored." + zh: "" + } + label: { + en: "Local Storage Exporter" + zh: "" + } + } + + local_storage_exporter_type { + desc { + en: "Type of the Exporter to use." + zh: "" + } + label: { + en: "Local Storage Exporter Type" + zh: "" + } + } + + local_storage_exporter_root { + desc { + en: "File system path to keep uploaded files." + zh: "" + } + label: { + en: "Local Filesystem Exporter Root" + zh: "" + } + } + local_storage_gc { desc { en: "Garbage collection settings for the intermediate and temporary files in the local file system." diff --git a/apps/emqx_ft/src/emqx_ft.erl b/apps/emqx_ft/src/emqx_ft.erl index 518807d9f..b7c8f0eac 100644 --- a/apps/emqx_ft/src/emqx_ft.erl +++ b/apps/emqx_ft/src/emqx_ft.erl @@ -33,7 +33,8 @@ ]). -export([ - decode_filemeta/1 + decode_filemeta/1, + encode_filemeta/1 ]). -export([on_complete/4]). @@ -114,6 +115,11 @@ decode_filemeta(Map) when is_map(Map) -> {error, {invalid_filemeta, Error}} end. +encode_filemeta(Meta = #{}) -> + % TODO: Looks like this should be hocon's responsibility. + Schema = emqx_ft_schema:schema(filemeta), + hocon_tconf:make_serializable(Schema, emqx_map_lib:binary_key_map(Meta), #{}). + %%-------------------------------------------------------------------- %% Hooks %%-------------------------------------------------------------------- diff --git a/apps/emqx_ft/src/emqx_ft_api.erl b/apps/emqx_ft/src/emqx_ft_api.erl index ddc6e761a..143d629de 100644 --- a/apps/emqx_ft/src/emqx_ft_api.erl +++ b/apps/emqx_ft/src/emqx_ft_api.erl @@ -40,6 +40,10 @@ '/file_transfer/file'/2 ]). +-export([ + mk_file_uri/3 +]). + -import(hoconsc, [mk/2, ref/1, ref/2]). namespace() -> "file_transfer". @@ -69,6 +73,11 @@ schema("/file_transfer/files") -> } }; schema("/file_transfer/file") -> + % TODO + % This is conceptually another API, because this logic is inherent only to the + % local filesystem exporter. Ideally, we won't even publish it if `emqx_ft` is + % configured with another exporter. Accordingly, things that look too specific + % for this module (i.e. `parse_filepath/1`) should go away in another API module. #{ 'operationId' => '/file_transfer/file', get => #{ @@ -77,8 +86,7 @@ schema("/file_transfer/file") -> description => ?DESC("file_get"), parameters => [ ref(file_node), - ref(file_clientid), - ref(file_id) + ref(file_ref) ], responses => #{ 200 => <<"Operation success">>, @@ -91,32 +99,40 @@ schema("/file_transfer/file") -> }. '/file_transfer/files'(get, #{}) -> - case emqx_ft_storage:ready_transfers() of + case emqx_ft_storage:exports() of {ok, Transfers} -> - FormattedTransfers = lists:map( - fun({Id, Info}) -> - #{id => Id, info => format_file_info(Info)} - end, - Transfers - ), - {200, #{<<"files">> => FormattedTransfers}}; + {200, #{<<"files">> => lists:map(fun format_export_info/1, Transfers)}}; {error, _} -> {503, error_msg('SERVICE_UNAVAILABLE', <<"Service unavailable">>)} end. '/file_transfer/file'(get, #{query_string := Query}) -> - case emqx_ft_storage:get_ready_transfer(Query) of - {ok, FileData} -> - {200, - #{ - <<"content-type">> => <<"application/data">>, - <<"content-disposition">> => <<"attachment">> - }, - FileData}; - {error, enoent} -> - {404, error_msg('NOT_FOUND', <<"Not found">>)}; - {error, Error} -> - ?SLOG(warning, #{msg => "get_ready_transfer_fail", error => Error}), + try + Node = parse_node(maps:get(<<"node">>, Query)), + Filepath = parse_filepath(maps:get(<<"fileref">>, Query)), + case emqx_ft_storage_fs_proto_v1:read_export_file(Node, Filepath, self()) of + {ok, ReaderPid} -> + FileData = emqx_ft_storage_fs_reader:table(ReaderPid), + {200, + #{ + <<"content-type">> => <<"application/data">>, + <<"content-disposition">> => <<"attachment">> + }, + FileData}; + {error, enoent} -> + {404, error_msg('NOT_FOUND', <<"Not found">>)}; + {error, Error} -> + ?SLOG(warning, #{msg => "get_ready_transfer_fail", error => Error}), + {503, error_msg('SERVICE_UNAVAILABLE', <<"Service unavailable">>)} + end + catch + throw:{invalid, Param} -> + {404, + error_msg( + 'NOT_FOUND', + iolist_to_binary(["Invalid query parameter: ", Param]) + )}; + error:{erpc, noconnection} -> {503, error_msg('SERVICE_UNAVAILABLE', <<"Service unavailable">>)} end. @@ -124,46 +140,100 @@ error_msg(Code, Msg) -> #{code => Code, message => emqx_misc:readable_error_msg(Msg)}. -spec fields(hocon_schema:name()) -> hocon_schema:fields(). +fields(file_ref) -> + [ + {fileref, + hoconsc:mk(binary(), #{ + in => query, + desc => <<"File reference">>, + example => <<"file1">>, + required => true + })} + ]; fields(file_node) -> - Desc = <<"File Node">>, - Meta = #{ - in => query, desc => Desc, example => <<"emqx@127.0.0.1">>, required => false - }, - [{node, hoconsc:mk(binary(), Meta)}]; -fields(file_clientid) -> - Desc = <<"File ClientId">>, - Meta = #{ - in => query, desc => Desc, example => <<"client1">>, required => false - }, - [{clientid, hoconsc:mk(binary(), Meta)}]; -fields(file_id) -> - Desc = <<"File">>, - Meta = #{ - in => query, desc => Desc, example => <<"file1">>, required => false - }, - [{fileid, hoconsc:mk(binary(), Meta)}]. + [ + {node, + hoconsc:mk(binary(), #{ + in => query, + desc => <<"Node under which the file is located">>, + example => atom_to_list(node()), + required => true + })} + ]. roots() -> [ file_node, - file_clientid, - file_id + file_ref + ]. + +mk_file_uri(_Options, Node, Filepath) -> + % TODO: qualify with `?BASE_PATH` + [ + "/file_transfer/file?", + uri_string:compose_query([ + {"node", atom_to_list(Node)}, + {"fileref", Filepath} + ]) ]. %%-------------------------------------------------------------------- %% Helpers %%-------------------------------------------------------------------- -format_file_info(#{path := Path, size := Size, timestamp := Timestamp}) -> - #{ - path => Path, +format_export_info( + Info = #{ + name := Name, + size := Size, + uri := URI, + timestamp := Timestamp, + transfer := {ClientId, FileId} + } +) -> + Res = #{ + name => iolist_to_binary(Name), size => Size, - timestamp => format_datetime(Timestamp) - }. + timestamp => format_timestamp(Timestamp), + clientid => ClientId, + fileid => FileId, + uri => iolist_to_binary(URI) + }, + case Info of + #{meta := Meta} -> + Res#{metadata => emqx_ft:encode_filemeta(Meta)}; + #{} -> + Res + end. -format_datetime({{Year, Month, Day}, {Hour, Minute, Second}}) -> - iolist_to_binary( - io_lib:format("~4..0w-~2..0w-~2..0wT~2..0w:~2..0w:~2..0w", [ - Year, Month, Day, Hour, Minute, Second - ]) - ). +format_timestamp(Timestamp) -> + iolist_to_binary(calendar:system_time_to_rfc3339(Timestamp, [{unit, second}])). + +parse_node(NodeBin) -> + case emqx_misc:safe_to_existing_atom(NodeBin) of + {ok, Node} -> + Node; + {error, _} -> + throw({invalid, NodeBin}) + end. + +parse_filepath(PathBin) -> + case filename:pathtype(PathBin) of + relative -> + ok; + absolute -> + throw({invalid, PathBin}) + end, + PathComponents = filename:split(PathBin), + case lists:any(fun is_special_component/1, PathComponents) of + false -> + filename:join(PathComponents); + true -> + throw({invalid, PathBin}) + end. + +is_special_component(<<".", _/binary>>) -> + true; +is_special_component([$. | _]) -> + true; +is_special_component(_) -> + false. diff --git a/apps/emqx_ft/src/emqx_ft_assembler.erl b/apps/emqx_ft/src/emqx_ft_assembler.erl index ff845fee9..5489a232a 100644 --- a/apps/emqx_ft/src/emqx_ft_assembler.erl +++ b/apps/emqx_ft/src/emqx_ft_assembler.erl @@ -22,13 +22,13 @@ -export([callback_mode/0]). -export([init/1]). -export([handle_event/4]). +-export([terminate/3]). -record(st, { storage :: _Storage, transfer :: emqx_ft:transfer(), assembly :: emqx_ft_assembly:t(), - file :: {file:filename(), io:device(), term()} | undefined, - hash + export :: _Export | undefined }). -define(NAME(Transfer), {n, l, {?MODULE, Transfer}}). @@ -47,11 +47,11 @@ callback_mode() -> handle_event_function. init({Storage, Transfer, Size}) -> + _ = erlang:process_flag(trap_exit, true), St = #st{ storage = Storage, transfer = Transfer, - assembly = emqx_ft_assembly:new(Size), - hash = crypto:hash_init(sha256) + assembly = emqx_ft_assembly:new(Size) }, {ok, idle, St}. @@ -61,10 +61,10 @@ handle_event(info, kickoff, idle, St) -> % We could wait for this message and handle it at the end of the assembling rather than at % the beginning, however it would make error handling much more messier. {next_state, list_local_fragments, St, ?internal([])}; -handle_event(internal, _, list_local_fragments, St = #st{assembly = Asm}) -> +handle_event(internal, _, list_local_fragments, St = #st{}) -> % TODO: what we do with non-transients errors here (e.g. `eacces`)? {ok, Fragments} = emqx_ft_storage_fs:list(St#st.storage, St#st.transfer, fragment), - NAsm = emqx_ft_assembly:update(emqx_ft_assembly:append(Asm, node(), Fragments)), + NAsm = emqx_ft_assembly:update(emqx_ft_assembly:append(St#st.assembly, node(), Fragments)), NSt = St#st{assembly = NAsm}, case emqx_ft_assembly:status(NAsm) of complete -> @@ -110,8 +110,8 @@ handle_event(internal, _, start_assembling, St = #st{assembly = Asm}) -> Filemeta = emqx_ft_assembly:filemeta(Asm), Coverage = emqx_ft_assembly:coverage(Asm), % TODO: better error handling - {ok, Handle} = emqx_ft_storage_fs:open_file(St#st.storage, St#st.transfer, Filemeta), - {next_state, {assemble, Coverage}, St#st{file = Handle}, ?internal([])}; + {ok, Export} = export_start(Filemeta, St), + {next_state, {assemble, Coverage}, St#st{export = Export}, ?internal([])}; handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #st{}) -> % TODO % Currently, race is possible between getting segment info from the remote node and @@ -119,15 +119,17 @@ handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #st{}) -> % TODO: pipelining % TODO: better error handling {ok, Content} = pread(Node, Segment, St), - {ok, NHandle} = emqx_ft_storage_fs:write(St#st.file, Content), - {next_state, {assemble, Rest}, St#st{file = NHandle}, ?internal([])}; + {ok, NExport} = export_write(St#st.export, Content), + {next_state, {assemble, Rest}, St#st{export = NExport}, ?internal([])}; handle_event(internal, _, {assemble, []}, St = #st{}) -> {next_state, complete, St, ?internal([])}; -handle_event(internal, _, complete, St = #st{assembly = Asm, file = Handle}) -> - Filemeta = emqx_ft_assembly:filemeta(Asm), - Result = emqx_ft_storage_fs:complete(St#st.storage, St#st.transfer, Filemeta, Handle), +handle_event(internal, _, complete, St = #st{}) -> + Result = export_complete(St#st.export), ok = maybe_garbage_collect(Result, St), - {stop, {shutdown, Result}}. + {stop, {shutdown, Result}, St#st{export = undefined}}. + +terminate(_Reason, _StateName, #st{export = Export}) -> + Export /= undefined andalso export_discard(Export). pread(Node, Segment, St) when Node =:= node() -> emqx_ft_storage_fs:pread(St#st.storage, St#st.transfer, Segment, 0, segsize(Segment)); @@ -136,8 +138,33 @@ pread(Node, Segment, St) -> %% -maybe_garbage_collect(ok, St = #st{storage = Storage, transfer = Transfer}) -> - Nodes = emqx_ft_assembly:nodes(St#st.assembly), +export_start(Filemeta, #st{storage = Storage, transfer = Transfer}) -> + {ExporterMod, Exporter} = emqx_ft_storage_fs:exporter(Storage), + case ExporterMod:start_export(Exporter, Transfer, Filemeta) of + {ok, Export} -> + {ok, {ExporterMod, Export}}; + {error, _} = Error -> + Error + end. + +export_write({ExporterMod, Export}, Content) -> + case ExporterMod:write(Export, Content) of + {ok, ExportNext} -> + {ok, {ExporterMod, ExportNext}}; + {error, _} = Error -> + Error + end. + +export_complete({ExporterMod, Export}) -> + ExporterMod:complete(Export). + +export_discard({ExporterMod, Export}) -> + ExporterMod:discard(Export). + +%% + +maybe_garbage_collect(ok, #st{storage = Storage, transfer = Transfer, assembly = Asm}) -> + Nodes = emqx_ft_assembly:nodes(Asm), emqx_ft_storage_fs_gc:collect(Storage, Transfer, Nodes); maybe_garbage_collect({error, _}, _St) -> ok. diff --git a/apps/emqx_ft/src/emqx_ft_fs_util.erl b/apps/emqx_ft/src/emqx_ft_fs_util.erl new file mode 100644 index 000000000..198f4ccc5 --- /dev/null +++ b/apps/emqx_ft/src/emqx_ft_fs_util.erl @@ -0,0 +1,101 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ft_fs_util). + +-include_lib("snabbkaffe/include/trace.hrl"). + +-export([read_decode_file/2]). + +-export([fold/4]). + +-type glob() :: ['*' | globfun()]. +-type globfun() :: + fun((_Filename :: file:name()) -> boolean()). +-type foldfun(Acc) :: + fun( + ( + _Filepath :: file:name(), + _Info :: file:file_info() | {error, _IoError}, + _Stack :: [file:name()], + Acc + ) -> Acc + ). + +%% + +-spec read_decode_file(file:name(), fun((binary()) -> Value)) -> + {ok, Value} | {error, _IoError}. +read_decode_file(Filepath, DecodeFun) -> + case file:read_file(Filepath) of + {ok, Content} -> + safe_decode(Content, DecodeFun); + {error, _} = Error -> + Error + end. + +safe_decode(Content, DecodeFun) -> + try + {ok, DecodeFun(Content)} + catch + C:E:Stacktrace -> + ?tp(warning, "safe_decode_failed", #{ + class => C, + exception => E, + stacktrace => Stacktrace + }), + {error, corrupted} + end. + +-spec fold(foldfun(Acc), Acc, _Root :: file:name(), glob()) -> + Acc. +fold(Fun, Acc, Root, Glob) -> + fold(Fun, Acc, [], Root, Glob, []). + +fold(Fun, AccIn, Path, Root, [Glob | Rest], Stack) when Glob == '*' orelse is_function(Glob) -> + case file:list_dir(filename:join(Root, Path)) of + {ok, Filenames} -> + lists:foldl( + fun(FN, Acc) -> + case matches_glob(Glob, FN) of + true when Path == [] -> + fold(Fun, Acc, FN, Root, Rest, [FN | Stack]); + true -> + fold(Fun, Acc, filename:join(Path, FN), Root, Rest, [FN | Stack]); + false -> + Acc + end + end, + AccIn, + Filenames + ); + {error, enotdir} -> + AccIn; + {error, Reason} -> + Fun(Path, {error, Reason}, Stack, AccIn) + end; +fold(Fun, AccIn, Filepath, Root, [], Stack) -> + case file:read_link_info(filename:join(Root, Filepath), [{time, posix}, raw]) of + {ok, Info} -> + Fun(Filepath, Info, Stack, AccIn); + {error, Reason} -> + Fun(Filepath, {error, Reason}, Stack, AccIn) + end. + +matches_glob('*', _) -> + true; +matches_glob(FilterFun, Filename) when is_function(FilterFun) -> + FilterFun(Filename). diff --git a/apps/emqx_ft/src/emqx_ft_schema.erl b/apps/emqx_ft/src/emqx_ft_schema.erl index d2b2b9299..37e2adafc 100644 --- a/apps/emqx_ft/src/emqx_ft_schema.erl +++ b/apps/emqx_ft/src/emqx_ft_schema.erl @@ -66,12 +66,33 @@ fields(local_storage) -> desc => ?DESC("local_storage_root"), required => false }}, + {exporter, #{ + type => hoconsc:union([ + ?REF(local_storage_exporter) + ]), + desc => ?DESC("local_storage_exporter"), + required => true + }}, {gc, #{ - type => hoconsc:ref(?MODULE, local_storage_gc), + type => ?REF(local_storage_gc), desc => ?DESC("local_storage_gc"), required => false }} ]; +fields(local_storage_exporter) -> + [ + {type, #{ + type => local, + default => local, + required => false, + desc => ?DESC("local_storage_exporter_type") + }}, + {root, #{ + type => binary(), + desc => ?DESC("local_storage_exporter_root"), + required => false + }} + ]; fields(local_storage_gc) -> [ {interval, #{ @@ -101,12 +122,15 @@ desc(file_transfer) -> "File transfer settings"; desc(local_storage) -> "File transfer local storage settings"; +desc(local_storage_exporter) -> + "Exporter settings for the File transfer local storage backend"; desc(local_storage_gc) -> "Garbage collection settings for the File transfer local storage backend". schema(filemeta) -> #{ roots => [ + % TODO nonempty {name, hoconsc:mk(string(), #{required => true})}, {size, hoconsc:mk(non_neg_integer())}, {expire_at, hoconsc:mk(non_neg_integer())}, diff --git a/apps/emqx_ft/src/emqx_ft_storage.erl b/apps/emqx_ft/src/emqx_ft_storage.erl index 0b8c38736..1d1c08ce9 100644 --- a/apps/emqx_ft/src/emqx_ft_storage.erl +++ b/apps/emqx_ft/src/emqx_ft_storage.erl @@ -24,8 +24,7 @@ store_segment/2, assemble/2, - ready_transfers/0, - get_ready_transfer/1, + exports/0, with_storage_type/3 ] @@ -34,12 +33,19 @@ -type storage() :: emqx_config:config(). -export_type([assemble_callback/0]). +-export_type([export_data/0]). -type assemble_callback() :: fun((ok | {error, term()}) -> any()). --type ready_transfer_id() :: term(). --type ready_transfer_info() :: map(). --type ready_transfer_data() :: binary() | qlc:query_handle(). +-type export_info() :: #{ + transfer := emqx_ft:transfer(), + name := file:name(), + size := _Bytes :: non_neg_integer(), + uri => uri_string:uri_string(), + meta => emqx_ft:filemeta() +}. + +-type export_data() :: binary() | qlc:query_handle(). %%-------------------------------------------------------------------- %% Behaviour @@ -57,10 +63,9 @@ ok | {async, pid()} | {error, term()}. -callback assemble(storage(), emqx_ft:transfer(), _Size :: emqx_ft:bytes()) -> ok | {async, pid()} | {error, term()}. --callback ready_transfers(storage()) -> - {ok, [{ready_transfer_id(), ready_transfer_info()}]} | {error, term()}. --callback get_ready_transfer(storage(), ready_transfer_id()) -> - {ok, ready_transfer_data()} | {error, term()}. + +-callback exports(storage()) -> + {ok, [export_info()]} | {error, term()}. %%-------------------------------------------------------------------- %% API @@ -95,15 +100,11 @@ assemble(Transfer, Size) -> Mod = mod(), Mod:assemble(storage(), Transfer, Size). --spec ready_transfers() -> {ok, [{ready_transfer_id(), ready_transfer_info()}]} | {error, term()}. -ready_transfers() -> +-spec exports() -> + {ok, [export_info()]} | {error, term()}. +exports() -> Mod = mod(), - Mod:ready_transfers(storage()). - --spec get_ready_transfer(ready_transfer_id()) -> {ok, ready_transfer_data()} | {error, term()}. -get_ready_transfer(ReadyTransferId) -> - Mod = mod(), - Mod:get_ready_transfer(storage(), ReadyTransferId). + Mod:exports(storage()). -spec with_storage_type(atom(), atom(), list(term())) -> any(). with_storage_type(Type, Fun, Args) -> diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl new file mode 100644 index 000000000..e0bffd444 --- /dev/null +++ b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl @@ -0,0 +1,352 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ft_storage_exporter_fs). + +-include_lib("kernel/include/file.hrl"). +-include_lib("emqx/include/logger.hrl"). + +%% Exporter API +-export([start_export/3]). +-export([write/2]). +-export([complete/1]). +-export([discard/1]). + +-export([list_local/1]). +-export([list_local/2]). +-export([start_reader/3]). + +-export([list/1]). +% -export([list/2]). + +-export_type([export/0]). + +-type options() :: _TODO. +-type transfer() :: emqx_ft:transfer(). +-type filemeta() :: emqx_ft:filemeta(). +-type exportinfo() :: #{ + transfer := transfer(), + name := file:name(), + uri := uri_string:uri_string(), + timestamp := emqx_datetime:epoch_second(), + size := _Bytes :: non_neg_integer(), + meta => filemeta() +}. + +-type file_error() :: emqx_ft_storage_fs:file_error(). + +-opaque export() :: #{ + path := file:name(), + handle := io:device(), + result := file:name(), + meta := filemeta(), + hash := crypto:hash_state() +}. + +-type reader() :: pid(). + +-define(TEMPDIR, "tmp"). +-define(MANIFEST, ".MANIFEST.json"). + +%% NOTE +%% Bucketing of resulting files to accomodate the storage backend for considerably +%% large (e.g. > 10s of millions) amount of files. +-define(BUCKET_HASH, sha). + +%% 2 symbols = at most 256 directories on the upper level +-define(BUCKET1_LEN, 2). +%% 2 symbols = at most 256 directories on the second level +-define(BUCKET2_LEN, 2). + +-define(SLOG_UNEXPECTED(RelFilepath, Fileinfo, Options), + ?SLOG(notice, "filesystem_object_unexpected", #{ + relpath => RelFilepath, + fileinfo => Fileinfo, + options => Options + }) +). + +-define(SLOG_INACCESSIBLE(RelFilepath, Reason, Options), + ?SLOG(warning, "filesystem_object_inaccessible", #{ + relpath => RelFilepath, + reason => Reason, + options => Options + }) +). + +%% + +-spec start_export(options(), transfer(), filemeta()) -> + {ok, export()} | {error, file_error()}. +start_export(Options, Transfer, Filemeta = #{name := Filename}) -> + TempFilepath = mk_temp_absfilepath(Options, Transfer, Filename), + ResultFilepath = mk_absfilepath(Options, Transfer, result, Filename), + _ = filelib:ensure_dir(TempFilepath), + case file:open(TempFilepath, [write, raw, binary]) of + {ok, Handle} -> + {ok, #{ + path => TempFilepath, + handle => Handle, + result => ResultFilepath, + meta => Filemeta, + hash => init_checksum(Filemeta) + }}; + {error, _} = Error -> + Error + end. + +-spec write(export(), iodata()) -> + {ok, export()} | {error, file_error()}. +write(Export = #{handle := Handle, hash := Ctx}, IoData) -> + case file:write(Handle, IoData) of + ok -> + {ok, Export#{hash := update_checksum(Ctx, IoData)}}; + {error, _} = Error -> + Error + end. + +-spec complete(export()) -> + ok | {error, {checksum, _Algo, _Computed}} | {error, file_error()}. +complete( + Export = #{ + path := Filepath, + handle := Handle, + result := ResultFilepath, + meta := FilemetaIn, + hash := Ctx + } +) -> + case verify_checksum(Ctx, FilemetaIn) of + {ok, Filemeta} -> + ok = file:close(Handle), + _ = filelib:ensure_dir(ResultFilepath), + _ = file:write_file(mk_manifest_filename(ResultFilepath), encode_filemeta(Filemeta)), + file:rename(Filepath, ResultFilepath); + {error, _} = Error -> + _ = discard(Export), + Error + end. + +-spec discard(export()) -> + ok. +discard(#{path := Filepath, handle := Handle}) -> + ok = file:close(Handle), + file:delete(Filepath). + +%% + +-spec list_local(options(), transfer()) -> + {ok, [exportinfo(), ...]} | {error, file_error()}. +list_local(Options, Transfer) -> + TransferRoot = mk_absdir(Options, Transfer, result), + case + emqx_ft_fs_util:fold( + fun + (_Path, {error, Reason}, [], []) -> + {error, Reason}; + (_Path, Fileinfo = #file_info{type = regular}, [Filename | _], Acc) -> + RelFilepath = filename:join(mk_result_reldir(Transfer) ++ [Filename]), + Info = mk_exportinfo(Options, Filename, RelFilepath, Transfer, Fileinfo), + [Info | Acc]; + (RelFilepath, Fileinfo = #file_info{}, _, Acc) -> + ?SLOG_UNEXPECTED(RelFilepath, Fileinfo, Options), + Acc; + (RelFilepath, {error, Reason}, _, Acc) -> + ?SLOG_INACCESSIBLE(RelFilepath, Reason, Options), + Acc + end, + [], + TransferRoot, + [fun filter_manifest/1] + ) + of + Infos = [_ | _] -> + {ok, Infos}; + [] -> + {error, enoent}; + {error, Reason} -> + {error, Reason} + end. + +-spec list_local(options()) -> + {ok, #{transfer() => [exportinfo(), ...]}}. +list_local(Options) -> + Pattern = [ + _Bucket1 = '*', + _Bucket2 = '*', + _Rest = '*', + _ClientId = '*', + _FileId = '*', + fun filter_manifest/1 + ], + Root = get_storage_root(Options), + {ok, + emqx_ft_fs_util:fold( + fun(RelFilepath, Info, Stack, Acc) -> + read_exportinfo(Options, RelFilepath, Info, Stack, Acc) + end, + [], + Root, + Pattern + )}. + +filter_manifest(?MANIFEST) -> + % Filename equals `?MANIFEST`, there should also be a manifest for it. + false; +filter_manifest(Filename) -> + ?MANIFEST =/= string:find(Filename, ?MANIFEST, trailing). + +read_exportinfo(Options, RelFilepath, Fileinfo = #file_info{type = regular}, Stack, Acc) -> + [Filename, FileId, ClientId | _] = Stack, + Transfer = dirnames_to_transfer(ClientId, FileId), + Info = mk_exportinfo(Options, Filename, RelFilepath, Transfer, Fileinfo), + [Info | Acc]; +read_exportinfo(Options, RelFilepath, Fileinfo = #file_info{}, _Stack, Acc) -> + ?SLOG_UNEXPECTED(RelFilepath, Fileinfo, Options), + Acc; +read_exportinfo(Options, RelFilepath, {error, Reason}, _Stack, Acc) -> + ?SLOG_INACCESSIBLE(RelFilepath, Reason, Options), + Acc. + +mk_exportinfo(Options, Filename, RelFilepath, Transfer, Fileinfo) -> + Root = get_storage_root(Options), + try_read_filemeta( + filename:join(Root, mk_manifest_filename(RelFilepath)), + #{ + transfer => Transfer, + name => Filename, + uri => mk_export_uri(Options, RelFilepath), + timestamp => Fileinfo#file_info.mtime, + size => Fileinfo#file_info.size, + path => filename:join(Root, RelFilepath) + } + ). + +try_read_filemeta(Filepath, Info) -> + case emqx_ft_fs_util:read_decode_file(Filepath, fun decode_filemeta/1) of + {ok, Filemeta} -> + Info#{meta => Filemeta}; + {error, Reason} -> + ?SLOG(warning, "filemeta_inaccessible", #{ + path => Filepath, + reason => Reason + }), + Info + end. + +mk_export_uri(Options, RelFilepath) -> + % emqx_ft_storage_exporter_fs_api:mk_export_uri(Options, RelFilepath). + emqx_ft_api:mk_file_uri(Options, node(), RelFilepath). + +-spec start_reader(options(), file:name(), _Caller :: pid()) -> + {ok, reader()} | {error, enoent}. +start_reader(Options, Filepath, CallerPid) -> + Root = get_storage_root(Options), + case filelib:safe_relative_path(Filepath, Root) of + SafeFilepath when SafeFilepath /= unsafe -> + AbsFilepath = filename:join(Root, SafeFilepath), + emqx_ft_storage_fs_reader:start_supervised(CallerPid, AbsFilepath); + unsafe -> + {error, enoent} + end. + +%% + +-spec list(options()) -> + {ok, [exportinfo(), ...]} | {error, file_error()}. +list(_Options) -> + Nodes = mria_mnesia:running_nodes(), + Results = emqx_ft_storage_fs_proto_v1:list_exports(Nodes), + {GoodResults, BadResults} = lists:partition( + fun + ({_Node, {ok, {ok, _}}}) -> true; + (_) -> false + end, + lists:zip(Nodes, Results) + ), + length(BadResults) > 0 andalso + ?SLOG(warning, #{msg => "list_remote_exports_failed", failures => BadResults}), + {ok, [File || {_Node, {ok, {ok, Files}}} <- GoodResults, File <- Files]}. + +%% + +init_checksum(#{checksum := {Algo, _}}) -> + crypto:hash_init(Algo); +init_checksum(#{}) -> + crypto:hash_init(sha256). + +update_checksum(Ctx, IoData) -> + crypto:hash_update(Ctx, IoData). + +verify_checksum(Ctx, Filemeta = #{checksum := {Algo, Digest}}) -> + case crypto:hash_final(Ctx) of + Digest -> + {ok, Filemeta}; + Mismatch -> + {error, {checksum, Algo, binary:encode_hex(Mismatch)}} + end; +verify_checksum(Ctx, Filemeta = #{}) -> + Digest = crypto:hash_final(Ctx), + {ok, Filemeta#{checksum => {sha256, Digest}}}. + +%% + +-define(PRELUDE(Vsn, Meta), [<<"filemeta">>, Vsn, Meta]). + +encode_filemeta(Meta) -> + emqx_json:encode(?PRELUDE(_Vsn = 1, emqx_ft:encode_filemeta(Meta))). + +decode_filemeta(Binary) when is_binary(Binary) -> + ?PRELUDE(_Vsn = 1, Map) = emqx_json:decode(Binary, [return_maps]), + case emqx_ft:decode_filemeta(Map) of + {ok, Meta} -> + Meta; + {error, Reason} -> + error(Reason) + end. + +mk_manifest_filename(Filename) when is_list(Filename) -> + Filename ++ ?MANIFEST; +mk_manifest_filename(Filename) when is_binary(Filename) -> + <>. + +mk_temp_absfilepath(Options, Transfer, Filename) -> + Unique = erlang:unique_integer([positive]), + TempFilename = integer_to_list(Unique) ++ "." ++ Filename, + filename:join(mk_absdir(Options, Transfer, temporary), TempFilename). + +mk_absdir(Options, _Transfer, temporary) -> + filename:join([get_storage_root(Options), ?TEMPDIR]); +mk_absdir(Options, Transfer, result) -> + filename:join([get_storage_root(Options) | mk_result_reldir(Transfer)]). + +mk_absfilepath(Options, Transfer, What, Filename) -> + filename:join(mk_absdir(Options, Transfer, What), Filename). + +mk_result_reldir(Transfer = {ClientId, FileId}) -> + Hash = mk_transfer_hash(Transfer), + << + Bucket1:?BUCKET1_LEN/binary, + Bucket2:?BUCKET2_LEN/binary, + BucketRest/binary + >> = binary:encode_hex(Hash), + [Bucket1, Bucket2, BucketRest, ClientId, FileId]. + +mk_transfer_hash(Transfer) -> + crypto:hash(?BUCKET_HASH, term_to_binary(Transfer)). + +get_storage_root(Options) -> + maps:get(root, Options, filename:join([emqx:data_dir(), "ft", "exports"])). diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_fs.erl index b8aef5276..4a613dbcf 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs.erl @@ -29,6 +29,7 @@ -export([child_spec/1]). +% Segments-related API -export([store_filemeta/3]). -export([store_segment/3]). -export([read_filemeta/2]). @@ -43,22 +44,20 @@ -export([get_subdir/2]). -export([get_subdir/3]). --export([ready_transfers_local/1]). --export([get_ready_transfer_local/3]). +-export([exporter/1]). --export([ready_transfers/1]). --export([get_ready_transfer/2]). - --export([open_file/3]). --export([complete/4]). --export([write/2]). --export([discard/1]). +% Exporter-specific API +-export([exports/1]). +-export([exports_local/1]). +-export([exports_local/2]). -export_type([storage/0]). -export_type([filefrag/1]). -export_type([filefrag/0]). -export_type([transferinfo/0]). +-export_type([file_error/0]). + -type transfer() :: emqx_ft:transfer(). -type offset() :: emqx_ft:offset(). -type filemeta() :: emqx_ft:filemeta(). @@ -70,8 +69,7 @@ }. -type transferinfo() :: #{ - status := complete | incomplete, - result => [filefrag({result, #{}})] + filemeta => filemeta() }. % TODO naming @@ -85,16 +83,20 @@ -type filefrag() :: filefrag( {filemeta, filemeta()} | {segment, segmentinfo()} - | {result, #{}} ). -define(FRAGDIR, frags). -define(TEMPDIR, tmp). --define(RESULTDIR, result). -define(MANIFEST, "MANIFEST.json"). -define(SEGMENT, "SEG"). -type storage() :: #{ + root => file:name(), + exporter => exporter() +}. + +-type exporter() :: #{ + type := local, root => file:name() }. @@ -138,7 +140,9 @@ store_filemeta(Storage, Transfer, Meta) -> % about it too much now. {error, conflict}; {error, Reason} when Reason =:= notfound; Reason =:= corrupted; Reason =:= enoent -> - write_file_atomic(Storage, Transfer, Filepath, encode_filemeta(Meta)) + write_file_atomic(Storage, Transfer, Filepath, encode_filemeta(Meta)); + {error, _} = Error -> + Error end. %% Store a segment in the backing filesystem. @@ -153,17 +157,17 @@ store_segment(Storage, Transfer, Segment = {_Offset, Content}) -> write_file_atomic(Storage, Transfer, Filepath, Content). -spec read_filemeta(storage(), transfer()) -> - {ok, filefrag({filemeta, filemeta()})} | {error, corrupted} | {error, file_error()}. + {ok, filemeta()} | {error, corrupted} | {error, file_error()}. read_filemeta(Storage, Transfer) -> Filepath = mk_filepath(Storage, Transfer, get_subdirs_for(fragment), ?MANIFEST), read_file(Filepath, fun decode_filemeta/1). --spec list(storage(), transfer(), _What :: fragment | result) -> +-spec list(storage(), transfer(), _What :: fragment) -> % Some lower level errors? {error, notfound}? % Result will contain zero or only one filemeta. {ok, [filefrag({filemeta, filemeta()} | {segment, segmentinfo()})]} | {error, file_error()}. -list(Storage, Transfer, What) -> +list(Storage, Transfer, What = fragment) -> Dirname = mk_filedir(Storage, Transfer, get_subdirs_for(What)), case file:list_dir(Dirname) of {ok, Filenames} -> @@ -172,18 +176,13 @@ list(Storage, Transfer, What) -> % extremely bad luck is needed for that, e.g. concurrent assemblers with % different filemetas from different nodes). This might be unexpected for a % client given the current protocol, yet might be helpful in the future. - {ok, filtermap_files(get_filefrag_fun_for(What), Dirname, Filenames)}; + {ok, filtermap_files(fun mk_filefrag/2, Dirname, Filenames)}; {error, enoent} -> {ok, []}; {error, _} = Error -> Error end. -get_filefrag_fun_for(fragment) -> - fun mk_filefrag/2; -get_filefrag_fun_for(result) -> - fun mk_result_filefrag/2. - -spec pread(storage(), transfer(), filefrag(), offset(), _Size :: non_neg_integer()) -> {ok, _Content :: iodata()} | {error, eof} | {error, file_error()}. pread(_Storage, _Transfer, Frag, Offset, Size) -> @@ -213,102 +212,28 @@ assemble(Storage, Transfer, Size) -> {ok, Pid} = emqx_ft_assembler_sup:ensure_child(Storage, Transfer, Size), {async, Pid}. -get_ready_transfer(_Storage, ReadyTransferId) -> - case parse_ready_transfer_id(ReadyTransferId) of - {ok, {Node, Transfer}} -> - try - case emqx_ft_storage_fs_proto_v1:get_ready_transfer(Node, self(), Transfer) of - {ok, ReaderPid} -> - {ok, emqx_ft_storage_fs_reader:table(ReaderPid)}; - {error, _} = Error -> - Error - end - catch - error:Exc:Stacktrace -> - ?SLOG(warning, #{ - msg => "get_ready_transfer_error", - node => Node, - transfer => Transfer, - exception => Exc, - stacktrace => Stacktrace - }), - {error, Exc}; - C:Exc:Stacktrace -> - ?SLOG(warning, #{ - msg => "get_ready_transfer_fail", - class => C, - node => Node, - transfer => Transfer, - exception => Exc, - stacktrace => Stacktrace - }), - {error, {C, Exc}} - end; - {error, _} = Error -> - Error +%% + +-spec exporter(storage()) -> {module(), _ExporterOptions}. +exporter(Storage) -> + case maps:get(exporter, Storage) of + #{type := local} = Options -> + {emqx_ft_storage_exporter_fs, Options} end. -get_ready_transfer_local(Storage, CallerPid, Transfer) -> - Dirname = mk_filedir(Storage, Transfer, get_subdirs_for(result)), - case file:list_dir(Dirname) of - {ok, [Filename | _]} -> - FullFilename = filename:join([Dirname, Filename]), - emqx_ft_storage_fs_reader:start_supervised(CallerPid, FullFilename); - {error, _} = Error -> - Error - end. +exports(Storage) -> + {ExporterMod, ExporterOpts} = exporter(Storage), + ExporterMod:list(ExporterOpts). -ready_transfers(_Storage) -> - Nodes = mria_mnesia:running_nodes(), - Results = emqx_ft_storage_fs_proto_v1:ready_transfers(Nodes), - {GoodResults, BadResults} = lists:partition( - fun - ({ok, _}) -> true; - (_) -> false - end, - Results - ), - case {GoodResults, BadResults} of - {[], _} -> - ?SLOG(warning, #{msg => "ready_transfers", failures => BadResults}), - {error, no_nodes}; - {_, []} -> - {ok, [File || {ok, Files} <- GoodResults, File <- Files]}; - {_, _} -> - ?SLOG(warning, #{msg => "ready_transfers", failures => BadResults}), - {ok, [File || {ok, Files} <- GoodResults, File <- Files]} - end. +exports_local(Storage) -> + {ExporterMod, ExporterOpts} = exporter(Storage), + ExporterMod:list_local(ExporterOpts). -ready_transfers_local(Storage) -> - {ok, Transfers} = transfers(Storage), - lists:filtermap( - fun - ({Transfer, #{status := complete, result := [Result | _]}}) -> - {true, {ready_transfer_id(Transfer), maps:without([fragment], Result)}}; - (_) -> - false - end, - maps:to_list(Transfers) - ). +exports_local(Storage, Transfer) -> + {ExporterMod, ExporterOpts} = exporter(Storage), + ExporterMod:list_local(ExporterOpts, Transfer). -ready_transfer_id({ClientId, FileId}) -> - #{ - <<"node">> => atom_to_binary(node()), - <<"clientid">> => ClientId, - <<"fileid">> => FileId - }. - -parse_ready_transfer_id(#{ - <<"node">> := NodeBin, <<"clientid">> := ClientId, <<"fileid">> := FileId -}) -> - case emqx_misc:safe_to_existing_atom(NodeBin) of - {ok, Node} -> - {ok, {Node, {ClientId, FileId}}}; - {error, _} -> - {error, {invalid_node, NodeBin}} - end; -parse_ready_transfer_id(#{}) -> - {error, invalid_file_id}. +%% -spec transfers(storage()) -> {ok, #{transfer() => transferinfo()}}. @@ -345,17 +270,16 @@ transfers(Storage, ClientId, AccIn) -> end. read_transferinfo(Storage, Transfer, Acc) -> - case list(Storage, Transfer, result) of - {ok, Result = [_ | _]} -> - Info = #{status => complete, result => Result}, - Acc#{Transfer => Info}; - {ok, []} -> - Info = #{status => incomplete}, - Acc#{Transfer => Info}; - {error, _Reason} -> - ?tp(warning, "list_result_failed", #{ + case read_filemeta(Storage, Transfer) of + {ok, Filemeta} -> + Acc#{Transfer => #{filemeta => Filemeta}}; + {error, enoent} -> + Acc#{Transfer => #{}}; + {error, Reason} -> + ?tp(warning, "read_transferinfo_failed", #{ storage => Storage, - transfer => Transfer + transfer => Transfer, + reason => Reason }), Acc end. @@ -365,7 +289,7 @@ read_transferinfo(Storage, Transfer, Acc) -> get_subdir(Storage, Transfer) -> mk_filedir(Storage, Transfer, []). --spec get_subdir(storage(), transfer(), fragment | temporary | result) -> +-spec get_subdir(storage(), transfer(), fragment | temporary) -> file:name(). get_subdir(Storage, Transfer, What) -> mk_filedir(Storage, Transfer, get_subdirs_for(What)). @@ -373,84 +297,12 @@ get_subdir(Storage, Transfer, What) -> get_subdirs_for(fragment) -> [?FRAGDIR]; get_subdirs_for(temporary) -> - [?TEMPDIR]; -get_subdirs_for(result) -> - [?RESULTDIR]. - -%% - --type handle() :: {file:name(), io:device(), crypto:hash_state()}. - --spec open_file(storage(), transfer(), filemeta()) -> - {ok, handle()} | {error, file_error()}. -open_file(Storage, Transfer, Filemeta) -> - Filename = maps:get(name, Filemeta), - TempFilepath = mk_temp_filepath(Storage, Transfer, Filename), - _ = filelib:ensure_dir(TempFilepath), - case file:open(TempFilepath, [write, raw, binary]) of - {ok, Handle} -> - % TODO: preserve filemeta - {ok, {TempFilepath, Handle, init_checksum(Filemeta)}}; - {error, _} = Error -> - Error - end. - --spec write(handle(), iodata()) -> - {ok, handle()} | {error, file_error()}. -write({Filepath, IoDevice, Ctx}, IoData) -> - case file:write(IoDevice, IoData) of - ok -> - {ok, {Filepath, IoDevice, update_checksum(Ctx, IoData)}}; - {error, _} = Error -> - Error - end. - --spec complete(storage(), transfer(), filemeta(), handle()) -> - ok | {error, {checksum, _Algo, _Computed}} | {error, file_error()}. -complete(Storage, Transfer, Filemeta = #{name := Filename}, Handle = {Filepath, IoDevice, Ctx}) -> - TargetFilepath = mk_filepath(Storage, Transfer, get_subdirs_for(result), Filename), - case verify_checksum(Ctx, Filemeta) of - ok -> - ok = file:close(IoDevice), - mv_temp_file(Filepath, TargetFilepath); - {error, _} = Error -> - _ = discard(Handle), - Error - end. - --spec discard(handle()) -> - ok. -discard({Filepath, IoDevice, _Ctx}) -> - ok = file:close(IoDevice), - file:delete(Filepath). - -init_checksum(#{checksum := {Algo, _}}) -> - crypto:hash_init(Algo); -init_checksum(#{}) -> - undefined. - -update_checksum(Ctx, IoData) when Ctx /= undefined -> - crypto:hash_update(Ctx, IoData); -update_checksum(undefined, _IoData) -> - undefined. - -verify_checksum(Ctx, #{checksum := {Algo, Digest}}) when Ctx /= undefined -> - case crypto:hash_final(Ctx) of - Digest -> - ok; - Mismatch -> - {error, {checksum, Algo, binary:encode_hex(Mismatch)}} - end; -verify_checksum(undefined, _) -> - ok. + [?TEMPDIR]. -define(PRELUDE(Vsn, Meta), [<<"filemeta">>, Vsn, Meta]). encode_filemeta(Meta) -> - % TODO: Looks like this should be hocon's responsibility. - Schema = emqx_ft_schema:schema(filemeta), - Term = hocon_tconf:make_serializable(Schema, emqx_map_lib:binary_key_map(Meta), #{}), - emqx_json:encode(?PRELUDE(_Vsn = 1, Term)). + emqx_json:encode(?PRELUDE(_Vsn = 1, emqx_ft:encode_filemeta(Meta))). decode_filemeta(Binary) when is_binary(Binary) -> ?PRELUDE(_Vsn = 1, Map) = emqx_json:decode(Binary, [return_maps]), @@ -490,33 +342,12 @@ try_list_dir(Dirname) -> end. get_storage_root(Storage) -> - maps:get(root, Storage, filename:join(emqx:data_dir(), "file_transfer")). + maps:get(root, Storage, filename:join([emqx:data_dir(), "ft", "transfers"])). -include_lib("kernel/include/file.hrl"). -read_file(Filepath) -> - file:read_file(Filepath). - read_file(Filepath, DecodeFun) -> - case read_file(Filepath) of - {ok, Content} -> - safe_decode(Content, DecodeFun); - {error, _} = Error -> - Error - end. - -safe_decode(Content, DecodeFun) -> - try - {ok, DecodeFun(Content)} - catch - C:E:Stacktrace -> - ?tp(warning, "safe_decode_failed", #{ - class => C, - exception => E, - stacktrace => Stacktrace - }), - {error, corrupted} - end. + emqx_ft_fs_util:read_decode_file(Filepath, DecodeFun). write_file_atomic(Storage, Transfer, Filepath, Content) when is_binary(Content) -> TempFilepath = mk_temp_filepath(Storage, Transfer, filename:basename(Filepath)), @@ -574,12 +405,6 @@ mk_filefrag(_Dirname, _Filename) -> }), false. -mk_result_filefrag(Dirname, Filename) -> - % NOTE - % Any file in the `?RESULTDIR` subdir is currently considered the result of - % the file transfer. - mk_filefrag(Dirname, Filename, result, fun(_, _) -> {ok, #{}} end). - mk_filefrag(Dirname, Filename, Tag, Fun) -> Filepath = filename:join(Dirname, Filename), % TODO error handling? diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl b/apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl index 58c5dbfdf..20e4f468d 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl @@ -308,28 +308,18 @@ is_same_filepath(P1, P2) when is_binary(P1) -> filepath_to_binary(S) -> unicode:characters_to_binary(S, unicode, file:native_name_encoding()). -get_segments_ttl(Storage, Transfer) -> +get_segments_ttl(Storage, TransferInfo) -> {MinTTL, MaxTTL} = emqx_ft_conf:segments_ttl(Storage), - clamp(MinTTL, MaxTTL, try_get_filemeta_ttl(Storage, Transfer)). + clamp(MinTTL, MaxTTL, try_get_filemeta_ttl(TransferInfo)). -try_get_filemeta_ttl(Storage, Transfer) -> - case emqx_ft_storage_fs:read_filemeta(Storage, Transfer) of - {ok, Filemeta} -> - maps:get(segments_ttl, Filemeta, undefined); - {error, _} -> - undefined - end. +try_get_filemeta_ttl(#{filemeta := Filemeta}) -> + maps:get(segments_ttl, Filemeta, undefined); +try_get_filemeta_ttl(#{}) -> + undefined. clamp(Min, Max, V) -> min(Max, max(Min, V)). -% try_collect(_Subject, ok = Result, Then, _Stats) -> -% Then(Result); -% try_collect(_Subject, {ok, Result}, Then, _Stats) -> -% Then(Result); -% try_collect(Subject, {error, _} = Error, _Then, Stats) -> -% register_gcstat_error(Subject, Error, Stats). - %% init_gcstats() -> diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs_proxy.erl b/apps/emqx_ft/src/emqx_ft_storage_fs_proxy.erl index 7e19dd322..dbd0cd6dc 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs_proxy.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs_proxy.erl @@ -23,8 +23,8 @@ -export([ list_local/2, pread_local/4, - get_ready_transfer_local/2, - ready_transfers_local/0 + list_exports_local/0, + read_export_file_local/2 ]). list_local(Transfer, What) -> @@ -33,8 +33,18 @@ list_local(Transfer, What) -> pread_local(Transfer, Frag, Offset, Size) -> emqx_ft_storage:with_storage_type(local, pread, [Transfer, Frag, Offset, Size]). -get_ready_transfer_local(CallerPid, Transfer) -> - emqx_ft_storage:with_storage_type(local, get_ready_transfer_local, [CallerPid, Transfer]). +list_exports_local() -> + case emqx_ft_storage:with_storage_type(local, exporter, []) of + {emqx_ft_storage_exporter_fs, Options} -> + emqx_ft_storage_exporter_fs:list_local(Options); + InvalidExporter -> + {error, {invalid_exporter, InvalidExporter}} + end. -ready_transfers_local() -> - emqx_ft_storage:with_storage_type(local, ready_transfers_local, []). +read_export_file_local(Filepath, CallerPid) -> + case emqx_ft_storage:with_storage_type(local, exporter, []) of + {emqx_ft_storage_exporter_fs, Options} -> + emqx_ft_storage_exporter_fs:start_reader(Options, Filepath, CallerPid); + InvalidExporter -> + {error, {invalid_exporter, InvalidExporter}} + end. diff --git a/apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl b/apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl index e2c4c93d7..f152928fe 100644 --- a/apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl +++ b/apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl @@ -22,8 +22,10 @@ -export([multilist/3]). -export([pread/5]). --export([ready_transfers/1]). --export([get_ready_transfer/3]). + +%% TODO: These should be defined in a separate BPAPI +-export([list_exports/1]). +-export([read_export_file/3]). -type offset() :: emqx_ft:offset(). -type transfer() :: emqx_ft:transfer(). @@ -44,19 +46,16 @@ multilist(Nodes, Transfer, What) -> pread(Node, Transfer, Frag, Offset, Size) -> erpc:call(Node, emqx_ft_storage_fs_proxy, pread_local, [Transfer, Frag, Offset, Size]). --spec ready_transfers([node()]) -> - [ - {ok, [{emqx_ft_storage:ready_transfer_id(), emqx_ft_storage:ready_transfer_info()}]} - | {error, term()} - | {exit, term()} - | {throw, term()} - ]. -ready_transfers(Nodes) -> - erpc:multicall(Nodes, emqx_ft_storage_fs_proxy, ready_transfers_local, []). +%% --spec get_ready_transfer(node(), pid(), emqx_ft_storage:ready_transfer_id()) -> - {ok, emqx_ft_storage:ready_transfer_data()} +-spec list_exports([node()]) -> + emqx_rpc:erpc_multicall([emqx_ft_storage:export_info()]). +list_exports(Nodes) -> + erpc:multicall(Nodes, emqx_ft_storage_fs_proxy, list_exports_local, []). + +-spec read_export_file(node(), file:name(), pid()) -> + {ok, emqx_ft_storage:export_data()} | {error, term()} | no_return(). -get_ready_transfer(Node, CallerPid, ReadyTransferId) -> - erpc:call(Node, emqx_ft_storage_fs_proxy, get_ready_transfer_local, [CallerPid, ReadyTransferId]). +read_export_file(Node, Filepath, CallerPid) -> + erpc:call(Node, emqx_ft_storage_fs_proxy, read_export_file_local, [Filepath, CallerPid]). diff --git a/apps/emqx_ft/test/emqx_ft_SUITE.erl b/apps/emqx_ft/test/emqx_ft_SUITE.erl index 9c9087732..5fb384c16 100644 --- a/apps/emqx_ft/test/emqx_ft_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_SUITE.erl @@ -58,8 +58,9 @@ end_per_suite(_Config) -> set_special_configs(Config) -> fun (emqx_ft) -> - Root = emqx_ft_test_helpers:ft_root(Config, node()), - emqx_ft_test_helpers:load_config(#{storage => #{type => local, root => Root}}); + emqx_ft_test_helpers:load_config(#{ + storage => emqx_ft_test_helpers:local_storage(Config) + }); (_) -> ok end. @@ -108,8 +109,9 @@ mk_cluster_specs(Config) -> {conf, [{[listeners, Proto, default, enabled], false} || Proto <- [ssl, ws, wss]]}, {env_handler, fun (emqx_ft) -> - Root = emqx_ft_test_helpers:ft_root(Config, node()), - emqx_ft_test_helpers:load_config(#{storage => #{type => local, root => Root}}); + emqx_ft_test_helpers:load_config(#{ + storage => emqx_ft_test_helpers:local_storage(Config) + }); (_) -> ok end} @@ -194,11 +196,10 @@ t_simple_transfer(Config) -> emqtt:publish(C, mk_fin_topic(FileId, Filesize), <<>>, 1) ), - [ReadyTransferId] = list_ready_transfers(?config(clientid, Config)), - {ok, TableQH} = emqx_ft_storage:get_ready_transfer(ReadyTransferId), + [Export] = list_exports(?config(clientid, Config)), ?assertEqual( - iolist_to_binary(Data), - iolist_to_binary(qlc:eval(TableQH)) + {ok, iolist_to_binary(Data)}, + read_export(Export) ). t_meta_conflict(Config) -> @@ -424,11 +425,10 @@ t_switch_node(Config) -> %% Now check consistency of the file - [ReadyTransferId] = list_ready_transfers(ClientId), - {ok, TableQH} = emqx_ft_storage:get_ready_transfer(ReadyTransferId), + [Export] = list_exports(ClientId), ?assertEqual( - iolist_to_binary(Data), - iolist_to_binary(qlc:eval(TableQH)) + {ok, iolist_to_binary(Data)}, + read_export(Export) ). t_assemble_crash(Config) -> @@ -501,27 +501,30 @@ t_unreliable_migrating_client(Config) -> ], _Context = run_commands(Commands, Context), - ReadyTransferIds = list_ready_transfers(?config(clientid, Config)), + Exports = list_exports(?config(clientid, Config)), % NOTE % The cluster had 2 assemblers running on two different nodes, because client sent `fin` % twice. This is currently expected, files must be identical anyway. - Node1Bin = atom_to_binary(Node1), - NodeSelfBin = atom_to_binary(NodeSelf), + Node1Str = atom_to_list(Node1), + NodeSelfStr = atom_to_list(NodeSelf), ?assertMatch( - [#{<<"node">> := Node1Bin}, #{<<"node">> := NodeSelfBin}], - lists:sort(ReadyTransferIds) + [#{"node" := Node1Str}, #{"node" := NodeSelfStr}], + lists:map( + fun(#{uri := URIString}) -> + #{query := QS} = uri_string:parse(URIString), + uri_string:dissect_query(QS) + end, + lists:sort(Exports) + ) ), [ - begin - {ok, TableQH} = emqx_ft_storage:get_ready_transfer(Id), - ?assertEqual( - Payload, - iolist_to_binary(qlc:eval(TableQH)) - ) - end - || Id <- ReadyTransferIds + ?assertEqual( + {ok, Payload}, + read_export(Export) + ) + || Export <- Exports ]. run_commands(Commands, Context) -> @@ -620,10 +623,10 @@ meta(FileName, Data) -> size => byte_size(FullData) }. -list_ready_transfers(ClientId) -> - {ok, ReadyTransfers} = emqx_ft_storage:ready_transfers(), - [ - Id - || {#{<<"clientid">> := CId} = Id, _Info} <- ReadyTransfers, - CId == ClientId - ]. +list_exports(ClientId) -> + {ok, Exports} = emqx_ft_storage:exports(), + [Export || Export = #{transfer := {CId, _}} <- Exports, CId == ClientId]. + +read_export(#{path := AbsFilepath}) -> + % TODO: only works for the local filesystem exporter right now + file:read_file(AbsFilepath). diff --git a/apps/emqx_ft/test/emqx_ft_api_SUITE.erl b/apps/emqx_ft/test/emqx_ft_api_SUITE.erl index 7b191e229..aff4d864c 100644 --- a/apps/emqx_ft/test/emqx_ft_api_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_api_SUITE.erl @@ -24,7 +24,7 @@ -include_lib("emqx/include/asserts.hrl"). --import(emqx_mgmt_api_test_util, [request/3, uri/1]). +-import(emqx_mgmt_api_test_util, [uri/1]). all() -> emqx_common_test_helpers:all(?MODULE). @@ -41,8 +41,9 @@ end_per_suite(_Config) -> set_special_configs(Config) -> fun (emqx_ft) -> - Root = emqx_ft_test_helpers:ft_root(Config, node()), - emqx_ft_test_helpers:load_config(#{storage => #{type => local, root => Root}}); + emqx_ft_test_helpers:load_config(#{ + storage => emqx_ft_test_helpers:local_storage(Config) + }); (_) -> ok end. @@ -61,40 +62,25 @@ t_list_ready_transfers(Config) -> ok = emqx_ft_test_helpers:upload_file(ClientId, <<"f1">>, <<"data">>, node()), - {ok, 200, Response} = request(get, uri(["file_transfer", "files"])), - - #{<<"files">> := Files} = emqx_json:decode(Response, [return_maps]), + {ok, 200, #{<<"files">> := Files}} = + request(get, uri(["file_transfer", "files"]), fun json/1), ?assertInclude( - #{<<"id">> := #{<<"clientid">> := ClientId, <<"fileid">> := <<"f1">>}}, + #{<<"clientid">> := ClientId, <<"fileid">> := <<"f1">>}, Files ). -%% This shouldn't happen in real life -%% but we need to test it anyway -t_list_ready_transfers_no_nodes(_Config) -> - _ = meck:new(mria_mnesia, [passthrough]), - _ = meck:expect(mria_mnesia, running_nodes, fun() -> [] end), - - ?assertMatch( - {ok, 503, _}, - request(get, uri(["file_transfer", "files"])) - ). - t_download_transfer(Config) -> ClientId = client_id(Config), ok = emqx_ft_test_helpers:upload_file(ClientId, <<"f1">>, <<"data">>, node()), ?assertMatch( - {ok, 503, _}, + {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}}, request( get, - uri(["file_transfer", "file"]) ++ - query(#{ - clientid => ClientId, - fileid => <<"f1">> - }) + uri(["file_transfer", "file"]) ++ query(#{fileref => <<"f1">>}), + fun json/1 ) ), @@ -104,8 +90,7 @@ t_download_transfer(Config) -> get, uri(["file_transfer", "file"]) ++ query(#{ - clientid => ClientId, - fileid => <<"f1">>, + fileref => <<"f1">>, node => <<"nonode@nohost">> }) ) @@ -117,22 +102,16 @@ t_download_transfer(Config) -> get, uri(["file_transfer", "file"]) ++ query(#{ - clientid => ClientId, - fileid => <<"unknown_file">>, + fileref => <<"unknown_file">>, node => node() }) ) ), - {ok, 200, Response} = request( - get, - uri(["file_transfer", "file"]) ++ - query(#{ - clientid => ClientId, - fileid => <<"f1">>, - node => node() - }) - ), + {ok, 200, #{<<"files">> := [File]}} = + request(get, uri(["file_transfer", "files"]), fun json/1), + + {ok, 200, Response} = request(get, uri([]) ++ maps:get(<<"uri">>, File)), ?assertEqual( <<"data">>, @@ -147,7 +126,18 @@ client_id(Config) -> atom_to_binary(?config(tc, Config), utf8). request(Method, Url) -> - request(Method, Url, []). + emqx_mgmt_api_test_util:request(Method, Url, []). + +request(Method, Url, Decoder) when is_function(Decoder) -> + case emqx_mgmt_api_test_util:request(Method, Url, []) of + {ok, Code, Body} -> + {ok, Code, Decoder(Body)}; + Otherwise -> + Otherwise + end. + +json(Body) when is_binary(Body) -> + emqx_json:decode(Body, [return_maps]). query(Params) -> KVs = lists:map(fun({K, V}) -> uri_encode(K) ++ "=" ++ uri_encode(V) end, maps:to_list(Params)), diff --git a/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl b/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl index 8df471deb..4c7c0f08c 100644 --- a/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl @@ -47,6 +47,7 @@ init_per_testcase(TC, Config) -> {ok, Pid} = emqx_ft_assembler_sup:start_link(), [ {storage_root, "file_transfer_root"}, + {exports_root, "file_transfer_exports"}, {file_id, atom_to_binary(TC)}, {assembler_sup, Pid} | Config @@ -85,11 +86,12 @@ t_assemble_empty_transfer(Config) -> ), Status = complete_assemble(Storage, Transfer, 0), ?assertEqual({shutdown, ok}, Status), - {ok, [Result = #{size := Size = 0}]} = emqx_ft_storage_fs:list(Storage, Transfer, result), - ?assertEqual( - {error, eof}, - emqx_ft_storage_fs:pread(Storage, Transfer, Result, 0, Size) - ), + {ok, [_Result = #{size := _Size = 0}]} = + emqx_ft_storage_fs:exports_local(Storage, Transfer), + % ?assertEqual( + % {error, eof}, + % emqx_ft_storage_fs:pread(Storage, Transfer, Result, 0, Size) + % ), ok. t_assemble_complete_local_transfer(Config) -> @@ -133,12 +135,13 @@ t_assemble_complete_local_transfer(Config) -> {ok, [ #{ size := TransferSize, - fragment := {result, #{}} + meta := #{} } ]}, - emqx_ft_storage_fs:list(Storage, Transfer, result) + emqx_ft_storage_fs:exports_local(Storage, Transfer) ), - {ok, [#{path := AssemblyFilename}]} = emqx_ft_storage_fs:list(Storage, Transfer, result), + {ok, [#{path := AssemblyFilename}]} = + emqx_ft_storage_fs:exports_local(Storage, Transfer), ?assertMatch( {ok, #file_info{type = regular, size = TransferSize}}, file:read_file_info(AssemblyFilename) @@ -191,18 +194,23 @@ complete_assemble(Storage, Transfer, Size, Timeout) -> t_list_transfers(Config) -> Storage = storage(Config), + {ok, Exports} = emqx_ft_storage_fs:exports_local(Storage), ?assertMatch( - {ok, #{ - {?CLIENTID1, <<"t_assemble_empty_transfer">>} := #{ - status := complete, - result := [#{path := _, size := 0, fragment := {result, _}}] + [ + #{ + transfer := {?CLIENTID2, <<"t_assemble_complete_local_transfer">>}, + path := _, + size := Size, + meta := #{name := "topsecret.pdf"} }, - {?CLIENTID2, <<"t_assemble_complete_local_transfer">>} := #{ - status := complete, - result := [#{path := _, size := Size, fragment := {result, _}}] + #{ + transfer := {?CLIENTID1, <<"t_assemble_empty_transfer">>}, + path := _, + size := 0, + meta := #{name := "important.pdf"} } - }} when Size > 0, - emqx_ft_storage_fs:transfers(Storage) + ] when Size > 0, + lists:sort(Exports) ). %% @@ -232,5 +240,9 @@ mk_fileid() -> storage(Config) -> #{ type => local, - root => ?config(storage_root, Config) + root => ?config(storage_root, Config), + exporter => #{ + type => local, + root => ?config(exports_root, Config) + } }. diff --git a/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl b/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl index 314b4a5f2..e43abf095 100644 --- a/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl @@ -56,7 +56,16 @@ t_update_config(_Config) -> {ok, _}, emqx_conf:update( [file_transfer], - #{<<"storage">> => #{<<"type">> => <<"local">>, <<"root">> => <<"/tmp/path">>}}, + #{ + <<"storage">> => #{ + <<"type">> => <<"local">>, + <<"root">> => <<"/tmp/path">>, + <<"exporter">> => #{ + <<"type">> => <<"local">>, + <<"root">> => <<"/tmp/exports">> + } + } + }, #{} ) ), diff --git a/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl b/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl index 5551cce27..5b3f56b7d 100644 --- a/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl @@ -22,11 +22,8 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("stdlib/include/assert.hrl"). --include_lib("emqx/include/asserts.hrl"). - all() -> [ - {group, single_node}, {group, cluster} ]. @@ -34,7 +31,6 @@ all() -> groups() -> [ - {single_node, [sequence], emqx_common_test_helpers:all(?MODULE) -- ?CLUSTER_CASES}, {cluster, [sequence], ?CLUSTER_CASES} ]. @@ -48,8 +44,9 @@ end_per_suite(_Config) -> set_special_configs(Config) -> fun (emqx_ft) -> - Root = emqx_ft_test_helpers:ft_root(Config, node()), - emqx_ft_test_helpers:load_config(#{storage => #{type => local, root => Root}}); + emqx_ft_test_helpers:load_config(#{ + storage => emqx_ft_test_helpers:local_storage(Config) + }); (_) -> ok end. @@ -74,47 +71,19 @@ end_per_group(_Group, _Config) -> %% Tests %%-------------------------------------------------------------------- -t_invalid_ready_transfer_id(Config) -> - ?assertMatch( - {error, _}, - emqx_ft_storage_fs:get_ready_transfer(storage(Config), #{ - <<"clientid">> => client_id(Config), - <<"fileid">> => <<"fileid">>, - <<"node">> => atom_to_binary('nonexistent@127.0.0.1') - }) - ), - ?assertMatch( - {error, _}, - emqx_ft_storage_fs:get_ready_transfer(storage(Config), #{ - <<"clientid">> => client_id(Config), - <<"fileid">> => <<"fileid">>, - <<"node">> => <<"nonexistent_as_atom@127.0.0.1">> - }) - ), - ?assertMatch( - {error, _}, - emqx_ft_storage_fs:get_ready_transfer(storage(Config), #{ - <<"clientid">> => client_id(Config), - <<"fileid">> => <<"nonexistent_file">>, - <<"node">> => node() - }) - ). - t_multinode_ready_transfers(Config) -> Node1 = ?config(additional_node, Config), - ok = emqx_ft_test_helpers:upload_file(<<"c1">>, <<"f1">>, <<"data">>, Node1), + ok = emqx_ft_test_helpers:upload_file(<<"c/1">>, <<"f:1">>, "fn1", <<"data">>, Node1), Node2 = node(), - ok = emqx_ft_test_helpers:upload_file(<<"c2">>, <<"f2">>, <<"data">>, Node2), + ok = emqx_ft_test_helpers:upload_file(<<"c/2">>, <<"f:2">>, "fn2", <<"data">>, Node2), - ?assertInclude( - #{<<"clientid">> := <<"c1">>, <<"fileid">> := <<"f1">>}, - ready_transfer_ids(Config) - ), - - ?assertInclude( - #{<<"clientid">> := <<"c2">>, <<"fileid">> := <<"f2">>}, - ready_transfer_ids(Config) + ?assertMatch( + [ + #{transfer := {<<"c/1">>, <<"f:1">>}, name := "fn1"}, + #{transfer := {<<"c/2">>, <<"f:2">>}, name := "fn2"} + ], + lists:sort(list_exports(Config)) ). %%-------------------------------------------------------------------- @@ -127,13 +96,13 @@ client_id(Config) -> storage(Config) -> #{ type => local, - root => ft_root(Config) + root => emqx_ft_test_helpers:root(Config, node(), ["transfers"]), + exporter => #{ + type => local, + root => emqx_ft_test_helpers:root(Config, node(), ["exports"]) + } }. -ft_root(Config) -> - emqx_ft_test_helpers:ft_root(Config, node()). - -ready_transfer_ids(Config) -> - {ok, ReadyTransfers} = emqx_ft_storage_fs:ready_transfers(storage(Config)), - {ReadyTransferIds, _} = lists:unzip(ReadyTransfers), - ReadyTransferIds. +list_exports(Config) -> + {ok, Exports} = emqx_ft_storage_fs:exports(storage(Config)), + Exports. diff --git a/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl b/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl index 89e9eb970..25cd200f1 100644 --- a/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl @@ -41,7 +41,14 @@ init_per_testcase(TC, Config) -> emqx_ft, fun(emqx_ft) -> emqx_ft_test_helpers:load_config(#{ - storage => #{type => local, root => mk_root(TC, Config)} + storage => #{ + type => local, + root => emqx_ft_test_helpers:root(Config, node(), [TC, transfers]), + exporter => #{ + type => local, + root => emqx_ft_test_helpers:root(Config, node(), [TC, exports]) + } + } }) end ), @@ -53,9 +60,6 @@ end_per_testcase(_TC, _Config) -> ok = application:stop(emqx_ft), ok. -mk_root(TC, Config) -> - filename:join([?config(priv_dir, Config), "file_transfer", TC, atom_to_list(node())]). - %% -define(NSEGS(Filesize, SegmentSize), (ceil(Filesize / SegmentSize) + 1)). diff --git a/apps/emqx_ft/test/emqx_ft_test_helpers.erl b/apps/emqx_ft/test/emqx_ft_test_helpers.erl index b756f8034..e62c74d81 100644 --- a/apps/emqx_ft/test/emqx_ft_test_helpers.erl +++ b/apps/emqx_ft/test/emqx_ft_test_helpers.erl @@ -30,7 +30,7 @@ start_additional_node(Config, Name) -> {configure_gen_rpc, true}, {env_handler, fun (emqx_ft) -> - load_config(#{storage => #{type => local, root => ft_root(Config, node())}}); + load_config(#{storage => local_storage(Config)}); (_) -> ok end} @@ -43,6 +43,13 @@ stop_additional_node(Node) -> ok = emqx_common_test_helpers:stop_slave(Node), ok. +local_storage(Config) -> + #{ + type => local, + root => root(Config, node(), [transfers]), + exporter => #{type => local, root => root(Config, node(), [exports])} + }. + load_config(Config) -> emqx_common_test_helpers:load_config(emqx_ft_schema, #{file_transfer => Config}). @@ -50,10 +57,8 @@ tcp_port(Node) -> {_, Port} = rpc:call(Node, emqx_config, get, [[listeners, tcp, default, bind]]), Port. -ft_root(Config, Node) -> - filename:join([ - ?config(priv_dir, Config), <<"file_transfer">>, atom_to_binary(Node) - ]). +root(Config, Node, Tail) -> + filename:join([?config(priv_dir, Config), "file_transfer", Node | Tail]). upload_file(ClientId, FileId, Data, Node) -> Port = tcp_port(Node),