fix(ft): improve typespecs

This commit is contained in:
Andrew Mayorov 2023-02-20 18:22:51 +03:00 committed by Ilya Averyanov
parent 2b925aa60b
commit 75070102ec
3 changed files with 55 additions and 16 deletions

View File

@ -26,7 +26,7 @@
-record(st, { -record(st, {
storage :: _Storage, storage :: _Storage,
transfer :: emqx_ft:transfer(), transfer :: emqx_ft:transfer(),
assembly :: _TODO, assembly :: emqx_ft_assembly:t(),
file :: {file:filename(), io:device(), term()} | undefined, file :: {file:filename(), io:device(), term()} | undefined,
hash hash
}). }).

View File

@ -25,16 +25,43 @@
-export([coverage/1]). -export([coverage/1]).
-export([properties/1]). -export([properties/1]).
-export_type([t/0]).
-type filemeta() :: emqx_ft:filemeta().
-type filefrag() :: emqx_ft_storage_fs:filefrag().
-type filefrag(T) :: emqx_ft_storage_fs:filefrag(T).
-type segmentinfo() :: emqx_ft_storage_fs:segmentinfo().
-record(asm, { -record(asm, {
status :: _TODO, status :: status(),
coverage :: _TODO, coverage :: coverage() | undefined,
properties :: _TODO, properties :: properties() | undefined,
meta :: _TODO, meta :: orddict:orddict(
% orddict:orddict(K, V) filemeta(),
segs :: _TODO, {node(), filefrag({filemeta, filemeta()})}
size ),
segs :: orddict:orddict(
{emqx_ft:offset(), _Locality, _MEnd, node()},
filefrag({segment, segmentinfo()})
),
size :: emqx_ft:bytes()
}). }).
-type status() ::
{incomplete, {missing, _}}
| complete
| {error, {inconsistent, _}}.
-type coverage() :: [{node(), filefrag({segment, segmentinfo()})}].
-type properties() :: #{
%% Node where "most" of the segments are located.
dominant => node()
}.
-opaque t() :: #asm{}.
-spec new(emqx_ft:bytes()) -> t().
new(Size) -> new(Size) ->
#asm{ #asm{
status = {incomplete, {missing, filemeta}}, status = {incomplete, {missing, filemeta}},
@ -43,6 +70,7 @@ new(Size) ->
size = Size size = Size
}. }.
-spec append(t(), node(), filefrag() | [filefrag()]) -> t().
append(Asm, Node, Fragments) when is_list(Fragments) -> append(Asm, Node, Fragments) when is_list(Fragments) ->
lists:foldl(fun(F, AsmIn) -> append(AsmIn, Node, F) end, Asm, Fragments); lists:foldl(fun(F, AsmIn) -> append(AsmIn, Node, F) end, Asm, Fragments);
append(Asm, Node, Fragment = #{fragment := {filemeta, _}}) -> append(Asm, Node, Fragment = #{fragment := {filemeta, _}}) ->
@ -50,6 +78,7 @@ append(Asm, Node, Fragment = #{fragment := {filemeta, _}}) ->
append(Asm, Node, Segment = #{fragment := {segment, _}}) -> append(Asm, Node, Segment = #{fragment := {segment, _}}) ->
append_segmentinfo(Asm, Node, Segment). append_segmentinfo(Asm, Node, Segment).
-spec update(t()) -> t().
update(Asm) -> update(Asm) ->
case status(meta, Asm) of case status(meta, Asm) of
{complete, _Meta} -> {complete, _Meta} ->
@ -67,15 +96,18 @@ update(Asm) ->
Asm#asm{status = Status} Asm#asm{status = Status}
end. end.
-spec status(t()) -> status().
status(#asm{status = Status}) -> status(#asm{status = Status}) ->
Status. Status.
-spec filemeta(t()) -> filemeta().
filemeta(Asm) -> filemeta(Asm) ->
case status(meta, Asm) of case status(meta, Asm) of
{complete, Meta} -> Meta; {complete, Meta} -> Meta;
_Other -> undefined _Other -> undefined
end. end.
-spec coverage(t()) -> coverage() | undefined.
coverage(#asm{coverage = Coverage}) -> coverage(#asm{coverage = Coverage}) ->
Coverage. Coverage.

View File

@ -80,11 +80,18 @@
-type storage() :: emqx_ft_storage:storage(). -type storage() :: emqx_ft_storage:storage().
-type file_error() ::
file:posix()
%% Filename is incompatible with the backing filesystem.
| badarg
%% System limit (e.g. number of ports) reached.
| system_limit.
%% Store manifest in the backing filesystem. %% Store manifest in the backing filesystem.
%% Atomic operation. %% Atomic operation.
-spec store_filemeta(storage(), transfer(), filemeta()) -> -spec store_filemeta(storage(), transfer(), filemeta()) ->
% Quota? Some lower level errors? % Quota? Some lower level errors?
ok | {error, conflict} | {error, _TODO}. ok | {error, conflict} | {error, file_error()}.
store_filemeta(Storage, Transfer, Meta) -> store_filemeta(Storage, Transfer, Meta) ->
% TODO safeguard against bad clientids / fileids. % TODO safeguard against bad clientids / fileids.
Filepath = mk_filepath(Storage, Transfer, [?FRAGDIR], ?MANIFEST), Filepath = mk_filepath(Storage, Transfer, [?FRAGDIR], ?MANIFEST),
@ -110,7 +117,7 @@ store_filemeta(Storage, Transfer, Meta) ->
-spec store_segment(storage(), transfer(), segment()) -> -spec store_segment(storage(), transfer(), segment()) ->
% Where is the checksum gets verified? Upper level probably. % Where is the checksum gets verified? Upper level probably.
% Quota? Some lower level errors? % Quota? Some lower level errors?
ok | {error, _TODO}. ok | {error, file_error()}.
store_segment(Storage, Transfer, Segment = {_Offset, Content}) -> store_segment(Storage, Transfer, Segment = {_Offset, Content}) ->
Filepath = mk_filepath(Storage, Transfer, [?FRAGDIR], mk_segment_filename(Segment)), Filepath = mk_filepath(Storage, Transfer, [?FRAGDIR], mk_segment_filename(Segment)),
write_file_atomic(Storage, Transfer, Filepath, Content). write_file_atomic(Storage, Transfer, Filepath, Content).
@ -118,7 +125,8 @@ store_segment(Storage, Transfer, Segment = {_Offset, Content}) ->
-spec list(storage(), transfer(), _What :: fragment | result) -> -spec list(storage(), transfer(), _What :: fragment | result) ->
% Some lower level errors? {error, notfound}? % Some lower level errors? {error, notfound}?
% Result will contain zero or only one filemeta. % Result will contain zero or only one filemeta.
{ok, [filefrag({filemeta, filemeta()} | {segment, segmentinfo()})]} | {error, _TODO}. {ok, [filefrag({filemeta, filemeta()} | {segment, segmentinfo()})]}
| {error, file_error()}.
list(Storage, Transfer, What) -> list(Storage, Transfer, What) ->
Dirname = mk_filedir(Storage, Transfer, get_subdirs_for(What)), Dirname = mk_filedir(Storage, Transfer, get_subdirs_for(What)),
case file:list_dir(Dirname) of case file:list_dir(Dirname) of
@ -146,7 +154,7 @@ get_filefrag_fun_for(result) ->
fun mk_result_filefrag/2. fun mk_result_filefrag/2.
-spec pread(storage(), transfer(), filefrag(), offset(), _Size :: non_neg_integer()) -> -spec pread(storage(), transfer(), filefrag(), offset(), _Size :: non_neg_integer()) ->
{ok, _Content :: iodata()} | {error, _TODO}. {ok, _Content :: iodata()} | {error, eof} | {error, file_error()}.
pread(_Storage, _Transfer, Frag, Offset, Size) -> pread(_Storage, _Transfer, Frag, Offset, Size) ->
Filepath = maps:get(path, Frag), Filepath = maps:get(path, Frag),
case file:open(Filepath, [read, raw, binary]) of case file:open(Filepath, [read, raw, binary]) of
@ -168,7 +176,6 @@ pread(_Storage, _Transfer, Frag, Offset, Size) ->
end. end.
-spec assemble(storage(), transfer(), emqx_ft:bytes()) -> -spec assemble(storage(), transfer(), emqx_ft:bytes()) ->
% {ok, _Assembler :: pid()} | {error, incomplete} | {error, badrpc} | {error, _TODO}.
{async, _Assembler :: pid()} | {error, _TODO}. {async, _Assembler :: pid()} | {error, _TODO}.
assemble(Storage, Transfer, Size) -> assemble(Storage, Transfer, Size) ->
% TODO: ask cluster if the transfer is already assembled % TODO: ask cluster if the transfer is already assembled
@ -321,7 +328,7 @@ read_transferinfo(Storage, Transfer, Acc) ->
-type handle() :: {file:name(), io:device(), crypto:hash_state()}. -type handle() :: {file:name(), io:device(), crypto:hash_state()}.
-spec open_file(storage(), transfer(), filemeta()) -> -spec open_file(storage(), transfer(), filemeta()) ->
{ok, handle()} | {error, _TODO}. {ok, handle()} | {error, file_error()}.
open_file(Storage, Transfer, Filemeta) -> open_file(Storage, Transfer, Filemeta) ->
Filename = maps:get(name, Filemeta), Filename = maps:get(name, Filemeta),
TempFilepath = mk_temp_filepath(Storage, Transfer, Filename), TempFilepath = mk_temp_filepath(Storage, Transfer, Filename),
@ -335,7 +342,7 @@ open_file(Storage, Transfer, Filemeta) ->
end. end.
-spec write(handle(), iodata()) -> -spec write(handle(), iodata()) ->
{ok, handle()} | {error, _TODO}. {ok, handle()} | {error, file_error()}.
write({Filepath, IoDevice, Ctx}, IoData) -> write({Filepath, IoDevice, Ctx}, IoData) ->
case file:write(IoDevice, IoData) of case file:write(IoDevice, IoData) of
ok -> ok ->
@ -345,7 +352,7 @@ write({Filepath, IoDevice, Ctx}, IoData) ->
end. end.
-spec complete(storage(), transfer(), filemeta(), handle()) -> -spec complete(storage(), transfer(), filemeta(), handle()) ->
ok | {error, {checksum, _Algo, _Computed}} | {error, _TODO}. ok | {error, {checksum, _Algo, _Computed}} | {error, file_error()}.
complete(Storage, Transfer, Filemeta, Handle = {Filepath, IoDevice, Ctx}) -> complete(Storage, Transfer, Filemeta, Handle = {Filepath, IoDevice, Ctx}) ->
TargetFilepath = mk_filepath(Storage, Transfer, [?RESULTDIR], maps:get(name, Filemeta)), TargetFilepath = mk_filepath(Storage, Transfer, [?RESULTDIR], maps:get(name, Filemeta)),
case verify_checksum(Ctx, Filemeta) of case verify_checksum(Ctx, Filemeta) of