diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl b/apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl index 20e4f468d..eb274a4d5 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl @@ -169,7 +169,7 @@ try_collect_transfer(Storage, Transfer, TransferInfo = #{}, Stats) -> % TODO: collect empty directories separately case FragCleaned and TempCleaned of true -> - collect_transfer_directory(Storage, Transfer, Stats2); + collect_transfer_directory(Storage, Transfer, Cutoff, Stats2); false -> Stats2 end; @@ -191,18 +191,32 @@ collect_tempfiles(Storage, Transfer, Stats) -> collect_outdated_fragments(Storage, Transfer, Cutoff, Stats) -> Dirname = emqx_ft_storage_fs:get_subdir(Storage, Transfer, fragment), - Filter = fun(_Filepath, #file_info{mtime = ModifiedAt}) -> ModifiedAt < Cutoff end, - maybe_collect_directory(Dirname, Filter, Stats). + maybe_collect_directory(Dirname, filter_older_than(Cutoff), Stats). collect_outdated_tempfiles(Storage, Transfer, Cutoff, Stats) -> Dirname = emqx_ft_storage_fs:get_subdir(Storage, Transfer, temporary), - Filter = fun(_Filepath, #file_info{mtime = ModifiedAt}) -> ModifiedAt < Cutoff end, - maybe_collect_directory(Dirname, Filter, Stats). + maybe_collect_directory(Dirname, filter_older_than(Cutoff), Stats). -collect_transfer_directory(Storage, Transfer, Stats) -> +collect_transfer_directory(Storage, Transfer, Cutoff, Stats) -> Dirname = emqx_ft_storage_fs:get_subdir(Storage, Transfer), - StatsNext = collect_empty_directory(Dirname, Stats), - collect_parents(Dirname, get_storage_root(Storage), StatsNext). + Filter = + case Stats of + #gcstats{directories = 0} -> + % Nothing were collected, this is a leftover from a past complete transfer GC. + filter_older_than(Cutoff); + #gcstats{} -> + % Usual incomplete transfer GC, collect directories unconditionally. + true + end, + case collect_empty_directory(Dirname, Filter, Stats) of + {true, StatsNext} -> + collect_parents(Dirname, get_storage_root(Storage), StatsNext); + {false, StatsNext} -> + StatsNext + end. + +filter_older_than(Cutoff) -> + fun(_Filepath, #file_info{mtime = ModifiedAt}) -> ModifiedAt =< Cutoff end. collect_parents(Dirname, Until, Stats) -> Parent = filename:dirname(Dirname), @@ -218,14 +232,6 @@ collect_parents(Dirname, Until, Stats) -> register_gcstat_error({directory, Parent}, Reason, Stats) end. -% collect_outdated_fragment(#{path := Filepath, fileinfo := Fileinfo}, Cutoff, Stats) -> -% case Fileinfo#file_info.mtime of -% ModifiedAt when ModifiedAt < Cutoff -> -% collect_filepath(Filepath, Fileinfo, Stats); -% _ -> -% Stats -% end. - maybe_collect_directory(Dirpath, Filter, Stats) -> case filelib:is_dir(Dirpath) of true -> @@ -263,10 +269,10 @@ collect_directory(Dirpath, Fileinfo, Filter, Stats) -> case file:list_dir(Dirpath) of {ok, Filenames} -> {Clean, StatsNext} = collect_files(Dirpath, Filenames, Filter, Stats), - case Clean andalso filter_filepath(Filter, Dirpath, Fileinfo) of + case Clean of true -> - {true, collect_empty_directory(Dirpath, StatsNext)}; - _ -> + collect_empty_directory(Dirpath, Fileinfo, Filter, StatsNext); + false -> {false, StatsNext} end; {error, Reason} -> @@ -284,13 +290,23 @@ collect_files(Dirname, Filenames, Filter, Stats) -> Filenames ). -collect_empty_directory(Dirpath, Stats) -> - case file:del_dir(Dirpath) of +collect_empty_directory(Dirpath, Filter, Stats) -> + case file:read_link_info(Dirpath, [{time, posix}, raw]) of + {ok, Dirinfo} -> + collect_empty_directory(Dirpath, Dirinfo, Filter, Stats); + {error, Reason} -> + {Reason == enoent, register_gcstat_error({directory, Dirpath}, Reason, Stats)} + end. + +collect_empty_directory(Dirpath, Dirinfo, Filter, Stats) -> + case filter_filepath(Filter, Dirpath, Dirinfo) andalso file:del_dir(Dirpath) of + false -> + {false, Stats}; ok -> ?tp(garbage_collected_directory, #{path => Dirpath}), - account_gcstat_directory(Stats); + {true, account_gcstat_directory(Stats)}; {error, Reason} -> - register_gcstat_error({directory, Dirpath}, Reason, Stats) + {false, register_gcstat_error({directory, Dirpath}, Reason, Stats)} end. filter_filepath(Filter, _, _) when is_boolean(Filter) -> diff --git a/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl b/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl index 25cd200f1..53a22b8b6 100644 --- a/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl @@ -104,6 +104,10 @@ t_gc_triggers_manually(_Config) -> t_gc_complete_transfers(_Config) -> Storage = emqx_ft_conf:storage(), + ok = emqx_config:put([file_transfer, storage, gc, minimum_segments_ttl], 0), + ok = emqx_config:put([file_transfer, storage, gc, maximum_segments_ttl], 3), + ok = emqx_config:put([file_transfer, storage, gc, interval], 500), + ok = emqx_ft_storage_fs_gc:reset(Storage), Transfers = [ { T1 = {<<"client1">>, mk_file_id()}, @@ -174,7 +178,20 @@ t_gc_complete_transfers(_Config) -> ?assertEqual(?NSEGS(S2, SS2) + ?NSEGS(S3, SS3), CFiles), ?assertEqual(2 + 2, CDirectories), ?assertMatch(Space when Space > S2 + S3, CSpace), - ?assertMatch(Errors when map_size(Errors) == 0, CErrors). + ?assertMatch(Errors when map_size(Errors) == 0, CErrors), + % 4. Ensure that empty transfer directories will be eventually collected + {ok, _} = ?block_until( + #{ + ?snk_kind := garbage_collection, + stats := #gcstats{ + files = 0, + directories = 6, + space = 0 + } + }, + 5000, + 0 + ). t_gc_incomplete_transfers(_Config) -> ok = emqx_config:put([file_transfer, storage, gc, minimum_segments_ttl], 0),