diff --git a/apps/emqx_ft/src/emqx_ft_assembler.erl b/apps/emqx_ft/src/emqx_ft_assembler.erl index 38ccf13ac..ff845fee9 100644 --- a/apps/emqx_ft/src/emqx_ft_assembler.erl +++ b/apps/emqx_ft/src/emqx_ft_assembler.erl @@ -137,20 +137,10 @@ pread(Node, Segment, St) -> %% maybe_garbage_collect(ok, St = #st{storage = Storage, transfer = Transfer}) -> - Nodes = get_coverage_nodes(St), + Nodes = emqx_ft_assembly:nodes(St#st.assembly), emqx_ft_storage_fs_gc:collect(Storage, Transfer, Nodes); maybe_garbage_collect({error, _}, _St) -> ok. -get_coverage_nodes(St) -> - Coverage = emqx_ft_assembly:coverage(St#st.assembly), - ordsets:to_list( - lists:foldl( - fun({Node, _Segment}, Acc) -> ordsets:add_element(Node, Acc) end, - ordsets:new(), - Coverage - ) - ). - segsize(#{fragment := {segment, Info}}) -> maps:get(size, Info). diff --git a/apps/emqx_ft/src/emqx_ft_assembly.erl b/apps/emqx_ft/src/emqx_ft_assembly.erl index bea320bbf..d0998d6ec 100644 --- a/apps/emqx_ft/src/emqx_ft_assembly.erl +++ b/apps/emqx_ft/src/emqx_ft_assembly.erl @@ -22,6 +22,7 @@ -export([status/1]). -export([filemeta/1]). +-export([nodes/1]). -export([coverage/1]). -export([properties/1]). @@ -108,6 +109,24 @@ filemeta(Asm) -> coverage(#asm{coverage = Coverage}) -> Coverage. +-spec nodes(t()) -> [node()]. +nodes(#asm{meta = Meta, segs = Segs}) -> + S1 = orddict:fold( + fun(_Meta, {Node, _Fragment}, Acc) -> + ordsets:add_element(Node, Acc) + end, + ordsets:new(), + Meta + ), + S2 = emqx_wdgraph:fold( + fun(_Offset, {_End, _, {Node, _Fragment}}, Acc) -> + ordsets:add_element(Node, Acc) + end, + ordsets:new(), + Segs + ), + ordsets:to_list(ordsets:union(S1, S2)). + properties(#asm{properties = Properties}) -> Properties. diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl b/apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl index 98b048108..58c5dbfdf 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl @@ -80,11 +80,15 @@ handle_call(Call, From, St) -> ?SLOG(error, #{msg => "unexpected_call", call => Call, from => From}), {noreply, St}. -% TODO -% handle_cast({collect, Transfer, [Node | Rest]}, St) -> -% ok = do_collect_transfer(Transfer, Node, St), -% ok = collect(self(), Transfer, Rest), -% {noreply, St}; +handle_cast({collect, Transfer, [Node | Rest]}, St) -> + ok = do_collect_transfer(Transfer, Node, St), + case Rest of + [_ | _] -> + gen_server:cast(self(), {collect, Transfer, Rest}); + [] -> + ok + end, + {noreply, St}; handle_cast(reset, St) -> {noreply, reset_timer(St)}; handle_cast(Cast, St) -> @@ -95,10 +99,13 @@ handle_info({timeout, TRef, collect}, St = #st{next_gc_timer = TRef}) -> StNext = do_collect_garbage(St), {noreply, start_timer(StNext#st{next_gc_timer = undefined})}. -% do_collect_transfer(Transfer, Node, St = #st{storage = Storage}) when Node == node() -> -% Stats = try_collect_transfer(Storage, Transfer, complete, init_gcstats()), -% ok = maybe_report(Stats, St), -% ok. +do_collect_transfer(Transfer, Node, St = #st{storage = Storage}) when Node == node() -> + Stats = try_collect_transfer(Storage, Transfer, complete, init_gcstats()), + ok = maybe_report(Stats, St), + ok; +do_collect_transfer(_Transfer, _Node, _St = #st{}) -> + % TODO + ok. maybe_collect_garbage(_CalledAt, St = #st{last_gc = undefined}) -> do_collect_garbage(St); @@ -149,21 +156,13 @@ collect_garbage(Storage, Transfers, Stats) -> ) ). -try_collect_transfer(Storage, Transfer, #{status := complete}, Stats) -> - % File transfer is complete. - % We should be good to delete fragments and temporary files with their respective - % directories altogether. - % TODO: file expiration - {_, Stats1} = collect_fragments(Storage, Transfer, Stats), - {_, Stats2} = collect_tempfiles(Storage, Transfer, Stats1), - Stats2; -try_collect_transfer(Storage, Transfer, #{status := incomplete}, Stats) -> - % File transfer is still incomplete. +try_collect_transfer(Storage, Transfer, TransferInfo = #{}, Stats) -> + % File transfer might still be incomplete. % Any outdated fragments and temporary files should be collectable. As a kind of % heuristic we only delete transfer directory itself only if it is also outdated % _and was empty at the start of GC_, as a precaution against races between % writers and GCs. - TTL = get_segments_ttl(Storage, Transfer), + TTL = get_segments_ttl(Storage, TransferInfo), Cutoff = erlang:system_time(second) - TTL, {FragCleaned, Stats1} = collect_outdated_fragments(Storage, Transfer, Cutoff, Stats), {TempCleaned, Stats2} = collect_outdated_tempfiles(Storage, Transfer, Cutoff, Stats1), @@ -173,7 +172,14 @@ try_collect_transfer(Storage, Transfer, #{status := incomplete}, Stats) -> collect_transfer_directory(Storage, Transfer, Stats2); false -> Stats2 - end. + end; +try_collect_transfer(Storage, Transfer, complete, Stats) -> + % File transfer is complete. + % We should be good to delete fragments and temporary files with their respective + % directories altogether. + {_, Stats1} = collect_fragments(Storage, Transfer, Stats), + {_, Stats2} = collect_tempfiles(Storage, Transfer, Stats1), + Stats2. collect_fragments(Storage, Transfer, Stats) -> Dirname = emqx_ft_storage_fs:get_subdir(Storage, Transfer, fragment), diff --git a/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl b/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl index 37ac7eedf..89e9eb970 100644 --- a/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl @@ -45,9 +45,11 @@ init_per_testcase(TC, Config) -> }) end ), + ok = snabbkaffe:start_trace(), Config. end_per_testcase(_TC, _Config) -> + ok = snabbkaffe:stop(), ok = application:stop(emqx_ft), ok. @@ -126,20 +128,29 @@ t_gc_complete_transfers(_Config) -> emqx_ft_storage_fs_gc:collect(Storage) ), % 2. Complete just the first transfer - ?assertEqual( - ok, - complete_transfer(Storage, T1, S1) + {ok, {ok, Event}} = ?wait_async_action( + ?assertEqual(ok, complete_transfer(Storage, T1, S1)), + #{?snk_kind := garbage_collection}, + 1000 ), ?assertMatch( - #gcstats{ - files = Files, - directories = 2, - space = Space, - errors = #{} = Es + #{ + stats := #gcstats{ + files = Files, + directories = 2, + space = Space, + errors = #{} = Es + } } when Files == ?NSEGS(S1, SS1) andalso Space > S1 andalso map_size(Es) == 0, - emqx_ft_storage_fs_gc:collect(Storage) + Event ), % 3. Complete rest of transfers + {ok, Sub} = snabbkaffe_collector:subscribe( + ?match_event(#{?snk_kind := garbage_collection}), + 2, + 1000, + 0 + ), ?assertEqual( [ok, ok], emqx_misc:pmap( @@ -147,18 +158,19 @@ t_gc_complete_transfers(_Config) -> [{T2, S2}, {T3, S3}] ) ), - ?assertMatch( - #gcstats{ - files = Files, - directories = 4, - space = Space, - errors = #{} = Es - } when - Files == (?NSEGS(S2, SS2) + ?NSEGS(S3, SS3)) andalso - Space > (S2 + S3) andalso - map_size(Es) == 0, - emqx_ft_storage_fs_gc:collect(Storage) - ). + {ok, Events} = snabbkaffe_collector:receive_events(Sub), + CFiles = lists:sum([Stats#gcstats.files || #{stats := Stats} <- Events]), + CDirectories = lists:sum([Stats#gcstats.directories || #{stats := Stats} <- Events]), + CSpace = lists:sum([Stats#gcstats.space || #{stats := Stats} <- Events]), + CErrors = lists:foldl( + fun maps:merge/2, + #{}, + [Stats#gcstats.errors || #{stats := Stats} <- Events] + ), + ?assertEqual(?NSEGS(S2, SS2) + ?NSEGS(S3, SS3), CFiles), + ?assertEqual(2 + 2, CDirectories), + ?assertMatch(Space when Space > S2 + S3, CSpace), + ?assertMatch(Errors when map_size(Errors) == 0, CErrors). t_gc_incomplete_transfers(_Config) -> ok = emqx_config:put([file_transfer, storage, gc, minimum_segments_ttl], 0), @@ -188,52 +200,47 @@ t_gc_incomplete_transfers(_Config) -> ], % 1. Start transfers, send all the segments but don't trigger completion. _ = emqx_misc:pmap(fun(Transfer) -> start_transfer(Storage, Transfer) end, Transfers), - ?check_trace( - begin - % 2. Enable periodic GC every 0.5 seconds. - ok = emqx_config:put([file_transfer, storage, gc, interval], 500), - ok = emqx_ft_storage_fs_gc:reset(Storage), - % 3. First we need the first transfer to be collected. - {ok, _} = ?block_until( - #{ - ?snk_kind := garbage_collection, - stats := #gcstats{ - files = Files, - directories = 4, - space = Space - } - } when Files == (?NSEGS(S1, SS1)) andalso Space > S1, - 5000, - 0 - ), - % 4. Then the second one. - {ok, _} = ?block_until( - #{ - ?snk_kind := garbage_collection, - stats := #gcstats{ - files = Files, - directories = 4, - space = Space - } - } when Files == (?NSEGS(S2, SS2)) andalso Space > S2, - 5000, - 0 - ), - % 5. Then transfers 3 and 4 because 3rd has too big TTL and 4th has no specific TTL. - {ok, _} = ?block_until( - #{ - ?snk_kind := garbage_collection, - stats := #gcstats{ - files = Files, - directories = 4 * 2, - space = Space - } - } when Files == (?NSEGS(S3, SS3) + ?NSEGS(S4, SS4)) andalso Space > S3 + S4, - 5000, - 0 - ) - end, - [] + % 2. Enable periodic GC every 0.5 seconds. + ok = emqx_config:put([file_transfer, storage, gc, interval], 500), + ok = emqx_ft_storage_fs_gc:reset(Storage), + % 3. First we need the first transfer to be collected. + {ok, _} = ?block_until( + #{ + ?snk_kind := garbage_collection, + stats := #gcstats{ + files = Files, + directories = 4, + space = Space + } + } when Files == (?NSEGS(S1, SS1)) andalso Space > S1, + 5000, + 0 + ), + % 4. Then the second one. + {ok, _} = ?block_until( + #{ + ?snk_kind := garbage_collection, + stats := #gcstats{ + files = Files, + directories = 4, + space = Space + } + } when Files == (?NSEGS(S2, SS2)) andalso Space > S2, + 5000, + 0 + ), + % 5. Then transfers 3 and 4 because 3rd has too big TTL and 4th has no specific TTL. + {ok, _} = ?block_until( + #{ + ?snk_kind := garbage_collection, + stats := #gcstats{ + files = Files, + directories = 4 * 2, + space = Space + } + } when Files == (?NSEGS(S3, SS3) + ?NSEGS(S4, SS4)) andalso Space > S3 + S4, + 5000, + 0 ). t_gc_handling_errors(_Config) ->