feat(ft): introduce exporter concept in local storage backend

The exporter is responsible for keeping fully transferred and
successfully assembled files. This was on the local storage itself
before. This abstraction is needed to give us an ability to
support S3 destinations more easily, just by swapping the storage
exporter.

Also implement local filesystem exporter and reimplement parts of
the `emqx_ft` API on top of it.
This commit is contained in:
Andrew Mayorov 2023-03-10 15:27:50 +03:00 committed by Ilya Averyanov
parent 2880c8f4eb
commit 4132f5a5fb
20 changed files with 936 additions and 502 deletions

View File

@ -1,5 +1,8 @@
file_transfer {
storage {
type = local
exporter {
type = local
}
}
}

View File

@ -13,7 +13,7 @@ emqx_ft_schema {
local_type {
desc {
en: "Use local file system to store uploaded files and temporary data."
en: "Use local file system to store uploaded fragments and temporary data."
zh: "使用本地文件系统来存储上传的文件和临时数据。"
}
label: {
@ -24,7 +24,7 @@ emqx_ft_schema {
local_storage_root {
desc {
en: "File system path to keep uploaded files and temporary data."
en: "File system path to keep uploaded fragments and temporary data."
zh: "保存上传文件和临时数据的文件系统路径。"
}
label: {
@ -33,6 +33,40 @@ emqx_ft_schema {
}
}
local_storage_exporter {
desc {
en: "Exporter for the local file system storage backend.<br/>"
"Exporter defines where and how fully transferred and assembled files are stored."
zh: ""
}
label: {
en: "Local Storage Exporter"
zh: ""
}
}
local_storage_exporter_type {
desc {
en: "Type of the Exporter to use."
zh: ""
}
label: {
en: "Local Storage Exporter Type"
zh: ""
}
}
local_storage_exporter_root {
desc {
en: "File system path to keep uploaded files."
zh: ""
}
label: {
en: "Local Filesystem Exporter Root"
zh: ""
}
}
local_storage_gc {
desc {
en: "Garbage collection settings for the intermediate and temporary files in the local file system."

View File

@ -33,7 +33,8 @@
]).
-export([
decode_filemeta/1
decode_filemeta/1,
encode_filemeta/1
]).
-export([on_complete/4]).
@ -114,6 +115,11 @@ decode_filemeta(Map) when is_map(Map) ->
{error, {invalid_filemeta, Error}}
end.
encode_filemeta(Meta = #{}) ->
% TODO: Looks like this should be hocon's responsibility.
Schema = emqx_ft_schema:schema(filemeta),
hocon_tconf:make_serializable(Schema, emqx_map_lib:binary_key_map(Meta), #{}).
%%--------------------------------------------------------------------
%% Hooks
%%--------------------------------------------------------------------

View File

@ -40,6 +40,10 @@
'/file_transfer/file'/2
]).
-export([
mk_file_uri/3
]).
-import(hoconsc, [mk/2, ref/1, ref/2]).
namespace() -> "file_transfer".
@ -69,6 +73,11 @@ schema("/file_transfer/files") ->
}
};
schema("/file_transfer/file") ->
% TODO
% This is conceptually another API, because this logic is inherent only to the
% local filesystem exporter. Ideally, we won't even publish it if `emqx_ft` is
% configured with another exporter. Accordingly, things that look too specific
% for this module (i.e. `parse_filepath/1`) should go away in another API module.
#{
'operationId' => '/file_transfer/file',
get => #{
@ -77,8 +86,7 @@ schema("/file_transfer/file") ->
description => ?DESC("file_get"),
parameters => [
ref(file_node),
ref(file_clientid),
ref(file_id)
ref(file_ref)
],
responses => #{
200 => <<"Operation success">>,
@ -91,32 +99,40 @@ schema("/file_transfer/file") ->
}.
'/file_transfer/files'(get, #{}) ->
case emqx_ft_storage:ready_transfers() of
case emqx_ft_storage:exports() of
{ok, Transfers} ->
FormattedTransfers = lists:map(
fun({Id, Info}) ->
#{id => Id, info => format_file_info(Info)}
end,
Transfers
),
{200, #{<<"files">> => FormattedTransfers}};
{200, #{<<"files">> => lists:map(fun format_export_info/1, Transfers)}};
{error, _} ->
{503, error_msg('SERVICE_UNAVAILABLE', <<"Service unavailable">>)}
end.
'/file_transfer/file'(get, #{query_string := Query}) ->
case emqx_ft_storage:get_ready_transfer(Query) of
{ok, FileData} ->
{200,
#{
<<"content-type">> => <<"application/data">>,
<<"content-disposition">> => <<"attachment">>
},
FileData};
{error, enoent} ->
{404, error_msg('NOT_FOUND', <<"Not found">>)};
{error, Error} ->
?SLOG(warning, #{msg => "get_ready_transfer_fail", error => Error}),
try
Node = parse_node(maps:get(<<"node">>, Query)),
Filepath = parse_filepath(maps:get(<<"fileref">>, Query)),
case emqx_ft_storage_fs_proto_v1:read_export_file(Node, Filepath, self()) of
{ok, ReaderPid} ->
FileData = emqx_ft_storage_fs_reader:table(ReaderPid),
{200,
#{
<<"content-type">> => <<"application/data">>,
<<"content-disposition">> => <<"attachment">>
},
FileData};
{error, enoent} ->
{404, error_msg('NOT_FOUND', <<"Not found">>)};
{error, Error} ->
?SLOG(warning, #{msg => "get_ready_transfer_fail", error => Error}),
{503, error_msg('SERVICE_UNAVAILABLE', <<"Service unavailable">>)}
end
catch
throw:{invalid, Param} ->
{404,
error_msg(
'NOT_FOUND',
iolist_to_binary(["Invalid query parameter: ", Param])
)};
error:{erpc, noconnection} ->
{503, error_msg('SERVICE_UNAVAILABLE', <<"Service unavailable">>)}
end.
@ -124,46 +140,100 @@ error_msg(Code, Msg) ->
#{code => Code, message => emqx_misc:readable_error_msg(Msg)}.
-spec fields(hocon_schema:name()) -> hocon_schema:fields().
fields(file_ref) ->
[
{fileref,
hoconsc:mk(binary(), #{
in => query,
desc => <<"File reference">>,
example => <<"file1">>,
required => true
})}
];
fields(file_node) ->
Desc = <<"File Node">>,
Meta = #{
in => query, desc => Desc, example => <<"emqx@127.0.0.1">>, required => false
},
[{node, hoconsc:mk(binary(), Meta)}];
fields(file_clientid) ->
Desc = <<"File ClientId">>,
Meta = #{
in => query, desc => Desc, example => <<"client1">>, required => false
},
[{clientid, hoconsc:mk(binary(), Meta)}];
fields(file_id) ->
Desc = <<"File">>,
Meta = #{
in => query, desc => Desc, example => <<"file1">>, required => false
},
[{fileid, hoconsc:mk(binary(), Meta)}].
[
{node,
hoconsc:mk(binary(), #{
in => query,
desc => <<"Node under which the file is located">>,
example => atom_to_list(node()),
required => true
})}
].
roots() ->
[
file_node,
file_clientid,
file_id
file_ref
].
mk_file_uri(_Options, Node, Filepath) ->
% TODO: qualify with `?BASE_PATH`
[
"/file_transfer/file?",
uri_string:compose_query([
{"node", atom_to_list(Node)},
{"fileref", Filepath}
])
].
%%--------------------------------------------------------------------
%% Helpers
%%--------------------------------------------------------------------
format_file_info(#{path := Path, size := Size, timestamp := Timestamp}) ->
#{
path => Path,
format_export_info(
Info = #{
name := Name,
size := Size,
uri := URI,
timestamp := Timestamp,
transfer := {ClientId, FileId}
}
) ->
Res = #{
name => iolist_to_binary(Name),
size => Size,
timestamp => format_datetime(Timestamp)
}.
timestamp => format_timestamp(Timestamp),
clientid => ClientId,
fileid => FileId,
uri => iolist_to_binary(URI)
},
case Info of
#{meta := Meta} ->
Res#{metadata => emqx_ft:encode_filemeta(Meta)};
#{} ->
Res
end.
format_datetime({{Year, Month, Day}, {Hour, Minute, Second}}) ->
iolist_to_binary(
io_lib:format("~4..0w-~2..0w-~2..0wT~2..0w:~2..0w:~2..0w", [
Year, Month, Day, Hour, Minute, Second
])
).
format_timestamp(Timestamp) ->
iolist_to_binary(calendar:system_time_to_rfc3339(Timestamp, [{unit, second}])).
parse_node(NodeBin) ->
case emqx_misc:safe_to_existing_atom(NodeBin) of
{ok, Node} ->
Node;
{error, _} ->
throw({invalid, NodeBin})
end.
parse_filepath(PathBin) ->
case filename:pathtype(PathBin) of
relative ->
ok;
absolute ->
throw({invalid, PathBin})
end,
PathComponents = filename:split(PathBin),
case lists:any(fun is_special_component/1, PathComponents) of
false ->
filename:join(PathComponents);
true ->
throw({invalid, PathBin})
end.
is_special_component(<<".", _/binary>>) ->
true;
is_special_component([$. | _]) ->
true;
is_special_component(_) ->
false.

