fix(ft-fs): add missing `read_segment/5` + fix atomic write

This commit is contained in:
Andrew Mayorov 2023-02-02 21:31:04 +03:00 committed by Ilya Averyanov
parent 5dae423f1e
commit f9078e8401
2 changed files with 46 additions and 15 deletions

View File

@ -36,7 +36,7 @@ start_child(Storage, Transfer, Callback) ->
init(_) -> init(_) ->
SupFlags = #{ SupFlags = #{
strategy => one_for_one, strategy => one_for_one,
intensity => 100, intensity => 10,
period => 1000 period => 1000
}, },
{ok, {SupFlags, []}}. {ok, {SupFlags, []}}.

View File

@ -24,6 +24,7 @@
-export([store_filemeta/3]). -export([store_filemeta/3]).
-export([store_segment/3]). -export([store_segment/3]).
-export([list/2]). -export([list/2]).
-export([read_segment/5]).
-export([assemble/3]). -export([assemble/3]).
-export([open_file/3]). -export([open_file/3]).
@ -154,6 +155,28 @@ list(Storage, Transfer) ->
Error Error
end. end.
-spec read_segment(
storage(), transfer(), filefrag(segmentinfo()), offset(), _Size :: non_neg_integer()
) ->
{ok, _Content :: iodata()} | {error, _TODO}.
read_segment(_Storage, _Transfer, Segment, Offset, Size) ->
Filepath = maps:get(path, Segment),
case file:open(Filepath, [raw, read]) of
{ok, IoDevice} ->
Read = file:pread(IoDevice, Offset, Size),
ok = file:close(IoDevice),
case Read of
{ok, Content} ->
{ok, Content};
eof ->
{error, eof};
{error, Reason} ->
{error, Reason}
end;
{error, Reason} ->
{error, Reason}
end.
-spec assemble(storage(), transfer(), fun((ok | {error, term()}) -> any())) -> -spec assemble(storage(), transfer(), fun((ok | {error, term()}) -> any())) ->
% {ok, _Assembler :: pid()} | {error, incomplete} | {error, badrpc} | {error, _TODO}. % {ok, _Assembler :: pid()} | {error, incomplete} | {error, badrpc} | {error, _TODO}.
{ok, _Assembler :: pid()} | {error, _TODO}. {ok, _Assembler :: pid()} | {error, _TODO}.
@ -359,18 +382,18 @@ safe_decode(Content, DecodeFun) ->
end. end.
write_file_atomic(Filepath, Content) when is_binary(Content) -> write_file_atomic(Filepath, Content) when is_binary(Content) ->
TempFilepath = mk_temp_filepath(Filepath),
Result = emqx_misc:pipeline( Result = emqx_misc:pipeline(
[ [
fun filelib:ensure_dir/1, fun filelib:ensure_dir/1,
fun mk_temp_filepath/1,
fun write_contents/2, fun write_contents/2,
fun mv_temp_file/1 fun(FP) -> mv_temp_file(Filepath, FP) end
], ],
Filepath, TempFilepath,
Content Content
), ),
case Result of case Result of
{ok, {Filepath, TempFilepath}, _} -> {ok, _, _} ->
_ = file:delete(TempFilepath), _ = file:delete(TempFilepath),
ok; ok;
{error, Reason, _} -> {error, Reason, _} ->
@ -381,13 +404,20 @@ mk_temp_filepath(Filepath) ->
Dirname = filename:dirname(Filepath), Dirname = filename:dirname(Filepath),
Filename = filename:basename(Filepath), Filename = filename:basename(Filepath),
Unique = erlang:unique_integer([positive]), Unique = erlang:unique_integer([positive]),
TempFilepath = filename:join(Dirname, ?TEMP ++ integer_to_list(Unique) ++ "." ++ Filename), filename:join(Dirname, mk_filename([?TEMP, Unique, ".", Filename])).
{Filepath, TempFilepath}.
write_contents({_Filepath, TempFilepath}, Content) -> mk_filename(Comps) ->
file:write_file(TempFilepath, Content). lists:append(lists:map(fun mk_filename_component/1, Comps)).
mv_temp_file({Filepath, TempFilepath}) -> mk_filename_component(I) when is_integer(I) -> integer_to_list(I);
mk_filename_component(A) when is_atom(A) -> atom_to_list(A);
mk_filename_component(B) when is_binary(B) -> unicode:characters_to_list(B);
mk_filename_component(S) when is_list(S) -> S.
write_contents(Filepath, Content) ->
file:write_file(Filepath, Content).
mv_temp_file(Filepath, TempFilepath) ->
file:rename(TempFilepath, Filepath). file:rename(TempFilepath, Filepath).
touch_file(Filepath) -> touch_file(Filepath) ->
@ -398,21 +428,22 @@ filtermap_files(Fun, Dirname, Filenames) ->
lists:filtermap(fun(Filename) -> Fun(Dirname, Filename) end, Filenames). lists:filtermap(fun(Filename) -> Fun(Dirname, Filename) end, Filenames).
mk_filefrag(Dirname, Filename = ?MANIFEST) -> mk_filefrag(Dirname, Filename = ?MANIFEST) ->
mk_filefrag(Dirname, Filename, fun read_filemeta/2); mk_filefrag(Dirname, Filename, filemeta, fun read_filemeta/2);
mk_filefrag(Dirname, Filename = ?SEGMENT ++ _) -> mk_filefrag(Dirname, Filename = ?SEGMENT ++ _) ->
mk_filefrag(Dirname, Filename, fun read_segmentinfo/2); mk_filefrag(Dirname, Filename, segment, fun read_segmentinfo/2);
mk_filefrag(_Dirname, _) -> mk_filefrag(_Dirname, _) ->
false. false.
mk_filefrag(Dirname, Filename, Fun) -> mk_filefrag(Dirname, Filename, Tag, Fun) ->
Filepath = filename:join(Dirname, Filename), Filepath = filename:join(Dirname, Filename),
Fileinfo = file:read_file_info(Filepath), % TODO error handling?
{ok, Fileinfo} = file:read_file_info(Filepath),
case Fun(Filename, Filepath) of case Fun(Filename, Filepath) of
{ok, Frag} -> {ok, Frag} ->
{true, #{ {true, #{
path => Filepath, path => Filepath,
timestamp => Fileinfo#file_info.mtime, timestamp => Fileinfo#file_info.mtime,
fragment => Frag fragment => {Tag, Frag}
}}; }};
{error, _Reason} -> {error, _Reason} ->
false false