Merge pull request #9900 from keynslug/file-transfer
test(ft): add some basic assembler tests
This commit is contained in:
commit
af4360f5c5
|
@ -66,7 +66,8 @@
|
||||||
%% TTL of individual segments
|
%% TTL of individual segments
|
||||||
%% Somewhat confusing that we won't know it on the nodes where the filemeta
|
%% Somewhat confusing that we won't know it on the nodes where the filemeta
|
||||||
%% is missing.
|
%% is missing.
|
||||||
segments_ttl => _Seconds :: pos_integer()
|
segments_ttl => _Seconds :: pos_integer(),
|
||||||
|
user_data => emqx_ft_schema:json_value()
|
||||||
}.
|
}.
|
||||||
|
|
||||||
-type segment() :: {offset(), _Content :: binary()}.
|
-type segment() :: {offset(), _Content :: binary()}.
|
||||||
|
|
|
@ -66,7 +66,7 @@ init({Storage, Transfer, Callback}) ->
|
||||||
|
|
||||||
handle_event(internal, _, list_local_fragments, St = #st{assembly = Asm}) ->
|
handle_event(internal, _, list_local_fragments, St = #st{assembly = Asm}) ->
|
||||||
% TODO: what we do with non-transients errors here (e.g. `eacces`)?
|
% TODO: what we do with non-transients errors here (e.g. `eacces`)?
|
||||||
{ok, Fragments} = emqx_ft_storage_fs:list(St#st.storage, St#st.transfer),
|
{ok, Fragments} = emqx_ft_storage_fs:list(St#st.storage, St#st.transfer, fragment),
|
||||||
NAsm = emqx_ft_assembly:update(emqx_ft_assembly:append(Asm, node(), Fragments)),
|
NAsm = emqx_ft_assembly:update(emqx_ft_assembly:append(Asm, node(), Fragments)),
|
||||||
NSt = St#st{assembly = NAsm},
|
NSt = St#st{assembly = NAsm},
|
||||||
case emqx_ft_assembly:status(NAsm) of
|
case emqx_ft_assembly:status(NAsm) of
|
||||||
|
@ -80,13 +80,11 @@ handle_event(internal, _, list_local_fragments, St = #st{assembly = Asm}) ->
|
||||||
% {stop, Reason}
|
% {stop, Reason}
|
||||||
end;
|
end;
|
||||||
handle_event(internal, _, {list_remote_fragments, Nodes}, St) ->
|
handle_event(internal, _, {list_remote_fragments, Nodes}, St) ->
|
||||||
% TODO: portable "storage" ref
|
|
||||||
Args = [St#st.storage, St#st.transfer],
|
|
||||||
% TODO
|
% TODO
|
||||||
% Async would better because we would not need to wait for some lagging nodes if
|
% Async would better because we would not need to wait for some lagging nodes if
|
||||||
% the coverage is already complete.
|
% the coverage is already complete.
|
||||||
% TODO: BP API?
|
% TODO: portable "storage" ref
|
||||||
Results = erpc:multicall(Nodes, emqx_ft_storage_fs, list, Args, ?RPC_LIST_TIMEOUT),
|
Results = emqx_ft_storage_fs_proto_v1:multilist(Nodes, St#st.transfer, fragment),
|
||||||
NodeResults = lists:zip(Nodes, Results),
|
NodeResults = lists:zip(Nodes, Results),
|
||||||
NAsm = emqx_ft_assembly:update(
|
NAsm = emqx_ft_assembly:update(
|
||||||
lists:foldl(
|
lists:foldl(
|
||||||
|
@ -119,9 +117,8 @@ handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #st{}) ->
|
||||||
% TODO
|
% TODO
|
||||||
% Currently, race is possible between getting segment info from the remote node and
|
% Currently, race is possible between getting segment info from the remote node and
|
||||||
% this node garbage collecting the segment itself.
|
% this node garbage collecting the segment itself.
|
||||||
Args = [St#st.storage, St#st.transfer, Segment, 0, segsize(Segment)],
|
|
||||||
% TODO: pipelining
|
% TODO: pipelining
|
||||||
case erpc:call(Node, emqx_ft_storage_fs, read_segment, Args, ?RPC_READSEG_TIMEOUT) of
|
case pread(Node, Segment, St) of
|
||||||
{ok, Content} ->
|
{ok, Content} ->
|
||||||
{ok, NHandle} = emqx_ft_storage_fs:write(St#st.file, Content),
|
{ok, NHandle} = emqx_ft_storage_fs:write(St#st.file, Content),
|
||||||
{next_state, {assemble, Rest}, St#st{file = NHandle}, ?internal([])}
|
{next_state, {assemble, Rest}, St#st{file = NHandle}, ?internal([])}
|
||||||
|
@ -158,6 +155,11 @@ handle_event(internal, _, complete, St = #st{assembly = Asm, file = Handle, call
|
||||||
% handle_cast(_Cast, St) ->
|
% handle_cast(_Cast, St) ->
|
||||||
% {noreply, St}.
|
% {noreply, St}.
|
||||||
|
|
||||||
|
pread(Node, Segment, St) when Node =:= node() ->
|
||||||
|
emqx_ft_storage_fs:pread(St#st.storage, St#st.transfer, Segment, 0, segsize(Segment));
|
||||||
|
pread(Node, Segment, St) ->
|
||||||
|
emqx_ft_storage_fs_proto_v1:pread(Node, St#st.transfer, Segment, 0, segsize(Segment)).
|
||||||
|
|
||||||
%%
|
%%
|
||||||
|
|
||||||
segsize(#{fragment := {segment, Info}}) ->
|
segsize(#{fragment := {segment, Info}}) ->
|
||||||
|
|
|
@ -23,6 +23,20 @@
|
||||||
|
|
||||||
-export([namespace/0, roots/0, fields/1, tags/0]).
|
-export([namespace/0, roots/0, fields/1, tags/0]).
|
||||||
|
|
||||||
|
-export([schema/1]).
|
||||||
|
|
||||||
|
-type json_value() ::
|
||||||
|
null
|
||||||
|
| boolean()
|
||||||
|
| binary()
|
||||||
|
| number()
|
||||||
|
| [json_value()]
|
||||||
|
| #{binary() => json_value()}.
|
||||||
|
|
||||||
|
-reflect_type([json_value/0]).
|
||||||
|
|
||||||
|
%%
|
||||||
|
|
||||||
namespace() -> file_transfer.
|
namespace() -> file_transfer.
|
||||||
|
|
||||||
tags() ->
|
tags() ->
|
||||||
|
@ -47,3 +61,29 @@ fields(local_storage) ->
|
||||||
desc => ?DESC("local")
|
desc => ?DESC("local")
|
||||||
}}
|
}}
|
||||||
].
|
].
|
||||||
|
|
||||||
|
schema(filemeta) ->
|
||||||
|
#{
|
||||||
|
roots => [
|
||||||
|
{name, hoconsc:mk(string(), #{required => true})},
|
||||||
|
{size, hoconsc:mk(non_neg_integer())},
|
||||||
|
{expire_at, hoconsc:mk(non_neg_integer())},
|
||||||
|
{checksum, hoconsc:mk({atom(), binary()}, #{converter => converter(checksum)})},
|
||||||
|
{segments_ttl, hoconsc:mk(pos_integer())},
|
||||||
|
{user_data, hoconsc:mk(json_value())}
|
||||||
|
]
|
||||||
|
}.
|
||||||
|
|
||||||
|
converter(checksum) ->
|
||||||
|
fun
|
||||||
|
(undefined, #{}) ->
|
||||||
|
undefined;
|
||||||
|
({sha256, Bin}, #{make_serializable := true}) ->
|
||||||
|
_ = is_binary(Bin) orelse throw({expected_type, string}),
|
||||||
|
_ = byte_size(Bin) =:= 32 orelse throw({expected_length, 32}),
|
||||||
|
binary:encode_hex(Bin);
|
||||||
|
(Hex, #{}) ->
|
||||||
|
_ = is_binary(Hex) orelse throw({expected_type, string}),
|
||||||
|
_ = byte_size(Hex) =:= 64 orelse throw({expected_length, 64}),
|
||||||
|
{sha256, binary:decode_hex(Hex)}
|
||||||
|
end.
|
||||||
|
|
|
@ -24,6 +24,14 @@
|
||||||
]
|
]
|
||||||
).
|
).
|
||||||
|
|
||||||
|
-export([list_local/2]).
|
||||||
|
-export([pread_local/4]).
|
||||||
|
|
||||||
|
-export([local_transfers/0]).
|
||||||
|
|
||||||
|
-type offset() :: emqx_ft:offset().
|
||||||
|
-type transfer() :: emqx_ft:transfer().
|
||||||
|
|
||||||
-type storage() :: emqx_config:config().
|
-type storage() :: emqx_config:config().
|
||||||
|
|
||||||
-export_type([assemble_callback/0]).
|
-export_type([assemble_callback/0]).
|
||||||
|
@ -63,8 +71,41 @@ assemble(Transfer, Callback) ->
|
||||||
Mod = mod(),
|
Mod = mod(),
|
||||||
Mod:assemble(storage(), Transfer, Callback).
|
Mod:assemble(storage(), Transfer, Callback).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Local FS API
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-type filefrag() :: emqx_ft_storage_fs:filefrag().
|
||||||
|
-type transferinfo() :: emqx_ft_storage_fs:transferinfo().
|
||||||
|
|
||||||
|
-spec list_local(transfer(), fragment | result) ->
|
||||||
|
{ok, [filefrag()]} | {error, term()}.
|
||||||
|
list_local(Transfer, What) ->
|
||||||
|
with_local_storage(
|
||||||
|
fun(Mod, Storage) -> Mod:list(Storage, Transfer, What) end
|
||||||
|
).
|
||||||
|
|
||||||
|
-spec pread_local(transfer(), filefrag(), offset(), _Size :: non_neg_integer()) ->
|
||||||
|
{ok, [filefrag()]} | {error, term()}.
|
||||||
|
pread_local(Transfer, Frag, Offset, Size) ->
|
||||||
|
with_local_storage(
|
||||||
|
fun(Mod, Storage) -> Mod:pread(Storage, Transfer, Frag, Offset, Size) end
|
||||||
|
).
|
||||||
|
|
||||||
|
-spec local_transfers() ->
|
||||||
|
{ok, node(), #{transfer() => transferinfo()}} | {error, term()}.
|
||||||
|
local_transfers() ->
|
||||||
|
with_local_storage(
|
||||||
|
fun(Mod, Storage) -> Mod:transfers(Storage) end
|
||||||
|
).
|
||||||
|
|
||||||
|
%%
|
||||||
|
|
||||||
mod() ->
|
mod() ->
|
||||||
case storage() of
|
mod(storage()).
|
||||||
|
|
||||||
|
mod(Storage) ->
|
||||||
|
case Storage of
|
||||||
#{type := local} ->
|
#{type := local} ->
|
||||||
emqx_ft_storage_fs
|
emqx_ft_storage_fs
|
||||||
% emqx_ft_storage_dummy
|
% emqx_ft_storage_dummy
|
||||||
|
@ -72,3 +113,11 @@ mod() ->
|
||||||
|
|
||||||
storage() ->
|
storage() ->
|
||||||
emqx_config:get([file_transfer, storage]).
|
emqx_config:get([file_transfer, storage]).
|
||||||
|
|
||||||
|
with_local_storage(Fun) ->
|
||||||
|
case storage() of
|
||||||
|
#{type := local} = Storage ->
|
||||||
|
Fun(mod(Storage), Storage);
|
||||||
|
#{type := Type} ->
|
||||||
|
{error, {unsupported_storage_type, Type}}
|
||||||
|
end.
|
||||||
|
|
|
@ -16,45 +16,59 @@
|
||||||
|
|
||||||
-module(emqx_ft_storage_fs).
|
-module(emqx_ft_storage_fs).
|
||||||
|
|
||||||
-include_lib("typerefl/include/types.hrl").
|
|
||||||
-include_lib("hocon/include/hoconsc.hrl").
|
|
||||||
|
|
||||||
-behaviour(emqx_ft_storage).
|
-behaviour(emqx_ft_storage).
|
||||||
|
|
||||||
-export([store_filemeta/3]).
|
-export([store_filemeta/3]).
|
||||||
-export([store_segment/3]).
|
-export([store_segment/3]).
|
||||||
-export([list/2]).
|
-export([list/3]).
|
||||||
-export([read_segment/5]).
|
-export([pread/5]).
|
||||||
-export([assemble/3]).
|
-export([assemble/3]).
|
||||||
|
|
||||||
|
-export([transfers/1]).
|
||||||
|
|
||||||
-export([open_file/3]).
|
-export([open_file/3]).
|
||||||
-export([complete/4]).
|
-export([complete/4]).
|
||||||
-export([write/2]).
|
-export([write/2]).
|
||||||
-export([discard/1]).
|
-export([discard/1]).
|
||||||
|
|
||||||
|
-export_type([filefrag/1]).
|
||||||
|
-export_type([filefrag/0]).
|
||||||
|
-export_type([transferinfo/0]).
|
||||||
|
|
||||||
-type transfer() :: emqx_ft:transfer().
|
-type transfer() :: emqx_ft:transfer().
|
||||||
-type offset() :: emqx_ft:offset().
|
-type offset() :: emqx_ft:offset().
|
||||||
|
|
||||||
-type filemeta() :: emqx_ft:filemeta().
|
-type filemeta() :: emqx_ft:filemeta().
|
||||||
|
-type segment() :: emqx_ft:segment().
|
||||||
-type segment() :: {offset(), _Content :: binary()}.
|
|
||||||
|
|
||||||
-type segmentinfo() :: #{
|
-type segmentinfo() :: #{
|
||||||
offset := offset(),
|
offset := offset(),
|
||||||
size := _Bytes :: non_neg_integer()
|
size := _Bytes :: non_neg_integer()
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
-type transferinfo() :: #{
|
||||||
|
status := complete | incomplete,
|
||||||
|
result => [filefrag({result, #{}})]
|
||||||
|
}.
|
||||||
|
|
||||||
|
% TODO naming
|
||||||
-type filefrag(T) :: #{
|
-type filefrag(T) :: #{
|
||||||
path := file:name(),
|
path := file:name(),
|
||||||
timestamp := emqx_datetime:epoch_second(),
|
timestamp := emqx_datetime:epoch_second(),
|
||||||
|
size := _Bytes :: non_neg_integer(),
|
||||||
fragment := T
|
fragment := T
|
||||||
}.
|
}.
|
||||||
|
|
||||||
-type filefrag() :: filefrag({filemeta, filemeta()} | {segment, segmentinfo()}).
|
-type filefrag() :: filefrag(
|
||||||
|
{filemeta, filemeta()}
|
||||||
|
| {segment, segmentinfo()}
|
||||||
|
| {result, #{}}
|
||||||
|
).
|
||||||
|
|
||||||
|
-define(FRAGDIR, frags).
|
||||||
|
-define(TEMPDIR, tmp).
|
||||||
|
-define(RESULTDIR, result).
|
||||||
-define(MANIFEST, "MANIFEST.json").
|
-define(MANIFEST, "MANIFEST.json").
|
||||||
-define(SEGMENT, "SEG").
|
-define(SEGMENT, "SEG").
|
||||||
-define(TEMP, "TMP").
|
|
||||||
|
|
||||||
-type root() :: file:name().
|
-type root() :: file:name().
|
||||||
|
|
||||||
|
@ -80,7 +94,8 @@
|
||||||
% Quota? Some lower level errors?
|
% Quota? Some lower level errors?
|
||||||
{ok, emqx_ft_storage:ctx()} | {error, conflict} | {error, _TODO}.
|
{ok, emqx_ft_storage:ctx()} | {error, conflict} | {error, _TODO}.
|
||||||
store_filemeta(Storage, Transfer, Meta) ->
|
store_filemeta(Storage, Transfer, Meta) ->
|
||||||
Filepath = mk_filepath(Storage, Transfer, ?MANIFEST),
|
% TODO safeguard against bad clientids / fileids.
|
||||||
|
Filepath = mk_filepath(Storage, Transfer, [?FRAGDIR], ?MANIFEST),
|
||||||
case read_file(Filepath, fun decode_filemeta/1) of
|
case read_file(Filepath, fun decode_filemeta/1) of
|
||||||
{ok, Meta} ->
|
{ok, Meta} ->
|
||||||
_ = touch_file(Filepath),
|
_ = touch_file(Filepath),
|
||||||
|
@ -92,7 +107,7 @@ store_filemeta(Storage, Transfer, Meta) ->
|
||||||
% about it too much now.
|
% about it too much now.
|
||||||
{error, conflict};
|
{error, conflict};
|
||||||
{error, Reason} when Reason =:= notfound; Reason =:= corrupted; Reason =:= enoent ->
|
{error, Reason} when Reason =:= notfound; Reason =:= corrupted; Reason =:= enoent ->
|
||||||
write_file_atomic(Filepath, encode_filemeta(Meta))
|
write_file_atomic(Storage, Transfer, Filepath, encode_filemeta(Meta))
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% Store a segment in the backing filesystem.
|
%% Store a segment in the backing filesystem.
|
||||||
|
@ -102,32 +117,47 @@ store_filemeta(Storage, Transfer, Meta) ->
|
||||||
% Quota? Some lower level errors?
|
% Quota? Some lower level errors?
|
||||||
ok | {error, _TODO}.
|
ok | {error, _TODO}.
|
||||||
store_segment(Storage, Transfer, Segment = {_Offset, Content}) ->
|
store_segment(Storage, Transfer, Segment = {_Offset, Content}) ->
|
||||||
Filepath = mk_filepath(Storage, Transfer, mk_segment_filename(Segment)),
|
Filepath = mk_filepath(Storage, Transfer, [?FRAGDIR], mk_segment_filename(Segment)),
|
||||||
write_file_atomic(Filepath, Content).
|
write_file_atomic(Storage, Transfer, Filepath, Content).
|
||||||
|
|
||||||
-spec list(storage(), transfer()) ->
|
-spec list(storage(), transfer(), _What :: fragment | result) ->
|
||||||
% Some lower level errors? {error, notfound}?
|
% Some lower level errors? {error, notfound}?
|
||||||
% Result will contain zero or only one filemeta.
|
% Result will contain zero or only one filemeta.
|
||||||
{ok, list(filefrag())} | {error, _TODO}.
|
{ok, [filefrag({filemeta, filemeta()} | {segment, segmentinfo()})]} | {error, _TODO}.
|
||||||
list(Storage, Transfer) ->
|
list(Storage, Transfer, What) ->
|
||||||
Dirname = mk_filedir(Storage, Transfer),
|
Dirname = mk_filedir(Storage, Transfer, get_subdirs_for(What)),
|
||||||
case file:list_dir(Dirname) of
|
case file:list_dir(Dirname) of
|
||||||
{ok, Filenames} ->
|
{ok, Filenames} ->
|
||||||
{ok, filtermap_files(fun mk_filefrag/2, Dirname, Filenames)};
|
% TODO
|
||||||
|
% In case of `What = result` there might be more than one file (though
|
||||||
|
% extremely bad luck is needed for that, e.g. concurrent assemblers with
|
||||||
|
% different filemetas from different nodes). This might be unexpected for a
|
||||||
|
% client given the current protocol, yet might be helpful in the future.
|
||||||
|
{ok, filtermap_files(get_filefrag_fun_for(What), Dirname, Filenames)};
|
||||||
{error, enoent} ->
|
{error, enoent} ->
|
||||||
{ok, []};
|
{ok, []};
|
||||||
{error, _} = Error ->
|
{error, _} = Error ->
|
||||||
Error
|
Error
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec read_segment(
|
get_subdirs_for(fragment) ->
|
||||||
storage(), transfer(), filefrag(segmentinfo()), offset(), _Size :: non_neg_integer()
|
[?FRAGDIR];
|
||||||
) ->
|
get_subdirs_for(result) ->
|
||||||
|
[?RESULTDIR].
|
||||||
|
|
||||||
|
get_filefrag_fun_for(fragment) ->
|
||||||
|
fun mk_filefrag/2;
|
||||||
|
get_filefrag_fun_for(result) ->
|
||||||
|
fun mk_result_filefrag/2.
|
||||||
|
|
||||||
|
-spec pread(storage(), transfer(), filefrag(), offset(), _Size :: non_neg_integer()) ->
|
||||||
{ok, _Content :: iodata()} | {error, _TODO}.
|
{ok, _Content :: iodata()} | {error, _TODO}.
|
||||||
read_segment(_Storage, _Transfer, Segment, Offset, Size) ->
|
pread(_Storage, _Transfer, Frag, Offset, Size) ->
|
||||||
Filepath = maps:get(path, Segment),
|
Filepath = maps:get(path, Frag),
|
||||||
case file:open(Filepath, [raw, read]) of
|
case file:open(Filepath, [read, raw, binary]) of
|
||||||
{ok, IoDevice} ->
|
{ok, IoDevice} ->
|
||||||
|
% NOTE
|
||||||
|
% Reading empty file is always `eof`.
|
||||||
Read = file:pread(IoDevice, Offset, Size),
|
Read = file:pread(IoDevice, Offset, Size),
|
||||||
ok = file:close(IoDevice),
|
ok = file:close(IoDevice),
|
||||||
case Read of
|
case Read of
|
||||||
|
@ -148,15 +178,63 @@ read_segment(_Storage, _Transfer, Segment, Offset, Size) ->
|
||||||
assemble(Storage, Transfer, Callback) ->
|
assemble(Storage, Transfer, Callback) ->
|
||||||
emqx_ft_assembler_sup:start_child(Storage, Transfer, Callback).
|
emqx_ft_assembler_sup:start_child(Storage, Transfer, Callback).
|
||||||
|
|
||||||
|
%%
|
||||||
|
|
||||||
|
-spec transfers(storage()) ->
|
||||||
|
{ok, #{transfer() => transferinfo()}}.
|
||||||
|
transfers(Storage) ->
|
||||||
|
% TODO `Continuation`
|
||||||
|
% There might be millions of transfers on the node, we need a protocol and
|
||||||
|
% storage schema to iterate through them effectively.
|
||||||
|
ClientIds = try_list_dir(get_storage_root(Storage)),
|
||||||
|
{ok,
|
||||||
|
lists:foldl(
|
||||||
|
fun(ClientId, Acc) -> transfers(Storage, ClientId, Acc) end,
|
||||||
|
#{},
|
||||||
|
ClientIds
|
||||||
|
)}.
|
||||||
|
|
||||||
|
transfers(Storage, ClientId, AccIn) ->
|
||||||
|
Dirname = mk_client_filedir(Storage, ClientId),
|
||||||
|
case file:list_dir(Dirname) of
|
||||||
|
{ok, FileIds} ->
|
||||||
|
lists:foldl(
|
||||||
|
fun(FileId, Acc) ->
|
||||||
|
Transfer = {filename_to_binary(ClientId), filename_to_binary(FileId)},
|
||||||
|
read_transferinfo(Storage, Transfer, Acc)
|
||||||
|
end,
|
||||||
|
AccIn,
|
||||||
|
FileIds
|
||||||
|
);
|
||||||
|
{error, _Reason} ->
|
||||||
|
% TODO worth logging
|
||||||
|
AccIn
|
||||||
|
end.
|
||||||
|
|
||||||
|
read_transferinfo(Storage, Transfer, Acc) ->
|
||||||
|
case list(Storage, Transfer, result) of
|
||||||
|
{ok, Result = [_ | _]} ->
|
||||||
|
Info = #{status => complete, result => Result},
|
||||||
|
Acc#{Transfer => Info};
|
||||||
|
{ok, []} ->
|
||||||
|
Info = #{status => incomplete},
|
||||||
|
Acc#{Transfer => Info};
|
||||||
|
{error, _Reason} ->
|
||||||
|
% TODO worth logging
|
||||||
|
Acc
|
||||||
|
end.
|
||||||
|
|
||||||
|
%%
|
||||||
|
|
||||||
-type handle() :: {file:name(), io:device(), crypto:hash_state()}.
|
-type handle() :: {file:name(), io:device(), crypto:hash_state()}.
|
||||||
|
|
||||||
-spec open_file(storage(), transfer(), filemeta()) ->
|
-spec open_file(storage(), transfer(), filemeta()) ->
|
||||||
{ok, handle()} | {error, _TODO}.
|
{ok, handle()} | {error, _TODO}.
|
||||||
open_file(Storage, Transfer, Filemeta) ->
|
open_file(Storage, Transfer, Filemeta) ->
|
||||||
Filename = maps:get(name, Filemeta),
|
Filename = maps:get(name, Filemeta),
|
||||||
Filepath = mk_filepath(Storage, Transfer, Filename),
|
TempFilepath = mk_temp_filepath(Storage, Transfer, Filename),
|
||||||
TempFilepath = mk_temp_filepath(Filepath),
|
_ = filelib:ensure_dir(TempFilepath),
|
||||||
case file:open(TempFilepath, [write, raw]) of
|
case file:open(TempFilepath, [write, raw, binary]) of
|
||||||
{ok, Handle} ->
|
{ok, Handle} ->
|
||||||
_ = file:truncate(Handle),
|
_ = file:truncate(Handle),
|
||||||
{ok, {TempFilepath, Handle, init_checksum(Filemeta)}};
|
{ok, {TempFilepath, Handle, init_checksum(Filemeta)}};
|
||||||
|
@ -177,11 +255,11 @@ write({Filepath, IoDevice, Ctx}, IoData) ->
|
||||||
-spec complete(storage(), transfer(), filemeta(), handle()) ->
|
-spec complete(storage(), transfer(), filemeta(), handle()) ->
|
||||||
ok | {error, {checksum, _Algo, _Computed}} | {error, _TODO}.
|
ok | {error, {checksum, _Algo, _Computed}} | {error, _TODO}.
|
||||||
complete(Storage, Transfer, Filemeta, Handle = {Filepath, IoDevice, Ctx}) ->
|
complete(Storage, Transfer, Filemeta, Handle = {Filepath, IoDevice, Ctx}) ->
|
||||||
TargetFilepath = mk_filepath(Storage, Transfer, maps:get(name, Filemeta)),
|
TargetFilepath = mk_filepath(Storage, Transfer, [?RESULTDIR], maps:get(name, Filemeta)),
|
||||||
case verify_checksum(Ctx, Filemeta) of
|
case verify_checksum(Ctx, Filemeta) of
|
||||||
ok ->
|
ok ->
|
||||||
ok = file:close(IoDevice),
|
ok = file:close(IoDevice),
|
||||||
file:rename(Filepath, TargetFilepath);
|
mv_temp_file(Filepath, TargetFilepath);
|
||||||
{error, _} = Error ->
|
{error, _} = Error ->
|
||||||
_ = discard(Handle),
|
_ = discard(Handle),
|
||||||
Error
|
Error
|
||||||
|
@ -224,17 +302,6 @@ verify_checksum(undefined, _) ->
|
||||||
|
|
||||||
-define(PRELUDE(Vsn, Meta), [<<"filemeta">>, Vsn, Meta]).
|
-define(PRELUDE(Vsn, Meta), [<<"filemeta">>, Vsn, Meta]).
|
||||||
|
|
||||||
schema() ->
|
|
||||||
#{
|
|
||||||
roots => [
|
|
||||||
{name, hoconsc:mk(string(), #{required => true})},
|
|
||||||
{size, hoconsc:mk(non_neg_integer())},
|
|
||||||
{expire_at, hoconsc:mk(non_neg_integer())},
|
|
||||||
{checksum, hoconsc:mk({atom(), binary()}, #{converter => converter(checksum)})},
|
|
||||||
{segments_ttl, hoconsc:mk(pos_integer())}
|
|
||||||
]
|
|
||||||
}.
|
|
||||||
|
|
||||||
% encode_filemeta(Meta) ->
|
% encode_filemeta(Meta) ->
|
||||||
% emqx_json:encode(
|
% emqx_json:encode(
|
||||||
% ?PRELUDE(
|
% ?PRELUDE(
|
||||||
|
@ -261,26 +328,14 @@ schema() ->
|
||||||
|
|
||||||
encode_filemeta(Meta) ->
|
encode_filemeta(Meta) ->
|
||||||
% TODO: Looks like this should be hocon's responsibility.
|
% TODO: Looks like this should be hocon's responsibility.
|
||||||
Term = hocon_tconf:make_serializable(schema(), emqx_map_lib:binary_key_map(Meta), #{}),
|
Schema = emqx_ft_schema:schema(filemeta),
|
||||||
|
Term = hocon_tconf:make_serializable(Schema, emqx_map_lib:binary_key_map(Meta), #{}),
|
||||||
emqx_json:encode(?PRELUDE(_Vsn = 1, Term)).
|
emqx_json:encode(?PRELUDE(_Vsn = 1, Term)).
|
||||||
|
|
||||||
decode_filemeta(Binary) ->
|
decode_filemeta(Binary) ->
|
||||||
|
Schema = emqx_ft_schema:schema(filemeta),
|
||||||
?PRELUDE(_Vsn = 1, Term) = emqx_json:decode(Binary, [return_maps]),
|
?PRELUDE(_Vsn = 1, Term) = emqx_json:decode(Binary, [return_maps]),
|
||||||
hocon_tconf:check_plain(schema(), Term, #{atom_key => true, required => false}).
|
hocon_tconf:check_plain(Schema, Term, #{atom_key => true, required => false}).
|
||||||
|
|
||||||
converter(checksum) ->
|
|
||||||
fun
|
|
||||||
(undefined, #{}) ->
|
|
||||||
undefined;
|
|
||||||
({sha256, Bin}, #{make_serializable := true}) ->
|
|
||||||
_ = is_binary(Bin) orelse throw({expected_type, string}),
|
|
||||||
_ = byte_size(Bin) =:= 32 orelse throw({expected_length, 32}),
|
|
||||||
binary:encode_hex(Bin);
|
|
||||||
(Hex, #{}) ->
|
|
||||||
_ = is_binary(Hex) orelse throw({expected_type, string}),
|
|
||||||
_ = byte_size(Hex) =:= 64 orelse throw({expected_length, 64}),
|
|
||||||
{sha256, binary:decode_hex(Hex)}
|
|
||||||
end.
|
|
||||||
|
|
||||||
% map_into(Fun, Into, Ks, Map) ->
|
% map_into(Fun, Into, Ks, Map) ->
|
||||||
% map_foldr(map_into_fn(Fun, Into), Into, Ks, Map).
|
% map_foldr(map_into_fn(Fun, Into), Into, Ks, Map).
|
||||||
|
@ -310,11 +365,20 @@ break_segment_filename(Filename) ->
|
||||||
{error, invalid}
|
{error, invalid}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
mk_filedir(Storage, {ClientId, FileId}) ->
|
mk_filedir(Storage, {ClientId, FileId}, SubDirs) ->
|
||||||
filename:join([get_storage_root(Storage), ClientId, FileId]).
|
filename:join([get_storage_root(Storage), ClientId, FileId | SubDirs]).
|
||||||
|
|
||||||
mk_filepath(Storage, Transfer, Filename) ->
|
mk_client_filedir(Storage, ClientId) ->
|
||||||
filename:join(mk_filedir(Storage, Transfer), Filename).
|
filename:join([get_storage_root(Storage), ClientId]).
|
||||||
|
|
||||||
|
mk_filepath(Storage, Transfer, SubDirs, Filename) ->
|
||||||
|
filename:join(mk_filedir(Storage, Transfer, SubDirs), Filename).
|
||||||
|
|
||||||
|
try_list_dir(Dirname) ->
|
||||||
|
case file:list_dir(Dirname) of
|
||||||
|
{ok, List} -> List;
|
||||||
|
{error, _} -> []
|
||||||
|
end.
|
||||||
|
|
||||||
get_storage_root(Storage) ->
|
get_storage_root(Storage) ->
|
||||||
maps:get(root, Storage, filename:join(emqx:data_dir(), "file_transfer")).
|
maps:get(root, Storage, filename:join(emqx:data_dir(), "file_transfer")).
|
||||||
|
@ -341,13 +405,13 @@ safe_decode(Content, DecodeFun) ->
|
||||||
{error, corrupted}
|
{error, corrupted}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
write_file_atomic(Filepath, Content) when is_binary(Content) ->
|
write_file_atomic(Storage, Transfer, Filepath, Content) when is_binary(Content) ->
|
||||||
TempFilepath = mk_temp_filepath(Filepath),
|
TempFilepath = mk_temp_filepath(Storage, Transfer, filename:basename(Filepath)),
|
||||||
Result = emqx_misc:pipeline(
|
Result = emqx_misc:pipeline(
|
||||||
[
|
[
|
||||||
fun filelib:ensure_dir/1,
|
fun filelib:ensure_dir/1,
|
||||||
fun write_contents/2,
|
fun write_contents/2,
|
||||||
fun(FP) -> mv_temp_file(Filepath, FP) end
|
fun(_) -> mv_temp_file(TempFilepath, Filepath) end
|
||||||
],
|
],
|
||||||
TempFilepath,
|
TempFilepath,
|
||||||
Content
|
Content
|
||||||
|
@ -360,11 +424,9 @@ write_file_atomic(Filepath, Content) when is_binary(Content) ->
|
||||||
{error, Reason}
|
{error, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
mk_temp_filepath(Filepath) ->
|
mk_temp_filepath(Storage, Transfer, Filename) ->
|
||||||
Dirname = filename:dirname(Filepath),
|
|
||||||
Filename = filename:basename(Filepath),
|
|
||||||
Unique = erlang:unique_integer([positive]),
|
Unique = erlang:unique_integer([positive]),
|
||||||
filename:join(Dirname, mk_filename([?TEMP, Unique, ".", Filename])).
|
filename:join(mk_filedir(Storage, Transfer, [?TEMPDIR]), mk_filename([Unique, ".", Filename])).
|
||||||
|
|
||||||
mk_filename(Comps) ->
|
mk_filename(Comps) ->
|
||||||
lists:append(lists:map(fun mk_filename_component/1, Comps)).
|
lists:append(lists:map(fun mk_filename_component/1, Comps)).
|
||||||
|
@ -377,7 +439,8 @@ mk_filename_component(S) when is_list(S) -> S.
|
||||||
write_contents(Filepath, Content) ->
|
write_contents(Filepath, Content) ->
|
||||||
file:write_file(Filepath, Content).
|
file:write_file(Filepath, Content).
|
||||||
|
|
||||||
mv_temp_file(Filepath, TempFilepath) ->
|
mv_temp_file(TempFilepath, Filepath) ->
|
||||||
|
_ = filelib:ensure_dir(Filepath),
|
||||||
file:rename(TempFilepath, Filepath).
|
file:rename(TempFilepath, Filepath).
|
||||||
|
|
||||||
touch_file(Filepath) ->
|
touch_file(Filepath) ->
|
||||||
|
@ -391,9 +454,16 @@ mk_filefrag(Dirname, Filename = ?MANIFEST) ->
|
||||||
mk_filefrag(Dirname, Filename, filemeta, fun read_filemeta/2);
|
mk_filefrag(Dirname, Filename, filemeta, fun read_filemeta/2);
|
||||||
mk_filefrag(Dirname, Filename = ?SEGMENT ++ _) ->
|
mk_filefrag(Dirname, Filename = ?SEGMENT ++ _) ->
|
||||||
mk_filefrag(Dirname, Filename, segment, fun read_segmentinfo/2);
|
mk_filefrag(Dirname, Filename, segment, fun read_segmentinfo/2);
|
||||||
mk_filefrag(_Dirname, _) ->
|
mk_filefrag(_Dirname, _Filename) ->
|
||||||
|
% TODO this is unexpected, worth logging?
|
||||||
false.
|
false.
|
||||||
|
|
||||||
|
mk_result_filefrag(Dirname, Filename) ->
|
||||||
|
% NOTE
|
||||||
|
% Any file in the `?RESULTDIR` subdir is currently considered the result of
|
||||||
|
% the file transfer.
|
||||||
|
mk_filefrag(Dirname, Filename, result, fun(_, _) -> {ok, #{}} end).
|
||||||
|
|
||||||
mk_filefrag(Dirname, Filename, Tag, Fun) ->
|
mk_filefrag(Dirname, Filename, Tag, Fun) ->
|
||||||
Filepath = filename:join(Dirname, Filename),
|
Filepath = filename:join(Dirname, Filename),
|
||||||
% TODO error handling?
|
% TODO error handling?
|
||||||
|
@ -403,9 +473,11 @@ mk_filefrag(Dirname, Filename, Tag, Fun) ->
|
||||||
{true, #{
|
{true, #{
|
||||||
path => Filepath,
|
path => Filepath,
|
||||||
timestamp => Fileinfo#file_info.mtime,
|
timestamp => Fileinfo#file_info.mtime,
|
||||||
|
size => Fileinfo#file_info.size,
|
||||||
fragment => {Tag, Frag}
|
fragment => {Tag, Frag}
|
||||||
}};
|
}};
|
||||||
{error, _Reason} ->
|
{error, _Reason} ->
|
||||||
|
% TODO loss of information
|
||||||
false
|
false
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -414,3 +486,6 @@ read_filemeta(_Filename, Filepath) ->
|
||||||
|
|
||||||
read_segmentinfo(Filename, _Filepath) ->
|
read_segmentinfo(Filename, _Filepath) ->
|
||||||
break_segment_filename(Filename).
|
break_segment_filename(Filename).
|
||||||
|
|
||||||
|
filename_to_binary(S) when is_list(S) -> unicode:characters_to_binary(S);
|
||||||
|
filename_to_binary(B) when is_binary(B) -> B.
|
||||||
|
|
|
@ -0,0 +1,56 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_ft_storage_fs_proto_v1).
|
||||||
|
|
||||||
|
-behaviour(emqx_bpapi).
|
||||||
|
|
||||||
|
-export([introduced_in/0]).
|
||||||
|
|
||||||
|
-export([list/3]).
|
||||||
|
-export([multilist/3]).
|
||||||
|
-export([pread/5]).
|
||||||
|
-export([transfers/1]).
|
||||||
|
|
||||||
|
-type offset() :: emqx_ft:offset().
|
||||||
|
-type transfer() :: emqx_ft:transfer().
|
||||||
|
-type filefrag() :: emqx_ft_storage_fs:filefrag().
|
||||||
|
-type transferinfo() :: emqx_ft_storage_fs:transferinfo().
|
||||||
|
|
||||||
|
-include_lib("emqx/include/bpapi.hrl").
|
||||||
|
|
||||||
|
introduced_in() ->
|
||||||
|
"5.0.17".
|
||||||
|
|
||||||
|
-spec list(node(), transfer(), fragment | result) ->
|
||||||
|
{ok, [filefrag()]} | {error, term()}.
|
||||||
|
list(Node, Transfer, What) ->
|
||||||
|
erpc:call(Node, emqx_ft_storage, list_local, [Transfer, What]).
|
||||||
|
|
||||||
|
-spec multilist([node()], transfer(), fragment | result) ->
|
||||||
|
emqx_rpc:erpc_multicall({ok, [filefrag()]} | {error, term()}).
|
||||||
|
multilist(Nodes, Transfer, What) ->
|
||||||
|
erpc:multicall(Nodes, emqx_ft_storage, list_local, [Transfer, What]).
|
||||||
|
|
||||||
|
-spec pread(node(), transfer(), filefrag(), offset(), _Size :: non_neg_integer()) ->
|
||||||
|
{ok, [filefrag()]} | {error, term()}.
|
||||||
|
pread(Node, Transfer, Frag, Offset, Size) ->
|
||||||
|
erpc:call(Node, emqx_ft_storage, pread_local, [Transfer, Frag, Offset, Size]).
|
||||||
|
|
||||||
|
-spec transfers([node()]) ->
|
||||||
|
emqx_rpc:erpc_multicall({ok, #{transfer() => transferinfo()}} | {error, term()}).
|
||||||
|
transfers(Nodes) ->
|
||||||
|
erpc:multicall(Nodes, emqx_ft_storage, local_transfers, []).
|
|
@ -24,24 +24,31 @@
|
||||||
-include_lib("kernel/include/file.hrl").
|
-include_lib("kernel/include/file.hrl").
|
||||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
|
||||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
all() ->
|
||||||
|
[
|
||||||
|
t_assemble_empty_transfer,
|
||||||
|
t_assemble_complete_local_transfer,
|
||||||
|
|
||||||
|
% NOTE
|
||||||
|
% It depends on the side effects of all previous testcases.
|
||||||
|
t_list_transfers
|
||||||
|
].
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
% {ok, Apps} = application:ensure_all_started(emqx_ft),
|
|
||||||
% [{suite_apps, Apps} | Config].
|
|
||||||
% ok = emqx_common_test_helpers:start_apps([emqx_ft]),
|
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
% lists:foreach(fun application:stop/1, lists:reverse(?config(suite_apps, Config))).
|
|
||||||
% ok = emqx_common_test_helpers:stop_apps([emqx_ft]),
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
init_per_testcase(TC, Config) ->
|
init_per_testcase(TC, Config) ->
|
||||||
ok = snabbkaffe:start_trace(),
|
ok = snabbkaffe:start_trace(),
|
||||||
Root = filename:join(["roots", TC]),
|
|
||||||
{ok, Pid} = emqx_ft_assembler_sup:start_link(),
|
{ok, Pid} = emqx_ft_assembler_sup:start_link(),
|
||||||
[{storage_root, Root}, {assembler_sup, Pid} | Config].
|
[
|
||||||
|
{storage_root, "file_transfer_root"},
|
||||||
|
{file_id, atom_to_binary(TC)},
|
||||||
|
{assembler_sup, Pid}
|
||||||
|
| Config
|
||||||
|
].
|
||||||
|
|
||||||
end_per_testcase(_TC, Config) ->
|
end_per_testcase(_TC, Config) ->
|
||||||
ok = inspect_storage_root(Config),
|
ok = inspect_storage_root(Config),
|
||||||
|
@ -51,11 +58,12 @@ end_per_testcase(_TC, Config) ->
|
||||||
|
|
||||||
%%
|
%%
|
||||||
|
|
||||||
-define(CLIENTID, <<"thatsme">>).
|
-define(CLIENTID1, <<"thatsme">>).
|
||||||
|
-define(CLIENTID2, <<"thatsnotme">>).
|
||||||
|
|
||||||
t_assemble_empty_transfer(Config) ->
|
t_assemble_empty_transfer(Config) ->
|
||||||
Storage = storage(Config),
|
Storage = storage(Config),
|
||||||
Transfer = {?CLIENTID, mk_fileid()},
|
Transfer = {?CLIENTID1, ?config(file_id, Config)},
|
||||||
Filename = "important.pdf",
|
Filename = "important.pdf",
|
||||||
Meta = #{
|
Meta = #{
|
||||||
name => Filename,
|
name => Filename,
|
||||||
|
@ -71,7 +79,7 @@ t_assemble_empty_transfer(Config) ->
|
||||||
fragment := {filemeta, Meta}
|
fragment := {filemeta, Meta}
|
||||||
}
|
}
|
||||||
]},
|
]},
|
||||||
emqx_ft_storage_fs:list(Storage, Transfer)
|
emqx_ft_storage_fs:list(Storage, Transfer, fragment)
|
||||||
),
|
),
|
||||||
{ok, _AsmPid} = emqx_ft_storage_fs:assemble(Storage, Transfer, fun on_assembly_finished/1),
|
{ok, _AsmPid} = emqx_ft_storage_fs:assemble(Storage, Transfer, fun on_assembly_finished/1),
|
||||||
{ok, Event} = ?block_until(#{?snk_kind := test_assembly_finished}),
|
{ok, Event} = ?block_until(#{?snk_kind := test_assembly_finished}),
|
||||||
|
@ -81,11 +89,16 @@ t_assemble_empty_transfer(Config) ->
|
||||||
% TODO
|
% TODO
|
||||||
file:read_file(mk_assembly_filename(Config, Transfer, Filename))
|
file:read_file(mk_assembly_filename(Config, Transfer, Filename))
|
||||||
),
|
),
|
||||||
|
{ok, [Result = #{size := Size = 0}]} = emqx_ft_storage_fs:list(Storage, Transfer, result),
|
||||||
|
?assertEqual(
|
||||||
|
{error, eof},
|
||||||
|
emqx_ft_storage_fs:pread(Storage, Transfer, Result, 0, Size)
|
||||||
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_assemble_complete_local_transfer(Config) ->
|
t_assemble_complete_local_transfer(Config) ->
|
||||||
Storage = storage(Config),
|
Storage = storage(Config),
|
||||||
Transfer = {?CLIENTID, mk_fileid()},
|
Transfer = {?CLIENTID2, ?config(file_id, Config)},
|
||||||
Filename = "topsecret.pdf",
|
Filename = "topsecret.pdf",
|
||||||
TransferSize = 10000 + rand:uniform(50000),
|
TransferSize = 10000 + rand:uniform(50000),
|
||||||
SegmentSize = 4096,
|
SegmentSize = 4096,
|
||||||
|
@ -109,7 +122,7 @@ t_assemble_complete_local_transfer(Config) ->
|
||||||
end
|
end
|
||||||
),
|
),
|
||||||
|
|
||||||
{ok, Fragments} = emqx_ft_storage_fs:list(Storage, Transfer),
|
{ok, Fragments} = emqx_ft_storage_fs:list(Storage, Transfer, fragment),
|
||||||
?assertEqual((TransferSize div SegmentSize) + 1 + 1, length(Fragments)),
|
?assertEqual((TransferSize div SegmentSize) + 1 + 1, length(Fragments)),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
[Meta],
|
[Meta],
|
||||||
|
@ -122,6 +135,16 @@ t_assemble_complete_local_transfer(Config) ->
|
||||||
?assertMatch(#{result := ok}, Event),
|
?assertMatch(#{result := ok}, Event),
|
||||||
|
|
||||||
AssemblyFilename = mk_assembly_filename(Config, Transfer, Filename),
|
AssemblyFilename = mk_assembly_filename(Config, Transfer, Filename),
|
||||||
|
?assertMatch(
|
||||||
|
{ok, [
|
||||||
|
#{
|
||||||
|
path := AssemblyFilename,
|
||||||
|
size := TransferSize,
|
||||||
|
fragment := {result, #{}}
|
||||||
|
}
|
||||||
|
]},
|
||||||
|
emqx_ft_storage_fs:list(Storage, Transfer, result)
|
||||||
|
),
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
{ok, #file_info{type = regular, size = TransferSize}},
|
{ok, #file_info{type = regular, size = TransferSize}},
|
||||||
file:read_file_info(AssemblyFilename)
|
file:read_file_info(AssemblyFilename)
|
||||||
|
@ -133,13 +156,31 @@ t_assemble_complete_local_transfer(Config) ->
|
||||||
).
|
).
|
||||||
|
|
||||||
mk_assembly_filename(Config, {ClientID, FileID}, Filename) ->
|
mk_assembly_filename(Config, {ClientID, FileID}, Filename) ->
|
||||||
filename:join([?config(storage_root, Config), ClientID, FileID, Filename]).
|
filename:join([?config(storage_root, Config), ClientID, FileID, result, Filename]).
|
||||||
|
|
||||||
on_assembly_finished(Result) ->
|
on_assembly_finished(Result) ->
|
||||||
?tp(test_assembly_finished, #{result => Result}).
|
?tp(test_assembly_finished, #{result => Result}).
|
||||||
|
|
||||||
%%
|
%%
|
||||||
|
|
||||||
|
t_list_transfers(Config) ->
|
||||||
|
Storage = storage(Config),
|
||||||
|
?assertMatch(
|
||||||
|
{ok, #{
|
||||||
|
{?CLIENTID1, <<"t_assemble_empty_transfer">>} := #{
|
||||||
|
status := complete,
|
||||||
|
result := [#{path := _, size := 0, fragment := {result, _}}]
|
||||||
|
},
|
||||||
|
{?CLIENTID2, <<"t_assemble_complete_local_transfer">>} := #{
|
||||||
|
status := complete,
|
||||||
|
result := [#{path := _, size := Size, fragment := {result, _}}]
|
||||||
|
}
|
||||||
|
}} when Size > 0,
|
||||||
|
emqx_ft_storage_fs:transfers(Storage)
|
||||||
|
).
|
||||||
|
|
||||||
|
%%
|
||||||
|
|
||||||
-include_lib("kernel/include/file.hrl").
|
-include_lib("kernel/include/file.hrl").
|
||||||
|
|
||||||
inspect_storage_root(Config) ->
|
inspect_storage_root(Config) ->
|
||||||
|
|
Loading…
Reference in New Issue