diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl index 5dd587a4e..16920d60b 100644 --- a/apps/emqx/test/emqx_common_test_helpers.erl +++ b/apps/emqx/test/emqx_common_test_helpers.erl @@ -30,6 +30,7 @@ start_apps/1, start_apps/2, start_apps/3, + start_app/2, stop_apps/1, reload/2, app_path/2, diff --git a/apps/emqx_ft/include/emqx_ft_storage_fs.hrl b/apps/emqx_ft/include/emqx_ft_storage_fs.hrl new file mode 100644 index 000000000..72ebe586a --- /dev/null +++ b/apps/emqx_ft/include/emqx_ft_storage_fs.hrl @@ -0,0 +1,29 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-ifndef(EMQX_FT_STORAGE_FS_HRL). +-define(EMQX_FT_STORAGE_FS_HRL, true). + +-record(gcstats, { + started_at :: integer(), + finished_at :: integer() | undefined, + files = 0 :: non_neg_integer(), + directories = 0 :: non_neg_integer(), + space = 0 :: non_neg_integer(), + errors = #{} :: #{_GCSubject => {error, _}} +}). + +-endif. diff --git a/apps/emqx_ft/src/emqx_ft_assembler.erl b/apps/emqx_ft/src/emqx_ft_assembler.erl index 9e3ddfcbd..38ccf13ac 100644 --- a/apps/emqx_ft/src/emqx_ft_assembler.erl +++ b/apps/emqx_ft/src/emqx_ft_assembler.erl @@ -126,6 +126,7 @@ handle_event(internal, _, {assemble, []}, St = #st{}) -> handle_event(internal, _, complete, St = #st{assembly = Asm, file = Handle}) -> Filemeta = emqx_ft_assembly:filemeta(Asm), Result = emqx_ft_storage_fs:complete(St#st.storage, St#st.transfer, Filemeta, Handle), + ok = maybe_garbage_collect(Result, St), {stop, {shutdown, Result}}. pread(Node, Segment, St) when Node =:= node() -> @@ -135,5 +136,21 @@ pread(Node, Segment, St) -> %% +maybe_garbage_collect(ok, St = #st{storage = Storage, transfer = Transfer}) -> + Nodes = get_coverage_nodes(St), + emqx_ft_storage_fs_gc:collect(Storage, Transfer, Nodes); +maybe_garbage_collect({error, _}, _St) -> + ok. + +get_coverage_nodes(St) -> + Coverage = emqx_ft_assembly:coverage(St#st.assembly), + ordsets:to_list( + lists:foldl( + fun({Node, _Segment}, Acc) -> ordsets:add_element(Node, Acc) end, + ordsets:new(), + Coverage + ) + ). + segsize(#{fragment := {segment, Info}}) -> maps:get(size, Info). diff --git a/apps/emqx_ft/src/emqx_ft_conf.erl b/apps/emqx_ft/src/emqx_ft_conf.erl index d56dd8d32..444462716 100644 --- a/apps/emqx_ft/src/emqx_ft_conf.erl +++ b/apps/emqx_ft/src/emqx_ft_conf.erl @@ -20,6 +20,11 @@ -behaviour(emqx_config_handler). +%% Accessors +-export([storage/0]). +-export([gc_interval/1]). +-export([segments_ttl/1]). + %% Load/Unload -export([ load/0, @@ -32,6 +37,30 @@ post_config_update/5 ]). +-type milliseconds() :: non_neg_integer(). +-type seconds() :: non_neg_integer(). + +%%-------------------------------------------------------------------- +%% Accessors +%%-------------------------------------------------------------------- + +-spec storage() -> _Storage | disabled. +storage() -> + emqx_config:get([file_transfer, storage], disabled). + +-spec gc_interval(_Storage) -> milliseconds(). +gc_interval(_Storage) -> + % TODO: config wiring + application:get_env(emqx_ft, gc_interval, timer:minutes(10)). + +-spec segments_ttl(_Storage) -> {_Min :: seconds(), _Max :: seconds()}. +segments_ttl(_Storage) -> + % TODO: config wiring + { + application:get_env(emqx_ft, min_segments_ttl, 60), + application:get_env(emqx_ft, max_segments_ttl, 72 * 3600) + }. + %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- diff --git a/apps/emqx_ft/src/emqx_ft_storage.erl b/apps/emqx_ft/src/emqx_ft_storage.erl index 7a95a0454..0b8c38736 100644 --- a/apps/emqx_ft/src/emqx_ft_storage.erl +++ b/apps/emqx_ft/src/emqx_ft_storage.erl @@ -18,6 +18,8 @@ -export( [ + child_spec/0, + store_filemeta/2, store_segment/2, assemble/2, @@ -64,6 +66,17 @@ %% API %%-------------------------------------------------------------------- +-spec child_spec() -> + [supervisor:child_spec()]. +child_spec() -> + try + Mod = mod(), + Mod:child_spec(storage()) + catch + error:disabled -> []; + error:undef -> [] + end. + -spec store_filemeta(emqx_ft:transfer(), emqx_ft:filemeta()) -> ok | {async, pid()} | {error, term()}. store_filemeta(Transfer, FileMeta) -> @@ -99,6 +112,8 @@ with_storage_type(Type, Fun, Args) -> #{type := Type} -> Mod = mod(Storage), apply(Mod, Fun, [Storage | Args]); + disabled -> + {error, disabled}; _ -> {error, {invalid_storage_type, Type}} end. @@ -108,7 +123,7 @@ with_storage_type(Type, Fun, Args) -> %%-------------------------------------------------------------------- storage() -> - emqx_config:get([file_transfer, storage]). + emqx_ft_conf:storage(). mod() -> mod(storage()). @@ -116,6 +131,8 @@ mod() -> mod(Storage) -> case Storage of #{type := local} -> - emqx_ft_storage_fs + emqx_ft_storage_fs; + disabled -> + error(disabled) % emqx_ft_storage_dummy end. diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_fs.erl index 2cc19d2a2..b8aef5276 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs.erl @@ -14,6 +14,12 @@ %% limitations under the License. %%-------------------------------------------------------------------- +%% Filesystem storage backend +%% +%% NOTE +%% If you plan to change storage layout please consult `emqx_ft_storage_fs_gc` +%% to see how much it would break or impair GC. + -module(emqx_ft_storage_fs). -behaviour(emqx_ft_storage). @@ -21,14 +27,22 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("snabbkaffe/include/trace.hrl"). +-export([child_spec/1]). + -export([store_filemeta/3]). -export([store_segment/3]). +-export([read_filemeta/2]). -export([list/3]). -export([pread/5]). -export([assemble/3]). -export([transfers/1]). +% GC API +% TODO: This is quickly becomes hairy. +-export([get_subdir/2]). +-export([get_subdir/3]). + -export([ready_transfers_local/1]). -export([get_ready_transfer_local/3]). @@ -40,6 +54,7 @@ -export([write/2]). -export([discard/1]). +-export_type([storage/0]). -export_type([filefrag/1]). -export_type([filefrag/0]). -export_type([transferinfo/0]). @@ -79,7 +94,9 @@ -define(MANIFEST, "MANIFEST.json"). -define(SEGMENT, "SEG"). --type storage() :: emqx_ft_storage:storage(). +-type storage() :: #{ + root => file:name() +}. -type file_error() :: file:posix() @@ -88,13 +105,25 @@ %% System limit (e.g. number of ports) reached. | system_limit. +%% Related resources childspecs +-spec child_spec(storage()) -> + [supervisor:child_spec()]. +child_spec(Storage) -> + [ + #{ + id => emqx_ft_storage_fs_gc, + start => {emqx_ft_storage_fs_gc, start_link, [Storage]}, + restart => permanent + } + ]. + %% Store manifest in the backing filesystem. %% Atomic operation. -spec store_filemeta(storage(), transfer(), filemeta()) -> % Quota? Some lower level errors? ok | {error, conflict} | {error, file_error()}. store_filemeta(Storage, Transfer, Meta) -> - Filepath = mk_filepath(Storage, Transfer, [?FRAGDIR], ?MANIFEST), + Filepath = mk_filepath(Storage, Transfer, get_subdirs_for(fragment), ?MANIFEST), case read_file(Filepath, fun decode_filemeta/1) of {ok, Meta} -> _ = touch_file(Filepath), @@ -119,9 +148,16 @@ store_filemeta(Storage, Transfer, Meta) -> % Quota? Some lower level errors? ok | {error, file_error()}. store_segment(Storage, Transfer, Segment = {_Offset, Content}) -> - Filepath = mk_filepath(Storage, Transfer, [?FRAGDIR], mk_segment_filename(Segment)), + Filename = mk_segment_filename(Segment), + Filepath = mk_filepath(Storage, Transfer, get_subdirs_for(fragment), Filename), write_file_atomic(Storage, Transfer, Filepath, Content). +-spec read_filemeta(storage(), transfer()) -> + {ok, filefrag({filemeta, filemeta()})} | {error, corrupted} | {error, file_error()}. +read_filemeta(Storage, Transfer) -> + Filepath = mk_filepath(Storage, Transfer, get_subdirs_for(fragment), ?MANIFEST), + read_file(Filepath, fun decode_filemeta/1). + -spec list(storage(), transfer(), _What :: fragment | result) -> % Some lower level errors? {error, notfound}? % Result will contain zero or only one filemeta. @@ -143,11 +179,6 @@ list(Storage, Transfer, What) -> Error end. -get_subdirs_for(fragment) -> - [?FRAGDIR]; -get_subdirs_for(result) -> - [?RESULTDIR]. - get_filefrag_fun_for(fragment) -> fun mk_filefrag/2; get_filefrag_fun_for(result) -> @@ -329,6 +360,23 @@ read_transferinfo(Storage, Transfer, Acc) -> Acc end. +-spec get_subdir(storage(), transfer()) -> + file:name(). +get_subdir(Storage, Transfer) -> + mk_filedir(Storage, Transfer, []). + +-spec get_subdir(storage(), transfer(), fragment | temporary | result) -> + file:name(). +get_subdir(Storage, Transfer, What) -> + mk_filedir(Storage, Transfer, get_subdirs_for(What)). + +get_subdirs_for(fragment) -> + [?FRAGDIR]; +get_subdirs_for(temporary) -> + [?TEMPDIR]; +get_subdirs_for(result) -> + [?RESULTDIR]. + %% -type handle() :: {file:name(), io:device(), crypto:hash_state()}. @@ -341,7 +389,7 @@ open_file(Storage, Transfer, Filemeta) -> _ = filelib:ensure_dir(TempFilepath), case file:open(TempFilepath, [write, raw, binary]) of {ok, Handle} -> - _ = file:truncate(Handle), + % TODO: preserve filemeta {ok, {TempFilepath, Handle, init_checksum(Filemeta)}}; {error, _} = Error -> Error @@ -359,8 +407,8 @@ write({Filepath, IoDevice, Ctx}, IoData) -> -spec complete(storage(), transfer(), filemeta(), handle()) -> ok | {error, {checksum, _Algo, _Computed}} | {error, file_error()}. -complete(Storage, Transfer, Filemeta, Handle = {Filepath, IoDevice, Ctx}) -> - TargetFilepath = mk_filepath(Storage, Transfer, [?RESULTDIR], maps:get(name, Filemeta)), +complete(Storage, Transfer, Filemeta = #{name := Filename}, Handle = {Filepath, IoDevice, Ctx}) -> + TargetFilepath = mk_filepath(Storage, Transfer, get_subdirs_for(result), Filename), case verify_checksum(Ctx, Filemeta) of ok -> ok = file:close(IoDevice), @@ -491,7 +539,7 @@ write_file_atomic(Storage, Transfer, Filepath, Content) when is_binary(Content) mk_temp_filepath(Storage, Transfer, Filename) -> Unique = erlang:unique_integer([positive]), - filename:join(mk_filedir(Storage, Transfer, [?TEMPDIR]), mk_filename([Unique, ".", Filename])). + filename:join(get_subdir(Storage, Transfer, temporary), mk_filename([Unique, ".", Filename])). mk_filename(Comps) -> lists:append(lists:map(fun mk_filename_component/1, Comps)). @@ -516,9 +564,9 @@ filtermap_files(Fun, Dirname, Filenames) -> lists:filtermap(fun(Filename) -> Fun(Dirname, Filename) end, Filenames). mk_filefrag(Dirname, Filename = ?MANIFEST) -> - mk_filefrag(Dirname, Filename, filemeta, fun read_filemeta/2); + mk_filefrag(Dirname, Filename, filemeta, fun read_frag_filemeta/2); mk_filefrag(Dirname, Filename = ?SEGMENT ++ _) -> - mk_filefrag(Dirname, Filename, segment, fun read_segmentinfo/2); + mk_filefrag(Dirname, Filename, segment, fun read_frag_segmentinfo/2); mk_filefrag(_Dirname, _Filename) -> ?tp(warning, "rogue_file_found", #{ directory => _Dirname, @@ -554,10 +602,10 @@ mk_filefrag(Dirname, Filename, Tag, Fun) -> false end. -read_filemeta(_Filename, Filepath) -> +read_frag_filemeta(_Filename, Filepath) -> read_file(Filepath, fun decode_filemeta/1). -read_segmentinfo(Filename, _Filepath) -> +read_frag_segmentinfo(Filename, _Filepath) -> break_segment_filename(Filename). filename_to_binary(S) when is_list(S) -> unicode:characters_to_binary(S); diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl b/apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl new file mode 100644 index 000000000..6d493337a --- /dev/null +++ b/apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl @@ -0,0 +1,337 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% Filesystem storage GC +%% +%% This is conceptually a part of the Filesystem storage backend, even +%% though it's tied to the backend module with somewhat narrow interface. + +-module(emqx_ft_storage_fs_gc). + +-include_lib("emqx_ft/include/emqx_ft_storage_fs.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/types.hrl"). +-include_lib("kernel/include/file.hrl"). +-include_lib("snabbkaffe/include/trace.hrl"). + +-export([start_link/1]). + +-export([collect/1]). +-export([collect/3]). +-export([reset/1]). + +-behaviour(gen_server). +-export([init/1]). +-export([handle_call/3]). +-export([handle_cast/2]). +-export([handle_info/2]). + +-record(st, { + storage :: emqx_ft_storage_fs:storage(), + next_gc_timer :: maybe(reference()), + last_gc :: maybe(gcstats()) +}). + +-type gcstats() :: #gcstats{}. + +%% + +start_link(Storage) -> + gen_server:start_link(mk_server_ref(Storage), ?MODULE, Storage, []). + +-spec collect(emqx_ft_storage_fs:storage()) -> gcstats(). +collect(Storage) -> + gen_server:call(mk_server_ref(Storage), {collect, erlang:system_time()}, infinity). + +-spec reset(emqx_ft_storage_fs:storage()) -> ok. +reset(Storage) -> + gen_server:cast(mk_server_ref(Storage), reset). + +collect(Storage, Transfer, Nodes) -> + gen_server:cast(mk_server_ref(Storage), {collect, Transfer, Nodes}). + +mk_server_ref(Storage) -> + % TODO + {via, gproc, {n, l, {?MODULE, get_storage_root(Storage)}}}. + +%% + +init(Storage) -> + St = #st{storage = Storage}, + {ok, start_timer(St)}. + +handle_call({collect, CalledAt}, _From, St) -> + StNext = maybe_collect_garbage(CalledAt, St), + {reply, StNext#st.last_gc, StNext}; +handle_call(Call, From, St) -> + ?SLOG(error, #{msg => "unexpected_call", call => Call, from => From}), + {noreply, St}. + +% TODO +% handle_cast({collect, Transfer, [Node | Rest]}, St) -> +% ok = do_collect_transfer(Transfer, Node, St), +% ok = collect(self(), Transfer, Rest), +% {noreply, St}; +handle_cast(reset, St) -> + {noreply, reset_timer(St)}; +handle_cast(Cast, St) -> + ?SLOG(error, #{msg => "unexpected_cast", cast => Cast}), + {noreply, St}. + +handle_info({timeout, TRef, collect}, St = #st{next_gc_timer = TRef}) -> + StNext = do_collect_garbage(St), + {noreply, start_timer(StNext#st{next_gc_timer = undefined})}. + +% do_collect_transfer(Transfer, Node, St = #st{storage = Storage}) when Node == node() -> +% Stats = try_collect_transfer(Storage, Transfer, complete, init_gcstats()), +% ok = maybe_report(Stats, St), +% ok. + +maybe_collect_garbage(_CalledAt, St = #st{last_gc = undefined}) -> + do_collect_garbage(St); +maybe_collect_garbage(CalledAt, St = #st{last_gc = #gcstats{finished_at = FinishedAt}}) -> + case FinishedAt > CalledAt of + true -> + St; + false -> + reset_timer(do_collect_garbage(St)) + end. + +do_collect_garbage(St = #st{storage = Storage}) -> + Stats = collect_garbage(Storage), + ok = maybe_report(Stats, St), + St#st{last_gc = Stats}. + +maybe_report(#gcstats{errors = Errors}, #st{storage = Storage}) when map_size(Errors) > 0 -> + ?tp(warning, "garbage_collection_errors", #{errors => Errors, storage => Storage}); +maybe_report(#gcstats{} = _Stats, #st{storage = _Storage}) -> + ?tp(garbage_collection, #{stats => _Stats, storage => _Storage}). + +start_timer(St = #st{next_gc_timer = undefined}) -> + Delay = emqx_ft_conf:gc_interval(St#st.storage), + St#st{next_gc_timer = emqx_misc:start_timer(Delay, collect)}. + +reset_timer(St = #st{next_gc_timer = undefined}) -> + start_timer(St); +reset_timer(St = #st{next_gc_timer = TRef}) -> + ok = emqx_misc:cancel_timer(TRef), + start_timer(St#st{next_gc_timer = undefined}). + +%% + +collect_garbage(Storage) -> + Stats = init_gcstats(), + {ok, Transfers} = emqx_ft_storage_fs:transfers(Storage), + collect_garbage(Storage, Transfers, Stats). + +collect_garbage(Storage, Transfers, Stats) -> + finish_gcstats( + maps:fold( + fun(Transfer, TransferInfo, StatsAcc) -> + % TODO: throttling? + try_collect_transfer(Storage, Transfer, TransferInfo, StatsAcc) + end, + Stats, + Transfers + ) + ). + +try_collect_transfer(Storage, Transfer, #{status := complete}, Stats) -> + % File transfer is complete. + % We should be good to delete fragments and temporary files with their respective + % directories altogether. + % TODO: file expiration + {_, Stats1} = collect_fragments(Storage, Transfer, Stats), + {_, Stats2} = collect_tempfiles(Storage, Transfer, Stats1), + Stats2; +try_collect_transfer(Storage, Transfer, #{status := incomplete}, Stats) -> + % File transfer is still incomplete. + % Any outdated fragments and temporary files should be collectable. As a kind of + % heuristic we only delete transfer directory itself only if it is also outdated + % _and was empty at the start of GC_, as a precaution against races between + % writers and GCs. + TTL = get_segments_ttl(Storage, Transfer), + Cutoff = erlang:system_time(second) + TTL, + {FragCleaned, Stats1} = collect_outdated_fragments(Storage, Transfer, Cutoff, Stats), + {TempCleaned, Stats2} = collect_outdated_tempfiles(Storage, Transfer, Cutoff, Stats1), + % TODO: collect empty directories separately + case FragCleaned and TempCleaned of + true -> + collect_transfer_directory(Storage, Transfer, Stats2); + false -> + Stats2 + end. + +collect_fragments(Storage, Transfer, Stats) -> + Dirname = emqx_ft_storage_fs:get_subdir(Storage, Transfer, fragment), + collect_filepath(Dirname, true, Stats). + +collect_tempfiles(Storage, Transfer, Stats) -> + Dirname = emqx_ft_storage_fs:get_subdir(Storage, Transfer, temporary), + collect_filepath(Dirname, true, Stats). + +collect_outdated_fragments(Storage, Transfer, Cutoff, Stats) -> + Dirname = emqx_ft_storage_fs:get_subdir(Storage, Transfer, fragment), + Filter = fun(_Filepath, #file_info{mtime = ModifiedAt}) -> ModifiedAt < Cutoff end, + collect_filepath(Dirname, Filter, Stats). + +collect_outdated_tempfiles(Storage, Transfer, Cutoff, Stats) -> + Dirname = emqx_ft_storage_fs:get_subdir(Storage, Transfer, temporary), + Filter = fun(_Filepath, #file_info{mtime = ModifiedAt}) -> ModifiedAt < Cutoff end, + collect_filepath(Dirname, Filter, Stats). + +collect_transfer_directory(Storage, Transfer, Stats) -> + Dirname = emqx_ft_storage_fs:get_subdir(Storage, Transfer), + StatsNext = collect_empty_directory(Dirname, Stats), + collect_parents(Dirname, StatsNext). + +collect_parents(Dirname, Stats) -> + Parent = filename:dirname(Dirname), + case file:del_dir(Parent) of + ok -> + collect_parents(Parent, account_gcstat_directory(Stats)); + {error, enoent} -> + collect_parents(Parent, Stats); + {error, eexist} -> + Stats; + {error, Reason} -> + register_gcstat_error({directory, Parent}, Reason, Stats) + end. + +% collect_outdated_fragment(#{path := Filepath, fileinfo := Fileinfo}, Cutoff, Stats) -> +% case Fileinfo#file_info.mtime of +% ModifiedAt when ModifiedAt < Cutoff -> +% collect_filepath(Filepath, Fileinfo, Stats); +% _ -> +% Stats +% end. + +-spec collect_filepath(file:name(), Filter, gcstats()) -> {boolean(), gcstats()} when + Filter :: boolean() | fun((file:name(), file:file_info()) -> boolean()). +collect_filepath(Filepath, Filter, Stats) -> + case file:read_file_info(Filepath) of + {ok, Fileinfo} -> + collect_filepath(Filepath, Fileinfo, Filter, Stats); + {error, enoent} -> + {true, Stats}; + {error, Reason} -> + {false, register_gcstat_error({path, Filepath}, Reason, Stats)} + end. + +collect_filepath(Filepath, #file_info{type = directory} = Fileinfo, Filter, Stats) -> + collect_directory(Filepath, Fileinfo, Filter, Stats); +collect_filepath(Filepath, #file_info{type = regular} = Fileinfo, Filter, Stats) -> + case filter_filepath(Filter, Filepath, Fileinfo) andalso file:delete(Filepath, [raw]) of + false -> + {false, Stats}; + ok -> + {true, account_gcstat(Fileinfo, Stats)}; + {error, enoent} -> + {true, Stats}; + {error, Reason} -> + {false, register_gcstat_error({file, Filepath}, Reason, Stats)} + end; +collect_filepath(Filepath, Fileinfo, _Filter, Stats) -> + {false, register_gcstat_error({file, Filepath}, {unexpected, Fileinfo}, Stats)}. + +collect_directory(Dirpath, Fileinfo, Filter, Stats) -> + case file:list_dir(Dirpath) of + {ok, Filenames} -> + {Clean, StatsNext} = collect_files(Dirpath, Filenames, Filter, Stats), + case Clean andalso filter_filepath(Filter, Dirpath, Fileinfo) of + true -> + {true, collect_empty_directory(Dirpath, StatsNext)}; + _ -> + {false, StatsNext} + end; + {error, Reason} -> + {false, register_gcstat_error({directory, Dirpath}, Reason, Stats)} + end. + +collect_files(Dirname, Filenames, Filter, Stats) -> + lists:foldl( + fun(Filename, {Complete, StatsAcc}) -> + Filepath = filename:join(Dirname, Filename), + {Collected, StatsNext} = collect_filepath(Filepath, Filter, StatsAcc), + {Collected andalso Complete, StatsNext} + end, + {true, Stats}, + Filenames + ). + +collect_empty_directory(Dirpath, Stats) -> + case file:del_dir(Dirpath) of + ok -> + account_gcstat_directory(Stats); + {error, enoent} -> + Stats; + {error, Reason} -> + register_gcstat_error({directory, Dirpath}, Reason, Stats) + end. + +filter_filepath(Filter, _, _) when is_boolean(Filter) -> + Filter; +filter_filepath(Filter, Filepath, Fileinfo) when is_function(Filter) -> + Filter(Filepath, Fileinfo). + +get_segments_ttl(Storage, Transfer) -> + {MinTTL, MaxTTL} = emqx_ft_conf:segments_ttl(Storage), + clamp(MinTTL, MaxTTL, try_get_filemeta_ttl(Storage, Transfer)). + +try_get_filemeta_ttl(Storage, Transfer) -> + case emqx_ft_storage_fs:read_filemeta(Storage, Transfer) of + {ok, Filemeta} -> + maps:get(segments_ttl, Filemeta, undefined); + {error, _} -> + undefined + end. + +clamp(Min, Max, V) -> + min(Max, max(Min, V)). + +% try_collect(_Subject, ok = Result, Then, _Stats) -> +% Then(Result); +% try_collect(_Subject, {ok, Result}, Then, _Stats) -> +% Then(Result); +% try_collect(Subject, {error, _} = Error, _Then, Stats) -> +% register_gcstat_error(Subject, Error, Stats). + +%% + +init_gcstats() -> + #gcstats{started_at = erlang:system_time()}. + +finish_gcstats(Stats) -> + Stats#gcstats{finished_at = erlang:system_time()}. + +account_gcstat(Fileinfo, Stats = #gcstats{files = Files, space = Space}) -> + Stats#gcstats{ + files = Files + 1, + space = Space + Fileinfo#file_info.size + }. + +account_gcstat_directory(Stats = #gcstats{directories = Directories}) -> + Stats#gcstats{ + directories = Directories + 1 + }. + +register_gcstat_error(Subject, Error, Stats = #gcstats{errors = Errors}) -> + Stats#gcstats{errors = Errors#{Subject => Error}}. + +%% + +get_storage_root(Storage) -> + maps:get(root, Storage, filename:join(emqx:data_dir(), "file_transfer")). diff --git a/apps/emqx_ft/src/emqx_ft_sup.erl b/apps/emqx_ft/src/emqx_ft_sup.erl index 8d388814c..3c28eae30 100644 --- a/apps/emqx_ft/src/emqx_ft_sup.erl +++ b/apps/emqx_ft/src/emqx_ft_sup.erl @@ -61,5 +61,5 @@ init([]) -> modules => [emqx_ft_responder_sup] }, - ChildSpecs = [Responder, AssemblerSup, FileReaderSup], + ChildSpecs = [Responder, AssemblerSup, FileReaderSup | emqx_ft_storage:child_spec()], {ok, {SupFlags, ChildSpecs}}. diff --git a/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl b/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl new file mode 100644 index 000000000..c7fffbacd --- /dev/null +++ b/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl @@ -0,0 +1,205 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ft_storage_fs_gc_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("emqx_ft/include/emqx_ft_storage_fs.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("stdlib/include/assert.hrl"). +-include_lib("snabbkaffe/include/test_macros.hrl"). + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + _ = application:load(emqx_ft), + ok = emqx_common_test_helpers:start_apps([]), + Config. + +end_per_suite(_Config) -> + ok = emqx_common_test_helpers:stop_apps([]), + ok. + +init_per_testcase(TC, Config) -> + _ = application:unset_env(emqx_ft, gc_interval), + _ = application:unset_env(emqx_ft, min_segments_ttl), + _ = application:unset_env(emqx_ft, max_segments_ttl), + ok = emqx_common_test_helpers:start_app( + emqx_ft, + fun(emqx_ft) -> + ok = emqx_config:put([file_transfer, storage], #{ + type => local, + root => mk_root(TC, Config) + }) + end + ), + Config. + +end_per_testcase(_TC, _Config) -> + ok = application:stop(emqx_ft), + ok. + +mk_root(TC, Config) -> + filename:join([?config(priv_dir, Config), <<"file_transfer">>, TC, atom_to_binary(node())]). + +%% + +t_gc_triggers_periodically(_Config) -> + Interval = 1000, + ok = application:set_env(emqx_ft, gc_interval, Interval), + ok = emqx_ft_storage_fs_gc:reset(emqx_ft_conf:storage()), + ?check_trace( + timer:sleep(Interval * 3), + fun(Trace) -> + [Event, _ | _] = ?of_kind(garbage_collection, Trace), + ?assertMatch( + #{ + stats := #gcstats{ + files = 0, + directories = 0, + space = 0, + errors = #{} = Errors + } + } when map_size(Errors) == 0, + Event + ) + end + ). + +t_gc_triggers_manually(_Config) -> + ?check_trace( + ?assertMatch( + #gcstats{files = 0, directories = 0, space = 0, errors = #{} = Errors} when + map_size(Errors) == 0, + emqx_ft_storage_fs_gc:collect(emqx_ft_conf:storage()) + ), + fun(Trace) -> + [Event] = ?of_kind(garbage_collection, Trace), + ?assertMatch( + #{stats := #gcstats{}}, + Event + ) + end + ). + +t_gc_complete_transfers(_Config) -> + Storage = emqx_ft_conf:storage(), + Transfers = [ + { + T1 = {<<"client1">>, mk_file_id()}, + "cat.cur", + emqx_ft_content_gen:new({?LINE, S1 = 42}, SS1 = 16) + }, + { + T2 = {<<"client2">>, mk_file_id()}, + "cat.ico", + emqx_ft_content_gen:new({?LINE, S2 = 420}, SS2 = 64) + }, + { + T3 = {<<"client42">>, mk_file_id()}, + "cat.jpg", + emqx_ft_content_gen:new({?LINE, S3 = 42000}, SS3 = 1024) + } + ], + % 1. Start all transfers + TransferSizes = emqx_misc:pmap( + fun(Transfer) -> start_transfer(Storage, Transfer) end, + Transfers + ), + ?assertEqual([S1, S2, S3], TransferSizes), + ?assertMatch( + #gcstats{files = 0, directories = 0, errors = #{} = Es} when map_size(Es) == 0, + emqx_ft_storage_fs_gc:collect(Storage) + ), + % 2. Complete just the first transfer + ?assertEqual( + ok, + complete_transfer(Storage, T1, S1) + ), + GCFiles1 = ceil(S1 / SS1) + 1, + ?assertMatch( + #gcstats{ + files = GCFiles1, + directories = 2, + space = Space, + errors = #{} = Es + } when Space > S1 andalso map_size(Es) == 0, + emqx_ft_storage_fs_gc:collect(Storage) + ), + % 3. Complete rest of transfers + ?assertEqual( + [ok, ok], + emqx_misc:pmap( + fun({Transfer, Size}) -> complete_transfer(Storage, Transfer, Size) end, + [{T2, S2}, {T3, S3}] + ) + ), + GCFiles2 = ceil(S2 / SS2) + 1, + GCFiles3 = ceil(S3 / SS3) + 1, + ?assertMatch( + #gcstats{ + files = Files, + directories = 4, + space = Space, + errors = #{} = Es + } when + Files == (GCFiles2 + GCFiles3) andalso + Space > (S2 + S3) andalso + map_size(Es) == 0, + emqx_ft_storage_fs_gc:collect(Storage) + ). + +start_transfer(Storage, {Transfer, Name, Gen}) -> + Meta = #{ + name => Name, + segments_ttl => 10 + }, + ?assertEqual( + ok, + emqx_ft_storage_fs:store_filemeta(Storage, Transfer, Meta) + ), + emqx_ft_content_gen:fold( + fun({Content, SegmentNum, #{chunk_size := SegmentSize}}, _Transferred) -> + Offset = (SegmentNum - 1) * SegmentSize, + ?assertEqual( + ok, + emqx_ft_storage_fs:store_segment(Storage, Transfer, {Offset, Content}) + ), + Offset + byte_size(Content) + end, + 0, + Gen + ). + +complete_transfer(Storage, Transfer, Size) -> + complete_transfer(Storage, Transfer, Size, 100). + +complete_transfer(Storage, Transfer, Size, Timeout) -> + {async, Pid} = emqx_ft_storage_fs:assemble(Storage, Transfer, Size), + MRef = erlang:monitor(process, Pid), + Pid ! kickoff, + receive + {'DOWN', MRef, process, Pid, {shutdown, Result}} -> + Result + after Timeout -> + ct:fail("Assembler did not finish in time") + end. + +mk_file_id() -> + emqx_guid:to_hexstr(emqx_guid:gen()).