diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_fs.erl index 010d004a1..7a0a6b3b4 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs.erl @@ -267,8 +267,12 @@ lookup_assembler([Source | Sources]) -> check_if_already_exported(Storage, Transfer) -> case files(Storage, #{transfer => Transfer}) of - {ok, #{items := [_ | _]}} -> ok; - _ -> {error, not_found} + {ok, #{items := [_ | _]}} -> + % NOTE: we don't know coverage here, let's just clean up locally. + _ = emqx_ft_storage_fs_gc:collect(Storage, Transfer, [node()]), + ok; + _ -> + {error, not_found} end. lookup_local_assembler(Transfer) -> diff --git a/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl b/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl index a7ffd5675..842ae6bad 100644 --- a/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl @@ -266,6 +266,38 @@ t_gc_incomplete_transfers(_Config) -> 0 ). +t_gc_repeated_transfer(_Config) -> + {local, Storage} = emqx_ft_storage:backend(), + Transfer = { + TID = {<<"clientclient">>, mk_file_id()}, + #{name => "repeat.please", segments_ttl => 10}, + emqx_ft_content_gen:new({?LINE, Size = 42}, 16) + }, + Size = start_transfer(Storage, Transfer), + {ok, {ok, #{stats := Stats1}}} = ?wait_async_action( + ?assertEqual(ok, complete_transfer(Storage, TID, Size)), + #{?snk_kind := garbage_collection}, + 1000 + ), + Size = start_transfer(Storage, Transfer), + {ok, {ok, #{stats := Stats2}}} = ?wait_async_action( + ?assertEqual(ok, complete_transfer(Storage, TID, Size)), + #{?snk_kind := garbage_collection}, + 1000 + ), + ?assertMatch( + #gcstats{files = 4, directories = 2}, + Stats1 + ), + ?assertMatch( + #gcstats{files = 4, directories = 2}, + Stats2 + ), + ?assertEqual( + {ok, []}, + emqx_ft_storage_fs:list(Storage, TID, fragment) + ). + t_gc_handling_errors(_Config) -> ok = set_gc_config(minimum_segments_ttl, 0), ok = set_gc_config(maximum_segments_ttl, 0), @@ -349,14 +381,18 @@ complete_transfer(Storage, Transfer, Size) -> complete_transfer(Storage, Transfer, Size, 100). complete_transfer(Storage, Transfer, Size, Timeout) -> - {async, Pid} = emqx_ft_storage_fs:assemble(Storage, Transfer, Size), - MRef = erlang:monitor(process, Pid), - Pid ! kickoff, - receive - {'DOWN', MRef, process, Pid, {shutdown, Result}} -> - Result - after Timeout -> - ct:fail("Assembler did not finish in time") + case emqx_ft_storage_fs:assemble(Storage, Transfer, Size) of + ok -> + ok; + {async, Pid} -> + MRef = erlang:monitor(process, Pid), + Pid ! kickoff, + receive + {'DOWN', MRef, process, Pid, {shutdown, Result}} -> + Result + after Timeout -> + ct:fail("Assembler did not finish in time") + end end. mk_file_id() ->