feat(ft-fs): make `list` / `read` more generic

And usable in wider contexts as a consequence, for example querying and
fetching resulting files from remote nodes.
This commit is contained in:
Andrew Mayorov 2023-02-03 14:23:58 +03:00
parent 3bb08fe945
commit 19cd66198b
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
3 changed files with 65 additions and 20 deletions

View File

@ -66,7 +66,7 @@ init({Storage, Transfer, Callback}) ->
handle_event(internal, _, list_local_fragments, St = #st{assembly = Asm}) -> handle_event(internal, _, list_local_fragments, St = #st{assembly = Asm}) ->
% TODO: what we do with non-transients errors here (e.g. `eacces`)? % TODO: what we do with non-transients errors here (e.g. `eacces`)?
{ok, Fragments} = emqx_ft_storage_fs:list(St#st.storage, St#st.transfer), {ok, Fragments} = emqx_ft_storage_fs:list(St#st.storage, St#st.transfer, fragment),
NAsm = emqx_ft_assembly:update(emqx_ft_assembly:append(Asm, node(), Fragments)), NAsm = emqx_ft_assembly:update(emqx_ft_assembly:append(Asm, node(), Fragments)),
NSt = St#st{assembly = NAsm}, NSt = St#st{assembly = NAsm},
case emqx_ft_assembly:status(NAsm) of case emqx_ft_assembly:status(NAsm) of
@ -81,7 +81,7 @@ handle_event(internal, _, list_local_fragments, St = #st{assembly = Asm}) ->
end; end;
handle_event(internal, _, {list_remote_fragments, Nodes}, St) -> handle_event(internal, _, {list_remote_fragments, Nodes}, St) ->
% TODO: portable "storage" ref % TODO: portable "storage" ref
Args = [St#st.storage, St#st.transfer], Args = [St#st.storage, St#st.transfer, fragment],
% TODO % TODO
% Async would better because we would not need to wait for some lagging nodes if % Async would better because we would not need to wait for some lagging nodes if
% the coverage is already complete. % the coverage is already complete.
@ -121,7 +121,7 @@ handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #st{}) ->
% this node garbage collecting the segment itself. % this node garbage collecting the segment itself.
Args = [St#st.storage, St#st.transfer, Segment, 0, segsize(Segment)], Args = [St#st.storage, St#st.transfer, Segment, 0, segsize(Segment)],
% TODO: pipelining % TODO: pipelining
case erpc:call(Node, emqx_ft_storage_fs, read_segment, Args, ?RPC_READSEG_TIMEOUT) of case erpc:call(Node, emqx_ft_storage_fs, pread, Args, ?RPC_READSEG_TIMEOUT) of
{ok, Content} -> {ok, Content} ->
{ok, NHandle} = emqx_ft_storage_fs:write(St#st.file, Content), {ok, NHandle} = emqx_ft_storage_fs:write(St#st.file, Content),
{next_state, {assemble, Rest}, St#st{file = NHandle}, ?internal([])} {next_state, {assemble, Rest}, St#st{file = NHandle}, ?internal([])}

View File

@ -20,8 +20,8 @@
-export([store_filemeta/3]). -export([store_filemeta/3]).
-export([store_segment/3]). -export([store_segment/3]).
-export([list/2]). -export([list/3]).
-export([read_segment/5]). -export([pread/5]).
-export([assemble/3]). -export([assemble/3]).
-export([open_file/3]). -export([open_file/3]).
@ -41,13 +41,19 @@
size := _Bytes :: non_neg_integer() size := _Bytes :: non_neg_integer()
}. }.
% TODO naming
-type filefrag(T) :: #{ -type filefrag(T) :: #{
path := file:name(), path := file:name(),
timestamp := emqx_datetime:epoch_second(), timestamp := emqx_datetime:epoch_second(),
size := _Bytes :: non_neg_integer(),
fragment := T fragment := T
}. }.
-type filefrag() :: filefrag({filemeta, filemeta()} | {segment, segmentinfo()}). -type filefrag() :: filefrag(
{filemeta, filemeta()}
| {segment, segmentinfo()}
| {result, #{}}
).
-define(FRAGDIR, frags). -define(FRAGDIR, frags).
-define(TEMPDIR, tmp). -define(TEMPDIR, tmp).
@ -104,29 +110,44 @@ store_segment(Storage, Transfer, Segment = {_Offset, Content}) ->
Filepath = mk_filepath(Storage, Transfer, [?FRAGDIR], mk_segment_filename(Segment)), Filepath = mk_filepath(Storage, Transfer, [?FRAGDIR], mk_segment_filename(Segment)),
write_file_atomic(Storage, Transfer, Filepath, Content). write_file_atomic(Storage, Transfer, Filepath, Content).
-spec list(storage(), transfer()) -> -spec list(storage(), transfer(), _What :: fragment | result) ->
% Some lower level errors? {error, notfound}? % Some lower level errors? {error, notfound}?
% Result will contain zero or only one filemeta. % Result will contain zero or only one filemeta.
{ok, list(filefrag())} | {error, _TODO}. {ok, [filefrag({filemeta, filemeta()} | {segment, segmentinfo()})]} | {error, _TODO}.
list(Storage, Transfer) -> list(Storage, Transfer, What) ->
Dirname = mk_filedir(Storage, Transfer, [?FRAGDIR]), Dirname = mk_filedir(Storage, Transfer, get_subdirs_for(What)),
case file:list_dir(Dirname) of case file:list_dir(Dirname) of
{ok, Filenames} -> {ok, Filenames} ->
{ok, filtermap_files(fun mk_filefrag/2, Dirname, Filenames)}; % TODO
% In case of `What = result` there might be more than one file (though
% extremely bad luck is needed for that, e.g. concurrent assemblers with
% different filemetas from different nodes). This might be unexpected for a
% client given the current protocol, yet might be helpful in the future.
{ok, filtermap_files(get_filefrag_fun_for(What), Dirname, Filenames)};
{error, enoent} -> {error, enoent} ->
{ok, []}; {ok, []};
{error, _} = Error -> {error, _} = Error ->
Error Error
end. end.
-spec read_segment( get_subdirs_for(fragment) ->
storage(), transfer(), filefrag(segmentinfo()), offset(), _Size :: non_neg_integer() [?FRAGDIR];
) -> get_subdirs_for(result) ->
[?RESULTDIR].
get_filefrag_fun_for(fragment) ->
fun mk_filefrag/2;
get_filefrag_fun_for(result) ->
fun mk_result_filefrag/2.
-spec pread(storage(), transfer(), filefrag(), offset(), _Size :: non_neg_integer()) ->
{ok, _Content :: iodata()} | {error, _TODO}. {ok, _Content :: iodata()} | {error, _TODO}.
read_segment(_Storage, _Transfer, Segment, Offset, Size) -> pread(_Storage, _Transfer, Frag, Offset, Size) ->
Filepath = maps:get(path, Segment), Filepath = maps:get(path, Frag),
case file:open(Filepath, [raw, read]) of case file:open(Filepath, [read, raw, binary]) of
{ok, IoDevice} -> {ok, IoDevice} ->
% NOTE
% Reading empty file is always `eof`.
Read = file:pread(IoDevice, Offset, Size), Read = file:pread(IoDevice, Offset, Size),
ok = file:close(IoDevice), ok = file:close(IoDevice),
case Read of case Read of
@ -147,6 +168,8 @@ read_segment(_Storage, _Transfer, Segment, Offset, Size) ->
assemble(Storage, Transfer, Callback) -> assemble(Storage, Transfer, Callback) ->
emqx_ft_assembler_sup:start_child(Storage, Transfer, Callback). emqx_ft_assembler_sup:start_child(Storage, Transfer, Callback).
%%
-type handle() :: {file:name(), io:device(), crypto:hash_state()}. -type handle() :: {file:name(), io:device(), crypto:hash_state()}.
-spec open_file(storage(), transfer(), filemeta()) -> -spec open_file(storage(), transfer(), filemeta()) ->
@ -155,7 +178,7 @@ open_file(Storage, Transfer, Filemeta) ->
Filename = maps:get(name, Filemeta), Filename = maps:get(name, Filemeta),
TempFilepath = mk_temp_filepath(Storage, Transfer, Filename), TempFilepath = mk_temp_filepath(Storage, Transfer, Filename),
_ = filelib:ensure_dir(TempFilepath), _ = filelib:ensure_dir(TempFilepath),
case file:open(TempFilepath, [write, raw]) of case file:open(TempFilepath, [write, raw, binary]) of
{ok, Handle} -> {ok, Handle} ->
_ = file:truncate(Handle), _ = file:truncate(Handle),
{ok, {TempFilepath, Handle, init_checksum(Filemeta)}}; {ok, {TempFilepath, Handle, init_checksum(Filemeta)}};
@ -370,6 +393,12 @@ mk_filefrag(_Dirname, _Filename) ->
% TODO this is unexpected, worth logging? % TODO this is unexpected, worth logging?
false. false.
mk_result_filefrag(Dirname, Filename) ->
% NOTE
% Any file in the `?RESULTDIR` subdir is currently considered the result of
% the file transfer.
mk_filefrag(Dirname, Filename, result, fun(_, _) -> {ok, #{}} end).
mk_filefrag(Dirname, Filename, Tag, Fun) -> mk_filefrag(Dirname, Filename, Tag, Fun) ->
Filepath = filename:join(Dirname, Filename), Filepath = filename:join(Dirname, Filename),
% TODO error handling? % TODO error handling?
@ -379,6 +408,7 @@ mk_filefrag(Dirname, Filename, Tag, Fun) ->
{true, #{ {true, #{
path => Filepath, path => Filepath,
timestamp => Fileinfo#file_info.mtime, timestamp => Fileinfo#file_info.mtime,
size => Fileinfo#file_info.size,
fragment => {Tag, Frag} fragment => {Tag, Frag}
}}; }};
{error, _Reason} -> {error, _Reason} ->

View File

@ -71,7 +71,7 @@ t_assemble_empty_transfer(Config) ->
fragment := {filemeta, Meta} fragment := {filemeta, Meta}
} }
]}, ]},
emqx_ft_storage_fs:list(Storage, Transfer) emqx_ft_storage_fs:list(Storage, Transfer, fragment)
), ),
{ok, _AsmPid} = emqx_ft_storage_fs:assemble(Storage, Transfer, fun on_assembly_finished/1), {ok, _AsmPid} = emqx_ft_storage_fs:assemble(Storage, Transfer, fun on_assembly_finished/1),
{ok, Event} = ?block_until(#{?snk_kind := test_assembly_finished}), {ok, Event} = ?block_until(#{?snk_kind := test_assembly_finished}),
@ -81,6 +81,11 @@ t_assemble_empty_transfer(Config) ->
% TODO % TODO
file:read_file(mk_assembly_filename(Config, Transfer, Filename)) file:read_file(mk_assembly_filename(Config, Transfer, Filename))
), ),
{ok, [Result = #{size := Size = 0}]} = emqx_ft_storage_fs:list(Storage, Transfer, result),
?assertEqual(
{error, eof},
emqx_ft_storage_fs:pread(Storage, Transfer, Result, 0, Size)
),
ok. ok.
t_assemble_complete_local_transfer(Config) -> t_assemble_complete_local_transfer(Config) ->
@ -109,7 +114,7 @@ t_assemble_complete_local_transfer(Config) ->
end end
), ),
{ok, Fragments} = emqx_ft_storage_fs:list(Storage, Transfer), {ok, Fragments} = emqx_ft_storage_fs:list(Storage, Transfer, fragment),
?assertEqual((TransferSize div SegmentSize) + 1 + 1, length(Fragments)), ?assertEqual((TransferSize div SegmentSize) + 1 + 1, length(Fragments)),
?assertEqual( ?assertEqual(
[Meta], [Meta],
@ -122,6 +127,16 @@ t_assemble_complete_local_transfer(Config) ->
?assertMatch(#{result := ok}, Event), ?assertMatch(#{result := ok}, Event),
AssemblyFilename = mk_assembly_filename(Config, Transfer, Filename), AssemblyFilename = mk_assembly_filename(Config, Transfer, Filename),
?assertMatch(
{ok, [
#{
path := AssemblyFilename,
size := TransferSize,
fragment := {result, #{}}
}
]},
emqx_ft_storage_fs:list(Storage, Transfer, result)
),
?assertMatch( ?assertMatch(
{ok, #file_info{type = regular, size = TransferSize}}, {ok, #file_info{type = regular, size = TransferSize}},
file:read_file_info(AssemblyFilename) file:read_file_info(AssemblyFilename)