View File

@ -22,13 +22,13 @@
-export([callback_mode/0]).
-export([init/1]).
-export([handle_event/4]).
-export([terminate/3]).
-record(st, {
storage :: _Storage,
transfer :: emqx_ft:transfer(),
assembly :: emqx_ft_assembly:t(),
file :: {file:filename(), io:device(), term()} | undefined,
hash
export :: _Export | undefined
}).
-define(NAME(Transfer), {n, l, {?MODULE, Transfer}}).
@ -47,11 +47,11 @@ callback_mode() ->
handle_event_function.
init({Storage, Transfer, Size}) ->
_ = erlang:process_flag(trap_exit, true),
St = #st{
storage = Storage,
transfer = Transfer,
assembly = emqx_ft_assembly:new(Size),
hash = crypto:hash_init(sha256)
assembly = emqx_ft_assembly:new(Size)
},
{ok, idle, St}.
@ -61,10 +61,10 @@ handle_event(info, kickoff, idle, St) ->
% We could wait for this message and handle it at the end of the assembling rather than at
% the beginning, however it would make error handling much more messier.
{next_state, list_local_fragments, St, ?internal([])};
handle_event(internal, _, list_local_fragments, St = #st{assembly = Asm}) ->
handle_event(internal, _, list_local_fragments, St = #st{}) ->
% TODO: what we do with non-transients errors here (e.g. `eacces`)?
{ok, Fragments} = emqx_ft_storage_fs:list(St#st.storage, St#st.transfer, fragment),
NAsm = emqx_ft_assembly:update(emqx_ft_assembly:append(Asm, node(), Fragments)),
NAsm = emqx_ft_assembly:update(emqx_ft_assembly:append(St#st.assembly, node(), Fragments)),
NSt = St#st{assembly = NAsm},
case emqx_ft_assembly:status(NAsm) of
complete ->
@ -110,8 +110,8 @@ handle_event(internal, _, start_assembling, St = #st{assembly = Asm}) ->
Filemeta = emqx_ft_assembly:filemeta(Asm),
Coverage = emqx_ft_assembly:coverage(Asm),
% TODO: better error handling
{ok, Handle} = emqx_ft_storage_fs:open_file(St#st.storage, St#st.transfer, Filemeta),
{next_state, {assemble, Coverage}, St#st{file = Handle}, ?internal([])};
{ok, Export} = export_start(Filemeta, St),
{next_state, {assemble, Coverage}, St#st{export = Export}, ?internal([])};
handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #st{}) ->
% TODO
% Currently, race is possible between getting segment info from the remote node and
@ -119,15 +119,17 @@ handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #st{}) ->
% TODO: pipelining
% TODO: better error handling
{ok, Content} = pread(Node, Segment, St),
{ok, NHandle} = emqx_ft_storage_fs:write(St#st.file, Content),
{next_state, {assemble, Rest}, St#st{file = NHandle}, ?internal([])};
{ok, NExport} = export_write(St#st.export, Content),
{next_state, {assemble, Rest}, St#st{export = NExport}, ?internal([])};
handle_event(internal, _, {assemble, []}, St = #st{}) ->
{next_state, complete, St, ?internal([])};
handle_event(internal, _, complete, St = #st{assembly = Asm, file = Handle}) ->
Filemeta = emqx_ft_assembly:filemeta(Asm),
Result = emqx_ft_storage_fs:complete(St#st.storage, St#st.transfer, Filemeta, Handle),
handle_event(internal, _, complete, St = #st{}) ->
Result = export_complete(St#st.export),
ok = maybe_garbage_collect(Result, St),
{stop, {shutdown, Result}}.
{stop, {shutdown, Result}, St#st{export = undefined}}.
terminate(_Reason, _StateName, #st{export = Export}) ->
Export /= undefined andalso export_discard(Export).
pread(Node, Segment, St) when Node =:= node() ->
emqx_ft_storage_fs:pread(St#st.storage, St#st.transfer, Segment, 0, segsize(Segment));
@ -136,8 +138,33 @@ pread(Node, Segment, St) ->
%%
maybe_garbage_collect(ok, St = #st{storage = Storage, transfer = Transfer}) ->
Nodes = emqx_ft_assembly:nodes(St#st.assembly),
export_start(Filemeta, #st{storage = Storage, transfer = Transfer}) ->
{ExporterMod, Exporter} = emqx_ft_storage_fs:exporter(Storage),
case ExporterMod:start_export(Exporter, Transfer, Filemeta) of
{ok, Export} ->
{ok, {ExporterMod, Export}};
{error, _} = Error ->
Error
end.
export_write({ExporterMod, Export}, Content) ->
case ExporterMod:write(Export, Content) of
{ok, ExportNext} ->
{ok, {ExporterMod, ExportNext}};
{error, _} = Error ->
Error
end.
export_complete({ExporterMod, Export}) ->
ExporterMod:complete(Export).
export_discard({ExporterMod, Export}) ->
ExporterMod:discard(Export).
%%
maybe_garbage_collect(ok, #st{storage = Storage, transfer = Transfer, assembly = Asm}) ->
Nodes = emqx_ft_assembly:nodes(Asm),
emqx_ft_storage_fs_gc:collect(Storage, Transfer, Nodes);
maybe_garbage_collect({error, _}, _St) ->
ok.

View File

@ -0,0 +1,101 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ft_fs_util).
-include_lib("snabbkaffe/include/trace.hrl").
-export([read_decode_file/2]).
-export([fold/4]).
-type glob() :: ['*' | globfun()].
-type globfun() ::
fun((_Filename :: file:name()) -> boolean()).
-type foldfun(Acc) ::
fun(
(
_Filepath :: file:name(),
_Info :: file:file_info() | {error, _IoError},
_Stack :: [file:name()],
Acc
) -> Acc
).
%%
-spec read_decode_file(file:name(), fun((binary()) -> Value)) ->
{ok, Value} | {error, _IoError}.
read_decode_file(Filepath, DecodeFun) ->
case file:read_file(Filepath) of
{ok, Content} ->
safe_decode(Content, DecodeFun);
{error, _} = Error ->
Error
end.
safe_decode(Content, DecodeFun) ->
try
{ok, DecodeFun(Content)}
catch
C:E:Stacktrace ->
?tp(warning, "safe_decode_failed", #{
class => C,
exception => E,
stacktrace => Stacktrace
}),
{error, corrupted}
end.
-spec fold(foldfun(Acc), Acc, _Root :: file:name(), glob()) ->
Acc.
fold(Fun, Acc, Root, Glob) ->
fold(Fun, Acc, [], Root, Glob, []).
fold(Fun, AccIn, Path, Root, [Glob | Rest], Stack) when Glob == '*' orelse is_function(Glob) ->
case file:list_dir(filename:join(Root, Path)) of
{ok, Filenames} ->
lists:foldl(
fun(FN, Acc) ->
case matches_glob(Glob, FN) of
true when Path == [] ->
fold(Fun, Acc, FN, Root, Rest, [FN | Stack]);
true ->
fold(Fun, Acc, filename:join(Path, FN), Root, Rest, [FN | Stack]);
false ->
Acc
end
end,
AccIn,
Filenames
);
{error, enotdir} ->
AccIn;
{error, Reason} ->
Fun(Path, {error, Reason}, Stack, AccIn)
end;
fold(Fun, AccIn, Filepath, Root, [], Stack) ->
case file:read_link_info(filename:join(Root, Filepath), [{time, posix}, raw]) of
{ok, Info} ->
Fun(Filepath, Info, Stack, AccIn);
{error, Reason} ->
Fun(Filepath, {error, Reason}, Stack, AccIn)
end.
matches_glob('*', _) ->
true;
matches_glob(FilterFun, Filename) when is_function(FilterFun) ->
FilterFun(Filename).

View File

@ -66,12 +66,33 @@ fields(local_storage) ->
desc => ?DESC("local_storage_root"),
required => false
}},
{exporter, #{
type => hoconsc:union([
?REF(local_storage_exporter)
]),
desc => ?DESC("local_storage_exporter"),
required => true
}},
{gc, #{
type => hoconsc:ref(?MODULE, local_storage_gc),
type => ?REF(local_storage_gc),
desc => ?DESC("local_storage_gc"),
required => false
}}
];
fields(local_storage_exporter) ->
[
{type, #{
type => local,
default => local,
required => false,
desc => ?DESC("local_storage_exporter_type")
}},
{root, #{
type => binary(),
desc => ?DESC("local_storage_exporter_root"),
required => false
}}
];
fields(local_storage_gc) ->
[
{interval, #{
@ -101,12 +122,15 @@ desc(file_transfer) ->
"File transfer settings";
desc(local_storage) ->
"File transfer local storage settings";
desc(local_storage_exporter) ->
"Exporter settings for the File transfer local storage backend";
desc(local_storage_gc) ->
"Garbage collection settings for the File transfer local storage backend".
schema(filemeta) ->
#{
roots => [
% TODO nonempty
{name, hoconsc:mk(string(), #{required => true})},
{size, hoconsc:mk(non_neg_integer())},
{expire_at, hoconsc:mk(non_neg_integer())},

View File

@ -24,8 +24,7 @@
store_segment/2,
assemble/2,
ready_transfers/0,
get_ready_transfer/1,
exports/0,
with_storage_type/3
]
@ -34,12 +33,19 @@
-type storage() :: emqx_config:config().
-export_type([assemble_callback/0]).
-export_type([export_data/0]).
-type assemble_callback() :: fun((ok | {error, term()}) -> any()).
-type ready_transfer_id() :: term().
-type ready_transfer_info() :: map().
-type ready_transfer_data() :: binary() | qlc:query_handle().
-type export_info() :: #{
transfer := emqx_ft:transfer(),
name := file:name(),
size := _Bytes :: non_neg_integer(),
uri => uri_string:uri_string(),
meta => emqx_ft:filemeta()
}.
-type export_data() :: binary() | qlc:query_handle().
%%--------------------------------------------------------------------
%% Behaviour
@ -57,10 +63,9 @@
ok | {async, pid()} | {error, term()}.
-callback assemble(storage(), emqx_ft:transfer(), _Size :: emqx_ft:bytes()) ->
ok | {async, pid()} | {error, term()}.
-callback ready_transfers(storage()) ->
{ok, [{ready_transfer_id(), ready_transfer_info()}]} | {error, term()}.
-callback get_ready_transfer(storage(), ready_transfer_id()) ->
{ok, ready_transfer_data()} | {error, term()}.
-callback exports(storage()) ->
{ok, [export_info()]} | {error, term()}.
%%--------------------------------------------------------------------
%% API
@ -95,15 +100,11 @@ assemble(Transfer, Size) ->
Mod = mod(),
Mod:assemble(storage(), Transfer, Size).
-spec ready_transfers() -> {ok, [{ready_transfer_id(), ready_transfer_info()}]} | {error, term()}.
ready_transfers() ->
-spec exports() ->
{ok, [export_info()]} | {error, term()}.
exports() ->
Mod = mod(),
Mod:ready_transfers(storage()).
-spec get_ready_transfer(ready_transfer_id()) -> {ok, ready_transfer_data()} | {error, term()}.
get_ready_transfer(ReadyTransferId) ->
Mod = mod(),
Mod:get_ready_transfer(storage(), ReadyTransferId).
Mod:exports(storage()).
-spec with_storage_type(atom(), atom(), list(term())) -> any().
with_storage_type(Type, Fun, Args) ->

View File

@ -0,0 +1,352 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ft_storage_exporter_fs).
-include_lib("kernel/include/file.hrl").
-include_lib("emqx/include/logger.hrl").
%% Exporter API
-export([start_export/3]).
-export([write/2]).
-export([complete/1]).
-export([discard/1]).
-export([list_local/1]).
-export([list_local/2]).
-export([start_reader/3]).
-export([list/1]).
% -export([list/2]).
-export_type([export/0]).
-type options() :: _TODO.
-type transfer() :: emqx_ft:transfer().
-type filemeta() :: emqx_ft:filemeta().
-type exportinfo() :: #{
transfer := transfer(),
name := file:name(),
uri := uri_string:uri_string(),
timestamp := emqx_datetime:epoch_second(),
size := _Bytes :: non_neg_integer(),
meta => filemeta()
}.
-type file_error() :: emqx_ft_storage_fs:file_error().
-opaque export() :: #{
path := file:name(),
handle := io:device(),
result := file:name(),
meta := filemeta(),
hash := crypto:hash_state()
}.
-type reader() :: pid().
-define(TEMPDIR, "tmp").
-define(MANIFEST, ".MANIFEST.json").
%% NOTE
%% Bucketing of resulting files to accomodate the storage backend for considerably
%% large (e.g. > 10s of millions) amount of files.
-define(BUCKET_HASH, sha).
%% 2 symbols = at most 256 directories on the upper level
-define(BUCKET1_LEN, 2).
%% 2 symbols = at most 256 directories on the second level
-define(BUCKET2_LEN, 2).
-define(SLOG_UNEXPECTED(RelFilepath, Fileinfo, Options),
?SLOG(notice, "filesystem_object_unexpected", #{
relpath => RelFilepath,
fileinfo => Fileinfo,
options => Options
})
).
-define(SLOG_INACCESSIBLE(RelFilepath, Reason, Options),
?SLOG(warning, "filesystem_object_inaccessible", #{
relpath => RelFilepath,
reason => Reason,
options => Options
})
).
%%
-spec start_export(options(), transfer(), filemeta()) ->
{ok, export()} | {error, file_error()}.
start_export(Options, Transfer, Filemeta = #{name := Filename}) ->
TempFilepath = mk_temp_absfilepath(Options, Transfer, Filename),
ResultFilepath = mk_absfilepath(Options, Transfer, result, Filename),
_ = filelib:ensure_dir(TempFilepath),
case file:open(TempFilepath, [write, raw, binary]) of
{ok, Handle} ->
{ok, #{
path => TempFilepath,
handle => Handle,
result => ResultFilepath,
meta => Filemeta,
hash => init_checksum(Filemeta)
}};
{error, _} = Error ->
Error
end.
-spec write(export(), iodata()) ->
{ok, export()} | {error, file_error()}.
write(Export = #{handle := Handle, hash := Ctx}, IoData) ->
case file:write(Handle, IoData) of
ok ->
{ok, Export#{hash := update_checksum(Ctx, IoData)}};
{error, _} = Error ->
Error
end.
-spec complete(export()) ->
ok | {error, {checksum, _Algo, _Computed}} | {error, file_error()}.
complete(
Export = #{
path := Filepath,
handle := Handle,
result := ResultFilepath,
meta := FilemetaIn,
hash := Ctx
}
) ->
case verify_checksum(Ctx, FilemetaIn) of
{ok, Filemeta} ->
ok = file:close(Handle),
_ = filelib:ensure_dir(ResultFilepath),
_ = file:write_file(mk_manifest_filename(ResultFilepath), encode_filemeta(Filemeta)),
file:rename(Filepath, ResultFilepath);
{error, _} = Error ->
_ = discard(Export),
Error
end.
-spec discard(export()) ->
ok.
discard(#{path := Filepath, handle := Handle}) ->
ok = file:close(Handle),
file:delete(Filepath).
%%
-spec list_local(options(), transfer()) ->
{ok, [exportinfo(), ...]} | {error, file_error()}.
list_local(Options, Transfer) ->
TransferRoot = mk_absdir(Options, Transfer, result),
case
emqx_ft_fs_util:fold(
fun
(_Path, {error, Reason}, [], []) ->
{error, Reason};
(_Path, Fileinfo = #file_info{type = regular}, [Filename | _], Acc) ->
RelFilepath = filename:join(mk_result_reldir(Transfer) ++ [Filename]),
Info = mk_exportinfo(Options, Filename, RelFilepath, Transfer, Fileinfo),
[Info | Acc];
(RelFilepath, Fileinfo = #file_info{}, _, Acc) ->
?SLOG_UNEXPECTED(RelFilepath, Fileinfo, Options),
Acc;
(RelFilepath, {error, Reason}, _, Acc) ->
?SLOG_INACCESSIBLE(RelFilepath, Reason, Options),
Acc
end,
[],
TransferRoot,
[fun filter_manifest/1]
)
of
Infos = [_ | _] ->
{ok, Infos};
[] ->
{error, enoent};
{error, Reason} ->
{error, Reason}
end.
-spec list_local(options()) ->
{ok, #{transfer() => [exportinfo(), ...]}}.
list_local(Options) ->
Pattern = [
_Bucket1 = '*',
_Bucket2 = '*',
_Rest = '*',
_ClientId = '*',
_FileId = '*',
fun filter_manifest/1
],
Root = get_storage_root(Options),
{ok,
emqx_ft_fs_util:fold(
fun(RelFilepath, Info, Stack, Acc) ->
read_exportinfo(Options, RelFilepath, Info, Stack, Acc)
end,
[],
Root,
Pattern
)}.
filter_manifest(?MANIFEST) ->
% Filename equals `?MANIFEST`, there should also be a manifest for it.
false;
filter_manifest(Filename) ->
?MANIFEST =/= string:find(Filename, ?MANIFEST, trailing).
read_exportinfo(Options, RelFilepath, Fileinfo = #file_info{type = regular}, Stack, Acc) ->
[Filename, FileId, ClientId | _] = Stack,
Transfer = dirnames_to_transfer(ClientId, FileId),
Info = mk_exportinfo(Options, Filename, RelFilepath, Transfer, Fileinfo),
[Info | Acc];
read_exportinfo(Options, RelFilepath, Fileinfo = #file_info{}, _Stack, Acc) ->
?SLOG_UNEXPECTED(RelFilepath, Fileinfo, Options),
Acc;
read_exportinfo(Options, RelFilepath, {error, Reason}, _Stack, Acc) ->
?SLOG_INACCESSIBLE(RelFilepath, Reason, Options),
Acc.
mk_exportinfo(Options, Filename, RelFilepath, Transfer, Fileinfo) ->
Root = get_storage_root(Options),
try_read_filemeta(
filename:join(Root, mk_manifest_filename(RelFilepath)),
#{
transfer => Transfer,
name => Filename,
uri => mk_export_uri(Options, RelFilepath),
timestamp => Fileinfo#file_info.mtime,
size => Fileinfo#file_info.size,
path => filename:join(Root, RelFilepath)
}
).
try_read_filemeta(Filepath, Info) ->
case emqx_ft_fs_util:read_decode_file(Filepath, fun decode_filemeta/1) of
{ok, Filemeta} ->
Info#{meta => Filemeta};
{error, Reason} ->
?SLOG(warning, "filemeta_inaccessible", #{
path => Filepath,
reason => Reason
}),
Info
end.
mk_export_uri(Options, RelFilepath) ->
% emqx_ft_storage_exporter_fs_api:mk_export_uri(Options, RelFilepath).
emqx_ft_api:mk_file_uri(Options, node(), RelFilepath).
-spec start_reader(options(), file:name(), _Caller :: pid()) ->
{ok, reader()} | {error, enoent}.
start_reader(Options, Filepath, CallerPid) ->
Root = get_storage_root(Options),
case filelib:safe_relative_path(Filepath, Root) of
SafeFilepath when SafeFilepath /= unsafe ->
AbsFilepath = filename:join(Root, SafeFilepath),
emqx_ft_storage_fs_reader:start_supervised(CallerPid, AbsFilepath);
unsafe ->
{error, enoent}
end.
%%
-spec list(options()) ->
{ok, [exportinfo(), ...]} | {error, file_error()}.
list(_Options) ->
Nodes = mria_mnesia:running_nodes(),
Results = emqx_ft_storage_fs_proto_v1:list_exports(Nodes),
{GoodResults, BadResults} = lists:partition(
fun
({_Node, {ok, {ok, _}}}) -> true;
(_) -> false
end,
lists:zip(Nodes, Results)
),
length(BadResults) > 0 andalso
?SLOG(warning, #{msg => "list_remote_exports_failed", failures => BadResults}),
{ok, [File || {_Node, {ok, {ok, Files}}} <- GoodResults, File <- Files]}.
%%
init_checksum(#{checksum := {Algo, _}}) ->
crypto:hash_init(Algo);
init_checksum(#{}) ->
crypto:hash_init(sha256).
update_checksum(Ctx, IoData) ->
crypto:hash_update(Ctx, IoData).
verify_checksum(Ctx, Filemeta = #{checksum := {Algo, Digest}}) ->
case crypto:hash_final(Ctx) of
Digest ->
{ok, Filemeta};
Mismatch ->
{error, {checksum, Algo, binary:encode_hex(Mismatch)}}
end;
verify_checksum(Ctx, Filemeta = #{}) ->
Digest = crypto:hash_final(Ctx),
{ok, Filemeta#{checksum => {sha256, Digest}}}.
%%
-define(PRELUDE(Vsn, Meta), [<<"filemeta">>, Vsn, Meta]).
encode_filemeta(Meta) ->
emqx_json:encode(?PRELUDE(_Vsn = 1, emqx_ft:encode_filemeta(Meta))).
decode_filemeta(Binary) when is_binary(Binary) ->
?PRELUDE(_Vsn = 1, Map) = emqx_json:decode(Binary, [return_maps]),
case emqx_ft:decode_filemeta(Map) of
{ok, Meta} ->
Meta;
{error, Reason} ->
error(Reason)
end.
mk_manifest_filename(Filename) when is_list(Filename) ->
Filename ++ ?MANIFEST;
mk_manifest_filename(Filename) when is_binary(Filename) ->
<<Filename/binary, ?MANIFEST>>.
mk_temp_absfilepath(Options, Transfer, Filename) ->
Unique = erlang:unique_integer([positive]),
TempFilename = integer_to_list(Unique) ++ "." ++ Filename,
filename:join(mk_absdir(Options, Transfer, temporary), TempFilename).
mk_absdir(Options, _Transfer, temporary) ->
filename:join([get_storage_root(Options), ?TEMPDIR]);
mk_absdir(Options, Transfer, result) ->
filename:join([get_storage_root(Options) | mk_result_reldir(Transfer)]).
mk_absfilepath(Options, Transfer, What, Filename) ->
filename:join(mk_absdir(Options, Transfer, What), Filename).
mk_result_reldir(Transfer = {ClientId, FileId}) ->
Hash = mk_transfer_hash(Transfer),
<<
Bucket1:?BUCKET1_LEN/binary,
Bucket2:?BUCKET2_LEN/binary,
BucketRest/binary
>> = binary:encode_hex(Hash),
[Bucket1, Bucket2, BucketRest, ClientId, FileId].
mk_transfer_hash(Transfer) ->
crypto:hash(?BUCKET_HASH, term_to_binary(Transfer)).
get_storage_root(Options) ->
maps:get(root, Options, filename:join([emqx:data_dir(), "ft", "exports"])).

View File

@ -29,6 +29,7 @@
-export([child_spec/1]).
% Segments-related API
-export([store_filemeta/3]).
-export([store_segment/3]).
-export([read_filemeta/2]).
@ -43,22 +44,20 @@
-export([get_subdir/2]).
-export([get_subdir/3]).
-export([ready_transfers_local/1]).
-export([get_ready_transfer_local/3]).
-export([exporter/1]).
-export([ready_transfers/1]).
-export([get_ready_transfer/2]).
-export([open_file/3]).
-export([complete/4]).
-export([write/2]).
-export([discard/1]).
% Exporter-specific API
-export([exports/1]).
-export([exports_local/1]).
-export([exports_local/2]).
-export_type([storage/0]).
-export_type([filefrag/1]).
-export_type([filefrag/0]).
-export_type([transferinfo/0]).
-export_type([file_error/0]).
-type transfer() :: emqx_ft:transfer().
-type offset() :: emqx_ft:offset().
-type filemeta() :: emqx_ft:filemeta().
@ -70,8 +69,7 @@
}.
-type transferinfo() :: #{
status := complete | incomplete,
result => [filefrag({result, #{}})]
filemeta => filemeta()
}.
% TODO naming
@ -85,16 +83,20 @@
-type filefrag() :: filefrag(
{filemeta, filemeta()}
| {segment, segmentinfo()}
| {result, #{}}
).
-define(FRAGDIR, frags).
-define(TEMPDIR, tmp).
-define(RESULTDIR, result).
-define(MANIFEST, "MANIFEST.json").
-define(SEGMENT, "SEG").
-type storage() :: #{
root => file:name(),
exporter => exporter()
}.
-type exporter() :: #{
type := local,
root => file:name()
}.
@ -138,7 +140,9 @@ store_filemeta(Storage, Transfer, Meta) ->
% about it too much now.
{error, conflict};
{error, Reason} when Reason =:= notfound; Reason =:= corrupted; Reason =:= enoent ->
write_file_atomic(Storage, Transfer, Filepath, encode_filemeta(Meta))
write_file_atomic(Storage, Transfer, Filepath, encode_filemeta(Meta));
{error, _} = Error ->
Error
end.
%% Store a segment in the backing filesystem.
@ -153,17 +157,17 @@ store_segment(Storage, Transfer, Segment = {_Offset, Content}) ->
write_file_atomic(Storage, Transfer, Filepath, Content).
-spec read_filemeta(storage(), transfer()) ->
{ok, filefrag({filemeta, filemeta()})} | {error, corrupted} | {error, file_error()}.
{ok, filemeta()} | {error, corrupted} | {error, file_error()}.
read_filemeta(Storage, Transfer) ->
Filepath = mk_filepath(Storage, Transfer, get_subdirs_for(fragment), ?MANIFEST),
read_file(Filepath, fun decode_filemeta/1).
-spec list(storage(), transfer(), _What :: fragment | result) ->
-spec list(storage(), transfer(), _What :: fragment) ->
% Some lower level errors? {error, notfound}?
% Result will contain zero or only one filemeta.
{ok, [filefrag({filemeta, filemeta()} | {segment, segmentinfo()})]}
| {error, file_error()}.
list(Storage, Transfer, What) ->
list(Storage, Transfer, What = fragment) ->
Dirname = mk_filedir(Storage, Transfer, get_subdirs_for(What)),
case file:list_dir(Dirname) of
{ok, Filenames} ->
@ -172,18 +176,13 @@ list(Storage, Transfer, What) ->
% extremely bad luck is needed for that, e.g. concurrent assemblers with
% different filemetas from different nodes). This might be unexpected for a
% client given the current protocol, yet might be helpful in the future.
{ok, filtermap_files(get_filefrag_fun_for(What), Dirname, Filenames)};
{ok, filtermap_files(fun mk_filefrag/2, Dirname, Filenames)};
{error, enoent} ->
{ok, []};
{error, _} = Error ->
Error
end.
get_filefrag_fun_for(fragment) ->
fun mk_filefrag/2;
get_filefrag_fun_for(result) ->
fun mk_result_filefrag/2.
-spec pread(storage(), transfer(), filefrag(), offset(), _Size :: non_neg_integer()) ->
{ok, _Content :: iodata()} | {error, eof} | {error, file_error()}.
pread(_Storage, _Transfer, Frag, Offset, Size) ->
@ -213,102 +212,28 @@ assemble(Storage, Transfer, Size) ->
{ok, Pid} = emqx_ft_assembler_sup:ensure_child(Storage, Transfer, Size),
{async, Pid}.
get_ready_transfer(_Storage, ReadyTransferId) ->
case parse_ready_transfer_id(ReadyTransferId) of
{ok, {Node, Transfer}} ->
try
case emqx_ft_storage_fs_proto_v1:get_ready_transfer(Node, self(), Transfer) of
{ok, ReaderPid} ->
{ok, emqx_ft_storage_fs_reader:table(ReaderPid)};
{error, _} = Error ->
Error
end
catch
error:Exc:Stacktrace ->
?SLOG(warning, #{
msg => "get_ready_transfer_error",
node => Node,
transfer => Transfer,
exception => Exc,
stacktrace => Stacktrace
}),
{error, Exc};
C:Exc:Stacktrace ->
?SLOG(warning, #{
msg => "get_ready_transfer_fail",
class => C,
node => Node,
transfer => Transfer,
exception => Exc,
stacktrace => Stacktrace
}),
{error, {C, Exc}}
end;
{error, _} = Error ->
Error
%%
-spec exporter(storage()) -> {module(), _ExporterOptions}.
exporter(Storage) ->
case maps:get(exporter, Storage) of
#{type := local} = Options ->
{emqx_ft_storage_exporter_fs, Options}
end.
get_ready_transfer_local(Storage, CallerPid, Transfer) ->
Dirname = mk_filedir(Storage, Transfer, get_subdirs_for(result)),
case file:list_dir(Dirname) of
{ok, [Filename | _]} ->
FullFilename = filename:join([Dirname, Filename]),
emqx_ft_storage_fs_reader:start_supervised(CallerPid, FullFilename);
{error, _} = Error ->
Error
end.
exports(Storage) ->
{ExporterMod, ExporterOpts} = exporter(Storage),
ExporterMod:list(ExporterOpts).
ready_transfers(_Storage) ->
Nodes = mria_mnesia:running_nodes(),
Results = emqx_ft_storage_fs_proto_v1:ready_transfers(Nodes),
{GoodResults, BadResults} = lists:partition(
fun
({ok, _}) -> true;
(_) -> false
end,
Results
),
case {GoodResults, BadResults} of
{[], _} ->
?SLOG(warning, #{msg => "ready_transfers", failures => BadResults}),
{error, no_nodes};
{_, []} ->
{ok, [File || {ok, Files} <- GoodResults, File <- Files]};
{_, _} ->
?SLOG(warning, #{msg => "ready_transfers", failures => BadResults}),
{ok, [File || {ok, Files} <- GoodResults, File <- Files]}
end.
exports_local(Storage) ->
{ExporterMod, ExporterOpts} = exporter(Storage),
ExporterMod:list_local(ExporterOpts).
ready_transfers_local(Storage) ->
{ok, Transfers} = transfers(Storage),
lists:filtermap(
fun
({Transfer, #{status := complete, result := [Result | _]}}) ->
{true, {ready_transfer_id(Transfer), maps:without([fragment], Result)}};
(_) ->
false
end,
maps:to_list(Transfers)
).
exports_local(Storage, Transfer) ->
{ExporterMod, ExporterOpts} = exporter(Storage),
ExporterMod:list_local(ExporterOpts, Transfer).
ready_transfer_id({ClientId, FileId}) ->
#{
<<"node">> => atom_to_binary(node()),
<<"clientid">> => ClientId,
<<"fileid">> => FileId
}.
parse_ready_transfer_id(#{
<<"node">> := NodeBin, <<"clientid">> := ClientId, <<"fileid">> := FileId
}) ->
case emqx_misc:safe_to_existing_atom(NodeBin) of
{ok, Node} ->
{ok, {Node, {ClientId, FileId}}};
{error, _} ->
{error, {invalid_node, NodeBin}}
end;
parse_ready_transfer_id(#{}) ->
{error, invalid_file_id}.
%%
-spec transfers(storage()) ->
{ok, #{transfer() => transferinfo()}}.
@ -345,17 +270,16 @@ transfers(Storage, ClientId, AccIn) ->
end.
read_transferinfo(Storage, Transfer, Acc) ->
case list(Storage, Transfer, result) of
{ok, Result = [_ | _]} ->
Info = #{status => complete, result => Result},
Acc#{Transfer => Info};
{ok, []} ->
Info = #{status => incomplete},
Acc#{Transfer => Info};
{error, _Reason} ->
?tp(warning, "list_result_failed", #{
case read_filemeta(Storage, Transfer) of
{ok, Filemeta} ->
Acc#{Transfer => #{filemeta => Filemeta}};
{error, enoent} ->
Acc#{Transfer => #{}};
{error, Reason} ->
?tp(warning, "read_transferinfo_failed", #{
storage => Storage,
transfer => Transfer
transfer => Transfer,
reason => Reason
}),
Acc
end.
@ -365,7 +289,7 @@ read_transferinfo(Storage, Transfer, Acc) ->
get_subdir(Storage, Transfer) ->
mk_filedir(Storage, Transfer, []).
-spec get_subdir(storage(), transfer(), fragment | temporary | result) ->
-spec get_subdir(storage(), transfer(), fragment | temporary) ->
file:name().
get_subdir(Storage, Transfer, What) ->
mk_filedir(Storage, Transfer, get_subdirs_for(What)).
@ -373,84 +297,12 @@ get_subdir(Storage, Transfer, What) ->
get_subdirs_for(fragment) ->
[?FRAGDIR];
get_subdirs_for(temporary) ->
[?TEMPDIR];
get_subdirs_for(result) ->
[?RESULTDIR].
%%
-type handle() :: {file:name(), io:device(), crypto:hash_state()}.
-spec open_file(storage(), transfer(), filemeta()) ->
{ok, handle()} | {error, file_error()}.
open_file(Storage, Transfer, Filemeta) ->
Filename = maps:get(name, Filemeta),
TempFilepath = mk_temp_filepath(Storage, Transfer, Filename),
_ = filelib:ensure_dir(TempFilepath),
case file:open(TempFilepath, [write, raw, binary]) of
{ok, Handle} ->
% TODO: preserve filemeta
{ok, {TempFilepath, Handle, init_checksum(Filemeta)}};
{error, _} = Error ->
Error
end.
-spec write(handle(), iodata()) ->
{ok, handle()} | {error, file_error()}.
write({Filepath, IoDevice, Ctx}, IoData) ->
case file:write(IoDevice, IoData) of
ok ->
{ok, {Filepath, IoDevice, update_checksum(Ctx, IoData)}};
{error, _} = Error ->
Error
end.
-spec complete(storage(), transfer(), filemeta(), handle()) ->
ok | {error, {checksum, _Algo, _Computed}} | {error, file_error()}.
complete(Storage, Transfer, Filemeta = #{name := Filename}, Handle = {Filepath, IoDevice, Ctx}) ->
TargetFilepath = mk_filepath(Storage, Transfer, get_subdirs_for(result), Filename),
case verify_checksum(Ctx, Filemeta) of
ok ->
ok = file:close(IoDevice),
mv_temp_file(Filepath, TargetFilepath);
{error, _} = Error ->
_ = discard(Handle),
Error
end.
-spec discard(handle()) ->
ok.
discard({Filepath, IoDevice, _Ctx}) ->
ok = file:close(IoDevice),
file:delete(Filepath).
init_checksum(#{checksum := {Algo, _}}) ->
crypto:hash_init(Algo);
init_checksum(#{}) ->
undefined.
update_checksum(Ctx, IoData) when Ctx /= undefined ->
crypto:hash_update(Ctx, IoData);
update_checksum(undefined, _IoData) ->
undefined.
verify_checksum(Ctx, #{checksum := {Algo, Digest}}) when Ctx /= undefined ->
case crypto:hash_final(Ctx) of
Digest ->
ok;
Mismatch ->
{error, {checksum, Algo, binary:encode_hex(Mismatch)}}
end;
verify_checksum(undefined, _) ->
ok.
[?TEMPDIR].
-define(PRELUDE(Vsn, Meta), [<<"filemeta">>, Vsn, Meta]).
encode_filemeta(Meta) ->
% TODO: Looks like this should be hocon's responsibility.
Schema = emqx_ft_schema:schema(filemeta),
Term = hocon_tconf:make_serializable(Schema, emqx_map_lib:binary_key_map(Meta), #{}),
emqx_json:encode(?PRELUDE(_Vsn = 1, Term)).
emqx_json:encode(?PRELUDE(_Vsn = 1, emqx_ft:encode_filemeta(Meta))).
decode_filemeta(Binary) when is_binary(Binary) ->
?PRELUDE(_Vsn = 1, Map) = emqx_json:decode(Binary, [return_maps]),
@ -490,33 +342,12 @@ try_list_dir(Dirname) ->
end.
get_storage_root(Storage) ->
maps:get(root, Storage, filename:join(emqx:data_dir(), "file_transfer")).
maps:get(root, Storage, filename:join([emqx:data_dir(), "ft", "transfers"])).
-include_lib("kernel/include/file.hrl").
read_file(Filepath) ->
file:read_file(Filepath).
read_file(Filepath, DecodeFun) ->
case read_file(Filepath) of
{ok, Content} ->
safe_decode(Content, DecodeFun);
{error, _} = Error ->
Error
end.
safe_decode(Content, DecodeFun) ->
try
{ok, DecodeFun(Content)}
catch
C:E:Stacktrace ->
?tp(warning, "safe_decode_failed", #{
class => C,
exception => E,
stacktrace => Stacktrace
}),
{error, corrupted}
end.
emqx_ft_fs_util:read_decode_file(Filepath, DecodeFun).
write_file_atomic(Storage, Transfer, Filepath, Content) when is_binary(Content) ->
TempFilepath = mk_temp_filepath(Storage, Transfer, filename:basename(Filepath)),
@ -574,12 +405,6 @@ mk_filefrag(_Dirname, _Filename) ->
}),
false.
mk_result_filefrag(Dirname, Filename) ->
% NOTE
% Any file in the `?RESULTDIR` subdir is currently considered the result of
% the file transfer.
mk_filefrag(Dirname, Filename, result, fun(_, _) -> {ok, #{}} end).
mk_filefrag(Dirname, Filename, Tag, Fun) ->
Filepath = filename:join(Dirname, Filename),
% TODO error handling?

View File

@ -308,28 +308,18 @@ is_same_filepath(P1, P2) when is_binary(P1) ->
filepath_to_binary(S) ->
unicode:characters_to_binary(S, unicode, file:native_name_encoding()).
get_segments_ttl(Storage, Transfer) ->
get_segments_ttl(Storage, TransferInfo) ->
{MinTTL, MaxTTL} = emqx_ft_conf:segments_ttl(Storage),
clamp(MinTTL, MaxTTL, try_get_filemeta_ttl(Storage, Transfer)).
clamp(MinTTL, MaxTTL, try_get_filemeta_ttl(TransferInfo)).
try_get_filemeta_ttl(Storage, Transfer) ->
case emqx_ft_storage_fs:read_filemeta(Storage, Transfer) of
{ok, Filemeta} ->
maps:get(segments_ttl, Filemeta, undefined);
{error, _} ->
undefined
end.
try_get_filemeta_ttl(#{filemeta := Filemeta}) ->
maps:get(segments_ttl, Filemeta, undefined);
try_get_filemeta_ttl(#{}) ->
undefined.
clamp(Min, Max, V) ->
min(Max, max(Min, V)).
% try_collect(_Subject, ok = Result, Then, _Stats) ->
% Then(Result);
% try_collect(_Subject, {ok, Result}, Then, _Stats) ->
% Then(Result);
% try_collect(Subject, {error, _} = Error, _Then, Stats) ->
% register_gcstat_error(Subject, Error, Stats).
%%
init_gcstats() ->

View File

@ -23,8 +23,8 @@
-export([
list_local/2,
pread_local/4,
get_ready_transfer_local/2,
ready_transfers_local/0
list_exports_local/0,
read_export_file_local/2
]).
list_local(Transfer, What) ->
@ -33,8 +33,18 @@ list_local(Transfer, What) ->
pread_local(Transfer, Frag, Offset, Size) ->
emqx_ft_storage:with_storage_type(local, pread, [Transfer, Frag, Offset, Size]).
get_ready_transfer_local(CallerPid, Transfer) ->
emqx_ft_storage:with_storage_type(local, get_ready_transfer_local, [CallerPid, Transfer]).
list_exports_local() ->
case emqx_ft_storage:with_storage_type(local, exporter, []) of
{emqx_ft_storage_exporter_fs, Options} ->
emqx_ft_storage_exporter_fs:list_local(Options);
InvalidExporter ->
{error, {invalid_exporter, InvalidExporter}}
end.
ready_transfers_local() ->
emqx_ft_storage:with_storage_type(local, ready_transfers_local, []).
read_export_file_local(Filepath, CallerPid) ->
case emqx_ft_storage:with_storage_type(local, exporter, []) of
{emqx_ft_storage_exporter_fs, Options} ->
emqx_ft_storage_exporter_fs:start_reader(Options, Filepath, CallerPid);
InvalidExporter ->
{error, {invalid_exporter, InvalidExporter}}
end.

View File

@ -22,8 +22,10 @@
-export([multilist/3]).
-export([pread/5]).
-export([ready_transfers/1]).
-export([get_ready_transfer/3]).
%% TODO: These should be defined in a separate BPAPI
-export([list_exports/1]).
-export([read_export_file/3]).
-type offset() :: emqx_ft:offset().
-type transfer() :: emqx_ft:transfer().
@ -44,19 +46,16 @@ multilist(Nodes, Transfer, What) ->
pread(Node, Transfer, Frag, Offset, Size) ->
erpc:call(Node, emqx_ft_storage_fs_proxy, pread_local, [Transfer, Frag, Offset, Size]).
-spec ready_transfers([node()]) ->
[
{ok, [{emqx_ft_storage:ready_transfer_id(), emqx_ft_storage:ready_transfer_info()}]}
| {error, term()}
| {exit, term()}
| {throw, term()}
].
ready_transfers(Nodes) ->
erpc:multicall(Nodes, emqx_ft_storage_fs_proxy, ready_transfers_local, []).
%%
-spec get_ready_transfer(node(), pid(), emqx_ft_storage:ready_transfer_id()) ->
{ok, emqx_ft_storage:ready_transfer_data()}
-spec list_exports([node()]) ->
emqx_rpc:erpc_multicall([emqx_ft_storage:export_info()]).
list_exports(Nodes) ->
erpc:multicall(Nodes, emqx_ft_storage_fs_proxy, list_exports_local, []).
-spec read_export_file(node(), file:name(), pid()) ->
{ok, emqx_ft_storage:export_data()}
| {error, term()}
| no_return().
get_ready_transfer(Node, CallerPid, ReadyTransferId) ->
erpc:call(Node, emqx_ft_storage_fs_proxy, get_ready_transfer_local, [CallerPid, ReadyTransferId]).
read_export_file(Node, Filepath, CallerPid) ->
erpc:call(Node, emqx_ft_storage_fs_proxy, read_export_file_local, [Filepath, CallerPid]).

View File

@ -58,8 +58,9 @@ end_per_suite(_Config) ->
set_special_configs(Config) ->
fun
(emqx_ft) ->
Root = emqx_ft_test_helpers:ft_root(Config, node()),
emqx_ft_test_helpers:load_config(#{storage => #{type => local, root => Root}});
emqx_ft_test_helpers:load_config(#{
storage => emqx_ft_test_helpers:local_storage(Config)
});
(_) ->
ok
end.
@ -108,8 +109,9 @@ mk_cluster_specs(Config) ->
{conf, [{[listeners, Proto, default, enabled], false} || Proto <- [ssl, ws, wss]]},
{env_handler, fun
(emqx_ft) ->
Root = emqx_ft_test_helpers:ft_root(Config, node()),
emqx_ft_test_helpers:load_config(#{storage => #{type => local, root => Root}});
emqx_ft_test_helpers:load_config(#{
storage => emqx_ft_test_helpers:local_storage(Config)
});
(_) ->
ok
end}
@ -194,11 +196,10 @@ t_simple_transfer(Config) ->
emqtt:publish(C, mk_fin_topic(FileId, Filesize), <<>>, 1)
),
[ReadyTransferId] = list_ready_transfers(?config(clientid, Config)),
{ok, TableQH} = emqx_ft_storage:get_ready_transfer(ReadyTransferId),
[Export] = list_exports(?config(clientid, Config)),
?assertEqual(
iolist_to_binary(Data),
iolist_to_binary(qlc:eval(TableQH))
{ok, iolist_to_binary(Data)},
read_export(Export)
).
t_meta_conflict(Config) ->
@ -424,11 +425,10 @@ t_switch_node(Config) ->
%% Now check consistency of the file
[ReadyTransferId] = list_ready_transfers(ClientId),
{ok, TableQH} = emqx_ft_storage:get_ready_transfer(ReadyTransferId),
[Export] = list_exports(ClientId),
?assertEqual(
iolist_to_binary(Data),
iolist_to_binary(qlc:eval(TableQH))
{ok, iolist_to_binary(Data)},
read_export(Export)
).
t_assemble_crash(Config) ->
@ -501,27 +501,30 @@ t_unreliable_migrating_client(Config) ->
],
_Context = run_commands(Commands, Context),
ReadyTransferIds = list_ready_transfers(?config(clientid, Config)),
Exports = list_exports(?config(clientid, Config)),
% NOTE
% The cluster had 2 assemblers running on two different nodes, because client sent `fin`
% twice. This is currently expected, files must be identical anyway.
Node1Bin = atom_to_binary(Node1),
NodeSelfBin = atom_to_binary(NodeSelf),
Node1Str = atom_to_list(Node1),
NodeSelfStr = atom_to_list(NodeSelf),
?assertMatch(
[#{<<"node">> := Node1Bin}, #{<<"node">> := NodeSelfBin}],
lists:sort(ReadyTransferIds)
[#{"node" := Node1Str}, #{"node" := NodeSelfStr}],
lists:map(
fun(#{uri := URIString}) ->
#{query := QS} = uri_string:parse(URIString),
uri_string:dissect_query(QS)
end,
lists:sort(Exports)
)
),
[
begin
{ok, TableQH} = emqx_ft_storage:get_ready_transfer(Id),
?assertEqual(
Payload,
iolist_to_binary(qlc:eval(TableQH))
)
end
|| Id <- ReadyTransferIds
?assertEqual(
{ok, Payload},
read_export(Export)
)
|| Export <- Exports
].
run_commands(Commands, Context) ->
@ -620,10 +623,10 @@ meta(FileName, Data) ->
size => byte_size(FullData)
}.
list_ready_transfers(ClientId) ->
{ok, ReadyTransfers} = emqx_ft_storage:ready_transfers(),
[
Id
|| {#{<<"clientid">> := CId} = Id, _Info} <- ReadyTransfers,
CId == ClientId
].
list_exports(ClientId) ->
{ok, Exports} = emqx_ft_storage:exports(),
[Export || Export = #{transfer := {CId, _}} <- Exports, CId == ClientId].
read_export(#{path := AbsFilepath}) ->
% TODO: only works for the local filesystem exporter right now
file:read_file(AbsFilepath).

View File

@ -24,7 +24,7 @@
-include_lib("emqx/include/asserts.hrl").
-import(emqx_mgmt_api_test_util, [request/3, uri/1]).
-import(emqx_mgmt_api_test_util, [uri/1]).
all() -> emqx_common_test_helpers:all(?MODULE).
@ -41,8 +41,9 @@ end_per_suite(_Config) ->
set_special_configs(Config) ->
fun
(emqx_ft) ->
Root = emqx_ft_test_helpers:ft_root(Config, node()),
emqx_ft_test_helpers:load_config(#{storage => #{type => local, root => Root}});
emqx_ft_test_helpers:load_config(#{
storage => emqx_ft_test_helpers:local_storage(Config)
});
(_) ->
ok
end.
@ -61,40 +62,25 @@ t_list_ready_transfers(Config) ->
ok = emqx_ft_test_helpers:upload_file(ClientId, <<"f1">>, <<"data">>, node()),
{ok, 200, Response} = request(get, uri(["file_transfer", "files"])),
#{<<"files">> := Files} = emqx_json:decode(Response, [return_maps]),
{ok, 200, #{<<"files">> := Files}} =
request(get, uri(["file_transfer", "files"]), fun json/1),
?assertInclude(
#{<<"id">> := #{<<"clientid">> := ClientId, <<"fileid">> := <<"f1">>}},
#{<<"clientid">> := ClientId, <<"fileid">> := <<"f1">>},
Files
).
%% This shouldn't happen in real life
%% but we need to test it anyway
t_list_ready_transfers_no_nodes(_Config) ->
_ = meck:new(mria_mnesia, [passthrough]),
_ = meck:expect(mria_mnesia, running_nodes, fun() -> [] end),
?assertMatch(
{ok, 503, _},
request(get, uri(["file_transfer", "files"]))
).
t_download_transfer(Config) ->
ClientId = client_id(Config),
ok = emqx_ft_test_helpers:upload_file(ClientId, <<"f1">>, <<"data">>, node()),
?assertMatch(
{ok, 503, _},
{ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
request(
get,
uri(["file_transfer", "file"]) ++
query(#{
clientid => ClientId,
fileid => <<"f1">>
})
uri(["file_transfer", "file"]) ++ query(#{fileref => <<"f1">>}),
fun json/1
)
),
@ -104,8 +90,7 @@ t_download_transfer(Config) ->
get,
uri(["file_transfer", "file"]) ++
query(#{
clientid => ClientId,
fileid => <<"f1">>,
fileref => <<"f1">>,
node => <<"nonode@nohost">>
})
)
@ -117,22 +102,16 @@ t_download_transfer(Config) ->
get,
uri(["file_transfer", "file"]) ++
query(#{
clientid => ClientId,
fileid => <<"unknown_file">>,
fileref => <<"unknown_file">>,
node => node()
})
)
),
{ok, 200, Response} = request(
get,
uri(["file_transfer", "file"]) ++
query(#{
clientid => ClientId,
fileid => <<"f1">>,
node => node()
})
),
{ok, 200, #{<<"files">> := [File]}} =
request(get, uri(["file_transfer", "files"]), fun json/1),
{ok, 200, Response} = request(get, uri([]) ++ maps:get(<<"uri">>, File)),
?assertEqual(
<<"data">>,
@ -147,7 +126,18 @@ client_id(Config) ->
atom_to_binary(?config(tc, Config), utf8).
request(Method, Url) ->
request(Method, Url, []).
emqx_mgmt_api_test_util:request(Method, Url, []).
request(Method, Url, Decoder) when is_function(Decoder) ->
case emqx_mgmt_api_test_util:request(Method, Url, []) of
{ok, Code, Body} ->
{ok, Code, Decoder(Body)};
Otherwise ->
Otherwise
end.
json(Body) when is_binary(Body) ->
emqx_json:decode(Body, [return_maps]).
query(Params) ->
KVs = lists:map(fun({K, V}) -> uri_encode(K) ++ "=" ++ uri_encode(V) end, maps:to_list(Params)),

View File

@ -47,6 +47,7 @@ init_per_testcase(TC, Config) ->
{ok, Pid} = emqx_ft_assembler_sup:start_link(),
[
{storage_root, "file_transfer_root"},
{exports_root, "file_transfer_exports"},
{file_id, atom_to_binary(TC)},
{assembler_sup, Pid}
| Config
@ -85,11 +86,12 @@ t_assemble_empty_transfer(Config) ->
),
Status = complete_assemble(Storage, Transfer, 0),
?assertEqual({shutdown, ok}, Status),
{ok, [Result = #{size := Size = 0}]} = emqx_ft_storage_fs:list(Storage, Transfer, result),
?assertEqual(
{error, eof},
emqx_ft_storage_fs:pread(Storage, Transfer, Result, 0, Size)
),
{ok, [_Result = #{size := _Size = 0}]} =
emqx_ft_storage_fs:exports_local(Storage, Transfer),
% ?assertEqual(
% {error, eof},
% emqx_ft_storage_fs:pread(Storage, Transfer, Result, 0, Size)
% ),
ok.
t_assemble_complete_local_transfer(Config) ->
@ -133,12 +135,13 @@ t_assemble_complete_local_transfer(Config) ->
{ok, [
#{
size := TransferSize,
fragment := {result, #{}}
meta := #{}
}
]},
emqx_ft_storage_fs:list(Storage, Transfer, result)
emqx_ft_storage_fs:exports_local(Storage, Transfer)
),
{ok, [#{path := AssemblyFilename}]} = emqx_ft_storage_fs:list(Storage, Transfer, result),
{ok, [#{path := AssemblyFilename}]} =
emqx_ft_storage_fs:exports_local(Storage, Transfer),
?assertMatch(
{ok, #file_info{type = regular, size = TransferSize}},
file:read_file_info(AssemblyFilename)
@ -191,18 +194,23 @@ complete_assemble(Storage, Transfer, Size, Timeout) ->
t_list_transfers(Config) ->
Storage = storage(Config),
{ok, Exports} = emqx_ft_storage_fs:exports_local(Storage),
?assertMatch(
{ok, #{
{?CLIENTID1, <<"t_assemble_empty_transfer">>} := #{
status := complete,
result := [#{path := _, size := 0, fragment := {result, _}}]
[
#{
transfer := {?CLIENTID2, <<"t_assemble_complete_local_transfer">>},
path := _,
size := Size,
meta := #{name := "topsecret.pdf"}
},
{?CLIENTID2, <<"t_assemble_complete_local_transfer">>} := #{
status := complete,
result := [#{path := _, size := Size, fragment := {result, _}}]
#{
transfer := {?CLIENTID1, <<"t_assemble_empty_transfer">>},
path := _,
size := 0,
meta := #{name := "important.pdf"}
}
}} when Size > 0,
emqx_ft_storage_fs:transfers(Storage)
] when Size > 0,
lists:sort(Exports)
).
%%
@ -232,5 +240,9 @@ mk_fileid() ->
storage(Config) ->
#{
type => local,
root => ?config(storage_root, Config)
root => ?config(storage_root, Config),
exporter => #{
type => local,
root => ?config(exports_root, Config)
}
}.

View File

@ -56,7 +56,16 @@ t_update_config(_Config) ->
{ok, _},
emqx_conf:update(
[file_transfer],
#{<<"storage">> => #{<<"type">> => <<"local">>, <<"root">> => <<"/tmp/path">>}},
#{
<<"storage">> => #{
<<"type">> => <<"local">>,
<<"root">> => <<"/tmp/path">>,
<<"exporter">> => #{
<<"type">> => <<"local">>,
<<"root">> => <<"/tmp/exports">>
}
}
},
#{}
)
),

View File

@ -22,11 +22,8 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("stdlib/include/assert.hrl").
-include_lib("emqx/include/asserts.hrl").
all() ->
[
{group, single_node},
{group, cluster}
].
@ -34,7 +31,6 @@ all() ->
groups() ->
[
{single_node, [sequence], emqx_common_test_helpers:all(?MODULE) -- ?CLUSTER_CASES},
{cluster, [sequence], ?CLUSTER_CASES}
].
@ -48,8 +44,9 @@ end_per_suite(_Config) ->
set_special_configs(Config) ->
fun
(emqx_ft) ->
Root = emqx_ft_test_helpers:ft_root(Config, node()),
emqx_ft_test_helpers:load_config(#{storage => #{type => local, root => Root}});
emqx_ft_test_helpers:load_config(#{
storage => emqx_ft_test_helpers:local_storage(Config)
});
(_) ->
ok
end.
@ -74,47 +71,19 @@ end_per_group(_Group, _Config) ->
%% Tests
%%--------------------------------------------------------------------
t_invalid_ready_transfer_id(Config) ->
?assertMatch(
{error, _},
emqx_ft_storage_fs:get_ready_transfer(storage(Config), #{
<<"clientid">> => client_id(Config),
<<"fileid">> => <<"fileid">>,
<<"node">> => atom_to_binary('nonexistent@127.0.0.1')
})
),
?assertMatch(
{error, _},
emqx_ft_storage_fs:get_ready_transfer(storage(Config), #{
<<"clientid">> => client_id(Config),
<<"fileid">> => <<"fileid">>,
<<"node">> => <<"nonexistent_as_atom@127.0.0.1">>
})
),
?assertMatch(
{error, _},
emqx_ft_storage_fs:get_ready_transfer(storage(Config), #{
<<"clientid">> => client_id(Config),
<<"fileid">> => <<"nonexistent_file">>,
<<"node">> => node()
})
).
t_multinode_ready_transfers(Config) ->
Node1 = ?config(additional_node, Config),
ok = emqx_ft_test_helpers:upload_file(<<"c1">>, <<"f1">>, <<"data">>, Node1),
ok = emqx_ft_test_helpers:upload_file(<<"c/1">>, <<"f:1">>, "fn1", <<"data">>, Node1),
Node2 = node(),
ok = emqx_ft_test_helpers:upload_file(<<"c2">>, <<"f2">>, <<"data">>, Node2),
ok = emqx_ft_test_helpers:upload_file(<<"c/2">>, <<"f:2">>, "fn2", <<"data">>, Node2),
?assertInclude(
#{<<"clientid">> := <<"c1">>, <<"fileid">> := <<"f1">>},
ready_transfer_ids(Config)
),
?assertInclude(
#{<<"clientid">> := <<"c2">>, <<"fileid">> := <<"f2">>},
ready_transfer_ids(Config)
?assertMatch(
[
#{transfer := {<<"c/1">>, <<"f:1">>}, name := "fn1"},
#{transfer := {<<"c/2">>, <<"f:2">>}, name := "fn2"}
],
lists:sort(list_exports(Config))
).
%%--------------------------------------------------------------------
@ -127,13 +96,13 @@ client_id(Config) ->
storage(Config) ->
#{
type => local,
root => ft_root(Config)
root => emqx_ft_test_helpers:root(Config, node(), ["transfers"]),
exporter => #{
type => local,
root => emqx_ft_test_helpers:root(Config, node(), ["exports"])
}
}.
ft_root(Config) ->
emqx_ft_test_helpers:ft_root(Config, node()).
ready_transfer_ids(Config) ->
{ok, ReadyTransfers} = emqx_ft_storage_fs:ready_transfers(storage(Config)),
{ReadyTransferIds, _} = lists:unzip(ReadyTransfers),
ReadyTransferIds.
list_exports(Config) ->
{ok, Exports} = emqx_ft_storage_fs:exports(storage(Config)),
Exports.

View File

@ -41,7 +41,14 @@ init_per_testcase(TC, Config) ->
emqx_ft,
fun(emqx_ft) ->
emqx_ft_test_helpers:load_config(#{
storage => #{type => local, root => mk_root(TC, Config)}
storage => #{
type => local,
root => emqx_ft_test_helpers:root(Config, node(), [TC, transfers]),
exporter => #{
type => local,
root => emqx_ft_test_helpers:root(Config, node(), [TC, exports])
}
}
})
end
),
@ -53,9 +60,6 @@ end_per_testcase(_TC, _Config) ->
ok = application:stop(emqx_ft),
ok.
mk_root(TC, Config) ->
filename:join([?config(priv_dir, Config), "file_transfer", TC, atom_to_list(node())]).
%%
-define(NSEGS(Filesize, SegmentSize), (ceil(Filesize / SegmentSize) + 1)).

View File

@ -30,7 +30,7 @@ start_additional_node(Config, Name) ->
{configure_gen_rpc, true},
{env_handler, fun
(emqx_ft) ->
load_config(#{storage => #{type => local, root => ft_root(Config, node())}});
load_config(#{storage => local_storage(Config)});
(_) ->
ok
end}
@ -43,6 +43,13 @@ stop_additional_node(Node) ->
ok = emqx_common_test_helpers:stop_slave(Node),
ok.
local_storage(Config) ->
#{
type => local,
root => root(Config, node(), [transfers]),
exporter => #{type => local, root => root(Config, node(), [exports])}
}.
load_config(Config) ->
emqx_common_test_helpers:load_config(emqx_ft_schema, #{file_transfer => Config}).
@ -50,10 +57,8 @@ tcp_port(Node) ->
{_, Port} = rpc:call(Node, emqx_config, get, [[listeners, tcp, default, bind]]),
Port.
ft_root(Config, Node) ->
filename:join([
?config(priv_dir, Config), <<"file_transfer">>, atom_to_binary(Node)
]).
root(Config, Node, Tail) ->
filename:join([?config(priv_dir, Config), "file_transfer", Node | Tail]).
upload_file(ClientId, FileId, Data, Node) ->
Port = tcp_port(Node),