From 75070102ece6eb7513a947c6e6368a3b9b64486c Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 20 Feb 2023 18:22:51 +0300 Subject: [PATCH] fix(ft): improve typespecs --- apps/emqx_ft/src/emqx_ft_assembler.erl | 2 +- apps/emqx_ft/src/emqx_ft_assembly.erl | 46 +++++++++++++++++++++---- apps/emqx_ft/src/emqx_ft_storage_fs.erl | 23 ++++++++----- 3 files changed, 55 insertions(+), 16 deletions(-) diff --git a/apps/emqx_ft/src/emqx_ft_assembler.erl b/apps/emqx_ft/src/emqx_ft_assembler.erl index 441303270..9e3ddfcbd 100644 --- a/apps/emqx_ft/src/emqx_ft_assembler.erl +++ b/apps/emqx_ft/src/emqx_ft_assembler.erl @@ -26,7 +26,7 @@ -record(st, { storage :: _Storage, transfer :: emqx_ft:transfer(), - assembly :: _TODO, + assembly :: emqx_ft_assembly:t(), file :: {file:filename(), io:device(), term()} | undefined, hash }). diff --git a/apps/emqx_ft/src/emqx_ft_assembly.erl b/apps/emqx_ft/src/emqx_ft_assembly.erl index 2d78b540b..f0f0026a3 100644 --- a/apps/emqx_ft/src/emqx_ft_assembly.erl +++ b/apps/emqx_ft/src/emqx_ft_assembly.erl @@ -25,16 +25,43 @@ -export([coverage/1]). -export([properties/1]). +-export_type([t/0]). + +-type filemeta() :: emqx_ft:filemeta(). +-type filefrag() :: emqx_ft_storage_fs:filefrag(). +-type filefrag(T) :: emqx_ft_storage_fs:filefrag(T). +-type segmentinfo() :: emqx_ft_storage_fs:segmentinfo(). + -record(asm, { - status :: _TODO, - coverage :: _TODO, - properties :: _TODO, - meta :: _TODO, - % orddict:orddict(K, V) - segs :: _TODO, - size + status :: status(), + coverage :: coverage() | undefined, + properties :: properties() | undefined, + meta :: orddict:orddict( + filemeta(), + {node(), filefrag({filemeta, filemeta()})} + ), + segs :: orddict:orddict( + {emqx_ft:offset(), _Locality, _MEnd, node()}, + filefrag({segment, segmentinfo()}) + ), + size :: emqx_ft:bytes() }). +-type status() :: + {incomplete, {missing, _}} + | complete + | {error, {inconsistent, _}}. + +-type coverage() :: [{node(), filefrag({segment, segmentinfo()})}]. + +-type properties() :: #{ + %% Node where "most" of the segments are located. + dominant => node() +}. + +-opaque t() :: #asm{}. + +-spec new(emqx_ft:bytes()) -> t(). new(Size) -> #asm{ status = {incomplete, {missing, filemeta}}, @@ -43,6 +70,7 @@ new(Size) -> size = Size }. +-spec append(t(), node(), filefrag() | [filefrag()]) -> t(). append(Asm, Node, Fragments) when is_list(Fragments) -> lists:foldl(fun(F, AsmIn) -> append(AsmIn, Node, F) end, Asm, Fragments); append(Asm, Node, Fragment = #{fragment := {filemeta, _}}) -> @@ -50,6 +78,7 @@ append(Asm, Node, Fragment = #{fragment := {filemeta, _}}) -> append(Asm, Node, Segment = #{fragment := {segment, _}}) -> append_segmentinfo(Asm, Node, Segment). +-spec update(t()) -> t(). update(Asm) -> case status(meta, Asm) of {complete, _Meta} -> @@ -67,15 +96,18 @@ update(Asm) -> Asm#asm{status = Status} end. +-spec status(t()) -> status(). status(#asm{status = Status}) -> Status. +-spec filemeta(t()) -> filemeta(). filemeta(Asm) -> case status(meta, Asm) of {complete, Meta} -> Meta; _Other -> undefined end. +-spec coverage(t()) -> coverage() | undefined. coverage(#asm{coverage = Coverage}) -> Coverage. diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_fs.erl index 103f0e48d..d3331a091 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs.erl @@ -80,11 +80,18 @@ -type storage() :: emqx_ft_storage:storage(). +-type file_error() :: + file:posix() + %% Filename is incompatible with the backing filesystem. + | badarg + %% System limit (e.g. number of ports) reached. + | system_limit. + %% Store manifest in the backing filesystem. %% Atomic operation. -spec store_filemeta(storage(), transfer(), filemeta()) -> % Quota? Some lower level errors? - ok | {error, conflict} | {error, _TODO}. + ok | {error, conflict} | {error, file_error()}. store_filemeta(Storage, Transfer, Meta) -> % TODO safeguard against bad clientids / fileids. Filepath = mk_filepath(Storage, Transfer, [?FRAGDIR], ?MANIFEST), @@ -110,7 +117,7 @@ store_filemeta(Storage, Transfer, Meta) -> -spec store_segment(storage(), transfer(), segment()) -> % Where is the checksum gets verified? Upper level probably. % Quota? Some lower level errors? - ok | {error, _TODO}. + ok | {error, file_error()}. store_segment(Storage, Transfer, Segment = {_Offset, Content}) -> Filepath = mk_filepath(Storage, Transfer, [?FRAGDIR], mk_segment_filename(Segment)), write_file_atomic(Storage, Transfer, Filepath, Content). @@ -118,7 +125,8 @@ store_segment(Storage, Transfer, Segment = {_Offset, Content}) -> -spec list(storage(), transfer(), _What :: fragment | result) -> % Some lower level errors? {error, notfound}? % Result will contain zero or only one filemeta. - {ok, [filefrag({filemeta, filemeta()} | {segment, segmentinfo()})]} | {error, _TODO}. + {ok, [filefrag({filemeta, filemeta()} | {segment, segmentinfo()})]} + | {error, file_error()}. list(Storage, Transfer, What) -> Dirname = mk_filedir(Storage, Transfer, get_subdirs_for(What)), case file:list_dir(Dirname) of @@ -146,7 +154,7 @@ get_filefrag_fun_for(result) -> fun mk_result_filefrag/2. -spec pread(storage(), transfer(), filefrag(), offset(), _Size :: non_neg_integer()) -> - {ok, _Content :: iodata()} | {error, _TODO}. + {ok, _Content :: iodata()} | {error, eof} | {error, file_error()}. pread(_Storage, _Transfer, Frag, Offset, Size) -> Filepath = maps:get(path, Frag), case file:open(Filepath, [read, raw, binary]) of @@ -168,7 +176,6 @@ pread(_Storage, _Transfer, Frag, Offset, Size) -> end. -spec assemble(storage(), transfer(), emqx_ft:bytes()) -> - % {ok, _Assembler :: pid()} | {error, incomplete} | {error, badrpc} | {error, _TODO}. {async, _Assembler :: pid()} | {error, _TODO}. assemble(Storage, Transfer, Size) -> % TODO: ask cluster if the transfer is already assembled @@ -321,7 +328,7 @@ read_transferinfo(Storage, Transfer, Acc) -> -type handle() :: {file:name(), io:device(), crypto:hash_state()}. -spec open_file(storage(), transfer(), filemeta()) -> - {ok, handle()} | {error, _TODO}. + {ok, handle()} | {error, file_error()}. open_file(Storage, Transfer, Filemeta) -> Filename = maps:get(name, Filemeta), TempFilepath = mk_temp_filepath(Storage, Transfer, Filename), @@ -335,7 +342,7 @@ open_file(Storage, Transfer, Filemeta) -> end. -spec write(handle(), iodata()) -> - {ok, handle()} | {error, _TODO}. + {ok, handle()} | {error, file_error()}. write({Filepath, IoDevice, Ctx}, IoData) -> case file:write(IoDevice, IoData) of ok -> @@ -345,7 +352,7 @@ write({Filepath, IoDevice, Ctx}, IoData) -> end. -spec complete(storage(), transfer(), filemeta(), handle()) -> - ok | {error, {checksum, _Algo, _Computed}} | {error, _TODO}. + ok | {error, {checksum, _Algo, _Computed}} | {error, file_error()}. complete(Storage, Transfer, Filemeta, Handle = {Filepath, IoDevice, Ctx}) -> TargetFilepath = mk_filepath(Storage, Transfer, [?RESULTDIR], maps:get(name, Filemeta)), case verify_checksum(Ctx, Filemeta) of