fix(ft-gc): ensure directories of complete transfers are GCed
This commit is contained in:
parent
2707b4500f
commit
64f15f1fdb
|
@ -169,7 +169,7 @@ try_collect_transfer(Storage, Transfer, TransferInfo = #{}, Stats) ->
|
||||||
% TODO: collect empty directories separately
|
% TODO: collect empty directories separately
|
||||||
case FragCleaned and TempCleaned of
|
case FragCleaned and TempCleaned of
|
||||||
true ->
|
true ->
|
||||||
collect_transfer_directory(Storage, Transfer, Stats2);
|
collect_transfer_directory(Storage, Transfer, Cutoff, Stats2);
|
||||||
false ->
|
false ->
|
||||||
Stats2
|
Stats2
|
||||||
end;
|
end;
|
||||||
|
@ -191,18 +191,32 @@ collect_tempfiles(Storage, Transfer, Stats) ->
|
||||||
|
|
||||||
collect_outdated_fragments(Storage, Transfer, Cutoff, Stats) ->
|
collect_outdated_fragments(Storage, Transfer, Cutoff, Stats) ->
|
||||||
Dirname = emqx_ft_storage_fs:get_subdir(Storage, Transfer, fragment),
|
Dirname = emqx_ft_storage_fs:get_subdir(Storage, Transfer, fragment),
|
||||||
Filter = fun(_Filepath, #file_info{mtime = ModifiedAt}) -> ModifiedAt < Cutoff end,
|
maybe_collect_directory(Dirname, filter_older_than(Cutoff), Stats).
|
||||||
maybe_collect_directory(Dirname, Filter, Stats).
|
|
||||||
|
|
||||||
collect_outdated_tempfiles(Storage, Transfer, Cutoff, Stats) ->
|
collect_outdated_tempfiles(Storage, Transfer, Cutoff, Stats) ->
|
||||||
Dirname = emqx_ft_storage_fs:get_subdir(Storage, Transfer, temporary),
|
Dirname = emqx_ft_storage_fs:get_subdir(Storage, Transfer, temporary),
|
||||||
Filter = fun(_Filepath, #file_info{mtime = ModifiedAt}) -> ModifiedAt < Cutoff end,
|
maybe_collect_directory(Dirname, filter_older_than(Cutoff), Stats).
|
||||||
maybe_collect_directory(Dirname, Filter, Stats).
|
|
||||||
|
|
||||||
collect_transfer_directory(Storage, Transfer, Stats) ->
|
collect_transfer_directory(Storage, Transfer, Cutoff, Stats) ->
|
||||||
Dirname = emqx_ft_storage_fs:get_subdir(Storage, Transfer),
|
Dirname = emqx_ft_storage_fs:get_subdir(Storage, Transfer),
|
||||||
StatsNext = collect_empty_directory(Dirname, Stats),
|
Filter =
|
||||||
collect_parents(Dirname, get_storage_root(Storage), StatsNext).
|
case Stats of
|
||||||
|
#gcstats{directories = 0} ->
|
||||||
|
% Nothing were collected, this is a leftover from a past complete transfer GC.
|
||||||
|
filter_older_than(Cutoff);
|
||||||
|
#gcstats{} ->
|
||||||
|
% Usual incomplete transfer GC, collect directories unconditionally.
|
||||||
|
true
|
||||||
|
end,
|
||||||
|
case collect_empty_directory(Dirname, Filter, Stats) of
|
||||||
|
{true, StatsNext} ->
|
||||||
|
collect_parents(Dirname, get_storage_root(Storage), StatsNext);
|
||||||
|
{false, StatsNext} ->
|
||||||
|
StatsNext
|
||||||
|
end.
|
||||||
|
|
||||||
|
filter_older_than(Cutoff) ->
|
||||||
|
fun(_Filepath, #file_info{mtime = ModifiedAt}) -> ModifiedAt =< Cutoff end.
|
||||||
|
|
||||||
collect_parents(Dirname, Until, Stats) ->
|
collect_parents(Dirname, Until, Stats) ->
|
||||||
Parent = filename:dirname(Dirname),
|
Parent = filename:dirname(Dirname),
|
||||||
|
@ -218,14 +232,6 @@ collect_parents(Dirname, Until, Stats) ->
|
||||||
register_gcstat_error({directory, Parent}, Reason, Stats)
|
register_gcstat_error({directory, Parent}, Reason, Stats)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
% collect_outdated_fragment(#{path := Filepath, fileinfo := Fileinfo}, Cutoff, Stats) ->
|
|
||||||
% case Fileinfo#file_info.mtime of
|
|
||||||
% ModifiedAt when ModifiedAt < Cutoff ->
|
|
||||||
% collect_filepath(Filepath, Fileinfo, Stats);
|
|
||||||
% _ ->
|
|
||||||
% Stats
|
|
||||||
% end.
|
|
||||||
|
|
||||||
maybe_collect_directory(Dirpath, Filter, Stats) ->
|
maybe_collect_directory(Dirpath, Filter, Stats) ->
|
||||||
case filelib:is_dir(Dirpath) of
|
case filelib:is_dir(Dirpath) of
|
||||||
true ->
|
true ->
|
||||||
|
@ -263,10 +269,10 @@ collect_directory(Dirpath, Fileinfo, Filter, Stats) ->
|
||||||
case file:list_dir(Dirpath) of
|
case file:list_dir(Dirpath) of
|
||||||
{ok, Filenames} ->
|
{ok, Filenames} ->
|
||||||
{Clean, StatsNext} = collect_files(Dirpath, Filenames, Filter, Stats),
|
{Clean, StatsNext} = collect_files(Dirpath, Filenames, Filter, Stats),
|
||||||
case Clean andalso filter_filepath(Filter, Dirpath, Fileinfo) of
|
case Clean of
|
||||||
true ->
|
true ->
|
||||||
{true, collect_empty_directory(Dirpath, StatsNext)};
|
collect_empty_directory(Dirpath, Fileinfo, Filter, StatsNext);
|
||||||
_ ->
|
false ->
|
||||||
{false, StatsNext}
|
{false, StatsNext}
|
||||||
end;
|
end;
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
|
@ -284,13 +290,23 @@ collect_files(Dirname, Filenames, Filter, Stats) ->
|
||||||
Filenames
|
Filenames
|
||||||
).
|
).
|
||||||
|
|
||||||
collect_empty_directory(Dirpath, Stats) ->
|
collect_empty_directory(Dirpath, Filter, Stats) ->
|
||||||
case file:del_dir(Dirpath) of
|
case file:read_link_info(Dirpath, [{time, posix}, raw]) of
|
||||||
|
{ok, Dirinfo} ->
|
||||||
|
collect_empty_directory(Dirpath, Dirinfo, Filter, Stats);
|
||||||
|
{error, Reason} ->
|
||||||
|
{Reason == enoent, register_gcstat_error({directory, Dirpath}, Reason, Stats)}
|
||||||
|
end.
|
||||||
|
|
||||||
|
collect_empty_directory(Dirpath, Dirinfo, Filter, Stats) ->
|
||||||
|
case filter_filepath(Filter, Dirpath, Dirinfo) andalso file:del_dir(Dirpath) of
|
||||||
|
false ->
|
||||||
|
{false, Stats};
|
||||||
ok ->
|
ok ->
|
||||||
?tp(garbage_collected_directory, #{path => Dirpath}),
|
?tp(garbage_collected_directory, #{path => Dirpath}),
|
||||||
account_gcstat_directory(Stats);
|
{true, account_gcstat_directory(Stats)};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
register_gcstat_error({directory, Dirpath}, Reason, Stats)
|
{false, register_gcstat_error({directory, Dirpath}, Reason, Stats)}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
filter_filepath(Filter, _, _) when is_boolean(Filter) ->
|
filter_filepath(Filter, _, _) when is_boolean(Filter) ->
|
||||||
|
|
|
@ -104,6 +104,10 @@ t_gc_triggers_manually(_Config) ->
|
||||||
|
|
||||||
t_gc_complete_transfers(_Config) ->
|
t_gc_complete_transfers(_Config) ->
|
||||||
Storage = emqx_ft_conf:storage(),
|
Storage = emqx_ft_conf:storage(),
|
||||||
|
ok = emqx_config:put([file_transfer, storage, gc, minimum_segments_ttl], 0),
|
||||||
|
ok = emqx_config:put([file_transfer, storage, gc, maximum_segments_ttl], 3),
|
||||||
|
ok = emqx_config:put([file_transfer, storage, gc, interval], 500),
|
||||||
|
ok = emqx_ft_storage_fs_gc:reset(Storage),
|
||||||
Transfers = [
|
Transfers = [
|
||||||
{
|
{
|
||||||
T1 = {<<"client1">>, mk_file_id()},
|
T1 = {<<"client1">>, mk_file_id()},
|
||||||
|
@ -174,7 +178,20 @@ t_gc_complete_transfers(_Config) ->
|
||||||
?assertEqual(?NSEGS(S2, SS2) + ?NSEGS(S3, SS3), CFiles),
|
?assertEqual(?NSEGS(S2, SS2) + ?NSEGS(S3, SS3), CFiles),
|
||||||
?assertEqual(2 + 2, CDirectories),
|
?assertEqual(2 + 2, CDirectories),
|
||||||
?assertMatch(Space when Space > S2 + S3, CSpace),
|
?assertMatch(Space when Space > S2 + S3, CSpace),
|
||||||
?assertMatch(Errors when map_size(Errors) == 0, CErrors).
|
?assertMatch(Errors when map_size(Errors) == 0, CErrors),
|
||||||
|
% 4. Ensure that empty transfer directories will be eventually collected
|
||||||
|
{ok, _} = ?block_until(
|
||||||
|
#{
|
||||||
|
?snk_kind := garbage_collection,
|
||||||
|
stats := #gcstats{
|
||||||
|
files = 0,
|
||||||
|
directories = 6,
|
||||||
|
space = 0
|
||||||
|
}
|
||||||
|
},
|
||||||
|
5000,
|
||||||
|
0
|
||||||
|
).
|
||||||
|
|
||||||
t_gc_incomplete_transfers(_Config) ->
|
t_gc_incomplete_transfers(_Config) ->
|
||||||
ok = emqx_config:put([file_transfer, storage, gc, minimum_segments_ttl], 0),
|
ok = emqx_config:put([file_transfer, storage, gc, minimum_segments_ttl], 0),
|
||||||
|
|
Loading…
Reference in New Issue