diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_fs.erl index e2bda8fcb..cc9d94b35 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs.erl @@ -49,9 +49,11 @@ -type filefrag() :: filefrag({filemeta, filemeta()} | {segment, segmentinfo()}). +-define(FRAGDIR, frags). +-define(TEMPDIR, tmp). +-define(RESULTDIR, result). -define(MANIFEST, "MANIFEST.json"). -define(SEGMENT, "SEG"). --define(TEMP, "TMP"). -type root() :: file:name(). @@ -77,7 +79,7 @@ % Quota? Some lower level errors? {ok, emqx_ft_storage:ctx()} | {error, conflict} | {error, _TODO}. store_filemeta(Storage, Transfer, Meta) -> - Filepath = mk_filepath(Storage, Transfer, ?MANIFEST), + Filepath = mk_filepath(Storage, Transfer, [?FRAGDIR], ?MANIFEST), case read_file(Filepath, fun decode_filemeta/1) of {ok, Meta} -> _ = touch_file(Filepath), @@ -89,7 +91,7 @@ store_filemeta(Storage, Transfer, Meta) -> % about it too much now. {error, conflict}; {error, Reason} when Reason =:= notfound; Reason =:= corrupted; Reason =:= enoent -> - write_file_atomic(Filepath, encode_filemeta(Meta)) + write_file_atomic(Storage, Transfer, Filepath, encode_filemeta(Meta)) end. %% Store a segment in the backing filesystem. @@ -99,15 +101,15 @@ store_filemeta(Storage, Transfer, Meta) -> % Quota? Some lower level errors? ok | {error, _TODO}. store_segment(Storage, Transfer, Segment = {_Offset, Content}) -> - Filepath = mk_filepath(Storage, Transfer, mk_segment_filename(Segment)), - write_file_atomic(Filepath, Content). + Filepath = mk_filepath(Storage, Transfer, [?FRAGDIR], mk_segment_filename(Segment)), + write_file_atomic(Storage, Transfer, Filepath, Content). -spec list(storage(), transfer()) -> % Some lower level errors? {error, notfound}? % Result will contain zero or only one filemeta. {ok, list(filefrag())} | {error, _TODO}. list(Storage, Transfer) -> - Dirname = mk_filedir(Storage, Transfer), + Dirname = mk_filedir(Storage, Transfer, [?FRAGDIR]), case file:list_dir(Dirname) of {ok, Filenames} -> {ok, filtermap_files(fun mk_filefrag/2, Dirname, Filenames)}; @@ -151,8 +153,8 @@ assemble(Storage, Transfer, Callback) -> {ok, handle()} | {error, _TODO}. open_file(Storage, Transfer, Filemeta) -> Filename = maps:get(name, Filemeta), - Filepath = mk_filepath(Storage, Transfer, Filename), - TempFilepath = mk_temp_filepath(Filepath), + TempFilepath = mk_temp_filepath(Storage, Transfer, Filename), + _ = filelib:ensure_dir(TempFilepath), case file:open(TempFilepath, [write, raw]) of {ok, Handle} -> _ = file:truncate(Handle), @@ -174,11 +176,11 @@ write({Filepath, IoDevice, Ctx}, IoData) -> -spec complete(storage(), transfer(), filemeta(), handle()) -> ok | {error, {checksum, _Algo, _Computed}} | {error, _TODO}. complete(Storage, Transfer, Filemeta, Handle = {Filepath, IoDevice, Ctx}) -> - TargetFilepath = mk_filepath(Storage, Transfer, maps:get(name, Filemeta)), + TargetFilepath = mk_filepath(Storage, Transfer, [?RESULTDIR], maps:get(name, Filemeta)), case verify_checksum(Ctx, Filemeta) of ok -> ok = file:close(IoDevice), - file:rename(Filepath, TargetFilepath); + mv_temp_file(Filepath, TargetFilepath); {error, _} = Error -> _ = discard(Handle), Error @@ -284,11 +286,11 @@ break_segment_filename(Filename) -> {error, invalid} end. -mk_filedir(Storage, {ClientId, FileId}) -> - filename:join([get_storage_root(Storage), ClientId, FileId]). +mk_filedir(Storage, {ClientId, FileId}, SubDirs) -> + filename:join([get_storage_root(Storage), ClientId, FileId | SubDirs]). -mk_filepath(Storage, Transfer, Filename) -> - filename:join(mk_filedir(Storage, Transfer), Filename). +mk_filepath(Storage, Transfer, SubDirs, Filename) -> + filename:join(mk_filedir(Storage, Transfer, SubDirs), Filename). get_storage_root(Storage) -> maps:get(root, Storage, filename:join(emqx:data_dir(), "file_transfer")). @@ -315,13 +317,13 @@ safe_decode(Content, DecodeFun) -> {error, corrupted} end. -write_file_atomic(Filepath, Content) when is_binary(Content) -> - TempFilepath = mk_temp_filepath(Filepath), +write_file_atomic(Storage, Transfer, Filepath, Content) when is_binary(Content) -> + TempFilepath = mk_temp_filepath(Storage, Transfer, filename:basename(Filepath)), Result = emqx_misc:pipeline( [ fun filelib:ensure_dir/1, fun write_contents/2, - fun(FP) -> mv_temp_file(Filepath, FP) end + fun(_) -> mv_temp_file(TempFilepath, Filepath) end ], TempFilepath, Content @@ -334,11 +336,9 @@ write_file_atomic(Filepath, Content) when is_binary(Content) -> {error, Reason} end. -mk_temp_filepath(Filepath) -> - Dirname = filename:dirname(Filepath), - Filename = filename:basename(Filepath), +mk_temp_filepath(Storage, Transfer, Filename) -> Unique = erlang:unique_integer([positive]), - filename:join(Dirname, mk_filename([?TEMP, Unique, ".", Filename])). + filename:join(mk_filedir(Storage, Transfer, [?TEMPDIR]), mk_filename([Unique, ".", Filename])). mk_filename(Comps) -> lists:append(lists:map(fun mk_filename_component/1, Comps)). @@ -351,7 +351,8 @@ mk_filename_component(S) when is_list(S) -> S. write_contents(Filepath, Content) -> file:write_file(Filepath, Content). -mv_temp_file(Filepath, TempFilepath) -> +mv_temp_file(TempFilepath, Filepath) -> + _ = filelib:ensure_dir(Filepath), file:rename(TempFilepath, Filepath). touch_file(Filepath) -> @@ -365,7 +366,8 @@ mk_filefrag(Dirname, Filename = ?MANIFEST) -> mk_filefrag(Dirname, Filename, filemeta, fun read_filemeta/2); mk_filefrag(Dirname, Filename = ?SEGMENT ++ _) -> mk_filefrag(Dirname, Filename, segment, fun read_segmentinfo/2); -mk_filefrag(_Dirname, _) -> +mk_filefrag(_Dirname, _Filename) -> + % TODO this is unexpected, worth logging? false. mk_filefrag(Dirname, Filename, Tag, Fun) -> @@ -380,6 +382,7 @@ mk_filefrag(Dirname, Filename, Tag, Fun) -> fragment => {Tag, Frag} }}; {error, _Reason} -> + % TODO loss of information false end. diff --git a/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl b/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl index 9ec2fce61..fdfdd432e 100644 --- a/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl @@ -133,7 +133,7 @@ t_assemble_complete_local_transfer(Config) -> ). mk_assembly_filename(Config, {ClientID, FileID}, Filename) -> - filename:join([?config(storage_root, Config), ClientID, FileID, Filename]). + filename:join([?config(storage_root, Config), ClientID, FileID, result, Filename]). on_assembly_finished(Result) -> ?tp(test_assembly_finished, #{result => Result}).