fix(ft-gc): ensure GC of already complete transfers
This commit is contained in:
parent
75cf562c90
commit
d36c2c2928
|
@ -267,8 +267,12 @@ lookup_assembler([Source | Sources]) ->
|
||||||
|
|
||||||
check_if_already_exported(Storage, Transfer) ->
|
check_if_already_exported(Storage, Transfer) ->
|
||||||
case files(Storage, #{transfer => Transfer}) of
|
case files(Storage, #{transfer => Transfer}) of
|
||||||
{ok, #{items := [_ | _]}} -> ok;
|
{ok, #{items := [_ | _]}} ->
|
||||||
_ -> {error, not_found}
|
% NOTE: we don't know coverage here, let's just clean up locally.
|
||||||
|
_ = emqx_ft_storage_fs_gc:collect(Storage, Transfer, [node()]),
|
||||||
|
ok;
|
||||||
|
_ ->
|
||||||
|
{error, not_found}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
lookup_local_assembler(Transfer) ->
|
lookup_local_assembler(Transfer) ->
|
||||||
|
|
|
@ -266,6 +266,38 @@ t_gc_incomplete_transfers(_Config) ->
|
||||||
0
|
0
|
||||||
).
|
).
|
||||||
|
|
||||||
|
t_gc_repeated_transfer(_Config) ->
|
||||||
|
{local, Storage} = emqx_ft_storage:backend(),
|
||||||
|
Transfer = {
|
||||||
|
TID = {<<"clientclient">>, mk_file_id()},
|
||||||
|
#{name => "repeat.please", segments_ttl => 10},
|
||||||
|
emqx_ft_content_gen:new({?LINE, Size = 42}, 16)
|
||||||
|
},
|
||||||
|
Size = start_transfer(Storage, Transfer),
|
||||||
|
{ok, {ok, #{stats := Stats1}}} = ?wait_async_action(
|
||||||
|
?assertEqual(ok, complete_transfer(Storage, TID, Size)),
|
||||||
|
#{?snk_kind := garbage_collection},
|
||||||
|
1000
|
||||||
|
),
|
||||||
|
Size = start_transfer(Storage, Transfer),
|
||||||
|
{ok, {ok, #{stats := Stats2}}} = ?wait_async_action(
|
||||||
|
?assertEqual(ok, complete_transfer(Storage, TID, Size)),
|
||||||
|
#{?snk_kind := garbage_collection},
|
||||||
|
1000
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
#gcstats{files = 4, directories = 2},
|
||||||
|
Stats1
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
#gcstats{files = 4, directories = 2},
|
||||||
|
Stats2
|
||||||
|
),
|
||||||
|
?assertEqual(
|
||||||
|
{ok, []},
|
||||||
|
emqx_ft_storage_fs:list(Storage, TID, fragment)
|
||||||
|
).
|
||||||
|
|
||||||
t_gc_handling_errors(_Config) ->
|
t_gc_handling_errors(_Config) ->
|
||||||
ok = set_gc_config(minimum_segments_ttl, 0),
|
ok = set_gc_config(minimum_segments_ttl, 0),
|
||||||
ok = set_gc_config(maximum_segments_ttl, 0),
|
ok = set_gc_config(maximum_segments_ttl, 0),
|
||||||
|
@ -349,14 +381,18 @@ complete_transfer(Storage, Transfer, Size) ->
|
||||||
complete_transfer(Storage, Transfer, Size, 100).
|
complete_transfer(Storage, Transfer, Size, 100).
|
||||||
|
|
||||||
complete_transfer(Storage, Transfer, Size, Timeout) ->
|
complete_transfer(Storage, Transfer, Size, Timeout) ->
|
||||||
{async, Pid} = emqx_ft_storage_fs:assemble(Storage, Transfer, Size),
|
case emqx_ft_storage_fs:assemble(Storage, Transfer, Size) of
|
||||||
MRef = erlang:monitor(process, Pid),
|
ok ->
|
||||||
Pid ! kickoff,
|
ok;
|
||||||
receive
|
{async, Pid} ->
|
||||||
{'DOWN', MRef, process, Pid, {shutdown, Result}} ->
|
MRef = erlang:monitor(process, Pid),
|
||||||
Result
|
Pid ! kickoff,
|
||||||
after Timeout ->
|
receive
|
||||||
ct:fail("Assembler did not finish in time")
|
{'DOWN', MRef, process, Pid, {shutdown, Result}} ->
|
||||||
|
Result
|
||||||
|
after Timeout ->
|
||||||
|
ct:fail("Assembler did not finish in time")
|
||||||
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
mk_file_id() ->
|
mk_file_id() ->
|
||||||
|
|
Loading…
Reference in New Issue