feat(ft): properly propagate config updates

Ensure that:
* Storage config might be removed.
* Local FS GC process is set up when Local FS storage is configured.
* Local FS GC process gets its timer reset on config updates.
* Storage / exporter gets chosen based on `type` only.
* Exporter config updates propagated as before.

Also employ `emqx_ft_schema:translate/1` instead of duplicating
defaults where applicable.
This commit is contained in:
Andrew Mayorov 2023-04-24 20:28:58 +03:00
parent 4574597175
commit 5efd590ca4
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
14 changed files with 376 additions and 194 deletions

View File

@ -23,6 +23,9 @@
-export([define/2]).
-export([apply/2]).
-type t(T) :: maybe(T).
-export_type([t/1]).
-spec to_list(maybe(A)) -> [A].
to_list(undefined) ->
[];

View File

@ -301,8 +301,8 @@ store_filemeta(Transfer, Segment) ->
emqx_ft_storage:store_filemeta(Transfer, Segment)
catch
C:E:S ->
?SLOG(error, #{
msg => "start_store_filemeta_failed", class => C, reason => E, stacktrace => S
?tp(error, "start_store_filemeta_failed", #{
class => C, reason => E, stacktrace => S
}),
{error, {internal_error, E}}
end.
@ -312,8 +312,8 @@ store_segment(Transfer, Segment) ->
emqx_ft_storage:store_segment(Transfer, Segment)
catch
C:E:S ->
?SLOG(error, #{
msg => "start_store_segment_failed", class => C, reason => E, stacktrace => S
?tp(error, "start_store_segment_failed", #{
class => C, reason => E, stacktrace => S
}),
{error, {internal_error, E}}
end.
@ -323,8 +323,8 @@ assemble(Transfer, FinalSize) ->
emqx_ft_storage:assemble(Transfer, FinalSize)
catch
C:E:S ->
?SLOG(error, #{
msg => "start_assemble_failed", class => C, reason => E, stacktrace => S
?tp(error, "start_assemble_failed", #{
class => C, reason => E, stacktrace => S
}),
{error, {internal_error, E}}
end.
@ -334,8 +334,7 @@ transfer(Msg, FileId) ->
{clientid_to_binary(ClientId), FileId}.
on_complete(Op, {ChanPid, PacketId}, Transfer, Result) ->
?SLOG(debug, #{
msg => "on_complete",
?tp(debug, "on_complete", #{
operation => Op,
packet_id => PacketId,
transfer => Transfer
@ -344,15 +343,13 @@ on_complete(Op, {ChanPid, PacketId}, Transfer, Result) ->
{Mode, ok} when Mode == ack orelse Mode == down ->
erlang:send(ChanPid, {puback, PacketId, [], ?RC_SUCCESS});
{Mode, {error, _} = Reason} when Mode == ack orelse Mode == down ->
?SLOG(error, #{
msg => Op ++ "_failed",
?tp(error, Op ++ "_failed", #{
transfer => Transfer,
reason => Reason
}),
erlang:send(ChanPid, {puback, PacketId, [], ?RC_UNSPECIFIED_ERROR});
timeout ->
?SLOG(error, #{
msg => Op ++ "_timed_out",
?tp(error, Op ++ "_timed_out", #{
transfer => Transfer
}),
erlang:send(ChanPid, {puback, PacketId, [], ?RC_UNSPECIFIED_ERROR})

View File

@ -159,7 +159,7 @@ handle_event(internal, _, {assemble, []}, St = #{}) ->
{next_state, complete, St, ?internal([])};
handle_event(internal, _, complete, St = #{export := Export}) ->
Result = emqx_ft_storage_exporter:complete(Export),
ok = maybe_garbage_collect(Result, St),
_ = maybe_garbage_collect(Result, St),
{stop, {shutdown, Result}, maps:remove(export, St)}.
-spec terminate(_Reason, state(), stdata()) -> _.

View File

@ -45,49 +45,28 @@
-type milliseconds() :: non_neg_integer().
-type seconds() :: non_neg_integer().
%% 5 minutes (s)
-define(DEFAULT_MIN_SEGMENTS_TTL, 300).
%% 1 day (s)
-define(DEFAULT_MAX_SEGMENTS_TTL, 86400).
%% 1 minute (ms)
-define(DEFAULT_GC_INTERVAL, 60000).
%%--------------------------------------------------------------------
%% Accessors
%%--------------------------------------------------------------------
-spec storage() -> _Storage.
storage() ->
emqx_config:get([file_transfer, storage]).
emqx_config:get([file_transfer, storage], undefined).
-spec gc_interval(_Storage) -> milliseconds().
gc_interval(_Storage) ->
Conf = assert_storage(local),
emqx_utils_maps:deep_get([segments, gc, interval], Conf, ?DEFAULT_GC_INTERVAL).
-spec gc_interval(_Storage) -> emqx_maybe:t(milliseconds()).
gc_interval(Conf = #{type := local}) ->
emqx_utils_maps:deep_get([segments, gc, interval], Conf);
gc_interval(_) ->
undefined.
-spec segments_ttl(_Storage) -> {_Min :: seconds(), _Max :: seconds()}.
segments_ttl(_Storage) ->
Conf = assert_storage(local),
-spec segments_ttl(_Storage) -> emqx_maybe:t({_Min :: seconds(), _Max :: seconds()}).
segments_ttl(Conf = #{type := local}) ->
{
emqx_utils_maps:deep_get(
[segments, gc, minimum_segments_ttl],
Conf,
?DEFAULT_MIN_SEGMENTS_TTL
),
emqx_utils_maps:deep_get(
[segments, gc, maximum_segments_ttl],
Conf,
?DEFAULT_MAX_SEGMENTS_TTL
)
}.
assert_storage(Type) ->
case storage() of
Conf = #{type := Type} ->
Conf;
Conf ->
error({inapplicable, Conf})
end.
emqx_utils_maps:deep_get([segments, gc, minimum_segments_ttl], Conf),
emqx_utils_maps:deep_get([segments, gc, maximum_segments_ttl], Conf)
};
segments_ttl(_) ->
undefined.
init_timeout() ->
emqx_config:get([file_transfer, init_timeout]).
@ -104,10 +83,7 @@ store_segment_timeout() ->
-spec load() -> ok.
load() ->
ok = emqx_ft_storage_exporter:update_exporter(
undefined,
storage()
),
ok = emqx_ft_storage:on_config_update(undefined, storage()),
emqx_conf:add_handler([file_transfer], ?MODULE).
-spec unload() -> ok.
@ -134,4 +110,4 @@ pre_config_update(_, Req, _Config) ->
post_config_update(_Path, _Req, NewConfig, OldConfig, _AppEnvs) ->
OldStorageConfig = maps:get(storage, OldConfig, undefined),
NewStorageConfig = maps:get(storage, NewConfig, undefined),
emqx_ft_storage_exporter:update_exporter(OldStorageConfig, NewStorageConfig).
emqx_ft_storage:on_config_update(OldStorageConfig, NewStorageConfig).

View File

@ -25,6 +25,8 @@
-export([schema/1]).
-export([translate/1]).
-type json_value() ::
null
| boolean()
@ -82,13 +84,25 @@ fields(file_transfer) ->
)},
{storage,
mk(
hoconsc:union([
hoconsc:union(
fun
(all_union_members) ->
[
% NOTE: by default storage is disabled
undefined,
ref(local_storage)
]),
];
({value, #{<<"type">> := <<"local">>}}) ->
[ref(local_storage)];
({value, #{<<"type">> := _}}) ->
throw(#{field_name => type, expected => "local"});
(_) ->
[undefined]
end
),
#{
required => false,
desc => ?DESC("storage"),
default => default_storage()
desc => ?DESC("storage")
}
)}
];
@ -108,18 +122,38 @@ fields(local_storage) ->
ref(local_storage_segments),
#{
desc => ?DESC("local_storage_segments"),
required => false
required => false,
default => #{
<<"gc">> => #{}
}
}
)},
{exporter,
mk(
hoconsc:union([
hoconsc:union(
fun
(all_union_members) ->
[
ref(local_storage_exporter),
ref(s3_exporter)
]),
];
({value, #{<<"type">> := <<"local">>}}) ->
[ref(local_storage_exporter)];
({value, #{<<"type">> := <<"s3">>}}) ->
[ref(s3_exporter)];
({value, #{<<"type">> := _}}) ->
throw(#{field_name => type, expected => "local | s3"});
({value, _}) ->
% NOTE: default
[ref(local_storage_exporter)]
end
),
#{
desc => ?DESC("local_storage_exporter"),
required => true
required => true,
default => #{
<<"type">> => <<"local">>
}
}
)}
];
@ -277,10 +311,11 @@ converter(unicode_string) ->
ref(Ref) ->
ref(?MODULE, Ref).
default_storage() ->
#{
<<"type">> => <<"local">>,
<<"exporter">> => #{
<<"type">> => <<"local">>
}
}.
translate(Conf) ->
[Root] = roots(),
maps:get(
Root,
hocon_tconf:check_plain(
?MODULE, #{atom_to_binary(Root) => Conf}, #{atom_key => true}, [Root]
)
).

View File

@ -27,7 +27,9 @@
files/0,
with_storage_type/2,
with_storage_type/3
with_storage_type/3,
on_config_update/2
]
).
@ -90,26 +92,35 @@ child_spec() ->
-spec store_filemeta(emqx_ft:transfer(), emqx_ft:filemeta()) ->
ok | {async, pid()} | {error, term()}.
store_filemeta(Transfer, FileMeta) ->
Mod = mod(),
Mod:store_filemeta(storage(), Transfer, FileMeta).
with_storage(store_filemeta, [Transfer, FileMeta]).
-spec store_segment(emqx_ft:transfer(), emqx_ft:segment()) ->
ok | {async, pid()} | {error, term()}.
store_segment(Transfer, Segment) ->
Mod = mod(),
Mod:store_segment(storage(), Transfer, Segment).
with_storage(store_segment, [Transfer, Segment]).
-spec assemble(emqx_ft:transfer(), emqx_ft:bytes()) ->
ok | {async, pid()} | {error, term()}.
assemble(Transfer, Size) ->
Mod = mod(),
Mod:assemble(storage(), Transfer, Size).
with_storage(assemble, [Transfer, Size]).
-spec files() ->
{ok, [file_info()]} | {error, term()}.
files() ->
Mod = mod(),
Mod:files(storage()).
with_storage(files, []).
-spec with_storage(atom() | function()) -> any().
with_storage(Fun) ->
with_storage(Fun, []).
-spec with_storage(atom() | function(), list(term())) -> any().
with_storage(Fun, Args) ->
case storage() of
Storage = #{} ->
apply_storage(Storage, Fun, Args);
undefined ->
{error, disabled}
end.
-spec with_storage_type(atom(), atom() | function()) -> any().
with_storage_type(Type, Fun) ->
@ -117,17 +128,61 @@ with_storage_type(Type, Fun) ->
-spec with_storage_type(atom(), atom() | function(), list(term())) -> any().
with_storage_type(Type, Fun, Args) ->
Storage = storage(),
with_storage(fun(Storage) ->
case Storage of
#{type := Type} when is_function(Fun) ->
apply(Fun, [Storage | Args]);
#{type := Type} when is_atom(Fun) ->
Mod = mod(Storage),
apply(Mod, Fun, [Storage | Args]);
disabled ->
{error, disabled};
#{type := Type} ->
apply_storage(Storage, Fun, Args);
_ ->
{error, {invalid_storage_type, Type}}
{error, {invalid_storage_type, Storage}}
end
end).
apply_storage(Storage, Fun, Args) when is_atom(Fun) ->
apply(mod(Storage), Fun, [Storage | Args]);
apply_storage(Storage, Fun, Args) when is_function(Fun) ->
apply(Fun, [Storage | Args]).
%%
-spec on_config_update(_Old :: emqx_maybe:t(storage()), _New :: emqx_maybe:t(storage())) ->
ok.
on_config_update(Storage, Storage) ->
ok;
on_config_update(#{type := Type} = StorageOld, #{type := Type} = StorageNew) ->
ok = (mod(StorageNew)):on_config_update(StorageOld, StorageNew);
on_config_update(StorageOld, StorageNew) ->
_ = emqx_maybe:apply(fun on_storage_stop/1, StorageOld),
_ = emqx_maybe:apply(fun on_storage_start/1, StorageNew),
_ = emqx_maybe:apply(
fun(Storage) -> (mod(Storage)):on_config_update(StorageOld, StorageNew) end,
StorageNew
),
ok.
on_storage_start(Storage = #{type := _}) ->
lists:foreach(
fun(ChildSpec) ->
{ok, _Child} = supervisor:start_child(emqx_ft_sup, ChildSpec)
end,
child_spec(Storage)
).
on_storage_stop(Storage = #{type := _}) ->
lists:foreach(
fun(#{id := ChildId}) ->
_ = supervisor:terminate_child(emqx_ft_sup, ChildId),
ok = supervisor:delete_child(emqx_ft_sup, ChildId)
end,
child_spec(Storage)
).
child_spec(Storage) ->
try
Mod = mod(Storage),
Mod:child_spec(Storage)
catch
error:disabled -> [];
error:undef -> []
end.
%%--------------------------------------------------------------------
@ -144,7 +199,6 @@ mod(Storage) ->
case Storage of
#{type := local} ->
emqx_ft_storage_fs;
disabled ->
undefined ->
error(disabled)
% emqx_ft_storage_dummy
end.

View File

@ -31,7 +31,7 @@
-export([list/1]).
%% Lifecycle API
-export([update_exporter/2]).
-export([on_config_update/2]).
%% Internal API
-export([exporter/1]).
@ -141,30 +141,28 @@ list(Storage) ->
%% Lifecycle
-spec update_exporter(emqx_config:config(), emqx_config:config()) -> ok | {error, term()}.
update_exporter(
#{exporter := #{type := OldType}} = OldConfig,
#{exporter := #{type := OldType}} = NewConfig
) ->
{ExporterMod, OldExporterOpts} = exporter(OldConfig),
{_NewExporterMod, NewExporterOpts} = exporter(NewConfig),
ExporterMod:update(OldExporterOpts, NewExporterOpts);
update_exporter(
#{exporter := _} = OldConfig,
#{exporter := _} = NewConfig
) ->
{OldExporterMod, OldExporterOpts} = exporter(OldConfig),
{NewExporterMod, NewExporterOpts} = exporter(NewConfig),
ok = OldExporterMod:stop(OldExporterOpts),
NewExporterMod:start(NewExporterOpts);
update_exporter(undefined, NewConfig) ->
{ExporterMod, ExporterOpts} = exporter(NewConfig),
ExporterMod:start(ExporterOpts);
update_exporter(OldConfig, undefined) ->
{ExporterMod, ExporterOpts} = exporter(OldConfig),
ExporterMod:stop(ExporterOpts);
update_exporter(_, _) ->
-spec on_config_update(storage(), storage()) -> ok | {error, term()}.
on_config_update(StorageOld, StorageNew) ->
on_exporter_update(
emqx_maybe:apply(fun exporter/1, StorageOld),
emqx_maybe:apply(fun exporter/1, StorageNew)
).
on_exporter_update(Config, Config) ->
ok;
on_exporter_update({ExporterMod, ConfigOld}, {ExporterMod, ConfigNew}) ->
ExporterMod:update(ConfigOld, ConfigNew);
on_exporter_update(ExporterOld, ExporterNew) ->
_ = emqx_maybe:apply(fun stop_exporter/1, ExporterOld),
_ = emqx_maybe:apply(fun start_exporter/1, ExporterNew),
ok.
start_exporter({ExporterMod, ExporterOpts}) ->
ok = ExporterMod:start(ExporterOpts).
stop_exporter({ExporterMod, ExporterOpts}) ->
ok = ExporterMod:stop(ExporterOpts).
%%------------------------------------------------------------------------------
%% Internal functions
%%------------------------------------------------------------------------------

View File

@ -47,6 +47,8 @@
-export([files/1]).
-export([on_config_update/2]).
-export_type([storage/0]).
-export_type([filefrag/1]).
-export_type([filefrag/0]).
@ -219,6 +221,13 @@ files(Storage) ->
%%
on_config_update(StorageOld, StorageNew) ->
% NOTE: this will reset GC timer, frequent changes would postpone GC indefinitely
ok = emqx_ft_storage_fs_gc:reset(StorageNew),
emqx_ft_storage_exporter:on_config_update(StorageOld, StorageNew).
%%
-spec transfers(storage()) ->
{ok, #{transfer() => transferinfo()}}.
transfers(Storage) ->

View File

@ -29,8 +29,9 @@
-export([start_link/1]).
-export([collect/1]).
-export([collect/0]).
-export([collect/3]).
-export([reset/0]).
-export([reset/1]).
-behaviour(gen_server).
@ -40,38 +41,43 @@
-export([handle_info/2]).
-record(st, {
storage :: emqx_ft_storage_fs:storage(),
next_gc_timer :: maybe(reference()),
last_gc :: maybe(gcstats())
}).
-type gcstats() :: #gcstats{}.
-define(IS_ENABLED(INTERVAL), (is_integer(INTERVAL) andalso INTERVAL > 0)).
%%
start_link(Storage) ->
gen_server:start_link(mk_server_ref(Storage), ?MODULE, Storage, []).
gen_server:start_link(mk_server_ref(global), ?MODULE, Storage, []).
-spec collect(emqx_ft_storage_fs:storage()) -> gcstats().
collect(Storage) ->
gen_server:call(mk_server_ref(Storage), {collect, erlang:system_time()}, infinity).
-spec collect() -> gcstats().
collect() ->
gen_server:call(mk_server_ref(global), {collect, erlang:system_time()}, infinity).
-spec reset() -> ok.
reset() ->
reset(emqx_ft_conf:storage()).
-spec reset(emqx_ft_storage_fs:storage()) -> ok.
reset(Storage) ->
gen_server:cast(mk_server_ref(Storage), reset).
gen_server:cast(mk_server_ref(global), {reset, gc_interval(Storage)}).
collect(Storage, Transfer, Nodes) ->
gen_server:cast(mk_server_ref(Storage), {collect, Transfer, Nodes}).
gc_enabled(Storage) andalso cast_collect(mk_server_ref(global), Storage, Transfer, Nodes).
mk_server_ref(Storage) ->
mk_server_ref(Name) ->
% TODO
{via, gproc, {n, l, {?MODULE, get_segments_root(Storage)}}}.
{via, gproc, {n, l, {?MODULE, Name}}}.
%%
init(Storage) ->
St = #st{storage = Storage},
{ok, start_timer(St)}.
St = #st{},
{ok, start_timer(gc_interval(Storage), St)}.
handle_call({collect, CalledAt}, _From, St) ->
StNext = maybe_collect_garbage(CalledAt, St),
@ -80,22 +86,17 @@ handle_call(Call, From, St) ->
?SLOG(error, #{msg => "unexpected_call", call => Call, from => From}),
{noreply, St}.
handle_cast({collect, Transfer, [Node | Rest]}, St) ->
case gc_enabled(St) of
true ->
ok = do_collect_transfer(Transfer, Node, St),
handle_cast({collect, Storage, Transfer, [Node | Rest]}, St) ->
ok = do_collect_transfer(Storage, Transfer, Node, St),
case Rest of
[_ | _] ->
gen_server:cast(self(), {collect, Transfer, Rest});
cast_collect(self(), Storage, Transfer, Rest);
[] ->
ok
end;
false ->
skip
end,
{noreply, St};
handle_cast(reset, St) ->
{noreply, reset_timer(St)};
handle_cast({reset, Interval}, St) ->
{noreply, start_timer(Interval, cancel_timer(St))};
handle_cast(Cast, St) ->
?SLOG(error, #{msg => "unexpected_cast", cast => Cast}),
{noreply, St}.
@ -104,14 +105,17 @@ handle_info({timeout, TRef, collect}, St = #st{next_gc_timer = TRef}) ->
StNext = do_collect_garbage(St),
{noreply, start_timer(StNext#st{next_gc_timer = undefined})}.
do_collect_transfer(Transfer, Node, St = #st{storage = Storage}) when Node == node() ->
do_collect_transfer(Storage, Transfer, Node, St = #st{}) when Node == node() ->
Stats = try_collect_transfer(Storage, Transfer, complete, init_gcstats()),
ok = maybe_report(Stats, St),
ok;
do_collect_transfer(_Transfer, _Node, _St = #st{}) ->
do_collect_transfer(_Storage, _Transfer, _Node, _St = #st{}) ->
% TODO
ok.
cast_collect(Ref, Storage, Transfer, Nodes) ->
gen_server:cast(Ref, {collect, Storage, Transfer, Nodes}).
maybe_collect_garbage(_CalledAt, St = #st{last_gc = undefined}) ->
do_collect_garbage(St);
maybe_collect_garbage(CalledAt, St = #st{last_gc = #gcstats{finished_at = FinishedAt}}) ->
@ -119,36 +123,41 @@ maybe_collect_garbage(CalledAt, St = #st{last_gc = #gcstats{finished_at = Finish
true ->
St;
false ->
reset_timer(do_collect_garbage(St))
start_timer(do_collect_garbage(cancel_timer(St)))
end.
do_collect_garbage(St = #st{storage = Storage}) ->
do_collect_garbage(St = #st{}) ->
emqx_ft_storage:with_storage_type(local, fun(Storage) ->
Stats = collect_garbage(Storage),
ok = maybe_report(Stats, St),
St#st{last_gc = Stats}.
ok = maybe_report(Stats, Storage),
St#st{last_gc = Stats}
end).
maybe_report(#gcstats{errors = Errors}, #st{storage = Storage}) when map_size(Errors) > 0 ->
maybe_report(#gcstats{errors = Errors}, Storage) when map_size(Errors) > 0 ->
?tp(warning, "garbage_collection_errors", #{errors => Errors, storage => Storage});
maybe_report(#gcstats{} = _Stats, #st{storage = _Storage}) ->
maybe_report(#gcstats{} = _Stats, _Storage) ->
?tp(garbage_collection, #{stats => _Stats, storage => _Storage}).
start_timer(St = #st{storage = Storage, next_gc_timer = undefined}) ->
case emqx_ft_conf:gc_interval(Storage) of
Delay when Delay > 0 ->
St#st{next_gc_timer = emqx_utils:start_timer(Delay, collect)};
0 ->
?SLOG(warning, #{msg => "periodic_gc_disabled"}),
St
end.
start_timer(St) ->
start_timer(gc_interval(emqx_ft_conf:storage()), St).
reset_timer(St = #st{next_gc_timer = undefined}) ->
start_timer(St);
reset_timer(St = #st{next_gc_timer = TRef}) ->
start_timer(Interval, St = #st{next_gc_timer = undefined}) when ?IS_ENABLED(Interval) ->
St#st{next_gc_timer = emqx_utils:start_timer(Interval, collect)};
start_timer(Interval, St) ->
?SLOG(warning, #{msg => "periodic_gc_disabled", interval => Interval}),
St.
cancel_timer(St = #st{next_gc_timer = undefined}) ->
St;
cancel_timer(St = #st{next_gc_timer = TRef}) ->
ok = emqx_utils:cancel_timer(TRef),
start_timer(St#st{next_gc_timer = undefined}).
St#st{next_gc_timer = undefined}.
gc_enabled(St) ->
emqx_ft_conf:gc_interval(St#st.storage) > 0.
gc_enabled(Storage) ->
?IS_ENABLED(gc_interval(Storage)).
gc_interval(Storage) ->
emqx_ft_conf:gc_interval(Storage).
%%
@ -175,8 +184,13 @@ try_collect_transfer(Storage, Transfer, TransferInfo = #{}, Stats) ->
% heuristic we only delete transfer directory itself only if it is also outdated
% _and was empty at the start of GC_, as a precaution against races between
% writers and GCs.
TTL = get_segments_ttl(Storage, TransferInfo),
Cutoff = erlang:system_time(second) - TTL,
Cutoff =
case get_segments_ttl(Storage, TransferInfo) of
TTL when is_integer(TTL) ->
erlang:system_time(second) - TTL;
undefined ->
0
end,
{FragCleaned, Stats1} = collect_outdated_fragments(Storage, Transfer, Cutoff, Stats),
{TempCleaned, Stats2} = collect_outdated_tempfiles(Storage, Transfer, Cutoff, Stats1),
% TODO: collect empty directories separately
@ -338,16 +352,17 @@ filepath_to_binary(S) ->
unicode:characters_to_binary(S, unicode, file:native_name_encoding()).
get_segments_ttl(Storage, TransferInfo) ->
{MinTTL, MaxTTL} = emqx_ft_conf:segments_ttl(Storage),
clamp(MinTTL, MaxTTL, try_get_filemeta_ttl(TransferInfo)).
clamp(emqx_ft_conf:segments_ttl(Storage), try_get_filemeta_ttl(TransferInfo)).
try_get_filemeta_ttl(#{filemeta := Filemeta}) ->
maps:get(segments_ttl, Filemeta, undefined);
try_get_filemeta_ttl(#{}) ->
undefined.
clamp(Min, Max, V) ->
min(Max, max(Min, V)).
clamp({Min, Max}, V) ->
min(Max, max(Min, V));
clamp(undefined, V) ->
V.
%%

View File

@ -61,5 +61,5 @@ init([]) ->
modules => [emqx_ft_responder_sup]
},
ChildSpecs = [Responder, AssemblerSup, FileReaderSup | emqx_ft_storage:child_spec()],
ChildSpecs = [Responder, AssemblerSup, FileReaderSup],
{ok, {SupFlags, ChildSpecs}}.

View File

@ -46,8 +46,8 @@ init_per_testcase(TC, Config) ->
ok = snabbkaffe:start_trace(),
{ok, Pid} = emqx_ft_assembler_sup:start_link(),
[
{storage_root, "file_transfer_root"},
{exports_root, "file_transfer_exports"},
{storage_root, <<"file_transfer_root">>},
{exports_root, <<"file_transfer_exports">>},
{file_id, atom_to_binary(TC)},
{assembler_sup, Pid}
| Config
@ -246,13 +246,17 @@ exporter(Config) ->
emqx_ft_storage_exporter:exporter(storage(Config)).
storage(Config) ->
#{
type => local,
segments => #{
root => ?config(storage_root, Config)
maps:get(
storage,
emqx_ft_schema:translate(#{
<<"storage">> => #{
<<"type">> => <<"local">>,
<<"segments">> => #{
<<"root">> => ?config(storage_root, Config)
},
exporter => #{
type => local,
root => ?config(exports_root, Config)
<<"exporter">> => #{
<<"root">> => ?config(exports_root, Config)
}
}.
}
})
).

View File

@ -21,6 +21,7 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("stdlib/include/assert.hrl").
-include_lib("snabbkaffe/include/test_macros.hrl").
all() -> emqx_common_test_helpers:all(?MODULE).
@ -85,3 +86,86 @@ t_update_config(_Config) ->
5 * 60 * 1000,
emqx_ft_conf:gc_interval(emqx_ft_conf:storage())
).
t_remove_restore_config(Config) ->
?assertMatch(
{ok, _},
emqx_conf:update([file_transfer, storage], #{<<"type">> => <<"local">>}, #{})
),
?assertEqual(
60 * 60 * 1000,
emqx_ft_conf:gc_interval(emqx_ft_conf:storage())
),
% Verify that transfers work
ok = emqx_ft_test_helpers:upload_file(gen_clientid(), <<"f1">>, "f1", <<?MODULE_STRING>>),
?assertMatch(
{ok, _},
emqx_conf:remove([file_transfer, storage], #{})
),
?assertEqual(
undefined,
emqx_ft_conf:storage()
),
?assertEqual(
undefined,
emqx_ft_conf:gc_interval(emqx_ft_conf:storage())
),
ClientId = gen_clientid(),
Client = emqx_ft_test_helpers:start_client(ClientId),
% Verify that transfers fail cleanly when storage is disabled
?check_trace(
?assertMatch(
{ok, #{reason_code_name := unspecified_error}},
emqtt:publish(
Client,
<<"$file/f2/init">>,
emqx_utils_json:encode(emqx_ft:encode_filemeta(#{name => "f2", size => 42})),
1
)
),
fun(Trace) ->
?assertMatch(
[#{transfer := {ClientId, <<"f2">>}, reason := {error, disabled}}],
?of_kind("store_filemeta_failed", Trace)
)
end
),
% Restore local storage backend
Root = iolist_to_binary(emqx_ft_test_helpers:root(Config, node(), [segments])),
?assertMatch(
{ok, _},
emqx_conf:update(
[file_transfer, storage],
#{
<<"type">> => <<"local">>,
<<"segments">> => #{
<<"root">> => Root,
<<"gc">> => #{<<"interval">> => <<"1s">>}
}
},
#{}
)
),
% Verify that GC is getting triggered eventually
?check_trace(
?block_until(#{?snk_kind := garbage_collection}, 5000, 0),
fun(Trace) ->
?assertMatch(
[
#{
?snk_kind := garbage_collection,
storage := #{
type := local,
segments := #{root := Root}
}
}
],
?of_kind(garbage_collection, Trace)
)
end
),
% Verify that transfers work again
ok = emqx_ft_test_helpers:upload_file(gen_clientid(), <<"f1">>, "f1", <<?MODULE_STRING>>).
gen_clientid() ->
emqx_base62:encode(emqx_guid:gen()).

View File

@ -68,7 +68,7 @@ end_per_testcase(_TC, _Config) ->
t_gc_triggers_periodically(_Config) ->
Interval = 500,
ok = set_gc_config(interval, Interval),
ok = emqx_ft_storage_fs_gc:reset(emqx_ft_conf:storage()),
ok = emqx_ft_storage_fs_gc:reset(),
?check_trace(
timer:sleep(Interval * 3),
fun(Trace) ->
@ -92,7 +92,7 @@ t_gc_triggers_manually(_Config) ->
?assertMatch(
#gcstats{files = 0, directories = 0, space = 0, errors = #{} = Errors} when
map_size(Errors) == 0,
emqx_ft_storage_fs_gc:collect(emqx_ft_conf:storage())
emqx_ft_storage_fs_gc:collect()
),
fun(Trace) ->
[Event] = ?of_kind(garbage_collection, Trace),
@ -108,7 +108,7 @@ t_gc_complete_transfers(_Config) ->
ok = set_gc_config(minimum_segments_ttl, 0),
ok = set_gc_config(maximum_segments_ttl, 3),
ok = set_gc_config(interval, 500),
ok = emqx_ft_storage_fs_gc:reset(Storage),
ok = emqx_ft_storage_fs_gc:reset(),
Transfers = [
{
T1 = {<<"client1">>, mk_file_id()},
@ -134,7 +134,7 @@ t_gc_complete_transfers(_Config) ->
?assertEqual([S1, S2, S3], TransferSizes),
?assertMatch(
#gcstats{files = 0, directories = 0, errors = #{} = Es} when map_size(Es) == 0,
emqx_ft_storage_fs_gc:collect(Storage)
emqx_ft_storage_fs_gc:collect()
),
% 2. Complete just the first transfer
{ok, {ok, Event}} = ?wait_async_action(
@ -224,7 +224,7 @@ t_gc_incomplete_transfers(_Config) ->
_ = emqx_utils:pmap(fun(Transfer) -> start_transfer(Storage, Transfer) end, Transfers),
% 2. Enable periodic GC every 0.5 seconds.
ok = set_gc_config(interval, 500),
ok = emqx_ft_storage_fs_gc:reset(Storage),
ok = emqx_ft_storage_fs_gc:reset(),
% 3. First we need the first transfer to be collected.
{ok, _} = ?block_until(
#{
@ -304,7 +304,7 @@ t_gc_handling_errors(_Config) ->
{directory, DirTransfer2} := eexist
}
} when Files == ?NSEGS(Size, SegSize) * 2 andalso Space > Size * 2,
emqx_ft_storage_fs_gc:collect(Storage)
emqx_ft_storage_fs_gc:collect()
),
fun(Trace) ->
?assertMatch(

View File

@ -68,15 +68,22 @@ tcp_port(Node) ->
root(Config, Node, Tail) ->
filename:join([?config(priv_dir, Config), "file_transfer", Node | Tail]).
start_client(ClientId) ->
start_client(ClientId, node()).
start_client(ClientId, Node) ->
Port = tcp_port(Node),
{ok, Client} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}, {port, Port}]),
{ok, _} = emqtt:connect(Client),
Client.
upload_file(ClientId, FileId, Name, Data) ->
upload_file(ClientId, FileId, Name, Data, node()).
upload_file(ClientId, FileId, Name, Data, Node) ->
Port = tcp_port(Node),
Size = byte_size(Data),
C1 = start_client(ClientId, Node),
{ok, C1} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}, {port, Port}]),
{ok, _} = emqtt:connect(C1),
Size = byte_size(Data),
Meta = #{
name => Name,
expire_at => erlang:system_time(_Unit = second) + 3600,