diff --git a/apps/emqx/src/emqx_maybe.erl b/apps/emqx/src/emqx_maybe.erl index 0b919f7ab..2629bc737 100644 --- a/apps/emqx/src/emqx_maybe.erl +++ b/apps/emqx/src/emqx_maybe.erl @@ -23,6 +23,9 @@ -export([define/2]). -export([apply/2]). +-type t(T) :: maybe(T). +-export_type([t/1]). + -spec to_list(maybe(A)) -> [A]. to_list(undefined) -> []; diff --git a/apps/emqx_ft/src/emqx_ft.erl b/apps/emqx_ft/src/emqx_ft.erl index 95982b849..d500a6344 100644 --- a/apps/emqx_ft/src/emqx_ft.erl +++ b/apps/emqx_ft/src/emqx_ft.erl @@ -301,8 +301,8 @@ store_filemeta(Transfer, Segment) -> emqx_ft_storage:store_filemeta(Transfer, Segment) catch C:E:S -> - ?SLOG(error, #{ - msg => "start_store_filemeta_failed", class => C, reason => E, stacktrace => S + ?tp(error, "start_store_filemeta_failed", #{ + class => C, reason => E, stacktrace => S }), {error, {internal_error, E}} end. @@ -312,8 +312,8 @@ store_segment(Transfer, Segment) -> emqx_ft_storage:store_segment(Transfer, Segment) catch C:E:S -> - ?SLOG(error, #{ - msg => "start_store_segment_failed", class => C, reason => E, stacktrace => S + ?tp(error, "start_store_segment_failed", #{ + class => C, reason => E, stacktrace => S }), {error, {internal_error, E}} end. @@ -323,8 +323,8 @@ assemble(Transfer, FinalSize) -> emqx_ft_storage:assemble(Transfer, FinalSize) catch C:E:S -> - ?SLOG(error, #{ - msg => "start_assemble_failed", class => C, reason => E, stacktrace => S + ?tp(error, "start_assemble_failed", #{ + class => C, reason => E, stacktrace => S }), {error, {internal_error, E}} end. @@ -334,8 +334,7 @@ transfer(Msg, FileId) -> {clientid_to_binary(ClientId), FileId}. on_complete(Op, {ChanPid, PacketId}, Transfer, Result) -> - ?SLOG(debug, #{ - msg => "on_complete", + ?tp(debug, "on_complete", #{ operation => Op, packet_id => PacketId, transfer => Transfer @@ -344,15 +343,13 @@ on_complete(Op, {ChanPid, PacketId}, Transfer, Result) -> {Mode, ok} when Mode == ack orelse Mode == down -> erlang:send(ChanPid, {puback, PacketId, [], ?RC_SUCCESS}); {Mode, {error, _} = Reason} when Mode == ack orelse Mode == down -> - ?SLOG(error, #{ - msg => Op ++ "_failed", + ?tp(error, Op ++ "_failed", #{ transfer => Transfer, reason => Reason }), erlang:send(ChanPid, {puback, PacketId, [], ?RC_UNSPECIFIED_ERROR}); timeout -> - ?SLOG(error, #{ - msg => Op ++ "_timed_out", + ?tp(error, Op ++ "_timed_out", #{ transfer => Transfer }), erlang:send(ChanPid, {puback, PacketId, [], ?RC_UNSPECIFIED_ERROR}) diff --git a/apps/emqx_ft/src/emqx_ft_assembler.erl b/apps/emqx_ft/src/emqx_ft_assembler.erl index 3a352cd10..767930f98 100644 --- a/apps/emqx_ft/src/emqx_ft_assembler.erl +++ b/apps/emqx_ft/src/emqx_ft_assembler.erl @@ -159,7 +159,7 @@ handle_event(internal, _, {assemble, []}, St = #{}) -> {next_state, complete, St, ?internal([])}; handle_event(internal, _, complete, St = #{export := Export}) -> Result = emqx_ft_storage_exporter:complete(Export), - ok = maybe_garbage_collect(Result, St), + _ = maybe_garbage_collect(Result, St), {stop, {shutdown, Result}, maps:remove(export, St)}. -spec terminate(_Reason, state(), stdata()) -> _. diff --git a/apps/emqx_ft/src/emqx_ft_conf.erl b/apps/emqx_ft/src/emqx_ft_conf.erl index dd9806e95..1e531ecdb 100644 --- a/apps/emqx_ft/src/emqx_ft_conf.erl +++ b/apps/emqx_ft/src/emqx_ft_conf.erl @@ -45,49 +45,28 @@ -type milliseconds() :: non_neg_integer(). -type seconds() :: non_neg_integer(). -%% 5 minutes (s) --define(DEFAULT_MIN_SEGMENTS_TTL, 300). -%% 1 day (s) --define(DEFAULT_MAX_SEGMENTS_TTL, 86400). -%% 1 minute (ms) --define(DEFAULT_GC_INTERVAL, 60000). - %%-------------------------------------------------------------------- %% Accessors %%-------------------------------------------------------------------- -spec storage() -> _Storage. storage() -> - emqx_config:get([file_transfer, storage]). + emqx_config:get([file_transfer, storage], undefined). --spec gc_interval(_Storage) -> milliseconds(). -gc_interval(_Storage) -> - Conf = assert_storage(local), - emqx_utils_maps:deep_get([segments, gc, interval], Conf, ?DEFAULT_GC_INTERVAL). +-spec gc_interval(_Storage) -> emqx_maybe:t(milliseconds()). +gc_interval(Conf = #{type := local}) -> + emqx_utils_maps:deep_get([segments, gc, interval], Conf); +gc_interval(_) -> + undefined. --spec segments_ttl(_Storage) -> {_Min :: seconds(), _Max :: seconds()}. -segments_ttl(_Storage) -> - Conf = assert_storage(local), +-spec segments_ttl(_Storage) -> emqx_maybe:t({_Min :: seconds(), _Max :: seconds()}). +segments_ttl(Conf = #{type := local}) -> { - emqx_utils_maps:deep_get( - [segments, gc, minimum_segments_ttl], - Conf, - ?DEFAULT_MIN_SEGMENTS_TTL - ), - emqx_utils_maps:deep_get( - [segments, gc, maximum_segments_ttl], - Conf, - ?DEFAULT_MAX_SEGMENTS_TTL - ) - }. - -assert_storage(Type) -> - case storage() of - Conf = #{type := Type} -> - Conf; - Conf -> - error({inapplicable, Conf}) - end. + emqx_utils_maps:deep_get([segments, gc, minimum_segments_ttl], Conf), + emqx_utils_maps:deep_get([segments, gc, maximum_segments_ttl], Conf) + }; +segments_ttl(_) -> + undefined. init_timeout() -> emqx_config:get([file_transfer, init_timeout]). @@ -104,10 +83,7 @@ store_segment_timeout() -> -spec load() -> ok. load() -> - ok = emqx_ft_storage_exporter:update_exporter( - undefined, - storage() - ), + ok = emqx_ft_storage:on_config_update(undefined, storage()), emqx_conf:add_handler([file_transfer], ?MODULE). -spec unload() -> ok. @@ -134,4 +110,4 @@ pre_config_update(_, Req, _Config) -> post_config_update(_Path, _Req, NewConfig, OldConfig, _AppEnvs) -> OldStorageConfig = maps:get(storage, OldConfig, undefined), NewStorageConfig = maps:get(storage, NewConfig, undefined), - emqx_ft_storage_exporter:update_exporter(OldStorageConfig, NewStorageConfig). + emqx_ft_storage:on_config_update(OldStorageConfig, NewStorageConfig). diff --git a/apps/emqx_ft/src/emqx_ft_schema.erl b/apps/emqx_ft/src/emqx_ft_schema.erl index ce3710cc1..27c593b6c 100644 --- a/apps/emqx_ft/src/emqx_ft_schema.erl +++ b/apps/emqx_ft/src/emqx_ft_schema.erl @@ -25,6 +25,8 @@ -export([schema/1]). +-export([translate/1]). + -type json_value() :: null | boolean() @@ -82,13 +84,25 @@ fields(file_transfer) -> )}, {storage, mk( - hoconsc:union([ - ref(local_storage) - ]), + hoconsc:union( + fun + (all_union_members) -> + [ + % NOTE: by default storage is disabled + undefined, + ref(local_storage) + ]; + ({value, #{<<"type">> := <<"local">>}}) -> + [ref(local_storage)]; + ({value, #{<<"type">> := _}}) -> + throw(#{field_name => type, expected => "local"}); + (_) -> + [undefined] + end + ), #{ required => false, - desc => ?DESC("storage"), - default => default_storage() + desc => ?DESC("storage") } )} ]; @@ -108,18 +122,38 @@ fields(local_storage) -> ref(local_storage_segments), #{ desc => ?DESC("local_storage_segments"), - required => false + required => false, + default => #{ + <<"gc">> => #{} + } } )}, {exporter, mk( - hoconsc:union([ - ref(local_storage_exporter), - ref(s3_exporter) - ]), + hoconsc:union( + fun + (all_union_members) -> + [ + ref(local_storage_exporter), + ref(s3_exporter) + ]; + ({value, #{<<"type">> := <<"local">>}}) -> + [ref(local_storage_exporter)]; + ({value, #{<<"type">> := <<"s3">>}}) -> + [ref(s3_exporter)]; + ({value, #{<<"type">> := _}}) -> + throw(#{field_name => type, expected => "local | s3"}); + ({value, _}) -> + % NOTE: default + [ref(local_storage_exporter)] + end + ), #{ desc => ?DESC("local_storage_exporter"), - required => true + required => true, + default => #{ + <<"type">> => <<"local">> + } } )} ]; @@ -277,10 +311,11 @@ converter(unicode_string) -> ref(Ref) -> ref(?MODULE, Ref). -default_storage() -> - #{ - <<"type">> => <<"local">>, - <<"exporter">> => #{ - <<"type">> => <<"local">> - } - }. +translate(Conf) -> + [Root] = roots(), + maps:get( + Root, + hocon_tconf:check_plain( + ?MODULE, #{atom_to_binary(Root) => Conf}, #{atom_key => true}, [Root] + ) + ). diff --git a/apps/emqx_ft/src/emqx_ft_storage.erl b/apps/emqx_ft/src/emqx_ft_storage.erl index 79538a9c7..fee16cd09 100644 --- a/apps/emqx_ft/src/emqx_ft_storage.erl +++ b/apps/emqx_ft/src/emqx_ft_storage.erl @@ -27,7 +27,9 @@ files/0, with_storage_type/2, - with_storage_type/3 + with_storage_type/3, + + on_config_update/2 ] ). @@ -90,26 +92,35 @@ child_spec() -> -spec store_filemeta(emqx_ft:transfer(), emqx_ft:filemeta()) -> ok | {async, pid()} | {error, term()}. store_filemeta(Transfer, FileMeta) -> - Mod = mod(), - Mod:store_filemeta(storage(), Transfer, FileMeta). + with_storage(store_filemeta, [Transfer, FileMeta]). -spec store_segment(emqx_ft:transfer(), emqx_ft:segment()) -> ok | {async, pid()} | {error, term()}. store_segment(Transfer, Segment) -> - Mod = mod(), - Mod:store_segment(storage(), Transfer, Segment). + with_storage(store_segment, [Transfer, Segment]). -spec assemble(emqx_ft:transfer(), emqx_ft:bytes()) -> ok | {async, pid()} | {error, term()}. assemble(Transfer, Size) -> - Mod = mod(), - Mod:assemble(storage(), Transfer, Size). + with_storage(assemble, [Transfer, Size]). -spec files() -> {ok, [file_info()]} | {error, term()}. files() -> - Mod = mod(), - Mod:files(storage()). + with_storage(files, []). + +-spec with_storage(atom() | function()) -> any(). +with_storage(Fun) -> + with_storage(Fun, []). + +-spec with_storage(atom() | function(), list(term())) -> any(). +with_storage(Fun, Args) -> + case storage() of + Storage = #{} -> + apply_storage(Storage, Fun, Args); + undefined -> + {error, disabled} + end. -spec with_storage_type(atom(), atom() | function()) -> any(). with_storage_type(Type, Fun) -> @@ -117,17 +128,61 @@ with_storage_type(Type, Fun) -> -spec with_storage_type(atom(), atom() | function(), list(term())) -> any(). with_storage_type(Type, Fun, Args) -> - Storage = storage(), - case Storage of - #{type := Type} when is_function(Fun) -> - apply(Fun, [Storage | Args]); - #{type := Type} when is_atom(Fun) -> - Mod = mod(Storage), - apply(Mod, Fun, [Storage | Args]); - disabled -> - {error, disabled}; - _ -> - {error, {invalid_storage_type, Type}} + with_storage(fun(Storage) -> + case Storage of + #{type := Type} -> + apply_storage(Storage, Fun, Args); + _ -> + {error, {invalid_storage_type, Storage}} + end + end). + +apply_storage(Storage, Fun, Args) when is_atom(Fun) -> + apply(mod(Storage), Fun, [Storage | Args]); +apply_storage(Storage, Fun, Args) when is_function(Fun) -> + apply(Fun, [Storage | Args]). + +%% + +-spec on_config_update(_Old :: emqx_maybe:t(storage()), _New :: emqx_maybe:t(storage())) -> + ok. +on_config_update(Storage, Storage) -> + ok; +on_config_update(#{type := Type} = StorageOld, #{type := Type} = StorageNew) -> + ok = (mod(StorageNew)):on_config_update(StorageOld, StorageNew); +on_config_update(StorageOld, StorageNew) -> + _ = emqx_maybe:apply(fun on_storage_stop/1, StorageOld), + _ = emqx_maybe:apply(fun on_storage_start/1, StorageNew), + _ = emqx_maybe:apply( + fun(Storage) -> (mod(Storage)):on_config_update(StorageOld, StorageNew) end, + StorageNew + ), + ok. + +on_storage_start(Storage = #{type := _}) -> + lists:foreach( + fun(ChildSpec) -> + {ok, _Child} = supervisor:start_child(emqx_ft_sup, ChildSpec) + end, + child_spec(Storage) + ). + +on_storage_stop(Storage = #{type := _}) -> + lists:foreach( + fun(#{id := ChildId}) -> + _ = supervisor:terminate_child(emqx_ft_sup, ChildId), + ok = supervisor:delete_child(emqx_ft_sup, ChildId) + end, + child_spec(Storage) + ). + +child_spec(Storage) -> + try + Mod = mod(Storage), + Mod:child_spec(Storage) + catch + error:disabled -> []; + error:undef -> [] end. %%-------------------------------------------------------------------- @@ -144,7 +199,6 @@ mod(Storage) -> case Storage of #{type := local} -> emqx_ft_storage_fs; - disabled -> + undefined -> error(disabled) - % emqx_ft_storage_dummy end. diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter.erl index 61241bd6f..72128cb40 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_exporter.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_exporter.erl @@ -31,7 +31,7 @@ -export([list/1]). %% Lifecycle API --export([update_exporter/2]). +-export([on_config_update/2]). %% Internal API -export([exporter/1]). @@ -141,30 +141,28 @@ list(Storage) -> %% Lifecycle --spec update_exporter(emqx_config:config(), emqx_config:config()) -> ok | {error, term()}. -update_exporter( - #{exporter := #{type := OldType}} = OldConfig, - #{exporter := #{type := OldType}} = NewConfig -) -> - {ExporterMod, OldExporterOpts} = exporter(OldConfig), - {_NewExporterMod, NewExporterOpts} = exporter(NewConfig), - ExporterMod:update(OldExporterOpts, NewExporterOpts); -update_exporter( - #{exporter := _} = OldConfig, - #{exporter := _} = NewConfig -) -> - {OldExporterMod, OldExporterOpts} = exporter(OldConfig), - {NewExporterMod, NewExporterOpts} = exporter(NewConfig), - ok = OldExporterMod:stop(OldExporterOpts), - NewExporterMod:start(NewExporterOpts); -update_exporter(undefined, NewConfig) -> - {ExporterMod, ExporterOpts} = exporter(NewConfig), - ExporterMod:start(ExporterOpts); -update_exporter(OldConfig, undefined) -> - {ExporterMod, ExporterOpts} = exporter(OldConfig), - ExporterMod:stop(ExporterOpts); -update_exporter(_, _) -> +-spec on_config_update(storage(), storage()) -> ok | {error, term()}. +on_config_update(StorageOld, StorageNew) -> + on_exporter_update( + emqx_maybe:apply(fun exporter/1, StorageOld), + emqx_maybe:apply(fun exporter/1, StorageNew) + ). + +on_exporter_update(Config, Config) -> + ok; +on_exporter_update({ExporterMod, ConfigOld}, {ExporterMod, ConfigNew}) -> + ExporterMod:update(ConfigOld, ConfigNew); +on_exporter_update(ExporterOld, ExporterNew) -> + _ = emqx_maybe:apply(fun stop_exporter/1, ExporterOld), + _ = emqx_maybe:apply(fun start_exporter/1, ExporterNew), ok. + +start_exporter({ExporterMod, ExporterOpts}) -> + ok = ExporterMod:start(ExporterOpts). + +stop_exporter({ExporterMod, ExporterOpts}) -> + ok = ExporterMod:stop(ExporterOpts). + %%------------------------------------------------------------------------------ %% Internal functions %%------------------------------------------------------------------------------ diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_fs.erl index 1b1d9ecf7..5754d2cfc 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs.erl @@ -47,6 +47,8 @@ -export([files/1]). +-export([on_config_update/2]). + -export_type([storage/0]). -export_type([filefrag/1]). -export_type([filefrag/0]). @@ -219,6 +221,13 @@ files(Storage) -> %% +on_config_update(StorageOld, StorageNew) -> + % NOTE: this will reset GC timer, frequent changes would postpone GC indefinitely + ok = emqx_ft_storage_fs_gc:reset(StorageNew), + emqx_ft_storage_exporter:on_config_update(StorageOld, StorageNew). + +%% + -spec transfers(storage()) -> {ok, #{transfer() => transferinfo()}}. transfers(Storage) -> diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl b/apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl index 0f61e65b7..692a270e3 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl @@ -29,8 +29,9 @@ -export([start_link/1]). --export([collect/1]). +-export([collect/0]). -export([collect/3]). +-export([reset/0]). -export([reset/1]). -behaviour(gen_server). @@ -40,38 +41,43 @@ -export([handle_info/2]). -record(st, { - storage :: emqx_ft_storage_fs:storage(), next_gc_timer :: maybe(reference()), last_gc :: maybe(gcstats()) }). -type gcstats() :: #gcstats{}. +-define(IS_ENABLED(INTERVAL), (is_integer(INTERVAL) andalso INTERVAL > 0)). + %% start_link(Storage) -> - gen_server:start_link(mk_server_ref(Storage), ?MODULE, Storage, []). + gen_server:start_link(mk_server_ref(global), ?MODULE, Storage, []). --spec collect(emqx_ft_storage_fs:storage()) -> gcstats(). -collect(Storage) -> - gen_server:call(mk_server_ref(Storage), {collect, erlang:system_time()}, infinity). +-spec collect() -> gcstats(). +collect() -> + gen_server:call(mk_server_ref(global), {collect, erlang:system_time()}, infinity). + +-spec reset() -> ok. +reset() -> + reset(emqx_ft_conf:storage()). -spec reset(emqx_ft_storage_fs:storage()) -> ok. reset(Storage) -> - gen_server:cast(mk_server_ref(Storage), reset). + gen_server:cast(mk_server_ref(global), {reset, gc_interval(Storage)}). collect(Storage, Transfer, Nodes) -> - gen_server:cast(mk_server_ref(Storage), {collect, Transfer, Nodes}). + gc_enabled(Storage) andalso cast_collect(mk_server_ref(global), Storage, Transfer, Nodes). -mk_server_ref(Storage) -> +mk_server_ref(Name) -> % TODO - {via, gproc, {n, l, {?MODULE, get_segments_root(Storage)}}}. + {via, gproc, {n, l, {?MODULE, Name}}}. %% init(Storage) -> - St = #st{storage = Storage}, - {ok, start_timer(St)}. + St = #st{}, + {ok, start_timer(gc_interval(Storage), St)}. handle_call({collect, CalledAt}, _From, St) -> StNext = maybe_collect_garbage(CalledAt, St), @@ -80,22 +86,17 @@ handle_call(Call, From, St) -> ?SLOG(error, #{msg => "unexpected_call", call => Call, from => From}), {noreply, St}. -handle_cast({collect, Transfer, [Node | Rest]}, St) -> - case gc_enabled(St) of - true -> - ok = do_collect_transfer(Transfer, Node, St), - case Rest of - [_ | _] -> - gen_server:cast(self(), {collect, Transfer, Rest}); - [] -> - ok - end; - false -> - skip +handle_cast({collect, Storage, Transfer, [Node | Rest]}, St) -> + ok = do_collect_transfer(Storage, Transfer, Node, St), + case Rest of + [_ | _] -> + cast_collect(self(), Storage, Transfer, Rest); + [] -> + ok end, {noreply, St}; -handle_cast(reset, St) -> - {noreply, reset_timer(St)}; +handle_cast({reset, Interval}, St) -> + {noreply, start_timer(Interval, cancel_timer(St))}; handle_cast(Cast, St) -> ?SLOG(error, #{msg => "unexpected_cast", cast => Cast}), {noreply, St}. @@ -104,14 +105,17 @@ handle_info({timeout, TRef, collect}, St = #st{next_gc_timer = TRef}) -> StNext = do_collect_garbage(St), {noreply, start_timer(StNext#st{next_gc_timer = undefined})}. -do_collect_transfer(Transfer, Node, St = #st{storage = Storage}) when Node == node() -> +do_collect_transfer(Storage, Transfer, Node, St = #st{}) when Node == node() -> Stats = try_collect_transfer(Storage, Transfer, complete, init_gcstats()), ok = maybe_report(Stats, St), ok; -do_collect_transfer(_Transfer, _Node, _St = #st{}) -> +do_collect_transfer(_Storage, _Transfer, _Node, _St = #st{}) -> % TODO ok. +cast_collect(Ref, Storage, Transfer, Nodes) -> + gen_server:cast(Ref, {collect, Storage, Transfer, Nodes}). + maybe_collect_garbage(_CalledAt, St = #st{last_gc = undefined}) -> do_collect_garbage(St); maybe_collect_garbage(CalledAt, St = #st{last_gc = #gcstats{finished_at = FinishedAt}}) -> @@ -119,36 +123,41 @@ maybe_collect_garbage(CalledAt, St = #st{last_gc = #gcstats{finished_at = Finish true -> St; false -> - reset_timer(do_collect_garbage(St)) + start_timer(do_collect_garbage(cancel_timer(St))) end. -do_collect_garbage(St = #st{storage = Storage}) -> - Stats = collect_garbage(Storage), - ok = maybe_report(Stats, St), - St#st{last_gc = Stats}. +do_collect_garbage(St = #st{}) -> + emqx_ft_storage:with_storage_type(local, fun(Storage) -> + Stats = collect_garbage(Storage), + ok = maybe_report(Stats, Storage), + St#st{last_gc = Stats} + end). -maybe_report(#gcstats{errors = Errors}, #st{storage = Storage}) when map_size(Errors) > 0 -> +maybe_report(#gcstats{errors = Errors}, Storage) when map_size(Errors) > 0 -> ?tp(warning, "garbage_collection_errors", #{errors => Errors, storage => Storage}); -maybe_report(#gcstats{} = _Stats, #st{storage = _Storage}) -> +maybe_report(#gcstats{} = _Stats, _Storage) -> ?tp(garbage_collection, #{stats => _Stats, storage => _Storage}). -start_timer(St = #st{storage = Storage, next_gc_timer = undefined}) -> - case emqx_ft_conf:gc_interval(Storage) of - Delay when Delay > 0 -> - St#st{next_gc_timer = emqx_utils:start_timer(Delay, collect)}; - 0 -> - ?SLOG(warning, #{msg => "periodic_gc_disabled"}), - St - end. +start_timer(St) -> + start_timer(gc_interval(emqx_ft_conf:storage()), St). -reset_timer(St = #st{next_gc_timer = undefined}) -> - start_timer(St); -reset_timer(St = #st{next_gc_timer = TRef}) -> +start_timer(Interval, St = #st{next_gc_timer = undefined}) when ?IS_ENABLED(Interval) -> + St#st{next_gc_timer = emqx_utils:start_timer(Interval, collect)}; +start_timer(Interval, St) -> + ?SLOG(warning, #{msg => "periodic_gc_disabled", interval => Interval}), + St. + +cancel_timer(St = #st{next_gc_timer = undefined}) -> + St; +cancel_timer(St = #st{next_gc_timer = TRef}) -> ok = emqx_utils:cancel_timer(TRef), - start_timer(St#st{next_gc_timer = undefined}). + St#st{next_gc_timer = undefined}. -gc_enabled(St) -> - emqx_ft_conf:gc_interval(St#st.storage) > 0. +gc_enabled(Storage) -> + ?IS_ENABLED(gc_interval(Storage)). + +gc_interval(Storage) -> + emqx_ft_conf:gc_interval(Storage). %% @@ -175,8 +184,13 @@ try_collect_transfer(Storage, Transfer, TransferInfo = #{}, Stats) -> % heuristic we only delete transfer directory itself only if it is also outdated % _and was empty at the start of GC_, as a precaution against races between % writers and GCs. - TTL = get_segments_ttl(Storage, TransferInfo), - Cutoff = erlang:system_time(second) - TTL, + Cutoff = + case get_segments_ttl(Storage, TransferInfo) of + TTL when is_integer(TTL) -> + erlang:system_time(second) - TTL; + undefined -> + 0 + end, {FragCleaned, Stats1} = collect_outdated_fragments(Storage, Transfer, Cutoff, Stats), {TempCleaned, Stats2} = collect_outdated_tempfiles(Storage, Transfer, Cutoff, Stats1), % TODO: collect empty directories separately @@ -338,16 +352,17 @@ filepath_to_binary(S) -> unicode:characters_to_binary(S, unicode, file:native_name_encoding()). get_segments_ttl(Storage, TransferInfo) -> - {MinTTL, MaxTTL} = emqx_ft_conf:segments_ttl(Storage), - clamp(MinTTL, MaxTTL, try_get_filemeta_ttl(TransferInfo)). + clamp(emqx_ft_conf:segments_ttl(Storage), try_get_filemeta_ttl(TransferInfo)). try_get_filemeta_ttl(#{filemeta := Filemeta}) -> maps:get(segments_ttl, Filemeta, undefined); try_get_filemeta_ttl(#{}) -> undefined. -clamp(Min, Max, V) -> - min(Max, max(Min, V)). +clamp({Min, Max}, V) -> + min(Max, max(Min, V)); +clamp(undefined, V) -> + V. %% diff --git a/apps/emqx_ft/src/emqx_ft_sup.erl b/apps/emqx_ft/src/emqx_ft_sup.erl index 3c28eae30..8d388814c 100644 --- a/apps/emqx_ft/src/emqx_ft_sup.erl +++ b/apps/emqx_ft/src/emqx_ft_sup.erl @@ -61,5 +61,5 @@ init([]) -> modules => [emqx_ft_responder_sup] }, - ChildSpecs = [Responder, AssemblerSup, FileReaderSup | emqx_ft_storage:child_spec()], + ChildSpecs = [Responder, AssemblerSup, FileReaderSup], {ok, {SupFlags, ChildSpecs}}. diff --git a/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl b/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl index 4b5610f51..a7323fc0e 100644 --- a/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl @@ -46,8 +46,8 @@ init_per_testcase(TC, Config) -> ok = snabbkaffe:start_trace(), {ok, Pid} = emqx_ft_assembler_sup:start_link(), [ - {storage_root, "file_transfer_root"}, - {exports_root, "file_transfer_exports"}, + {storage_root, <<"file_transfer_root">>}, + {exports_root, <<"file_transfer_exports">>}, {file_id, atom_to_binary(TC)}, {assembler_sup, Pid} | Config @@ -246,13 +246,17 @@ exporter(Config) -> emqx_ft_storage_exporter:exporter(storage(Config)). storage(Config) -> - #{ - type => local, - segments => #{ - root => ?config(storage_root, Config) - }, - exporter => #{ - type => local, - root => ?config(exports_root, Config) - } - }. + maps:get( + storage, + emqx_ft_schema:translate(#{ + <<"storage">> => #{ + <<"type">> => <<"local">>, + <<"segments">> => #{ + <<"root">> => ?config(storage_root, Config) + }, + <<"exporter">> => #{ + <<"root">> => ?config(exports_root, Config) + } + } + }) + ). diff --git a/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl b/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl index c616681dd..89b0e895d 100644 --- a/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl @@ -21,6 +21,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("stdlib/include/assert.hrl"). +-include_lib("snabbkaffe/include/test_macros.hrl"). all() -> emqx_common_test_helpers:all(?MODULE). @@ -85,3 +86,86 @@ t_update_config(_Config) -> 5 * 60 * 1000, emqx_ft_conf:gc_interval(emqx_ft_conf:storage()) ). + +t_remove_restore_config(Config) -> + ?assertMatch( + {ok, _}, + emqx_conf:update([file_transfer, storage], #{<<"type">> => <<"local">>}, #{}) + ), + ?assertEqual( + 60 * 60 * 1000, + emqx_ft_conf:gc_interval(emqx_ft_conf:storage()) + ), + % Verify that transfers work + ok = emqx_ft_test_helpers:upload_file(gen_clientid(), <<"f1">>, "f1", <>), + ?assertMatch( + {ok, _}, + emqx_conf:remove([file_transfer, storage], #{}) + ), + ?assertEqual( + undefined, + emqx_ft_conf:storage() + ), + ?assertEqual( + undefined, + emqx_ft_conf:gc_interval(emqx_ft_conf:storage()) + ), + ClientId = gen_clientid(), + Client = emqx_ft_test_helpers:start_client(ClientId), + % Verify that transfers fail cleanly when storage is disabled + ?check_trace( + ?assertMatch( + {ok, #{reason_code_name := unspecified_error}}, + emqtt:publish( + Client, + <<"$file/f2/init">>, + emqx_utils_json:encode(emqx_ft:encode_filemeta(#{name => "f2", size => 42})), + 1 + ) + ), + fun(Trace) -> + ?assertMatch( + [#{transfer := {ClientId, <<"f2">>}, reason := {error, disabled}}], + ?of_kind("store_filemeta_failed", Trace) + ) + end + ), + % Restore local storage backend + Root = iolist_to_binary(emqx_ft_test_helpers:root(Config, node(), [segments])), + ?assertMatch( + {ok, _}, + emqx_conf:update( + [file_transfer, storage], + #{ + <<"type">> => <<"local">>, + <<"segments">> => #{ + <<"root">> => Root, + <<"gc">> => #{<<"interval">> => <<"1s">>} + } + }, + #{} + ) + ), + % Verify that GC is getting triggered eventually + ?check_trace( + ?block_until(#{?snk_kind := garbage_collection}, 5000, 0), + fun(Trace) -> + ?assertMatch( + [ + #{ + ?snk_kind := garbage_collection, + storage := #{ + type := local, + segments := #{root := Root} + } + } + ], + ?of_kind(garbage_collection, Trace) + ) + end + ), + % Verify that transfers work again + ok = emqx_ft_test_helpers:upload_file(gen_clientid(), <<"f1">>, "f1", <>). + +gen_clientid() -> + emqx_base62:encode(emqx_guid:gen()). diff --git a/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl b/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl index abf2749ed..065e9ae0a 100644 --- a/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl @@ -68,7 +68,7 @@ end_per_testcase(_TC, _Config) -> t_gc_triggers_periodically(_Config) -> Interval = 500, ok = set_gc_config(interval, Interval), - ok = emqx_ft_storage_fs_gc:reset(emqx_ft_conf:storage()), + ok = emqx_ft_storage_fs_gc:reset(), ?check_trace( timer:sleep(Interval * 3), fun(Trace) -> @@ -92,7 +92,7 @@ t_gc_triggers_manually(_Config) -> ?assertMatch( #gcstats{files = 0, directories = 0, space = 0, errors = #{} = Errors} when map_size(Errors) == 0, - emqx_ft_storage_fs_gc:collect(emqx_ft_conf:storage()) + emqx_ft_storage_fs_gc:collect() ), fun(Trace) -> [Event] = ?of_kind(garbage_collection, Trace), @@ -108,7 +108,7 @@ t_gc_complete_transfers(_Config) -> ok = set_gc_config(minimum_segments_ttl, 0), ok = set_gc_config(maximum_segments_ttl, 3), ok = set_gc_config(interval, 500), - ok = emqx_ft_storage_fs_gc:reset(Storage), + ok = emqx_ft_storage_fs_gc:reset(), Transfers = [ { T1 = {<<"client1">>, mk_file_id()}, @@ -134,7 +134,7 @@ t_gc_complete_transfers(_Config) -> ?assertEqual([S1, S2, S3], TransferSizes), ?assertMatch( #gcstats{files = 0, directories = 0, errors = #{} = Es} when map_size(Es) == 0, - emqx_ft_storage_fs_gc:collect(Storage) + emqx_ft_storage_fs_gc:collect() ), % 2. Complete just the first transfer {ok, {ok, Event}} = ?wait_async_action( @@ -224,7 +224,7 @@ t_gc_incomplete_transfers(_Config) -> _ = emqx_utils:pmap(fun(Transfer) -> start_transfer(Storage, Transfer) end, Transfers), % 2. Enable periodic GC every 0.5 seconds. ok = set_gc_config(interval, 500), - ok = emqx_ft_storage_fs_gc:reset(Storage), + ok = emqx_ft_storage_fs_gc:reset(), % 3. First we need the first transfer to be collected. {ok, _} = ?block_until( #{ @@ -304,7 +304,7 @@ t_gc_handling_errors(_Config) -> {directory, DirTransfer2} := eexist } } when Files == ?NSEGS(Size, SegSize) * 2 andalso Space > Size * 2, - emqx_ft_storage_fs_gc:collect(Storage) + emqx_ft_storage_fs_gc:collect() ), fun(Trace) -> ?assertMatch( diff --git a/apps/emqx_ft/test/emqx_ft_test_helpers.erl b/apps/emqx_ft/test/emqx_ft_test_helpers.erl index 704e55454..11ddf191b 100644 --- a/apps/emqx_ft/test/emqx_ft_test_helpers.erl +++ b/apps/emqx_ft/test/emqx_ft_test_helpers.erl @@ -68,15 +68,22 @@ tcp_port(Node) -> root(Config, Node, Tail) -> filename:join([?config(priv_dir, Config), "file_transfer", Node | Tail]). +start_client(ClientId) -> + start_client(ClientId, node()). + +start_client(ClientId, Node) -> + Port = tcp_port(Node), + {ok, Client} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}, {port, Port}]), + {ok, _} = emqtt:connect(Client), + Client. + upload_file(ClientId, FileId, Name, Data) -> upload_file(ClientId, FileId, Name, Data, node()). upload_file(ClientId, FileId, Name, Data, Node) -> - Port = tcp_port(Node), - Size = byte_size(Data), + C1 = start_client(ClientId, Node), - {ok, C1} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}, {port, Port}]), - {ok, _} = emqtt:connect(C1), + Size = byte_size(Data), Meta = #{ name => Name, expire_at => erlang:system_time(_Unit = second) + 3600,