From 19cd66198bb0314ea807849e4ede375efc1e248b Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 3 Feb 2023 14:23:58 +0300 Subject: [PATCH] feat(ft-fs): make `list` / `read` more generic And usable in wider contexts as a consequence, for example querying and fetching resulting files from remote nodes. --- apps/emqx_ft/src/emqx_ft_assembler.erl | 6 +- apps/emqx_ft/src/emqx_ft_storage_fs.erl | 60 ++++++++++++++----- apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl | 19 +++++- 3 files changed, 65 insertions(+), 20 deletions(-) diff --git a/apps/emqx_ft/src/emqx_ft_assembler.erl b/apps/emqx_ft/src/emqx_ft_assembler.erl index 4fd8d6e75..af50dee3b 100644 --- a/apps/emqx_ft/src/emqx_ft_assembler.erl +++ b/apps/emqx_ft/src/emqx_ft_assembler.erl @@ -66,7 +66,7 @@ init({Storage, Transfer, Callback}) -> handle_event(internal, _, list_local_fragments, St = #st{assembly = Asm}) -> % TODO: what we do with non-transients errors here (e.g. `eacces`)? - {ok, Fragments} = emqx_ft_storage_fs:list(St#st.storage, St#st.transfer), + {ok, Fragments} = emqx_ft_storage_fs:list(St#st.storage, St#st.transfer, fragment), NAsm = emqx_ft_assembly:update(emqx_ft_assembly:append(Asm, node(), Fragments)), NSt = St#st{assembly = NAsm}, case emqx_ft_assembly:status(NAsm) of @@ -81,7 +81,7 @@ handle_event(internal, _, list_local_fragments, St = #st{assembly = Asm}) -> end; handle_event(internal, _, {list_remote_fragments, Nodes}, St) -> % TODO: portable "storage" ref - Args = [St#st.storage, St#st.transfer], + Args = [St#st.storage, St#st.transfer, fragment], % TODO % Async would better because we would not need to wait for some lagging nodes if % the coverage is already complete. @@ -121,7 +121,7 @@ handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #st{}) -> % this node garbage collecting the segment itself. Args = [St#st.storage, St#st.transfer, Segment, 0, segsize(Segment)], % TODO: pipelining - case erpc:call(Node, emqx_ft_storage_fs, read_segment, Args, ?RPC_READSEG_TIMEOUT) of + case erpc:call(Node, emqx_ft_storage_fs, pread, Args, ?RPC_READSEG_TIMEOUT) of {ok, Content} -> {ok, NHandle} = emqx_ft_storage_fs:write(St#st.file, Content), {next_state, {assemble, Rest}, St#st{file = NHandle}, ?internal([])} diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_fs.erl index cc9d94b35..fdf4558b8 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs.erl @@ -20,8 +20,8 @@ -export([store_filemeta/3]). -export([store_segment/3]). --export([list/2]). --export([read_segment/5]). +-export([list/3]). +-export([pread/5]). -export([assemble/3]). -export([open_file/3]). @@ -41,13 +41,19 @@ size := _Bytes :: non_neg_integer() }. +% TODO naming -type filefrag(T) :: #{ path := file:name(), timestamp := emqx_datetime:epoch_second(), + size := _Bytes :: non_neg_integer(), fragment := T }. --type filefrag() :: filefrag({filemeta, filemeta()} | {segment, segmentinfo()}). +-type filefrag() :: filefrag( + {filemeta, filemeta()} + | {segment, segmentinfo()} + | {result, #{}} +). -define(FRAGDIR, frags). -define(TEMPDIR, tmp). @@ -104,29 +110,44 @@ store_segment(Storage, Transfer, Segment = {_Offset, Content}) -> Filepath = mk_filepath(Storage, Transfer, [?FRAGDIR], mk_segment_filename(Segment)), write_file_atomic(Storage, Transfer, Filepath, Content). --spec list(storage(), transfer()) -> +-spec list(storage(), transfer(), _What :: fragment | result) -> % Some lower level errors? {error, notfound}? % Result will contain zero or only one filemeta. - {ok, list(filefrag())} | {error, _TODO}. -list(Storage, Transfer) -> - Dirname = mk_filedir(Storage, Transfer, [?FRAGDIR]), + {ok, [filefrag({filemeta, filemeta()} | {segment, segmentinfo()})]} | {error, _TODO}. +list(Storage, Transfer, What) -> + Dirname = mk_filedir(Storage, Transfer, get_subdirs_for(What)), case file:list_dir(Dirname) of {ok, Filenames} -> - {ok, filtermap_files(fun mk_filefrag/2, Dirname, Filenames)}; + % TODO + % In case of `What = result` there might be more than one file (though + % extremely bad luck is needed for that, e.g. concurrent assemblers with + % different filemetas from different nodes). This might be unexpected for a + % client given the current protocol, yet might be helpful in the future. + {ok, filtermap_files(get_filefrag_fun_for(What), Dirname, Filenames)}; {error, enoent} -> {ok, []}; {error, _} = Error -> Error end. --spec read_segment( - storage(), transfer(), filefrag(segmentinfo()), offset(), _Size :: non_neg_integer() -) -> +get_subdirs_for(fragment) -> + [?FRAGDIR]; +get_subdirs_for(result) -> + [?RESULTDIR]. + +get_filefrag_fun_for(fragment) -> + fun mk_filefrag/2; +get_filefrag_fun_for(result) -> + fun mk_result_filefrag/2. + +-spec pread(storage(), transfer(), filefrag(), offset(), _Size :: non_neg_integer()) -> {ok, _Content :: iodata()} | {error, _TODO}. -read_segment(_Storage, _Transfer, Segment, Offset, Size) -> - Filepath = maps:get(path, Segment), - case file:open(Filepath, [raw, read]) of +pread(_Storage, _Transfer, Frag, Offset, Size) -> + Filepath = maps:get(path, Frag), + case file:open(Filepath, [read, raw, binary]) of {ok, IoDevice} -> + % NOTE + % Reading empty file is always `eof`. Read = file:pread(IoDevice, Offset, Size), ok = file:close(IoDevice), case Read of @@ -147,6 +168,8 @@ read_segment(_Storage, _Transfer, Segment, Offset, Size) -> assemble(Storage, Transfer, Callback) -> emqx_ft_assembler_sup:start_child(Storage, Transfer, Callback). +%% + -type handle() :: {file:name(), io:device(), crypto:hash_state()}. -spec open_file(storage(), transfer(), filemeta()) -> @@ -155,7 +178,7 @@ open_file(Storage, Transfer, Filemeta) -> Filename = maps:get(name, Filemeta), TempFilepath = mk_temp_filepath(Storage, Transfer, Filename), _ = filelib:ensure_dir(TempFilepath), - case file:open(TempFilepath, [write, raw]) of + case file:open(TempFilepath, [write, raw, binary]) of {ok, Handle} -> _ = file:truncate(Handle), {ok, {TempFilepath, Handle, init_checksum(Filemeta)}}; @@ -370,6 +393,12 @@ mk_filefrag(_Dirname, _Filename) -> % TODO this is unexpected, worth logging? false. +mk_result_filefrag(Dirname, Filename) -> + % NOTE + % Any file in the `?RESULTDIR` subdir is currently considered the result of + % the file transfer. + mk_filefrag(Dirname, Filename, result, fun(_, _) -> {ok, #{}} end). + mk_filefrag(Dirname, Filename, Tag, Fun) -> Filepath = filename:join(Dirname, Filename), % TODO error handling? @@ -379,6 +408,7 @@ mk_filefrag(Dirname, Filename, Tag, Fun) -> {true, #{ path => Filepath, timestamp => Fileinfo#file_info.mtime, + size => Fileinfo#file_info.size, fragment => {Tag, Frag} }}; {error, _Reason} -> diff --git a/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl b/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl index fdfdd432e..77a619fe3 100644 --- a/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl @@ -71,7 +71,7 @@ t_assemble_empty_transfer(Config) -> fragment := {filemeta, Meta} } ]}, - emqx_ft_storage_fs:list(Storage, Transfer) + emqx_ft_storage_fs:list(Storage, Transfer, fragment) ), {ok, _AsmPid} = emqx_ft_storage_fs:assemble(Storage, Transfer, fun on_assembly_finished/1), {ok, Event} = ?block_until(#{?snk_kind := test_assembly_finished}), @@ -81,6 +81,11 @@ t_assemble_empty_transfer(Config) -> % TODO file:read_file(mk_assembly_filename(Config, Transfer, Filename)) ), + {ok, [Result = #{size := Size = 0}]} = emqx_ft_storage_fs:list(Storage, Transfer, result), + ?assertEqual( + {error, eof}, + emqx_ft_storage_fs:pread(Storage, Transfer, Result, 0, Size) + ), ok. t_assemble_complete_local_transfer(Config) -> @@ -109,7 +114,7 @@ t_assemble_complete_local_transfer(Config) -> end ), - {ok, Fragments} = emqx_ft_storage_fs:list(Storage, Transfer), + {ok, Fragments} = emqx_ft_storage_fs:list(Storage, Transfer, fragment), ?assertEqual((TransferSize div SegmentSize) + 1 + 1, length(Fragments)), ?assertEqual( [Meta], @@ -122,6 +127,16 @@ t_assemble_complete_local_transfer(Config) -> ?assertMatch(#{result := ok}, Event), AssemblyFilename = mk_assembly_filename(Config, Transfer, Filename), + ?assertMatch( + {ok, [ + #{ + path := AssemblyFilename, + size := TransferSize, + fragment := {result, #{}} + } + ]}, + emqx_ft_storage_fs:list(Storage, Transfer, result) + ), ?assertMatch( {ok, #file_info{type = regular, size = TransferSize}}, file:read_file_info(AssemblyFilename)