diff --git a/apps/emqx_ft/src/emqx_ft_assembler_sup.erl b/apps/emqx_ft/src/emqx_ft_assembler_sup.erl index e837d96f1..b60949a5e 100644 --- a/apps/emqx_ft/src/emqx_ft_assembler_sup.erl +++ b/apps/emqx_ft/src/emqx_ft_assembler_sup.erl @@ -36,7 +36,7 @@ start_child(Storage, Transfer, Callback) -> init(_) -> SupFlags = #{ strategy => one_for_one, - intensity => 100, + intensity => 10, period => 1000 }, {ok, {SupFlags, []}}. diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_fs.erl index 15efa142b..cce7cc19e 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs.erl @@ -24,6 +24,7 @@ -export([store_filemeta/3]). -export([store_segment/3]). -export([list/2]). +-export([read_segment/5]). -export([assemble/3]). -export([open_file/3]). @@ -154,6 +155,28 @@ list(Storage, Transfer) -> Error end. +-spec read_segment( + storage(), transfer(), filefrag(segmentinfo()), offset(), _Size :: non_neg_integer() +) -> + {ok, _Content :: iodata()} | {error, _TODO}. +read_segment(_Storage, _Transfer, Segment, Offset, Size) -> + Filepath = maps:get(path, Segment), + case file:open(Filepath, [raw, read]) of + {ok, IoDevice} -> + Read = file:pread(IoDevice, Offset, Size), + ok = file:close(IoDevice), + case Read of + {ok, Content} -> + {ok, Content}; + eof -> + {error, eof}; + {error, Reason} -> + {error, Reason} + end; + {error, Reason} -> + {error, Reason} + end. + -spec assemble(storage(), transfer(), fun((ok | {error, term()}) -> any())) -> % {ok, _Assembler :: pid()} | {error, incomplete} | {error, badrpc} | {error, _TODO}. {ok, _Assembler :: pid()} | {error, _TODO}. @@ -359,18 +382,18 @@ safe_decode(Content, DecodeFun) -> end. write_file_atomic(Filepath, Content) when is_binary(Content) -> + TempFilepath = mk_temp_filepath(Filepath), Result = emqx_misc:pipeline( [ fun filelib:ensure_dir/1, - fun mk_temp_filepath/1, fun write_contents/2, - fun mv_temp_file/1 + fun(FP) -> mv_temp_file(Filepath, FP) end ], - Filepath, + TempFilepath, Content ), case Result of - {ok, {Filepath, TempFilepath}, _} -> + {ok, _, _} -> _ = file:delete(TempFilepath), ok; {error, Reason, _} -> @@ -381,13 +404,20 @@ mk_temp_filepath(Filepath) -> Dirname = filename:dirname(Filepath), Filename = filename:basename(Filepath), Unique = erlang:unique_integer([positive]), - TempFilepath = filename:join(Dirname, ?TEMP ++ integer_to_list(Unique) ++ "." ++ Filename), - {Filepath, TempFilepath}. + filename:join(Dirname, mk_filename([?TEMP, Unique, ".", Filename])). -write_contents({_Filepath, TempFilepath}, Content) -> - file:write_file(TempFilepath, Content). +mk_filename(Comps) -> + lists:append(lists:map(fun mk_filename_component/1, Comps)). -mv_temp_file({Filepath, TempFilepath}) -> +mk_filename_component(I) when is_integer(I) -> integer_to_list(I); +mk_filename_component(A) when is_atom(A) -> atom_to_list(A); +mk_filename_component(B) when is_binary(B) -> unicode:characters_to_list(B); +mk_filename_component(S) when is_list(S) -> S. + +write_contents(Filepath, Content) -> + file:write_file(Filepath, Content). + +mv_temp_file(Filepath, TempFilepath) -> file:rename(TempFilepath, Filepath). touch_file(Filepath) -> @@ -398,21 +428,22 @@ filtermap_files(Fun, Dirname, Filenames) -> lists:filtermap(fun(Filename) -> Fun(Dirname, Filename) end, Filenames). mk_filefrag(Dirname, Filename = ?MANIFEST) -> - mk_filefrag(Dirname, Filename, fun read_filemeta/2); + mk_filefrag(Dirname, Filename, filemeta, fun read_filemeta/2); mk_filefrag(Dirname, Filename = ?SEGMENT ++ _) -> - mk_filefrag(Dirname, Filename, fun read_segmentinfo/2); + mk_filefrag(Dirname, Filename, segment, fun read_segmentinfo/2); mk_filefrag(_Dirname, _) -> false. -mk_filefrag(Dirname, Filename, Fun) -> +mk_filefrag(Dirname, Filename, Tag, Fun) -> Filepath = filename:join(Dirname, Filename), - Fileinfo = file:read_file_info(Filepath), + % TODO error handling? + {ok, Fileinfo} = file:read_file_info(Filepath), case Fun(Filename, Filepath) of {ok, Frag} -> {true, #{ path => Filepath, timestamp => Fileinfo#file_info.mtime, - fragment => Frag + fragment => {Tag, Frag} }}; {error, _Reason} -> false