diff --git a/apps/emqx_ft/etc/emqx_ft.conf b/apps/emqx_ft/etc/emqx_ft.conf
index 250dca6a9..8d921e79c 100644
--- a/apps/emqx_ft/etc/emqx_ft.conf
+++ b/apps/emqx_ft/etc/emqx_ft.conf
@@ -1,5 +1,8 @@
file_transfer {
storage {
type = local
+ exporter {
+ type = local
+ }
}
}
diff --git a/apps/emqx_ft/i18n/emqx_ft_schema_i18n.conf b/apps/emqx_ft/i18n/emqx_ft_schema_i18n.conf
index dd2d2a1dc..15c42dcfa 100644
--- a/apps/emqx_ft/i18n/emqx_ft_schema_i18n.conf
+++ b/apps/emqx_ft/i18n/emqx_ft_schema_i18n.conf
@@ -13,7 +13,7 @@ emqx_ft_schema {
local_type {
desc {
- en: "Use local file system to store uploaded files and temporary data."
+ en: "Use local file system to store uploaded fragments and temporary data."
zh: "使用本地文件系统来存储上传的文件和临时数据。"
}
label: {
@@ -24,7 +24,7 @@ emqx_ft_schema {
local_storage_root {
desc {
- en: "File system path to keep uploaded files and temporary data."
+ en: "File system path to keep uploaded fragments and temporary data."
zh: "保存上传文件和临时数据的文件系统路径。"
}
label: {
@@ -33,6 +33,40 @@ emqx_ft_schema {
}
}
+ local_storage_exporter {
+ desc {
+ en: "Exporter for the local file system storage backend.
"
+ "Exporter defines where and how fully transferred and assembled files are stored."
+ zh: ""
+ }
+ label: {
+ en: "Local Storage Exporter"
+ zh: ""
+ }
+ }
+
+ local_storage_exporter_type {
+ desc {
+ en: "Type of the Exporter to use."
+ zh: ""
+ }
+ label: {
+ en: "Local Storage Exporter Type"
+ zh: ""
+ }
+ }
+
+ local_storage_exporter_root {
+ desc {
+ en: "File system path to keep uploaded files."
+ zh: ""
+ }
+ label: {
+ en: "Local Filesystem Exporter Root"
+ zh: ""
+ }
+ }
+
local_storage_gc {
desc {
en: "Garbage collection settings for the intermediate and temporary files in the local file system."
diff --git a/apps/emqx_ft/src/emqx_ft.erl b/apps/emqx_ft/src/emqx_ft.erl
index 518807d9f..b7c8f0eac 100644
--- a/apps/emqx_ft/src/emqx_ft.erl
+++ b/apps/emqx_ft/src/emqx_ft.erl
@@ -33,7 +33,8 @@
]).
-export([
- decode_filemeta/1
+ decode_filemeta/1,
+ encode_filemeta/1
]).
-export([on_complete/4]).
@@ -114,6 +115,11 @@ decode_filemeta(Map) when is_map(Map) ->
{error, {invalid_filemeta, Error}}
end.
+encode_filemeta(Meta = #{}) ->
+ % TODO: Looks like this should be hocon's responsibility.
+ Schema = emqx_ft_schema:schema(filemeta),
+ hocon_tconf:make_serializable(Schema, emqx_map_lib:binary_key_map(Meta), #{}).
+
%%--------------------------------------------------------------------
%% Hooks
%%--------------------------------------------------------------------
diff --git a/apps/emqx_ft/src/emqx_ft_api.erl b/apps/emqx_ft/src/emqx_ft_api.erl
index ddc6e761a..143d629de 100644
--- a/apps/emqx_ft/src/emqx_ft_api.erl
+++ b/apps/emqx_ft/src/emqx_ft_api.erl
@@ -40,6 +40,10 @@
'/file_transfer/file'/2
]).
+-export([
+ mk_file_uri/3
+]).
+
-import(hoconsc, [mk/2, ref/1, ref/2]).
namespace() -> "file_transfer".
@@ -69,6 +73,11 @@ schema("/file_transfer/files") ->
}
};
schema("/file_transfer/file") ->
+ % TODO
+ % This is conceptually another API, because this logic is inherent only to the
+ % local filesystem exporter. Ideally, we won't even publish it if `emqx_ft` is
+ % configured with another exporter. Accordingly, things that look too specific
+ % for this module (i.e. `parse_filepath/1`) should go away in another API module.
#{
'operationId' => '/file_transfer/file',
get => #{
@@ -77,8 +86,7 @@ schema("/file_transfer/file") ->
description => ?DESC("file_get"),
parameters => [
ref(file_node),
- ref(file_clientid),
- ref(file_id)
+ ref(file_ref)
],
responses => #{
200 => <<"Operation success">>,
@@ -91,32 +99,40 @@ schema("/file_transfer/file") ->
}.
'/file_transfer/files'(get, #{}) ->
- case emqx_ft_storage:ready_transfers() of
+ case emqx_ft_storage:exports() of
{ok, Transfers} ->
- FormattedTransfers = lists:map(
- fun({Id, Info}) ->
- #{id => Id, info => format_file_info(Info)}
- end,
- Transfers
- ),
- {200, #{<<"files">> => FormattedTransfers}};
+ {200, #{<<"files">> => lists:map(fun format_export_info/1, Transfers)}};
{error, _} ->
{503, error_msg('SERVICE_UNAVAILABLE', <<"Service unavailable">>)}
end.
'/file_transfer/file'(get, #{query_string := Query}) ->
- case emqx_ft_storage:get_ready_transfer(Query) of
- {ok, FileData} ->
- {200,
- #{
- <<"content-type">> => <<"application/data">>,
- <<"content-disposition">> => <<"attachment">>
- },
- FileData};
- {error, enoent} ->
- {404, error_msg('NOT_FOUND', <<"Not found">>)};
- {error, Error} ->
- ?SLOG(warning, #{msg => "get_ready_transfer_fail", error => Error}),
+ try
+ Node = parse_node(maps:get(<<"node">>, Query)),
+ Filepath = parse_filepath(maps:get(<<"fileref">>, Query)),
+ case emqx_ft_storage_fs_proto_v1:read_export_file(Node, Filepath, self()) of
+ {ok, ReaderPid} ->
+ FileData = emqx_ft_storage_fs_reader:table(ReaderPid),
+ {200,
+ #{
+ <<"content-type">> => <<"application/data">>,
+ <<"content-disposition">> => <<"attachment">>
+ },
+ FileData};
+ {error, enoent} ->
+ {404, error_msg('NOT_FOUND', <<"Not found">>)};
+ {error, Error} ->
+ ?SLOG(warning, #{msg => "get_ready_transfer_fail", error => Error}),
+ {503, error_msg('SERVICE_UNAVAILABLE', <<"Service unavailable">>)}
+ end
+ catch
+ throw:{invalid, Param} ->
+ {404,
+ error_msg(
+ 'NOT_FOUND',
+ iolist_to_binary(["Invalid query parameter: ", Param])
+ )};
+ error:{erpc, noconnection} ->
{503, error_msg('SERVICE_UNAVAILABLE', <<"Service unavailable">>)}
end.
@@ -124,46 +140,100 @@ error_msg(Code, Msg) ->
#{code => Code, message => emqx_misc:readable_error_msg(Msg)}.
-spec fields(hocon_schema:name()) -> hocon_schema:fields().
+fields(file_ref) ->
+ [
+ {fileref,
+ hoconsc:mk(binary(), #{
+ in => query,
+ desc => <<"File reference">>,
+ example => <<"file1">>,
+ required => true
+ })}
+ ];
fields(file_node) ->
- Desc = <<"File Node">>,
- Meta = #{
- in => query, desc => Desc, example => <<"emqx@127.0.0.1">>, required => false
- },
- [{node, hoconsc:mk(binary(), Meta)}];
-fields(file_clientid) ->
- Desc = <<"File ClientId">>,
- Meta = #{
- in => query, desc => Desc, example => <<"client1">>, required => false
- },
- [{clientid, hoconsc:mk(binary(), Meta)}];
-fields(file_id) ->
- Desc = <<"File">>,
- Meta = #{
- in => query, desc => Desc, example => <<"file1">>, required => false
- },
- [{fileid, hoconsc:mk(binary(), Meta)}].
+ [
+ {node,
+ hoconsc:mk(binary(), #{
+ in => query,
+ desc => <<"Node under which the file is located">>,
+ example => atom_to_list(node()),
+ required => true
+ })}
+ ].
roots() ->
[
file_node,
- file_clientid,
- file_id
+ file_ref
+ ].
+
+mk_file_uri(_Options, Node, Filepath) ->
+ % TODO: qualify with `?BASE_PATH`
+ [
+ "/file_transfer/file?",
+ uri_string:compose_query([
+ {"node", atom_to_list(Node)},
+ {"fileref", Filepath}
+ ])
].
%%--------------------------------------------------------------------
%% Helpers
%%--------------------------------------------------------------------
-format_file_info(#{path := Path, size := Size, timestamp := Timestamp}) ->
- #{
- path => Path,
+format_export_info(
+ Info = #{
+ name := Name,
+ size := Size,
+ uri := URI,
+ timestamp := Timestamp,
+ transfer := {ClientId, FileId}
+ }
+) ->
+ Res = #{
+ name => iolist_to_binary(Name),
size => Size,
- timestamp => format_datetime(Timestamp)
- }.
+ timestamp => format_timestamp(Timestamp),
+ clientid => ClientId,
+ fileid => FileId,
+ uri => iolist_to_binary(URI)
+ },
+ case Info of
+ #{meta := Meta} ->
+ Res#{metadata => emqx_ft:encode_filemeta(Meta)};
+ #{} ->
+ Res
+ end.
-format_datetime({{Year, Month, Day}, {Hour, Minute, Second}}) ->
- iolist_to_binary(
- io_lib:format("~4..0w-~2..0w-~2..0wT~2..0w:~2..0w:~2..0w", [
- Year, Month, Day, Hour, Minute, Second
- ])
- ).
+format_timestamp(Timestamp) ->
+ iolist_to_binary(calendar:system_time_to_rfc3339(Timestamp, [{unit, second}])).
+
+parse_node(NodeBin) ->
+ case emqx_misc:safe_to_existing_atom(NodeBin) of
+ {ok, Node} ->
+ Node;
+ {error, _} ->
+ throw({invalid, NodeBin})
+ end.
+
+parse_filepath(PathBin) ->
+ case filename:pathtype(PathBin) of
+ relative ->
+ ok;
+ absolute ->
+ throw({invalid, PathBin})
+ end,
+ PathComponents = filename:split(PathBin),
+ case lists:any(fun is_special_component/1, PathComponents) of
+ false ->
+ filename:join(PathComponents);
+ true ->
+ throw({invalid, PathBin})
+ end.
+
+is_special_component(<<".", _/binary>>) ->
+ true;
+is_special_component([$. | _]) ->
+ true;
+is_special_component(_) ->
+ false.
diff --git a/apps/emqx_ft/src/emqx_ft_assembler.erl b/apps/emqx_ft/src/emqx_ft_assembler.erl
index ff845fee9..5489a232a 100644
--- a/apps/emqx_ft/src/emqx_ft_assembler.erl
+++ b/apps/emqx_ft/src/emqx_ft_assembler.erl
@@ -22,13 +22,13 @@
-export([callback_mode/0]).
-export([init/1]).
-export([handle_event/4]).
+-export([terminate/3]).
-record(st, {
storage :: _Storage,
transfer :: emqx_ft:transfer(),
assembly :: emqx_ft_assembly:t(),
- file :: {file:filename(), io:device(), term()} | undefined,
- hash
+ export :: _Export | undefined
}).
-define(NAME(Transfer), {n, l, {?MODULE, Transfer}}).
@@ -47,11 +47,11 @@ callback_mode() ->
handle_event_function.
init({Storage, Transfer, Size}) ->
+ _ = erlang:process_flag(trap_exit, true),
St = #st{
storage = Storage,
transfer = Transfer,
- assembly = emqx_ft_assembly:new(Size),
- hash = crypto:hash_init(sha256)
+ assembly = emqx_ft_assembly:new(Size)
},
{ok, idle, St}.
@@ -61,10 +61,10 @@ handle_event(info, kickoff, idle, St) ->
% We could wait for this message and handle it at the end of the assembling rather than at
% the beginning, however it would make error handling much more messier.
{next_state, list_local_fragments, St, ?internal([])};
-handle_event(internal, _, list_local_fragments, St = #st{assembly = Asm}) ->
+handle_event(internal, _, list_local_fragments, St = #st{}) ->
% TODO: what we do with non-transients errors here (e.g. `eacces`)?
{ok, Fragments} = emqx_ft_storage_fs:list(St#st.storage, St#st.transfer, fragment),
- NAsm = emqx_ft_assembly:update(emqx_ft_assembly:append(Asm, node(), Fragments)),
+ NAsm = emqx_ft_assembly:update(emqx_ft_assembly:append(St#st.assembly, node(), Fragments)),
NSt = St#st{assembly = NAsm},
case emqx_ft_assembly:status(NAsm) of
complete ->
@@ -110,8 +110,8 @@ handle_event(internal, _, start_assembling, St = #st{assembly = Asm}) ->
Filemeta = emqx_ft_assembly:filemeta(Asm),
Coverage = emqx_ft_assembly:coverage(Asm),
% TODO: better error handling
- {ok, Handle} = emqx_ft_storage_fs:open_file(St#st.storage, St#st.transfer, Filemeta),
- {next_state, {assemble, Coverage}, St#st{file = Handle}, ?internal([])};
+ {ok, Export} = export_start(Filemeta, St),
+ {next_state, {assemble, Coverage}, St#st{export = Export}, ?internal([])};
handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #st{}) ->
% TODO
% Currently, race is possible between getting segment info from the remote node and
@@ -119,15 +119,17 @@ handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #st{}) ->
% TODO: pipelining
% TODO: better error handling
{ok, Content} = pread(Node, Segment, St),
- {ok, NHandle} = emqx_ft_storage_fs:write(St#st.file, Content),
- {next_state, {assemble, Rest}, St#st{file = NHandle}, ?internal([])};
+ {ok, NExport} = export_write(St#st.export, Content),
+ {next_state, {assemble, Rest}, St#st{export = NExport}, ?internal([])};
handle_event(internal, _, {assemble, []}, St = #st{}) ->
{next_state, complete, St, ?internal([])};
-handle_event(internal, _, complete, St = #st{assembly = Asm, file = Handle}) ->
- Filemeta = emqx_ft_assembly:filemeta(Asm),
- Result = emqx_ft_storage_fs:complete(St#st.storage, St#st.transfer, Filemeta, Handle),
+handle_event(internal, _, complete, St = #st{}) ->
+ Result = export_complete(St#st.export),
ok = maybe_garbage_collect(Result, St),
- {stop, {shutdown, Result}}.
+ {stop, {shutdown, Result}, St#st{export = undefined}}.
+
+terminate(_Reason, _StateName, #st{export = Export}) ->
+ Export /= undefined andalso export_discard(Export).
pread(Node, Segment, St) when Node =:= node() ->
emqx_ft_storage_fs:pread(St#st.storage, St#st.transfer, Segment, 0, segsize(Segment));
@@ -136,8 +138,33 @@ pread(Node, Segment, St) ->
%%
-maybe_garbage_collect(ok, St = #st{storage = Storage, transfer = Transfer}) ->
- Nodes = emqx_ft_assembly:nodes(St#st.assembly),
+export_start(Filemeta, #st{storage = Storage, transfer = Transfer}) ->
+ {ExporterMod, Exporter} = emqx_ft_storage_fs:exporter(Storage),
+ case ExporterMod:start_export(Exporter, Transfer, Filemeta) of
+ {ok, Export} ->
+ {ok, {ExporterMod, Export}};
+ {error, _} = Error ->
+ Error
+ end.
+
+export_write({ExporterMod, Export}, Content) ->
+ case ExporterMod:write(Export, Content) of
+ {ok, ExportNext} ->
+ {ok, {ExporterMod, ExportNext}};
+ {error, _} = Error ->
+ Error
+ end.
+
+export_complete({ExporterMod, Export}) ->
+ ExporterMod:complete(Export).
+
+export_discard({ExporterMod, Export}) ->
+ ExporterMod:discard(Export).
+
+%%
+
+maybe_garbage_collect(ok, #st{storage = Storage, transfer = Transfer, assembly = Asm}) ->
+ Nodes = emqx_ft_assembly:nodes(Asm),
emqx_ft_storage_fs_gc:collect(Storage, Transfer, Nodes);
maybe_garbage_collect({error, _}, _St) ->
ok.
diff --git a/apps/emqx_ft/src/emqx_ft_fs_util.erl b/apps/emqx_ft/src/emqx_ft_fs_util.erl
new file mode 100644
index 000000000..198f4ccc5
--- /dev/null
+++ b/apps/emqx_ft/src/emqx_ft_fs_util.erl
@@ -0,0 +1,101 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_ft_fs_util).
+
+-include_lib("snabbkaffe/include/trace.hrl").
+
+-export([read_decode_file/2]).
+
+-export([fold/4]).
+
+-type glob() :: ['*' | globfun()].
+-type globfun() ::
+ fun((_Filename :: file:name()) -> boolean()).
+-type foldfun(Acc) ::
+ fun(
+ (
+ _Filepath :: file:name(),
+ _Info :: file:file_info() | {error, _IoError},
+ _Stack :: [file:name()],
+ Acc
+ ) -> Acc
+ ).
+
+%%
+
+-spec read_decode_file(file:name(), fun((binary()) -> Value)) ->
+ {ok, Value} | {error, _IoError}.
+read_decode_file(Filepath, DecodeFun) ->
+ case file:read_file(Filepath) of
+ {ok, Content} ->
+ safe_decode(Content, DecodeFun);
+ {error, _} = Error ->
+ Error
+ end.
+
+safe_decode(Content, DecodeFun) ->
+ try
+ {ok, DecodeFun(Content)}
+ catch
+ C:E:Stacktrace ->
+ ?tp(warning, "safe_decode_failed", #{
+ class => C,
+ exception => E,
+ stacktrace => Stacktrace
+ }),
+ {error, corrupted}
+ end.
+
+-spec fold(foldfun(Acc), Acc, _Root :: file:name(), glob()) ->
+ Acc.
+fold(Fun, Acc, Root, Glob) ->
+ fold(Fun, Acc, [], Root, Glob, []).
+
+fold(Fun, AccIn, Path, Root, [Glob | Rest], Stack) when Glob == '*' orelse is_function(Glob) ->
+ case file:list_dir(filename:join(Root, Path)) of
+ {ok, Filenames} ->
+ lists:foldl(
+ fun(FN, Acc) ->
+ case matches_glob(Glob, FN) of
+ true when Path == [] ->
+ fold(Fun, Acc, FN, Root, Rest, [FN | Stack]);
+ true ->
+ fold(Fun, Acc, filename:join(Path, FN), Root, Rest, [FN | Stack]);
+ false ->
+ Acc
+ end
+ end,
+ AccIn,
+ Filenames
+ );
+ {error, enotdir} ->
+ AccIn;
+ {error, Reason} ->
+ Fun(Path, {error, Reason}, Stack, AccIn)
+ end;
+fold(Fun, AccIn, Filepath, Root, [], Stack) ->
+ case file:read_link_info(filename:join(Root, Filepath), [{time, posix}, raw]) of
+ {ok, Info} ->
+ Fun(Filepath, Info, Stack, AccIn);
+ {error, Reason} ->
+ Fun(Filepath, {error, Reason}, Stack, AccIn)
+ end.
+
+matches_glob('*', _) ->
+ true;
+matches_glob(FilterFun, Filename) when is_function(FilterFun) ->
+ FilterFun(Filename).
diff --git a/apps/emqx_ft/src/emqx_ft_schema.erl b/apps/emqx_ft/src/emqx_ft_schema.erl
index d2b2b9299..37e2adafc 100644
--- a/apps/emqx_ft/src/emqx_ft_schema.erl
+++ b/apps/emqx_ft/src/emqx_ft_schema.erl
@@ -66,12 +66,33 @@ fields(local_storage) ->
desc => ?DESC("local_storage_root"),
required => false
}},
+ {exporter, #{
+ type => hoconsc:union([
+ ?REF(local_storage_exporter)
+ ]),
+ desc => ?DESC("local_storage_exporter"),
+ required => true
+ }},
{gc, #{
- type => hoconsc:ref(?MODULE, local_storage_gc),
+ type => ?REF(local_storage_gc),
desc => ?DESC("local_storage_gc"),
required => false
}}
];
+fields(local_storage_exporter) ->
+ [
+ {type, #{
+ type => local,
+ default => local,
+ required => false,
+ desc => ?DESC("local_storage_exporter_type")
+ }},
+ {root, #{
+ type => binary(),
+ desc => ?DESC("local_storage_exporter_root"),
+ required => false
+ }}
+ ];
fields(local_storage_gc) ->
[
{interval, #{
@@ -101,12 +122,15 @@ desc(file_transfer) ->
"File transfer settings";
desc(local_storage) ->
"File transfer local storage settings";
+desc(local_storage_exporter) ->
+ "Exporter settings for the File transfer local storage backend";
desc(local_storage_gc) ->
"Garbage collection settings for the File transfer local storage backend".
schema(filemeta) ->
#{
roots => [
+ % TODO nonempty
{name, hoconsc:mk(string(), #{required => true})},
{size, hoconsc:mk(non_neg_integer())},
{expire_at, hoconsc:mk(non_neg_integer())},
diff --git a/apps/emqx_ft/src/emqx_ft_storage.erl b/apps/emqx_ft/src/emqx_ft_storage.erl
index 0b8c38736..1d1c08ce9 100644
--- a/apps/emqx_ft/src/emqx_ft_storage.erl
+++ b/apps/emqx_ft/src/emqx_ft_storage.erl
@@ -24,8 +24,7 @@
store_segment/2,
assemble/2,
- ready_transfers/0,
- get_ready_transfer/1,
+ exports/0,
with_storage_type/3
]
@@ -34,12 +33,19 @@
-type storage() :: emqx_config:config().
-export_type([assemble_callback/0]).
+-export_type([export_data/0]).
-type assemble_callback() :: fun((ok | {error, term()}) -> any()).
--type ready_transfer_id() :: term().
--type ready_transfer_info() :: map().
--type ready_transfer_data() :: binary() | qlc:query_handle().
+-type export_info() :: #{
+ transfer := emqx_ft:transfer(),
+ name := file:name(),
+ size := _Bytes :: non_neg_integer(),
+ uri => uri_string:uri_string(),
+ meta => emqx_ft:filemeta()
+}.
+
+-type export_data() :: binary() | qlc:query_handle().
%%--------------------------------------------------------------------
%% Behaviour
@@ -57,10 +63,9 @@
ok | {async, pid()} | {error, term()}.
-callback assemble(storage(), emqx_ft:transfer(), _Size :: emqx_ft:bytes()) ->
ok | {async, pid()} | {error, term()}.
--callback ready_transfers(storage()) ->
- {ok, [{ready_transfer_id(), ready_transfer_info()}]} | {error, term()}.
--callback get_ready_transfer(storage(), ready_transfer_id()) ->
- {ok, ready_transfer_data()} | {error, term()}.
+
+-callback exports(storage()) ->
+ {ok, [export_info()]} | {error, term()}.
%%--------------------------------------------------------------------
%% API
@@ -95,15 +100,11 @@ assemble(Transfer, Size) ->
Mod = mod(),
Mod:assemble(storage(), Transfer, Size).
--spec ready_transfers() -> {ok, [{ready_transfer_id(), ready_transfer_info()}]} | {error, term()}.
-ready_transfers() ->
+-spec exports() ->
+ {ok, [export_info()]} | {error, term()}.
+exports() ->
Mod = mod(),
- Mod:ready_transfers(storage()).
-
--spec get_ready_transfer(ready_transfer_id()) -> {ok, ready_transfer_data()} | {error, term()}.
-get_ready_transfer(ReadyTransferId) ->
- Mod = mod(),
- Mod:get_ready_transfer(storage(), ReadyTransferId).
+ Mod:exports(storage()).
-spec with_storage_type(atom(), atom(), list(term())) -> any().
with_storage_type(Type, Fun, Args) ->
diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl
new file mode 100644
index 000000000..e0bffd444
--- /dev/null
+++ b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl
@@ -0,0 +1,352 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_ft_storage_exporter_fs).
+
+-include_lib("kernel/include/file.hrl").
+-include_lib("emqx/include/logger.hrl").
+
+%% Exporter API
+-export([start_export/3]).
+-export([write/2]).
+-export([complete/1]).
+-export([discard/1]).
+
+-export([list_local/1]).
+-export([list_local/2]).
+-export([start_reader/3]).
+
+-export([list/1]).
+% -export([list/2]).
+
+-export_type([export/0]).
+
+-type options() :: _TODO.
+-type transfer() :: emqx_ft:transfer().
+-type filemeta() :: emqx_ft:filemeta().
+-type exportinfo() :: #{
+ transfer := transfer(),
+ name := file:name(),
+ uri := uri_string:uri_string(),
+ timestamp := emqx_datetime:epoch_second(),
+ size := _Bytes :: non_neg_integer(),
+ meta => filemeta()
+}.
+
+-type file_error() :: emqx_ft_storage_fs:file_error().
+
+-opaque export() :: #{
+ path := file:name(),
+ handle := io:device(),
+ result := file:name(),
+ meta := filemeta(),
+ hash := crypto:hash_state()
+}.
+
+-type reader() :: pid().
+
+-define(TEMPDIR, "tmp").
+-define(MANIFEST, ".MANIFEST.json").
+
+%% NOTE
+%% Bucketing of resulting files to accomodate the storage backend for considerably
+%% large (e.g. > 10s of millions) amount of files.
+-define(BUCKET_HASH, sha).
+
+%% 2 symbols = at most 256 directories on the upper level
+-define(BUCKET1_LEN, 2).
+%% 2 symbols = at most 256 directories on the second level
+-define(BUCKET2_LEN, 2).
+
+-define(SLOG_UNEXPECTED(RelFilepath, Fileinfo, Options),
+ ?SLOG(notice, "filesystem_object_unexpected", #{
+ relpath => RelFilepath,
+ fileinfo => Fileinfo,
+ options => Options
+ })
+).
+
+-define(SLOG_INACCESSIBLE(RelFilepath, Reason, Options),
+ ?SLOG(warning, "filesystem_object_inaccessible", #{
+ relpath => RelFilepath,
+ reason => Reason,
+ options => Options
+ })
+).
+
+%%
+
+-spec start_export(options(), transfer(), filemeta()) ->
+ {ok, export()} | {error, file_error()}.
+start_export(Options, Transfer, Filemeta = #{name := Filename}) ->
+ TempFilepath = mk_temp_absfilepath(Options, Transfer, Filename),
+ ResultFilepath = mk_absfilepath(Options, Transfer, result, Filename),
+ _ = filelib:ensure_dir(TempFilepath),
+ case file:open(TempFilepath, [write, raw, binary]) of
+ {ok, Handle} ->
+ {ok, #{
+ path => TempFilepath,
+ handle => Handle,
+ result => ResultFilepath,
+ meta => Filemeta,
+ hash => init_checksum(Filemeta)
+ }};
+ {error, _} = Error ->
+ Error
+ end.
+
+-spec write(export(), iodata()) ->
+ {ok, export()} | {error, file_error()}.
+write(Export = #{handle := Handle, hash := Ctx}, IoData) ->
+ case file:write(Handle, IoData) of
+ ok ->
+ {ok, Export#{hash := update_checksum(Ctx, IoData)}};
+ {error, _} = Error ->
+ Error
+ end.
+
+-spec complete(export()) ->
+ ok | {error, {checksum, _Algo, _Computed}} | {error, file_error()}.
+complete(
+ Export = #{
+ path := Filepath,
+ handle := Handle,
+ result := ResultFilepath,
+ meta := FilemetaIn,
+ hash := Ctx
+ }
+) ->
+ case verify_checksum(Ctx, FilemetaIn) of
+ {ok, Filemeta} ->
+ ok = file:close(Handle),
+ _ = filelib:ensure_dir(ResultFilepath),
+ _ = file:write_file(mk_manifest_filename(ResultFilepath), encode_filemeta(Filemeta)),
+ file:rename(Filepath, ResultFilepath);
+ {error, _} = Error ->
+ _ = discard(Export),
+ Error
+ end.
+
+-spec discard(export()) ->
+ ok.
+discard(#{path := Filepath, handle := Handle}) ->
+ ok = file:close(Handle),
+ file:delete(Filepath).
+
+%%
+
+-spec list_local(options(), transfer()) ->
+ {ok, [exportinfo(), ...]} | {error, file_error()}.
+list_local(Options, Transfer) ->
+ TransferRoot = mk_absdir(Options, Transfer, result),
+ case
+ emqx_ft_fs_util:fold(
+ fun
+ (_Path, {error, Reason}, [], []) ->
+ {error, Reason};
+ (_Path, Fileinfo = #file_info{type = regular}, [Filename | _], Acc) ->
+ RelFilepath = filename:join(mk_result_reldir(Transfer) ++ [Filename]),
+ Info = mk_exportinfo(Options, Filename, RelFilepath, Transfer, Fileinfo),
+ [Info | Acc];
+ (RelFilepath, Fileinfo = #file_info{}, _, Acc) ->
+ ?SLOG_UNEXPECTED(RelFilepath, Fileinfo, Options),
+ Acc;
+ (RelFilepath, {error, Reason}, _, Acc) ->
+ ?SLOG_INACCESSIBLE(RelFilepath, Reason, Options),
+ Acc
+ end,
+ [],
+ TransferRoot,
+ [fun filter_manifest/1]
+ )
+ of
+ Infos = [_ | _] ->
+ {ok, Infos};
+ [] ->
+ {error, enoent};
+ {error, Reason} ->
+ {error, Reason}
+ end.
+
+-spec list_local(options()) ->
+ {ok, #{transfer() => [exportinfo(), ...]}}.
+list_local(Options) ->
+ Pattern = [
+ _Bucket1 = '*',
+ _Bucket2 = '*',
+ _Rest = '*',
+ _ClientId = '*',
+ _FileId = '*',
+ fun filter_manifest/1
+ ],
+ Root = get_storage_root(Options),
+ {ok,
+ emqx_ft_fs_util:fold(
+ fun(RelFilepath, Info, Stack, Acc) ->
+ read_exportinfo(Options, RelFilepath, Info, Stack, Acc)
+ end,
+ [],
+ Root,
+ Pattern
+ )}.
+
+filter_manifest(?MANIFEST) ->
+ % Filename equals `?MANIFEST`, there should also be a manifest for it.
+ false;
+filter_manifest(Filename) ->
+ ?MANIFEST =/= string:find(Filename, ?MANIFEST, trailing).
+
+read_exportinfo(Options, RelFilepath, Fileinfo = #file_info{type = regular}, Stack, Acc) ->
+ [Filename, FileId, ClientId | _] = Stack,
+ Transfer = dirnames_to_transfer(ClientId, FileId),
+ Info = mk_exportinfo(Options, Filename, RelFilepath, Transfer, Fileinfo),
+ [Info | Acc];
+read_exportinfo(Options, RelFilepath, Fileinfo = #file_info{}, _Stack, Acc) ->
+ ?SLOG_UNEXPECTED(RelFilepath, Fileinfo, Options),
+ Acc;
+read_exportinfo(Options, RelFilepath, {error, Reason}, _Stack, Acc) ->
+ ?SLOG_INACCESSIBLE(RelFilepath, Reason, Options),
+ Acc.
+
+mk_exportinfo(Options, Filename, RelFilepath, Transfer, Fileinfo) ->
+ Root = get_storage_root(Options),
+ try_read_filemeta(
+ filename:join(Root, mk_manifest_filename(RelFilepath)),
+ #{
+ transfer => Transfer,
+ name => Filename,
+ uri => mk_export_uri(Options, RelFilepath),
+ timestamp => Fileinfo#file_info.mtime,
+ size => Fileinfo#file_info.size,
+ path => filename:join(Root, RelFilepath)
+ }
+ ).
+
+try_read_filemeta(Filepath, Info) ->
+ case emqx_ft_fs_util:read_decode_file(Filepath, fun decode_filemeta/1) of
+ {ok, Filemeta} ->
+ Info#{meta => Filemeta};
+ {error, Reason} ->
+ ?SLOG(warning, "filemeta_inaccessible", #{
+ path => Filepath,
+ reason => Reason
+ }),
+ Info
+ end.
+
+mk_export_uri(Options, RelFilepath) ->
+ % emqx_ft_storage_exporter_fs_api:mk_export_uri(Options, RelFilepath).
+ emqx_ft_api:mk_file_uri(Options, node(), RelFilepath).
+
+-spec start_reader(options(), file:name(), _Caller :: pid()) ->
+ {ok, reader()} | {error, enoent}.
+start_reader(Options, Filepath, CallerPid) ->
+ Root = get_storage_root(Options),
+ case filelib:safe_relative_path(Filepath, Root) of
+ SafeFilepath when SafeFilepath /= unsafe ->
+ AbsFilepath = filename:join(Root, SafeFilepath),
+ emqx_ft_storage_fs_reader:start_supervised(CallerPid, AbsFilepath);
+ unsafe ->
+ {error, enoent}
+ end.
+
+%%
+
+-spec list(options()) ->
+ {ok, [exportinfo(), ...]} | {error, file_error()}.
+list(_Options) ->
+ Nodes = mria_mnesia:running_nodes(),
+ Results = emqx_ft_storage_fs_proto_v1:list_exports(Nodes),
+ {GoodResults, BadResults} = lists:partition(
+ fun
+ ({_Node, {ok, {ok, _}}}) -> true;
+ (_) -> false
+ end,
+ lists:zip(Nodes, Results)
+ ),
+ length(BadResults) > 0 andalso
+ ?SLOG(warning, #{msg => "list_remote_exports_failed", failures => BadResults}),
+ {ok, [File || {_Node, {ok, {ok, Files}}} <- GoodResults, File <- Files]}.
+
+%%
+
+init_checksum(#{checksum := {Algo, _}}) ->
+ crypto:hash_init(Algo);
+init_checksum(#{}) ->
+ crypto:hash_init(sha256).
+
+update_checksum(Ctx, IoData) ->
+ crypto:hash_update(Ctx, IoData).
+
+verify_checksum(Ctx, Filemeta = #{checksum := {Algo, Digest}}) ->
+ case crypto:hash_final(Ctx) of
+ Digest ->
+ {ok, Filemeta};
+ Mismatch ->
+ {error, {checksum, Algo, binary:encode_hex(Mismatch)}}
+ end;
+verify_checksum(Ctx, Filemeta = #{}) ->
+ Digest = crypto:hash_final(Ctx),
+ {ok, Filemeta#{checksum => {sha256, Digest}}}.
+
+%%
+
+-define(PRELUDE(Vsn, Meta), [<<"filemeta">>, Vsn, Meta]).
+
+encode_filemeta(Meta) ->
+ emqx_json:encode(?PRELUDE(_Vsn = 1, emqx_ft:encode_filemeta(Meta))).
+
+decode_filemeta(Binary) when is_binary(Binary) ->
+ ?PRELUDE(_Vsn = 1, Map) = emqx_json:decode(Binary, [return_maps]),
+ case emqx_ft:decode_filemeta(Map) of
+ {ok, Meta} ->
+ Meta;
+ {error, Reason} ->
+ error(Reason)
+ end.
+
+mk_manifest_filename(Filename) when is_list(Filename) ->
+ Filename ++ ?MANIFEST;
+mk_manifest_filename(Filename) when is_binary(Filename) ->
+ <>.
+
+mk_temp_absfilepath(Options, Transfer, Filename) ->
+ Unique = erlang:unique_integer([positive]),
+ TempFilename = integer_to_list(Unique) ++ "." ++ Filename,
+ filename:join(mk_absdir(Options, Transfer, temporary), TempFilename).
+
+mk_absdir(Options, _Transfer, temporary) ->
+ filename:join([get_storage_root(Options), ?TEMPDIR]);
+mk_absdir(Options, Transfer, result) ->
+ filename:join([get_storage_root(Options) | mk_result_reldir(Transfer)]).
+
+mk_absfilepath(Options, Transfer, What, Filename) ->
+ filename:join(mk_absdir(Options, Transfer, What), Filename).
+
+mk_result_reldir(Transfer = {ClientId, FileId}) ->
+ Hash = mk_transfer_hash(Transfer),
+ <<
+ Bucket1:?BUCKET1_LEN/binary,
+ Bucket2:?BUCKET2_LEN/binary,
+ BucketRest/binary
+ >> = binary:encode_hex(Hash),
+ [Bucket1, Bucket2, BucketRest, ClientId, FileId].
+
+mk_transfer_hash(Transfer) ->
+ crypto:hash(?BUCKET_HASH, term_to_binary(Transfer)).
+
+get_storage_root(Options) ->
+ maps:get(root, Options, filename:join([emqx:data_dir(), "ft", "exports"])).
diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_fs.erl
index b8aef5276..4a613dbcf 100644
--- a/apps/emqx_ft/src/emqx_ft_storage_fs.erl
+++ b/apps/emqx_ft/src/emqx_ft_storage_fs.erl
@@ -29,6 +29,7 @@
-export([child_spec/1]).
+% Segments-related API
-export([store_filemeta/3]).
-export([store_segment/3]).
-export([read_filemeta/2]).
@@ -43,22 +44,20 @@
-export([get_subdir/2]).
-export([get_subdir/3]).
--export([ready_transfers_local/1]).
--export([get_ready_transfer_local/3]).
+-export([exporter/1]).
--export([ready_transfers/1]).
--export([get_ready_transfer/2]).
-
--export([open_file/3]).
--export([complete/4]).
--export([write/2]).
--export([discard/1]).
+% Exporter-specific API
+-export([exports/1]).
+-export([exports_local/1]).
+-export([exports_local/2]).
-export_type([storage/0]).
-export_type([filefrag/1]).
-export_type([filefrag/0]).
-export_type([transferinfo/0]).
+-export_type([file_error/0]).
+
-type transfer() :: emqx_ft:transfer().
-type offset() :: emqx_ft:offset().
-type filemeta() :: emqx_ft:filemeta().
@@ -70,8 +69,7 @@
}.
-type transferinfo() :: #{
- status := complete | incomplete,
- result => [filefrag({result, #{}})]
+ filemeta => filemeta()
}.
% TODO naming
@@ -85,16 +83,20 @@
-type filefrag() :: filefrag(
{filemeta, filemeta()}
| {segment, segmentinfo()}
- | {result, #{}}
).
-define(FRAGDIR, frags).
-define(TEMPDIR, tmp).
--define(RESULTDIR, result).
-define(MANIFEST, "MANIFEST.json").
-define(SEGMENT, "SEG").
-type storage() :: #{
+ root => file:name(),
+ exporter => exporter()
+}.
+
+-type exporter() :: #{
+ type := local,
root => file:name()
}.
@@ -138,7 +140,9 @@ store_filemeta(Storage, Transfer, Meta) ->
% about it too much now.
{error, conflict};
{error, Reason} when Reason =:= notfound; Reason =:= corrupted; Reason =:= enoent ->
- write_file_atomic(Storage, Transfer, Filepath, encode_filemeta(Meta))
+ write_file_atomic(Storage, Transfer, Filepath, encode_filemeta(Meta));
+ {error, _} = Error ->
+ Error
end.
%% Store a segment in the backing filesystem.
@@ -153,17 +157,17 @@ store_segment(Storage, Transfer, Segment = {_Offset, Content}) ->
write_file_atomic(Storage, Transfer, Filepath, Content).
-spec read_filemeta(storage(), transfer()) ->
- {ok, filefrag({filemeta, filemeta()})} | {error, corrupted} | {error, file_error()}.
+ {ok, filemeta()} | {error, corrupted} | {error, file_error()}.
read_filemeta(Storage, Transfer) ->
Filepath = mk_filepath(Storage, Transfer, get_subdirs_for(fragment), ?MANIFEST),
read_file(Filepath, fun decode_filemeta/1).
--spec list(storage(), transfer(), _What :: fragment | result) ->
+-spec list(storage(), transfer(), _What :: fragment) ->
% Some lower level errors? {error, notfound}?
% Result will contain zero or only one filemeta.
{ok, [filefrag({filemeta, filemeta()} | {segment, segmentinfo()})]}
| {error, file_error()}.
-list(Storage, Transfer, What) ->
+list(Storage, Transfer, What = fragment) ->
Dirname = mk_filedir(Storage, Transfer, get_subdirs_for(What)),
case file:list_dir(Dirname) of
{ok, Filenames} ->
@@ -172,18 +176,13 @@ list(Storage, Transfer, What) ->
% extremely bad luck is needed for that, e.g. concurrent assemblers with
% different filemetas from different nodes). This might be unexpected for a
% client given the current protocol, yet might be helpful in the future.
- {ok, filtermap_files(get_filefrag_fun_for(What), Dirname, Filenames)};
+ {ok, filtermap_files(fun mk_filefrag/2, Dirname, Filenames)};
{error, enoent} ->
{ok, []};
{error, _} = Error ->
Error
end.
-get_filefrag_fun_for(fragment) ->
- fun mk_filefrag/2;
-get_filefrag_fun_for(result) ->
- fun mk_result_filefrag/2.
-
-spec pread(storage(), transfer(), filefrag(), offset(), _Size :: non_neg_integer()) ->
{ok, _Content :: iodata()} | {error, eof} | {error, file_error()}.
pread(_Storage, _Transfer, Frag, Offset, Size) ->
@@ -213,102 +212,28 @@ assemble(Storage, Transfer, Size) ->
{ok, Pid} = emqx_ft_assembler_sup:ensure_child(Storage, Transfer, Size),
{async, Pid}.
-get_ready_transfer(_Storage, ReadyTransferId) ->
- case parse_ready_transfer_id(ReadyTransferId) of
- {ok, {Node, Transfer}} ->
- try
- case emqx_ft_storage_fs_proto_v1:get_ready_transfer(Node, self(), Transfer) of
- {ok, ReaderPid} ->
- {ok, emqx_ft_storage_fs_reader:table(ReaderPid)};
- {error, _} = Error ->
- Error
- end
- catch
- error:Exc:Stacktrace ->
- ?SLOG(warning, #{
- msg => "get_ready_transfer_error",
- node => Node,
- transfer => Transfer,
- exception => Exc,
- stacktrace => Stacktrace
- }),
- {error, Exc};
- C:Exc:Stacktrace ->
- ?SLOG(warning, #{
- msg => "get_ready_transfer_fail",
- class => C,
- node => Node,
- transfer => Transfer,
- exception => Exc,
- stacktrace => Stacktrace
- }),
- {error, {C, Exc}}
- end;
- {error, _} = Error ->
- Error
+%%
+
+-spec exporter(storage()) -> {module(), _ExporterOptions}.
+exporter(Storage) ->
+ case maps:get(exporter, Storage) of
+ #{type := local} = Options ->
+ {emqx_ft_storage_exporter_fs, Options}
end.
-get_ready_transfer_local(Storage, CallerPid, Transfer) ->
- Dirname = mk_filedir(Storage, Transfer, get_subdirs_for(result)),
- case file:list_dir(Dirname) of
- {ok, [Filename | _]} ->
- FullFilename = filename:join([Dirname, Filename]),
- emqx_ft_storage_fs_reader:start_supervised(CallerPid, FullFilename);
- {error, _} = Error ->
- Error
- end.
+exports(Storage) ->
+ {ExporterMod, ExporterOpts} = exporter(Storage),
+ ExporterMod:list(ExporterOpts).
-ready_transfers(_Storage) ->
- Nodes = mria_mnesia:running_nodes(),
- Results = emqx_ft_storage_fs_proto_v1:ready_transfers(Nodes),
- {GoodResults, BadResults} = lists:partition(
- fun
- ({ok, _}) -> true;
- (_) -> false
- end,
- Results
- ),
- case {GoodResults, BadResults} of
- {[], _} ->
- ?SLOG(warning, #{msg => "ready_transfers", failures => BadResults}),
- {error, no_nodes};
- {_, []} ->
- {ok, [File || {ok, Files} <- GoodResults, File <- Files]};
- {_, _} ->
- ?SLOG(warning, #{msg => "ready_transfers", failures => BadResults}),
- {ok, [File || {ok, Files} <- GoodResults, File <- Files]}
- end.
+exports_local(Storage) ->
+ {ExporterMod, ExporterOpts} = exporter(Storage),
+ ExporterMod:list_local(ExporterOpts).
-ready_transfers_local(Storage) ->
- {ok, Transfers} = transfers(Storage),
- lists:filtermap(
- fun
- ({Transfer, #{status := complete, result := [Result | _]}}) ->
- {true, {ready_transfer_id(Transfer), maps:without([fragment], Result)}};
- (_) ->
- false
- end,
- maps:to_list(Transfers)
- ).
+exports_local(Storage, Transfer) ->
+ {ExporterMod, ExporterOpts} = exporter(Storage),
+ ExporterMod:list_local(ExporterOpts, Transfer).
-ready_transfer_id({ClientId, FileId}) ->
- #{
- <<"node">> => atom_to_binary(node()),
- <<"clientid">> => ClientId,
- <<"fileid">> => FileId
- }.
-
-parse_ready_transfer_id(#{
- <<"node">> := NodeBin, <<"clientid">> := ClientId, <<"fileid">> := FileId
-}) ->
- case emqx_misc:safe_to_existing_atom(NodeBin) of
- {ok, Node} ->
- {ok, {Node, {ClientId, FileId}}};
- {error, _} ->
- {error, {invalid_node, NodeBin}}
- end;
-parse_ready_transfer_id(#{}) ->
- {error, invalid_file_id}.
+%%
-spec transfers(storage()) ->
{ok, #{transfer() => transferinfo()}}.
@@ -345,17 +270,16 @@ transfers(Storage, ClientId, AccIn) ->
end.
read_transferinfo(Storage, Transfer, Acc) ->
- case list(Storage, Transfer, result) of
- {ok, Result = [_ | _]} ->
- Info = #{status => complete, result => Result},
- Acc#{Transfer => Info};
- {ok, []} ->
- Info = #{status => incomplete},
- Acc#{Transfer => Info};
- {error, _Reason} ->
- ?tp(warning, "list_result_failed", #{
+ case read_filemeta(Storage, Transfer) of
+ {ok, Filemeta} ->
+ Acc#{Transfer => #{filemeta => Filemeta}};
+ {error, enoent} ->
+ Acc#{Transfer => #{}};
+ {error, Reason} ->
+ ?tp(warning, "read_transferinfo_failed", #{
storage => Storage,
- transfer => Transfer
+ transfer => Transfer,
+ reason => Reason
}),
Acc
end.
@@ -365,7 +289,7 @@ read_transferinfo(Storage, Transfer, Acc) ->
get_subdir(Storage, Transfer) ->
mk_filedir(Storage, Transfer, []).
--spec get_subdir(storage(), transfer(), fragment | temporary | result) ->
+-spec get_subdir(storage(), transfer(), fragment | temporary) ->
file:name().
get_subdir(Storage, Transfer, What) ->
mk_filedir(Storage, Transfer, get_subdirs_for(What)).
@@ -373,84 +297,12 @@ get_subdir(Storage, Transfer, What) ->
get_subdirs_for(fragment) ->
[?FRAGDIR];
get_subdirs_for(temporary) ->
- [?TEMPDIR];
-get_subdirs_for(result) ->
- [?RESULTDIR].
-
-%%
-
--type handle() :: {file:name(), io:device(), crypto:hash_state()}.
-
--spec open_file(storage(), transfer(), filemeta()) ->
- {ok, handle()} | {error, file_error()}.
-open_file(Storage, Transfer, Filemeta) ->
- Filename = maps:get(name, Filemeta),
- TempFilepath = mk_temp_filepath(Storage, Transfer, Filename),
- _ = filelib:ensure_dir(TempFilepath),
- case file:open(TempFilepath, [write, raw, binary]) of
- {ok, Handle} ->
- % TODO: preserve filemeta
- {ok, {TempFilepath, Handle, init_checksum(Filemeta)}};
- {error, _} = Error ->
- Error
- end.
-
--spec write(handle(), iodata()) ->
- {ok, handle()} | {error, file_error()}.
-write({Filepath, IoDevice, Ctx}, IoData) ->
- case file:write(IoDevice, IoData) of
- ok ->
- {ok, {Filepath, IoDevice, update_checksum(Ctx, IoData)}};
- {error, _} = Error ->
- Error
- end.
-
--spec complete(storage(), transfer(), filemeta(), handle()) ->
- ok | {error, {checksum, _Algo, _Computed}} | {error, file_error()}.
-complete(Storage, Transfer, Filemeta = #{name := Filename}, Handle = {Filepath, IoDevice, Ctx}) ->
- TargetFilepath = mk_filepath(Storage, Transfer, get_subdirs_for(result), Filename),
- case verify_checksum(Ctx, Filemeta) of
- ok ->
- ok = file:close(IoDevice),
- mv_temp_file(Filepath, TargetFilepath);
- {error, _} = Error ->
- _ = discard(Handle),
- Error
- end.
-
--spec discard(handle()) ->
- ok.
-discard({Filepath, IoDevice, _Ctx}) ->
- ok = file:close(IoDevice),
- file:delete(Filepath).
-
-init_checksum(#{checksum := {Algo, _}}) ->
- crypto:hash_init(Algo);
-init_checksum(#{}) ->
- undefined.
-
-update_checksum(Ctx, IoData) when Ctx /= undefined ->
- crypto:hash_update(Ctx, IoData);
-update_checksum(undefined, _IoData) ->
- undefined.
-
-verify_checksum(Ctx, #{checksum := {Algo, Digest}}) when Ctx /= undefined ->
- case crypto:hash_final(Ctx) of
- Digest ->
- ok;
- Mismatch ->
- {error, {checksum, Algo, binary:encode_hex(Mismatch)}}
- end;
-verify_checksum(undefined, _) ->
- ok.
+ [?TEMPDIR].
-define(PRELUDE(Vsn, Meta), [<<"filemeta">>, Vsn, Meta]).
encode_filemeta(Meta) ->
- % TODO: Looks like this should be hocon's responsibility.
- Schema = emqx_ft_schema:schema(filemeta),
- Term = hocon_tconf:make_serializable(Schema, emqx_map_lib:binary_key_map(Meta), #{}),
- emqx_json:encode(?PRELUDE(_Vsn = 1, Term)).
+ emqx_json:encode(?PRELUDE(_Vsn = 1, emqx_ft:encode_filemeta(Meta))).
decode_filemeta(Binary) when is_binary(Binary) ->
?PRELUDE(_Vsn = 1, Map) = emqx_json:decode(Binary, [return_maps]),
@@ -490,33 +342,12 @@ try_list_dir(Dirname) ->
end.
get_storage_root(Storage) ->
- maps:get(root, Storage, filename:join(emqx:data_dir(), "file_transfer")).
+ maps:get(root, Storage, filename:join([emqx:data_dir(), "ft", "transfers"])).
-include_lib("kernel/include/file.hrl").
-read_file(Filepath) ->
- file:read_file(Filepath).
-
read_file(Filepath, DecodeFun) ->
- case read_file(Filepath) of
- {ok, Content} ->
- safe_decode(Content, DecodeFun);
- {error, _} = Error ->
- Error
- end.
-
-safe_decode(Content, DecodeFun) ->
- try
- {ok, DecodeFun(Content)}
- catch
- C:E:Stacktrace ->
- ?tp(warning, "safe_decode_failed", #{
- class => C,
- exception => E,
- stacktrace => Stacktrace
- }),
- {error, corrupted}
- end.
+ emqx_ft_fs_util:read_decode_file(Filepath, DecodeFun).
write_file_atomic(Storage, Transfer, Filepath, Content) when is_binary(Content) ->
TempFilepath = mk_temp_filepath(Storage, Transfer, filename:basename(Filepath)),
@@ -574,12 +405,6 @@ mk_filefrag(_Dirname, _Filename) ->
}),
false.
-mk_result_filefrag(Dirname, Filename) ->
- % NOTE
- % Any file in the `?RESULTDIR` subdir is currently considered the result of
- % the file transfer.
- mk_filefrag(Dirname, Filename, result, fun(_, _) -> {ok, #{}} end).
-
mk_filefrag(Dirname, Filename, Tag, Fun) ->
Filepath = filename:join(Dirname, Filename),
% TODO error handling?
diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl b/apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl
index 58c5dbfdf..20e4f468d 100644
--- a/apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl
+++ b/apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl
@@ -308,28 +308,18 @@ is_same_filepath(P1, P2) when is_binary(P1) ->
filepath_to_binary(S) ->
unicode:characters_to_binary(S, unicode, file:native_name_encoding()).
-get_segments_ttl(Storage, Transfer) ->
+get_segments_ttl(Storage, TransferInfo) ->
{MinTTL, MaxTTL} = emqx_ft_conf:segments_ttl(Storage),
- clamp(MinTTL, MaxTTL, try_get_filemeta_ttl(Storage, Transfer)).
+ clamp(MinTTL, MaxTTL, try_get_filemeta_ttl(TransferInfo)).
-try_get_filemeta_ttl(Storage, Transfer) ->
- case emqx_ft_storage_fs:read_filemeta(Storage, Transfer) of
- {ok, Filemeta} ->
- maps:get(segments_ttl, Filemeta, undefined);
- {error, _} ->
- undefined
- end.
+try_get_filemeta_ttl(#{filemeta := Filemeta}) ->
+ maps:get(segments_ttl, Filemeta, undefined);
+try_get_filemeta_ttl(#{}) ->
+ undefined.
clamp(Min, Max, V) ->
min(Max, max(Min, V)).
-% try_collect(_Subject, ok = Result, Then, _Stats) ->
-% Then(Result);
-% try_collect(_Subject, {ok, Result}, Then, _Stats) ->
-% Then(Result);
-% try_collect(Subject, {error, _} = Error, _Then, Stats) ->
-% register_gcstat_error(Subject, Error, Stats).
-
%%
init_gcstats() ->
diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs_proxy.erl b/apps/emqx_ft/src/emqx_ft_storage_fs_proxy.erl
index 7e19dd322..dbd0cd6dc 100644
--- a/apps/emqx_ft/src/emqx_ft_storage_fs_proxy.erl
+++ b/apps/emqx_ft/src/emqx_ft_storage_fs_proxy.erl
@@ -23,8 +23,8 @@
-export([
list_local/2,
pread_local/4,
- get_ready_transfer_local/2,
- ready_transfers_local/0
+ list_exports_local/0,
+ read_export_file_local/2
]).
list_local(Transfer, What) ->
@@ -33,8 +33,18 @@ list_local(Transfer, What) ->
pread_local(Transfer, Frag, Offset, Size) ->
emqx_ft_storage:with_storage_type(local, pread, [Transfer, Frag, Offset, Size]).
-get_ready_transfer_local(CallerPid, Transfer) ->
- emqx_ft_storage:with_storage_type(local, get_ready_transfer_local, [CallerPid, Transfer]).
+list_exports_local() ->
+ case emqx_ft_storage:with_storage_type(local, exporter, []) of
+ {emqx_ft_storage_exporter_fs, Options} ->
+ emqx_ft_storage_exporter_fs:list_local(Options);
+ InvalidExporter ->
+ {error, {invalid_exporter, InvalidExporter}}
+ end.
-ready_transfers_local() ->
- emqx_ft_storage:with_storage_type(local, ready_transfers_local, []).
+read_export_file_local(Filepath, CallerPid) ->
+ case emqx_ft_storage:with_storage_type(local, exporter, []) of
+ {emqx_ft_storage_exporter_fs, Options} ->
+ emqx_ft_storage_exporter_fs:start_reader(Options, Filepath, CallerPid);
+ InvalidExporter ->
+ {error, {invalid_exporter, InvalidExporter}}
+ end.
diff --git a/apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl b/apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl
index e2c4c93d7..f152928fe 100644
--- a/apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl
+++ b/apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl
@@ -22,8 +22,10 @@
-export([multilist/3]).
-export([pread/5]).
--export([ready_transfers/1]).
--export([get_ready_transfer/3]).
+
+%% TODO: These should be defined in a separate BPAPI
+-export([list_exports/1]).
+-export([read_export_file/3]).
-type offset() :: emqx_ft:offset().
-type transfer() :: emqx_ft:transfer().
@@ -44,19 +46,16 @@ multilist(Nodes, Transfer, What) ->
pread(Node, Transfer, Frag, Offset, Size) ->
erpc:call(Node, emqx_ft_storage_fs_proxy, pread_local, [Transfer, Frag, Offset, Size]).
--spec ready_transfers([node()]) ->
- [
- {ok, [{emqx_ft_storage:ready_transfer_id(), emqx_ft_storage:ready_transfer_info()}]}
- | {error, term()}
- | {exit, term()}
- | {throw, term()}
- ].
-ready_transfers(Nodes) ->
- erpc:multicall(Nodes, emqx_ft_storage_fs_proxy, ready_transfers_local, []).
+%%
--spec get_ready_transfer(node(), pid(), emqx_ft_storage:ready_transfer_id()) ->
- {ok, emqx_ft_storage:ready_transfer_data()}
+-spec list_exports([node()]) ->
+ emqx_rpc:erpc_multicall([emqx_ft_storage:export_info()]).
+list_exports(Nodes) ->
+ erpc:multicall(Nodes, emqx_ft_storage_fs_proxy, list_exports_local, []).
+
+-spec read_export_file(node(), file:name(), pid()) ->
+ {ok, emqx_ft_storage:export_data()}
| {error, term()}
| no_return().
-get_ready_transfer(Node, CallerPid, ReadyTransferId) ->
- erpc:call(Node, emqx_ft_storage_fs_proxy, get_ready_transfer_local, [CallerPid, ReadyTransferId]).
+read_export_file(Node, Filepath, CallerPid) ->
+ erpc:call(Node, emqx_ft_storage_fs_proxy, read_export_file_local, [Filepath, CallerPid]).
diff --git a/apps/emqx_ft/test/emqx_ft_SUITE.erl b/apps/emqx_ft/test/emqx_ft_SUITE.erl
index 9c9087732..5fb384c16 100644
--- a/apps/emqx_ft/test/emqx_ft_SUITE.erl
+++ b/apps/emqx_ft/test/emqx_ft_SUITE.erl
@@ -58,8 +58,9 @@ end_per_suite(_Config) ->
set_special_configs(Config) ->
fun
(emqx_ft) ->
- Root = emqx_ft_test_helpers:ft_root(Config, node()),
- emqx_ft_test_helpers:load_config(#{storage => #{type => local, root => Root}});
+ emqx_ft_test_helpers:load_config(#{
+ storage => emqx_ft_test_helpers:local_storage(Config)
+ });
(_) ->
ok
end.
@@ -108,8 +109,9 @@ mk_cluster_specs(Config) ->
{conf, [{[listeners, Proto, default, enabled], false} || Proto <- [ssl, ws, wss]]},
{env_handler, fun
(emqx_ft) ->
- Root = emqx_ft_test_helpers:ft_root(Config, node()),
- emqx_ft_test_helpers:load_config(#{storage => #{type => local, root => Root}});
+ emqx_ft_test_helpers:load_config(#{
+ storage => emqx_ft_test_helpers:local_storage(Config)
+ });
(_) ->
ok
end}
@@ -194,11 +196,10 @@ t_simple_transfer(Config) ->
emqtt:publish(C, mk_fin_topic(FileId, Filesize), <<>>, 1)
),
- [ReadyTransferId] = list_ready_transfers(?config(clientid, Config)),
- {ok, TableQH} = emqx_ft_storage:get_ready_transfer(ReadyTransferId),
+ [Export] = list_exports(?config(clientid, Config)),
?assertEqual(
- iolist_to_binary(Data),
- iolist_to_binary(qlc:eval(TableQH))
+ {ok, iolist_to_binary(Data)},
+ read_export(Export)
).
t_meta_conflict(Config) ->
@@ -424,11 +425,10 @@ t_switch_node(Config) ->
%% Now check consistency of the file
- [ReadyTransferId] = list_ready_transfers(ClientId),
- {ok, TableQH} = emqx_ft_storage:get_ready_transfer(ReadyTransferId),
+ [Export] = list_exports(ClientId),
?assertEqual(
- iolist_to_binary(Data),
- iolist_to_binary(qlc:eval(TableQH))
+ {ok, iolist_to_binary(Data)},
+ read_export(Export)
).
t_assemble_crash(Config) ->
@@ -501,27 +501,30 @@ t_unreliable_migrating_client(Config) ->
],
_Context = run_commands(Commands, Context),
- ReadyTransferIds = list_ready_transfers(?config(clientid, Config)),
+ Exports = list_exports(?config(clientid, Config)),
% NOTE
% The cluster had 2 assemblers running on two different nodes, because client sent `fin`
% twice. This is currently expected, files must be identical anyway.
- Node1Bin = atom_to_binary(Node1),
- NodeSelfBin = atom_to_binary(NodeSelf),
+ Node1Str = atom_to_list(Node1),
+ NodeSelfStr = atom_to_list(NodeSelf),
?assertMatch(
- [#{<<"node">> := Node1Bin}, #{<<"node">> := NodeSelfBin}],
- lists:sort(ReadyTransferIds)
+ [#{"node" := Node1Str}, #{"node" := NodeSelfStr}],
+ lists:map(
+ fun(#{uri := URIString}) ->
+ #{query := QS} = uri_string:parse(URIString),
+ uri_string:dissect_query(QS)
+ end,
+ lists:sort(Exports)
+ )
),
[
- begin
- {ok, TableQH} = emqx_ft_storage:get_ready_transfer(Id),
- ?assertEqual(
- Payload,
- iolist_to_binary(qlc:eval(TableQH))
- )
- end
- || Id <- ReadyTransferIds
+ ?assertEqual(
+ {ok, Payload},
+ read_export(Export)
+ )
+ || Export <- Exports
].
run_commands(Commands, Context) ->
@@ -620,10 +623,10 @@ meta(FileName, Data) ->
size => byte_size(FullData)
}.
-list_ready_transfers(ClientId) ->
- {ok, ReadyTransfers} = emqx_ft_storage:ready_transfers(),
- [
- Id
- || {#{<<"clientid">> := CId} = Id, _Info} <- ReadyTransfers,
- CId == ClientId
- ].
+list_exports(ClientId) ->
+ {ok, Exports} = emqx_ft_storage:exports(),
+ [Export || Export = #{transfer := {CId, _}} <- Exports, CId == ClientId].
+
+read_export(#{path := AbsFilepath}) ->
+ % TODO: only works for the local filesystem exporter right now
+ file:read_file(AbsFilepath).
diff --git a/apps/emqx_ft/test/emqx_ft_api_SUITE.erl b/apps/emqx_ft/test/emqx_ft_api_SUITE.erl
index 7b191e229..aff4d864c 100644
--- a/apps/emqx_ft/test/emqx_ft_api_SUITE.erl
+++ b/apps/emqx_ft/test/emqx_ft_api_SUITE.erl
@@ -24,7 +24,7 @@
-include_lib("emqx/include/asserts.hrl").
--import(emqx_mgmt_api_test_util, [request/3, uri/1]).
+-import(emqx_mgmt_api_test_util, [uri/1]).
all() -> emqx_common_test_helpers:all(?MODULE).
@@ -41,8 +41,9 @@ end_per_suite(_Config) ->
set_special_configs(Config) ->
fun
(emqx_ft) ->
- Root = emqx_ft_test_helpers:ft_root(Config, node()),
- emqx_ft_test_helpers:load_config(#{storage => #{type => local, root => Root}});
+ emqx_ft_test_helpers:load_config(#{
+ storage => emqx_ft_test_helpers:local_storage(Config)
+ });
(_) ->
ok
end.
@@ -61,40 +62,25 @@ t_list_ready_transfers(Config) ->
ok = emqx_ft_test_helpers:upload_file(ClientId, <<"f1">>, <<"data">>, node()),
- {ok, 200, Response} = request(get, uri(["file_transfer", "files"])),
-
- #{<<"files">> := Files} = emqx_json:decode(Response, [return_maps]),
+ {ok, 200, #{<<"files">> := Files}} =
+ request(get, uri(["file_transfer", "files"]), fun json/1),
?assertInclude(
- #{<<"id">> := #{<<"clientid">> := ClientId, <<"fileid">> := <<"f1">>}},
+ #{<<"clientid">> := ClientId, <<"fileid">> := <<"f1">>},
Files
).
-%% This shouldn't happen in real life
-%% but we need to test it anyway
-t_list_ready_transfers_no_nodes(_Config) ->
- _ = meck:new(mria_mnesia, [passthrough]),
- _ = meck:expect(mria_mnesia, running_nodes, fun() -> [] end),
-
- ?assertMatch(
- {ok, 503, _},
- request(get, uri(["file_transfer", "files"]))
- ).
-
t_download_transfer(Config) ->
ClientId = client_id(Config),
ok = emqx_ft_test_helpers:upload_file(ClientId, <<"f1">>, <<"data">>, node()),
?assertMatch(
- {ok, 503, _},
+ {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
request(
get,
- uri(["file_transfer", "file"]) ++
- query(#{
- clientid => ClientId,
- fileid => <<"f1">>
- })
+ uri(["file_transfer", "file"]) ++ query(#{fileref => <<"f1">>}),
+ fun json/1
)
),
@@ -104,8 +90,7 @@ t_download_transfer(Config) ->
get,
uri(["file_transfer", "file"]) ++
query(#{
- clientid => ClientId,
- fileid => <<"f1">>,
+ fileref => <<"f1">>,
node => <<"nonode@nohost">>
})
)
@@ -117,22 +102,16 @@ t_download_transfer(Config) ->
get,
uri(["file_transfer", "file"]) ++
query(#{
- clientid => ClientId,
- fileid => <<"unknown_file">>,
+ fileref => <<"unknown_file">>,
node => node()
})
)
),
- {ok, 200, Response} = request(
- get,
- uri(["file_transfer", "file"]) ++
- query(#{
- clientid => ClientId,
- fileid => <<"f1">>,
- node => node()
- })
- ),
+ {ok, 200, #{<<"files">> := [File]}} =
+ request(get, uri(["file_transfer", "files"]), fun json/1),
+
+ {ok, 200, Response} = request(get, uri([]) ++ maps:get(<<"uri">>, File)),
?assertEqual(
<<"data">>,
@@ -147,7 +126,18 @@ client_id(Config) ->
atom_to_binary(?config(tc, Config), utf8).
request(Method, Url) ->
- request(Method, Url, []).
+ emqx_mgmt_api_test_util:request(Method, Url, []).
+
+request(Method, Url, Decoder) when is_function(Decoder) ->
+ case emqx_mgmt_api_test_util:request(Method, Url, []) of
+ {ok, Code, Body} ->
+ {ok, Code, Decoder(Body)};
+ Otherwise ->
+ Otherwise
+ end.
+
+json(Body) when is_binary(Body) ->
+ emqx_json:decode(Body, [return_maps]).
query(Params) ->
KVs = lists:map(fun({K, V}) -> uri_encode(K) ++ "=" ++ uri_encode(V) end, maps:to_list(Params)),
diff --git a/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl b/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl
index 8df471deb..4c7c0f08c 100644
--- a/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl
+++ b/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl
@@ -47,6 +47,7 @@ init_per_testcase(TC, Config) ->
{ok, Pid} = emqx_ft_assembler_sup:start_link(),
[
{storage_root, "file_transfer_root"},
+ {exports_root, "file_transfer_exports"},
{file_id, atom_to_binary(TC)},
{assembler_sup, Pid}
| Config
@@ -85,11 +86,12 @@ t_assemble_empty_transfer(Config) ->
),
Status = complete_assemble(Storage, Transfer, 0),
?assertEqual({shutdown, ok}, Status),
- {ok, [Result = #{size := Size = 0}]} = emqx_ft_storage_fs:list(Storage, Transfer, result),
- ?assertEqual(
- {error, eof},
- emqx_ft_storage_fs:pread(Storage, Transfer, Result, 0, Size)
- ),
+ {ok, [_Result = #{size := _Size = 0}]} =
+ emqx_ft_storage_fs:exports_local(Storage, Transfer),
+ % ?assertEqual(
+ % {error, eof},
+ % emqx_ft_storage_fs:pread(Storage, Transfer, Result, 0, Size)
+ % ),
ok.
t_assemble_complete_local_transfer(Config) ->
@@ -133,12 +135,13 @@ t_assemble_complete_local_transfer(Config) ->
{ok, [
#{
size := TransferSize,
- fragment := {result, #{}}
+ meta := #{}
}
]},
- emqx_ft_storage_fs:list(Storage, Transfer, result)
+ emqx_ft_storage_fs:exports_local(Storage, Transfer)
),
- {ok, [#{path := AssemblyFilename}]} = emqx_ft_storage_fs:list(Storage, Transfer, result),
+ {ok, [#{path := AssemblyFilename}]} =
+ emqx_ft_storage_fs:exports_local(Storage, Transfer),
?assertMatch(
{ok, #file_info{type = regular, size = TransferSize}},
file:read_file_info(AssemblyFilename)
@@ -191,18 +194,23 @@ complete_assemble(Storage, Transfer, Size, Timeout) ->
t_list_transfers(Config) ->
Storage = storage(Config),
+ {ok, Exports} = emqx_ft_storage_fs:exports_local(Storage),
?assertMatch(
- {ok, #{
- {?CLIENTID1, <<"t_assemble_empty_transfer">>} := #{
- status := complete,
- result := [#{path := _, size := 0, fragment := {result, _}}]
+ [
+ #{
+ transfer := {?CLIENTID2, <<"t_assemble_complete_local_transfer">>},
+ path := _,
+ size := Size,
+ meta := #{name := "topsecret.pdf"}
},
- {?CLIENTID2, <<"t_assemble_complete_local_transfer">>} := #{
- status := complete,
- result := [#{path := _, size := Size, fragment := {result, _}}]
+ #{
+ transfer := {?CLIENTID1, <<"t_assemble_empty_transfer">>},
+ path := _,
+ size := 0,
+ meta := #{name := "important.pdf"}
}
- }} when Size > 0,
- emqx_ft_storage_fs:transfers(Storage)
+ ] when Size > 0,
+ lists:sort(Exports)
).
%%
@@ -232,5 +240,9 @@ mk_fileid() ->
storage(Config) ->
#{
type => local,
- root => ?config(storage_root, Config)
+ root => ?config(storage_root, Config),
+ exporter => #{
+ type => local,
+ root => ?config(exports_root, Config)
+ }
}.
diff --git a/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl b/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl
index 314b4a5f2..e43abf095 100644
--- a/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl
+++ b/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl
@@ -56,7 +56,16 @@ t_update_config(_Config) ->
{ok, _},
emqx_conf:update(
[file_transfer],
- #{<<"storage">> => #{<<"type">> => <<"local">>, <<"root">> => <<"/tmp/path">>}},
+ #{
+ <<"storage">> => #{
+ <<"type">> => <<"local">>,
+ <<"root">> => <<"/tmp/path">>,
+ <<"exporter">> => #{
+ <<"type">> => <<"local">>,
+ <<"root">> => <<"/tmp/exports">>
+ }
+ }
+ },
#{}
)
),
diff --git a/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl b/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl
index 5551cce27..5b3f56b7d 100644
--- a/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl
+++ b/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl
@@ -22,11 +22,8 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("stdlib/include/assert.hrl").
--include_lib("emqx/include/asserts.hrl").
-
all() ->
[
- {group, single_node},
{group, cluster}
].
@@ -34,7 +31,6 @@ all() ->
groups() ->
[
- {single_node, [sequence], emqx_common_test_helpers:all(?MODULE) -- ?CLUSTER_CASES},
{cluster, [sequence], ?CLUSTER_CASES}
].
@@ -48,8 +44,9 @@ end_per_suite(_Config) ->
set_special_configs(Config) ->
fun
(emqx_ft) ->
- Root = emqx_ft_test_helpers:ft_root(Config, node()),
- emqx_ft_test_helpers:load_config(#{storage => #{type => local, root => Root}});
+ emqx_ft_test_helpers:load_config(#{
+ storage => emqx_ft_test_helpers:local_storage(Config)
+ });
(_) ->
ok
end.
@@ -74,47 +71,19 @@ end_per_group(_Group, _Config) ->
%% Tests
%%--------------------------------------------------------------------
-t_invalid_ready_transfer_id(Config) ->
- ?assertMatch(
- {error, _},
- emqx_ft_storage_fs:get_ready_transfer(storage(Config), #{
- <<"clientid">> => client_id(Config),
- <<"fileid">> => <<"fileid">>,
- <<"node">> => atom_to_binary('nonexistent@127.0.0.1')
- })
- ),
- ?assertMatch(
- {error, _},
- emqx_ft_storage_fs:get_ready_transfer(storage(Config), #{
- <<"clientid">> => client_id(Config),
- <<"fileid">> => <<"fileid">>,
- <<"node">> => <<"nonexistent_as_atom@127.0.0.1">>
- })
- ),
- ?assertMatch(
- {error, _},
- emqx_ft_storage_fs:get_ready_transfer(storage(Config), #{
- <<"clientid">> => client_id(Config),
- <<"fileid">> => <<"nonexistent_file">>,
- <<"node">> => node()
- })
- ).
-
t_multinode_ready_transfers(Config) ->
Node1 = ?config(additional_node, Config),
- ok = emqx_ft_test_helpers:upload_file(<<"c1">>, <<"f1">>, <<"data">>, Node1),
+ ok = emqx_ft_test_helpers:upload_file(<<"c/1">>, <<"f:1">>, "fn1", <<"data">>, Node1),
Node2 = node(),
- ok = emqx_ft_test_helpers:upload_file(<<"c2">>, <<"f2">>, <<"data">>, Node2),
+ ok = emqx_ft_test_helpers:upload_file(<<"c/2">>, <<"f:2">>, "fn2", <<"data">>, Node2),
- ?assertInclude(
- #{<<"clientid">> := <<"c1">>, <<"fileid">> := <<"f1">>},
- ready_transfer_ids(Config)
- ),
-
- ?assertInclude(
- #{<<"clientid">> := <<"c2">>, <<"fileid">> := <<"f2">>},
- ready_transfer_ids(Config)
+ ?assertMatch(
+ [
+ #{transfer := {<<"c/1">>, <<"f:1">>}, name := "fn1"},
+ #{transfer := {<<"c/2">>, <<"f:2">>}, name := "fn2"}
+ ],
+ lists:sort(list_exports(Config))
).
%%--------------------------------------------------------------------
@@ -127,13 +96,13 @@ client_id(Config) ->
storage(Config) ->
#{
type => local,
- root => ft_root(Config)
+ root => emqx_ft_test_helpers:root(Config, node(), ["transfers"]),
+ exporter => #{
+ type => local,
+ root => emqx_ft_test_helpers:root(Config, node(), ["exports"])
+ }
}.
-ft_root(Config) ->
- emqx_ft_test_helpers:ft_root(Config, node()).
-
-ready_transfer_ids(Config) ->
- {ok, ReadyTransfers} = emqx_ft_storage_fs:ready_transfers(storage(Config)),
- {ReadyTransferIds, _} = lists:unzip(ReadyTransfers),
- ReadyTransferIds.
+list_exports(Config) ->
+ {ok, Exports} = emqx_ft_storage_fs:exports(storage(Config)),
+ Exports.
diff --git a/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl b/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl
index 89e9eb970..25cd200f1 100644
--- a/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl
+++ b/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl
@@ -41,7 +41,14 @@ init_per_testcase(TC, Config) ->
emqx_ft,
fun(emqx_ft) ->
emqx_ft_test_helpers:load_config(#{
- storage => #{type => local, root => mk_root(TC, Config)}
+ storage => #{
+ type => local,
+ root => emqx_ft_test_helpers:root(Config, node(), [TC, transfers]),
+ exporter => #{
+ type => local,
+ root => emqx_ft_test_helpers:root(Config, node(), [TC, exports])
+ }
+ }
})
end
),
@@ -53,9 +60,6 @@ end_per_testcase(_TC, _Config) ->
ok = application:stop(emqx_ft),
ok.
-mk_root(TC, Config) ->
- filename:join([?config(priv_dir, Config), "file_transfer", TC, atom_to_list(node())]).
-
%%
-define(NSEGS(Filesize, SegmentSize), (ceil(Filesize / SegmentSize) + 1)).
diff --git a/apps/emqx_ft/test/emqx_ft_test_helpers.erl b/apps/emqx_ft/test/emqx_ft_test_helpers.erl
index b756f8034..e62c74d81 100644
--- a/apps/emqx_ft/test/emqx_ft_test_helpers.erl
+++ b/apps/emqx_ft/test/emqx_ft_test_helpers.erl
@@ -30,7 +30,7 @@ start_additional_node(Config, Name) ->
{configure_gen_rpc, true},
{env_handler, fun
(emqx_ft) ->
- load_config(#{storage => #{type => local, root => ft_root(Config, node())}});
+ load_config(#{storage => local_storage(Config)});
(_) ->
ok
end}
@@ -43,6 +43,13 @@ stop_additional_node(Node) ->
ok = emqx_common_test_helpers:stop_slave(Node),
ok.
+local_storage(Config) ->
+ #{
+ type => local,
+ root => root(Config, node(), [transfers]),
+ exporter => #{type => local, root => root(Config, node(), [exports])}
+ }.
+
load_config(Config) ->
emqx_common_test_helpers:load_config(emqx_ft_schema, #{file_transfer => Config}).
@@ -50,10 +57,8 @@ tcp_port(Node) ->
{_, Port} = rpc:call(Node, emqx_config, get, [[listeners, tcp, default, bind]]),
Port.
-ft_root(Config, Node) ->
- filename:join([
- ?config(priv_dir, Config), <<"file_transfer">>, atom_to_binary(Node)
- ]).
+root(Config, Node, Tail) ->
+ filename:join([?config(priv_dir, Config), "file_transfer", Node | Tail]).
upload_file(ClientId, FileId, Data, Node) ->
Port = tcp_port(Node),