feat(ft-gc): treat all transfer as incomplete

Since the concept of _complete transfers_ is being split out into
the _export_ concept, we lose knowledge of completeness in the GC.
Instead of asking exporters for transfer statuses we just treat all
transfer as incomplete when GCing.
This commit is contained in:
Andrew Mayorov 2023-03-20 15:15:43 +03:00 committed by Ilya Averyanov
parent 0d39546080
commit 4f2600b9f1
4 changed files with 121 additions and 99 deletions

View File

@ -137,20 +137,10 @@ pread(Node, Segment, St) ->
%% %%
maybe_garbage_collect(ok, St = #st{storage = Storage, transfer = Transfer}) -> maybe_garbage_collect(ok, St = #st{storage = Storage, transfer = Transfer}) ->
Nodes = get_coverage_nodes(St), Nodes = emqx_ft_assembly:nodes(St#st.assembly),
emqx_ft_storage_fs_gc:collect(Storage, Transfer, Nodes); emqx_ft_storage_fs_gc:collect(Storage, Transfer, Nodes);
maybe_garbage_collect({error, _}, _St) -> maybe_garbage_collect({error, _}, _St) ->
ok. ok.
get_coverage_nodes(St) ->
Coverage = emqx_ft_assembly:coverage(St#st.assembly),
ordsets:to_list(
lists:foldl(
fun({Node, _Segment}, Acc) -> ordsets:add_element(Node, Acc) end,
ordsets:new(),
Coverage
)
).
segsize(#{fragment := {segment, Info}}) -> segsize(#{fragment := {segment, Info}}) ->
maps:get(size, Info). maps:get(size, Info).

View File

@ -22,6 +22,7 @@
-export([status/1]). -export([status/1]).
-export([filemeta/1]). -export([filemeta/1]).
-export([nodes/1]).
-export([coverage/1]). -export([coverage/1]).
-export([properties/1]). -export([properties/1]).
@ -108,6 +109,24 @@ filemeta(Asm) ->
coverage(#asm{coverage = Coverage}) -> coverage(#asm{coverage = Coverage}) ->
Coverage. Coverage.
-spec nodes(t()) -> [node()].
nodes(#asm{meta = Meta, segs = Segs}) ->
S1 = orddict:fold(
fun(_Meta, {Node, _Fragment}, Acc) ->
ordsets:add_element(Node, Acc)
end,
ordsets:new(),
Meta
),
S2 = emqx_wdgraph:fold(
fun(_Offset, {_End, _, {Node, _Fragment}}, Acc) ->
ordsets:add_element(Node, Acc)
end,
ordsets:new(),
Segs
),
ordsets:to_list(ordsets:union(S1, S2)).
properties(#asm{properties = Properties}) -> properties(#asm{properties = Properties}) ->
Properties. Properties.

View File

@ -80,11 +80,15 @@ handle_call(Call, From, St) ->
?SLOG(error, #{msg => "unexpected_call", call => Call, from => From}), ?SLOG(error, #{msg => "unexpected_call", call => Call, from => From}),
{noreply, St}. {noreply, St}.
% TODO handle_cast({collect, Transfer, [Node | Rest]}, St) ->
% handle_cast({collect, Transfer, [Node | Rest]}, St) -> ok = do_collect_transfer(Transfer, Node, St),
% ok = do_collect_transfer(Transfer, Node, St), case Rest of
% ok = collect(self(), Transfer, Rest), [_ | _] ->
% {noreply, St}; gen_server:cast(self(), {collect, Transfer, Rest});
[] ->
ok
end,
{noreply, St};
handle_cast(reset, St) -> handle_cast(reset, St) ->
{noreply, reset_timer(St)}; {noreply, reset_timer(St)};
handle_cast(Cast, St) -> handle_cast(Cast, St) ->
@ -95,10 +99,13 @@ handle_info({timeout, TRef, collect}, St = #st{next_gc_timer = TRef}) ->
StNext = do_collect_garbage(St), StNext = do_collect_garbage(St),
{noreply, start_timer(StNext#st{next_gc_timer = undefined})}. {noreply, start_timer(StNext#st{next_gc_timer = undefined})}.
% do_collect_transfer(Transfer, Node, St = #st{storage = Storage}) when Node == node() -> do_collect_transfer(Transfer, Node, St = #st{storage = Storage}) when Node == node() ->
% Stats = try_collect_transfer(Storage, Transfer, complete, init_gcstats()), Stats = try_collect_transfer(Storage, Transfer, complete, init_gcstats()),
% ok = maybe_report(Stats, St), ok = maybe_report(Stats, St),
% ok. ok;
do_collect_transfer(_Transfer, _Node, _St = #st{}) ->
% TODO
ok.
maybe_collect_garbage(_CalledAt, St = #st{last_gc = undefined}) -> maybe_collect_garbage(_CalledAt, St = #st{last_gc = undefined}) ->
do_collect_garbage(St); do_collect_garbage(St);
@ -149,21 +156,13 @@ collect_garbage(Storage, Transfers, Stats) ->
) )
). ).
try_collect_transfer(Storage, Transfer, #{status := complete}, Stats) -> try_collect_transfer(Storage, Transfer, TransferInfo = #{}, Stats) ->
% File transfer is complete. % File transfer might still be incomplete.
% We should be good to delete fragments and temporary files with their respective
% directories altogether.
% TODO: file expiration
{_, Stats1} = collect_fragments(Storage, Transfer, Stats),
{_, Stats2} = collect_tempfiles(Storage, Transfer, Stats1),
Stats2;
try_collect_transfer(Storage, Transfer, #{status := incomplete}, Stats) ->
% File transfer is still incomplete.
% Any outdated fragments and temporary files should be collectable. As a kind of % Any outdated fragments and temporary files should be collectable. As a kind of
% heuristic we only delete transfer directory itself only if it is also outdated % heuristic we only delete transfer directory itself only if it is also outdated
% _and was empty at the start of GC_, as a precaution against races between % _and was empty at the start of GC_, as a precaution against races between
% writers and GCs. % writers and GCs.
TTL = get_segments_ttl(Storage, Transfer), TTL = get_segments_ttl(Storage, TransferInfo),
Cutoff = erlang:system_time(second) - TTL, Cutoff = erlang:system_time(second) - TTL,
{FragCleaned, Stats1} = collect_outdated_fragments(Storage, Transfer, Cutoff, Stats), {FragCleaned, Stats1} = collect_outdated_fragments(Storage, Transfer, Cutoff, Stats),
{TempCleaned, Stats2} = collect_outdated_tempfiles(Storage, Transfer, Cutoff, Stats1), {TempCleaned, Stats2} = collect_outdated_tempfiles(Storage, Transfer, Cutoff, Stats1),
@ -173,7 +172,14 @@ try_collect_transfer(Storage, Transfer, #{status := incomplete}, Stats) ->
collect_transfer_directory(Storage, Transfer, Stats2); collect_transfer_directory(Storage, Transfer, Stats2);
false -> false ->
Stats2 Stats2
end. end;
try_collect_transfer(Storage, Transfer, complete, Stats) ->
% File transfer is complete.
% We should be good to delete fragments and temporary files with their respective
% directories altogether.
{_, Stats1} = collect_fragments(Storage, Transfer, Stats),
{_, Stats2} = collect_tempfiles(Storage, Transfer, Stats1),
Stats2.
collect_fragments(Storage, Transfer, Stats) -> collect_fragments(Storage, Transfer, Stats) ->
Dirname = emqx_ft_storage_fs:get_subdir(Storage, Transfer, fragment), Dirname = emqx_ft_storage_fs:get_subdir(Storage, Transfer, fragment),

View File

@ -45,9 +45,11 @@ init_per_testcase(TC, Config) ->
}) })
end end
), ),
ok = snabbkaffe:start_trace(),
Config. Config.
end_per_testcase(_TC, _Config) -> end_per_testcase(_TC, _Config) ->
ok = snabbkaffe:stop(),
ok = application:stop(emqx_ft), ok = application:stop(emqx_ft),
ok. ok.
@ -126,20 +128,29 @@ t_gc_complete_transfers(_Config) ->
emqx_ft_storage_fs_gc:collect(Storage) emqx_ft_storage_fs_gc:collect(Storage)
), ),
% 2. Complete just the first transfer % 2. Complete just the first transfer
?assertEqual( {ok, {ok, Event}} = ?wait_async_action(
ok, ?assertEqual(ok, complete_transfer(Storage, T1, S1)),
complete_transfer(Storage, T1, S1) #{?snk_kind := garbage_collection},
1000
), ),
?assertMatch( ?assertMatch(
#gcstats{ #{
files = Files, stats := #gcstats{
directories = 2, files = Files,
space = Space, directories = 2,
errors = #{} = Es space = Space,
errors = #{} = Es
}
} when Files == ?NSEGS(S1, SS1) andalso Space > S1 andalso map_size(Es) == 0, } when Files == ?NSEGS(S1, SS1) andalso Space > S1 andalso map_size(Es) == 0,
emqx_ft_storage_fs_gc:collect(Storage) Event
), ),
% 3. Complete rest of transfers % 3. Complete rest of transfers
{ok, Sub} = snabbkaffe_collector:subscribe(
?match_event(#{?snk_kind := garbage_collection}),
2,
1000,
0
),
?assertEqual( ?assertEqual(
[ok, ok], [ok, ok],
emqx_misc:pmap( emqx_misc:pmap(
@ -147,18 +158,19 @@ t_gc_complete_transfers(_Config) ->
[{T2, S2}, {T3, S3}] [{T2, S2}, {T3, S3}]
) )
), ),
?assertMatch( {ok, Events} = snabbkaffe_collector:receive_events(Sub),
#gcstats{ CFiles = lists:sum([Stats#gcstats.files || #{stats := Stats} <- Events]),
files = Files, CDirectories = lists:sum([Stats#gcstats.directories || #{stats := Stats} <- Events]),
directories = 4, CSpace = lists:sum([Stats#gcstats.space || #{stats := Stats} <- Events]),
space = Space, CErrors = lists:foldl(
errors = #{} = Es fun maps:merge/2,
} when #{},
Files == (?NSEGS(S2, SS2) + ?NSEGS(S3, SS3)) andalso [Stats#gcstats.errors || #{stats := Stats} <- Events]
Space > (S2 + S3) andalso ),
map_size(Es) == 0, ?assertEqual(?NSEGS(S2, SS2) + ?NSEGS(S3, SS3), CFiles),
emqx_ft_storage_fs_gc:collect(Storage) ?assertEqual(2 + 2, CDirectories),
). ?assertMatch(Space when Space > S2 + S3, CSpace),
?assertMatch(Errors when map_size(Errors) == 0, CErrors).
t_gc_incomplete_transfers(_Config) -> t_gc_incomplete_transfers(_Config) ->
ok = emqx_config:put([file_transfer, storage, gc, minimum_segments_ttl], 0), ok = emqx_config:put([file_transfer, storage, gc, minimum_segments_ttl], 0),
@ -188,52 +200,47 @@ t_gc_incomplete_transfers(_Config) ->
], ],
% 1. Start transfers, send all the segments but don't trigger completion. % 1. Start transfers, send all the segments but don't trigger completion.
_ = emqx_misc:pmap(fun(Transfer) -> start_transfer(Storage, Transfer) end, Transfers), _ = emqx_misc:pmap(fun(Transfer) -> start_transfer(Storage, Transfer) end, Transfers),
?check_trace( % 2. Enable periodic GC every 0.5 seconds.
begin ok = emqx_config:put([file_transfer, storage, gc, interval], 500),
% 2. Enable periodic GC every 0.5 seconds. ok = emqx_ft_storage_fs_gc:reset(Storage),
ok = emqx_config:put([file_transfer, storage, gc, interval], 500), % 3. First we need the first transfer to be collected.
ok = emqx_ft_storage_fs_gc:reset(Storage), {ok, _} = ?block_until(
% 3. First we need the first transfer to be collected. #{
{ok, _} = ?block_until( ?snk_kind := garbage_collection,
#{ stats := #gcstats{
?snk_kind := garbage_collection, files = Files,
stats := #gcstats{ directories = 4,
files = Files, space = Space
directories = 4, }
space = Space } when Files == (?NSEGS(S1, SS1)) andalso Space > S1,
} 5000,
} when Files == (?NSEGS(S1, SS1)) andalso Space > S1, 0
5000, ),
0 % 4. Then the second one.
), {ok, _} = ?block_until(
% 4. Then the second one. #{
{ok, _} = ?block_until( ?snk_kind := garbage_collection,
#{ stats := #gcstats{
?snk_kind := garbage_collection, files = Files,
stats := #gcstats{ directories = 4,
files = Files, space = Space
directories = 4, }
space = Space } when Files == (?NSEGS(S2, SS2)) andalso Space > S2,
} 5000,
} when Files == (?NSEGS(S2, SS2)) andalso Space > S2, 0
5000, ),
0 % 5. Then transfers 3 and 4 because 3rd has too big TTL and 4th has no specific TTL.
), {ok, _} = ?block_until(
% 5. Then transfers 3 and 4 because 3rd has too big TTL and 4th has no specific TTL. #{
{ok, _} = ?block_until( ?snk_kind := garbage_collection,
#{ stats := #gcstats{
?snk_kind := garbage_collection, files = Files,
stats := #gcstats{ directories = 4 * 2,
files = Files, space = Space
directories = 4 * 2, }
space = Space } when Files == (?NSEGS(S3, SS3) + ?NSEGS(S4, SS4)) andalso Space > S3 + S4,
} 5000,
} when Files == (?NSEGS(S3, SS3) + ?NSEGS(S4, SS4)) andalso Space > S3 + S4, 0
5000,
0
)
end,
[]
). ).
t_gc_handling_errors(_Config) -> t_gc_handling_errors(_Config) ->