fix(ft-fs): put fragments into separate directories
In order to avoid potential filename collisions.
This commit is contained in:
parent
8298236908
commit
1308fa0e6b
|
@ -49,9 +49,11 @@
|
||||||
|
|
||||||
-type filefrag() :: filefrag({filemeta, filemeta()} | {segment, segmentinfo()}).
|
-type filefrag() :: filefrag({filemeta, filemeta()} | {segment, segmentinfo()}).
|
||||||
|
|
||||||
|
-define(FRAGDIR, frags).
|
||||||
|
-define(TEMPDIR, tmp).
|
||||||
|
-define(RESULTDIR, result).
|
||||||
-define(MANIFEST, "MANIFEST.json").
|
-define(MANIFEST, "MANIFEST.json").
|
||||||
-define(SEGMENT, "SEG").
|
-define(SEGMENT, "SEG").
|
||||||
-define(TEMP, "TMP").
|
|
||||||
|
|
||||||
-type root() :: file:name().
|
-type root() :: file:name().
|
||||||
|
|
||||||
|
@ -77,7 +79,7 @@
|
||||||
% Quota? Some lower level errors?
|
% Quota? Some lower level errors?
|
||||||
{ok, emqx_ft_storage:ctx()} | {error, conflict} | {error, _TODO}.
|
{ok, emqx_ft_storage:ctx()} | {error, conflict} | {error, _TODO}.
|
||||||
store_filemeta(Storage, Transfer, Meta) ->
|
store_filemeta(Storage, Transfer, Meta) ->
|
||||||
Filepath = mk_filepath(Storage, Transfer, ?MANIFEST),
|
Filepath = mk_filepath(Storage, Transfer, [?FRAGDIR], ?MANIFEST),
|
||||||
case read_file(Filepath, fun decode_filemeta/1) of
|
case read_file(Filepath, fun decode_filemeta/1) of
|
||||||
{ok, Meta} ->
|
{ok, Meta} ->
|
||||||
_ = touch_file(Filepath),
|
_ = touch_file(Filepath),
|
||||||
|
@ -89,7 +91,7 @@ store_filemeta(Storage, Transfer, Meta) ->
|
||||||
% about it too much now.
|
% about it too much now.
|
||||||
{error, conflict};
|
{error, conflict};
|
||||||
{error, Reason} when Reason =:= notfound; Reason =:= corrupted; Reason =:= enoent ->
|
{error, Reason} when Reason =:= notfound; Reason =:= corrupted; Reason =:= enoent ->
|
||||||
write_file_atomic(Filepath, encode_filemeta(Meta))
|
write_file_atomic(Storage, Transfer, Filepath, encode_filemeta(Meta))
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% Store a segment in the backing filesystem.
|
%% Store a segment in the backing filesystem.
|
||||||
|
@ -99,15 +101,15 @@ store_filemeta(Storage, Transfer, Meta) ->
|
||||||
% Quota? Some lower level errors?
|
% Quota? Some lower level errors?
|
||||||
ok | {error, _TODO}.
|
ok | {error, _TODO}.
|
||||||
store_segment(Storage, Transfer, Segment = {_Offset, Content}) ->
|
store_segment(Storage, Transfer, Segment = {_Offset, Content}) ->
|
||||||
Filepath = mk_filepath(Storage, Transfer, mk_segment_filename(Segment)),
|
Filepath = mk_filepath(Storage, Transfer, [?FRAGDIR], mk_segment_filename(Segment)),
|
||||||
write_file_atomic(Filepath, Content).
|
write_file_atomic(Storage, Transfer, Filepath, Content).
|
||||||
|
|
||||||
-spec list(storage(), transfer()) ->
|
-spec list(storage(), transfer()) ->
|
||||||
% Some lower level errors? {error, notfound}?
|
% Some lower level errors? {error, notfound}?
|
||||||
% Result will contain zero or only one filemeta.
|
% Result will contain zero or only one filemeta.
|
||||||
{ok, list(filefrag())} | {error, _TODO}.
|
{ok, list(filefrag())} | {error, _TODO}.
|
||||||
list(Storage, Transfer) ->
|
list(Storage, Transfer) ->
|
||||||
Dirname = mk_filedir(Storage, Transfer),
|
Dirname = mk_filedir(Storage, Transfer, [?FRAGDIR]),
|
||||||
case file:list_dir(Dirname) of
|
case file:list_dir(Dirname) of
|
||||||
{ok, Filenames} ->
|
{ok, Filenames} ->
|
||||||
{ok, filtermap_files(fun mk_filefrag/2, Dirname, Filenames)};
|
{ok, filtermap_files(fun mk_filefrag/2, Dirname, Filenames)};
|
||||||
|
@ -151,8 +153,8 @@ assemble(Storage, Transfer, Callback) ->
|
||||||
{ok, handle()} | {error, _TODO}.
|
{ok, handle()} | {error, _TODO}.
|
||||||
open_file(Storage, Transfer, Filemeta) ->
|
open_file(Storage, Transfer, Filemeta) ->
|
||||||
Filename = maps:get(name, Filemeta),
|
Filename = maps:get(name, Filemeta),
|
||||||
Filepath = mk_filepath(Storage, Transfer, Filename),
|
TempFilepath = mk_temp_filepath(Storage, Transfer, Filename),
|
||||||
TempFilepath = mk_temp_filepath(Filepath),
|
_ = filelib:ensure_dir(TempFilepath),
|
||||||
case file:open(TempFilepath, [write, raw]) of
|
case file:open(TempFilepath, [write, raw]) of
|
||||||
{ok, Handle} ->
|
{ok, Handle} ->
|
||||||
_ = file:truncate(Handle),
|
_ = file:truncate(Handle),
|
||||||
|
@ -174,11 +176,11 @@ write({Filepath, IoDevice, Ctx}, IoData) ->
|
||||||
-spec complete(storage(), transfer(), filemeta(), handle()) ->
|
-spec complete(storage(), transfer(), filemeta(), handle()) ->
|
||||||
ok | {error, {checksum, _Algo, _Computed}} | {error, _TODO}.
|
ok | {error, {checksum, _Algo, _Computed}} | {error, _TODO}.
|
||||||
complete(Storage, Transfer, Filemeta, Handle = {Filepath, IoDevice, Ctx}) ->
|
complete(Storage, Transfer, Filemeta, Handle = {Filepath, IoDevice, Ctx}) ->
|
||||||
TargetFilepath = mk_filepath(Storage, Transfer, maps:get(name, Filemeta)),
|
TargetFilepath = mk_filepath(Storage, Transfer, [?RESULTDIR], maps:get(name, Filemeta)),
|
||||||
case verify_checksum(Ctx, Filemeta) of
|
case verify_checksum(Ctx, Filemeta) of
|
||||||
ok ->
|
ok ->
|
||||||
ok = file:close(IoDevice),
|
ok = file:close(IoDevice),
|
||||||
file:rename(Filepath, TargetFilepath);
|
mv_temp_file(Filepath, TargetFilepath);
|
||||||
{error, _} = Error ->
|
{error, _} = Error ->
|
||||||
_ = discard(Handle),
|
_ = discard(Handle),
|
||||||
Error
|
Error
|
||||||
|
@ -284,11 +286,11 @@ break_segment_filename(Filename) ->
|
||||||
{error, invalid}
|
{error, invalid}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
mk_filedir(Storage, {ClientId, FileId}) ->
|
mk_filedir(Storage, {ClientId, FileId}, SubDirs) ->
|
||||||
filename:join([get_storage_root(Storage), ClientId, FileId]).
|
filename:join([get_storage_root(Storage), ClientId, FileId | SubDirs]).
|
||||||
|
|
||||||
mk_filepath(Storage, Transfer, Filename) ->
|
mk_filepath(Storage, Transfer, SubDirs, Filename) ->
|
||||||
filename:join(mk_filedir(Storage, Transfer), Filename).
|
filename:join(mk_filedir(Storage, Transfer, SubDirs), Filename).
|
||||||
|
|
||||||
get_storage_root(Storage) ->
|
get_storage_root(Storage) ->
|
||||||
maps:get(root, Storage, filename:join(emqx:data_dir(), "file_transfer")).
|
maps:get(root, Storage, filename:join(emqx:data_dir(), "file_transfer")).
|
||||||
|
@ -315,13 +317,13 @@ safe_decode(Content, DecodeFun) ->
|
||||||
{error, corrupted}
|
{error, corrupted}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
write_file_atomic(Filepath, Content) when is_binary(Content) ->
|
write_file_atomic(Storage, Transfer, Filepath, Content) when is_binary(Content) ->
|
||||||
TempFilepath = mk_temp_filepath(Filepath),
|
TempFilepath = mk_temp_filepath(Storage, Transfer, filename:basename(Filepath)),
|
||||||
Result = emqx_misc:pipeline(
|
Result = emqx_misc:pipeline(
|
||||||
[
|
[
|
||||||
fun filelib:ensure_dir/1,
|
fun filelib:ensure_dir/1,
|
||||||
fun write_contents/2,
|
fun write_contents/2,
|
||||||
fun(FP) -> mv_temp_file(Filepath, FP) end
|
fun(_) -> mv_temp_file(TempFilepath, Filepath) end
|
||||||
],
|
],
|
||||||
TempFilepath,
|
TempFilepath,
|
||||||
Content
|
Content
|
||||||
|
@ -334,11 +336,9 @@ write_file_atomic(Filepath, Content) when is_binary(Content) ->
|
||||||
{error, Reason}
|
{error, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
mk_temp_filepath(Filepath) ->
|
mk_temp_filepath(Storage, Transfer, Filename) ->
|
||||||
Dirname = filename:dirname(Filepath),
|
|
||||||
Filename = filename:basename(Filepath),
|
|
||||||
Unique = erlang:unique_integer([positive]),
|
Unique = erlang:unique_integer([positive]),
|
||||||
filename:join(Dirname, mk_filename([?TEMP, Unique, ".", Filename])).
|
filename:join(mk_filedir(Storage, Transfer, [?TEMPDIR]), mk_filename([Unique, ".", Filename])).
|
||||||
|
|
||||||
mk_filename(Comps) ->
|
mk_filename(Comps) ->
|
||||||
lists:append(lists:map(fun mk_filename_component/1, Comps)).
|
lists:append(lists:map(fun mk_filename_component/1, Comps)).
|
||||||
|
@ -351,7 +351,8 @@ mk_filename_component(S) when is_list(S) -> S.
|
||||||
write_contents(Filepath, Content) ->
|
write_contents(Filepath, Content) ->
|
||||||
file:write_file(Filepath, Content).
|
file:write_file(Filepath, Content).
|
||||||
|
|
||||||
mv_temp_file(Filepath, TempFilepath) ->
|
mv_temp_file(TempFilepath, Filepath) ->
|
||||||
|
_ = filelib:ensure_dir(Filepath),
|
||||||
file:rename(TempFilepath, Filepath).
|
file:rename(TempFilepath, Filepath).
|
||||||
|
|
||||||
touch_file(Filepath) ->
|
touch_file(Filepath) ->
|
||||||
|
@ -365,7 +366,8 @@ mk_filefrag(Dirname, Filename = ?MANIFEST) ->
|
||||||
mk_filefrag(Dirname, Filename, filemeta, fun read_filemeta/2);
|
mk_filefrag(Dirname, Filename, filemeta, fun read_filemeta/2);
|
||||||
mk_filefrag(Dirname, Filename = ?SEGMENT ++ _) ->
|
mk_filefrag(Dirname, Filename = ?SEGMENT ++ _) ->
|
||||||
mk_filefrag(Dirname, Filename, segment, fun read_segmentinfo/2);
|
mk_filefrag(Dirname, Filename, segment, fun read_segmentinfo/2);
|
||||||
mk_filefrag(_Dirname, _) ->
|
mk_filefrag(_Dirname, _Filename) ->
|
||||||
|
% TODO this is unexpected, worth logging?
|
||||||
false.
|
false.
|
||||||
|
|
||||||
mk_filefrag(Dirname, Filename, Tag, Fun) ->
|
mk_filefrag(Dirname, Filename, Tag, Fun) ->
|
||||||
|
@ -380,6 +382,7 @@ mk_filefrag(Dirname, Filename, Tag, Fun) ->
|
||||||
fragment => {Tag, Frag}
|
fragment => {Tag, Frag}
|
||||||
}};
|
}};
|
||||||
{error, _Reason} ->
|
{error, _Reason} ->
|
||||||
|
% TODO loss of information
|
||||||
false
|
false
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -133,7 +133,7 @@ t_assemble_complete_local_transfer(Config) ->
|
||||||
).
|
).
|
||||||
|
|
||||||
mk_assembly_filename(Config, {ClientID, FileID}, Filename) ->
|
mk_assembly_filename(Config, {ClientID, FileID}, Filename) ->
|
||||||
filename:join([?config(storage_root, Config), ClientID, FileID, Filename]).
|
filename:join([?config(storage_root, Config), ClientID, FileID, result, Filename]).
|
||||||
|
|
||||||
on_assembly_finished(Result) ->
|
on_assembly_finished(Result) ->
|
||||||
?tp(test_assembly_finished, #{result => Result}).
|
?tp(test_assembly_finished, #{result => Result}).
|
||||||
|
|
Loading…
Reference in New Issue