diff --git a/apps/emqx/src/emqx_maybe.erl b/apps/emqx/src/emqx_maybe.erl
index 0b919f7ab..2629bc737 100644
--- a/apps/emqx/src/emqx_maybe.erl
+++ b/apps/emqx/src/emqx_maybe.erl
@@ -23,6 +23,9 @@
-export([define/2]).
-export([apply/2]).
+-type t(T) :: maybe(T).
+-export_type([t/1]).
+
-spec to_list(maybe(A)) -> [A].
to_list(undefined) ->
[];
diff --git a/apps/emqx_ft/src/emqx_ft.erl b/apps/emqx_ft/src/emqx_ft.erl
index 95982b849..42611e537 100644
--- a/apps/emqx_ft/src/emqx_ft.erl
+++ b/apps/emqx_ft/src/emqx_ft.erl
@@ -198,8 +198,7 @@ on_file_command(PacketId, FileId, Msg, FileCommand) ->
end.
on_init(PacketId, Msg, Transfer, Meta) ->
- ?SLOG(info, #{
- msg => "on_init",
+ ?tp(info, "file_transfer_init", #{
mqtt_msg => Msg,
packet_id => PacketId,
transfer => Transfer,
@@ -229,8 +228,7 @@ on_abort(_Msg, _FileId) ->
?RC_SUCCESS.
on_segment(PacketId, Msg, Transfer, Offset, Checksum) ->
- ?SLOG(info, #{
- msg => "on_segment",
+ ?tp(info, "file_transfer_segment", #{
mqtt_msg => Msg,
packet_id => PacketId,
transfer => Transfer,
@@ -255,8 +253,7 @@ on_segment(PacketId, Msg, Transfer, Offset, Checksum) ->
end).
on_fin(PacketId, Msg, Transfer, FinalSize, Checksum) ->
- ?SLOG(info, #{
- msg => "on_fin",
+ ?tp(info, "file_transfer_fin", #{
mqtt_msg => Msg,
packet_id => PacketId,
transfer => Transfer,
@@ -301,8 +298,8 @@ store_filemeta(Transfer, Segment) ->
emqx_ft_storage:store_filemeta(Transfer, Segment)
catch
C:E:S ->
- ?SLOG(error, #{
- msg => "start_store_filemeta_failed", class => C, reason => E, stacktrace => S
+ ?tp(error, "start_store_filemeta_failed", #{
+ class => C, reason => E, stacktrace => S
}),
{error, {internal_error, E}}
end.
@@ -312,8 +309,8 @@ store_segment(Transfer, Segment) ->
emqx_ft_storage:store_segment(Transfer, Segment)
catch
C:E:S ->
- ?SLOG(error, #{
- msg => "start_store_segment_failed", class => C, reason => E, stacktrace => S
+ ?tp(error, "start_store_segment_failed", #{
+ class => C, reason => E, stacktrace => S
}),
{error, {internal_error, E}}
end.
@@ -323,8 +320,8 @@ assemble(Transfer, FinalSize) ->
emqx_ft_storage:assemble(Transfer, FinalSize)
catch
C:E:S ->
- ?SLOG(error, #{
- msg => "start_assemble_failed", class => C, reason => E, stacktrace => S
+ ?tp(error, "start_assemble_failed", #{
+ class => C, reason => E, stacktrace => S
}),
{error, {internal_error, E}}
end.
@@ -334,8 +331,7 @@ transfer(Msg, FileId) ->
{clientid_to_binary(ClientId), FileId}.
on_complete(Op, {ChanPid, PacketId}, Transfer, Result) ->
- ?SLOG(debug, #{
- msg => "on_complete",
+ ?tp(debug, "on_complete", #{
operation => Op,
packet_id => PacketId,
transfer => Transfer
@@ -344,15 +340,13 @@ on_complete(Op, {ChanPid, PacketId}, Transfer, Result) ->
{Mode, ok} when Mode == ack orelse Mode == down ->
erlang:send(ChanPid, {puback, PacketId, [], ?RC_SUCCESS});
{Mode, {error, _} = Reason} when Mode == ack orelse Mode == down ->
- ?SLOG(error, #{
- msg => Op ++ "_failed",
+ ?tp(error, Op ++ "_failed", #{
transfer => Transfer,
reason => Reason
}),
erlang:send(ChanPid, {puback, PacketId, [], ?RC_UNSPECIFIED_ERROR});
timeout ->
- ?SLOG(error, #{
- msg => Op ++ "_timed_out",
+ ?tp(error, Op ++ "_timed_out", #{
transfer => Transfer
}),
erlang:send(ChanPid, {puback, PacketId, [], ?RC_UNSPECIFIED_ERROR})
diff --git a/apps/emqx_ft/src/emqx_ft_app.erl b/apps/emqx_ft/src/emqx_ft_app.erl
index 9b1513b46..0bac6b592 100644
--- a/apps/emqx_ft/src/emqx_ft_app.erl
+++ b/apps/emqx_ft/src/emqx_ft_app.erl
@@ -22,11 +22,9 @@
start(_StartType, _StartArgs) ->
{ok, Sup} = emqx_ft_sup:start_link(),
- ok = emqx_ft:hook(),
ok = emqx_ft_conf:load(),
{ok, Sup}.
stop(_State) ->
ok = emqx_ft_conf:unload(),
- ok = emqx_ft:unhook(),
ok.
diff --git a/apps/emqx_ft/src/emqx_ft_assembler.erl b/apps/emqx_ft/src/emqx_ft_assembler.erl
index 3a352cd10..767930f98 100644
--- a/apps/emqx_ft/src/emqx_ft_assembler.erl
+++ b/apps/emqx_ft/src/emqx_ft_assembler.erl
@@ -159,7 +159,7 @@ handle_event(internal, _, {assemble, []}, St = #{}) ->
{next_state, complete, St, ?internal([])};
handle_event(internal, _, complete, St = #{export := Export}) ->
Result = emqx_ft_storage_exporter:complete(Export),
- ok = maybe_garbage_collect(Result, St),
+ _ = maybe_garbage_collect(Result, St),
{stop, {shutdown, Result}, maps:remove(export, St)}.
-spec terminate(_Reason, state(), stdata()) -> _.
diff --git a/apps/emqx_ft/src/emqx_ft_conf.erl b/apps/emqx_ft/src/emqx_ft_conf.erl
index dd9806e95..90b59c8d1 100644
--- a/apps/emqx_ft/src/emqx_ft_conf.erl
+++ b/apps/emqx_ft/src/emqx_ft_conf.erl
@@ -23,6 +23,7 @@
-include_lib("emqx/include/logger.hrl").
%% Accessors
+-export([enabled/0]).
-export([storage/0]).
-export([gc_interval/1]).
-export([segments_ttl/1]).
@@ -45,49 +46,32 @@
-type milliseconds() :: non_neg_integer().
-type seconds() :: non_neg_integer().
-%% 5 minutes (s)
--define(DEFAULT_MIN_SEGMENTS_TTL, 300).
-%% 1 day (s)
--define(DEFAULT_MAX_SEGMENTS_TTL, 86400).
-%% 1 minute (ms)
--define(DEFAULT_GC_INTERVAL, 60000).
-
%%--------------------------------------------------------------------
%% Accessors
%%--------------------------------------------------------------------
+-spec enabled() -> boolean().
+enabled() ->
+ emqx_config:get([file_transfer, enable], false).
+
-spec storage() -> _Storage.
storage() ->
- emqx_config:get([file_transfer, storage]).
+ emqx_config:get([file_transfer, storage], undefined).
--spec gc_interval(_Storage) -> milliseconds().
-gc_interval(_Storage) ->
- Conf = assert_storage(local),
- emqx_utils_maps:deep_get([segments, gc, interval], Conf, ?DEFAULT_GC_INTERVAL).
+-spec gc_interval(_Storage) -> emqx_maybe:t(milliseconds()).
+gc_interval(Conf = #{type := local}) ->
+ emqx_utils_maps:deep_get([segments, gc, interval], Conf);
+gc_interval(_) ->
+ undefined.
--spec segments_ttl(_Storage) -> {_Min :: seconds(), _Max :: seconds()}.
-segments_ttl(_Storage) ->
- Conf = assert_storage(local),
+-spec segments_ttl(_Storage) -> emqx_maybe:t({_Min :: seconds(), _Max :: seconds()}).
+segments_ttl(Conf = #{type := local}) ->
{
- emqx_utils_maps:deep_get(
- [segments, gc, minimum_segments_ttl],
- Conf,
- ?DEFAULT_MIN_SEGMENTS_TTL
- ),
- emqx_utils_maps:deep_get(
- [segments, gc, maximum_segments_ttl],
- Conf,
- ?DEFAULT_MAX_SEGMENTS_TTL
- )
- }.
-
-assert_storage(Type) ->
- case storage() of
- Conf = #{type := Type} ->
- Conf;
- Conf ->
- error({inapplicable, Conf})
- end.
+ emqx_utils_maps:deep_get([segments, gc, minimum_segments_ttl], Conf),
+ emqx_utils_maps:deep_get([segments, gc, maximum_segments_ttl], Conf)
+ };
+segments_ttl(_) ->
+ undefined.
init_timeout() ->
emqx_config:get([file_transfer, init_timeout]).
@@ -104,10 +88,7 @@ store_segment_timeout() ->
-spec load() -> ok.
load() ->
- ok = emqx_ft_storage_exporter:update_exporter(
- undefined,
- storage()
- ),
+ ok = on_config_update(#{}, emqx_config:get([file_transfer], #{})),
emqx_conf:add_handler([file_transfer], ?MODULE).
-spec unload() -> ok.
@@ -131,7 +112,26 @@ pre_config_update(_, Req, _Config) ->
emqx_config:app_envs()
) ->
ok | {ok, Result :: any()} | {error, Reason :: term()}.
-post_config_update(_Path, _Req, NewConfig, OldConfig, _AppEnvs) ->
- OldStorageConfig = maps:get(storage, OldConfig, undefined),
- NewStorageConfig = maps:get(storage, NewConfig, undefined),
- emqx_ft_storage_exporter:update_exporter(OldStorageConfig, NewStorageConfig).
+post_config_update([file_transfer | _], _Req, NewConfig, OldConfig, _AppEnvs) ->
+ on_config_update(OldConfig, NewConfig).
+
+on_config_update(OldConfig, NewConfig) ->
+ lists:foreach(
+ fun(ConfKey) ->
+ on_config_update(
+ ConfKey,
+ maps:get(ConfKey, OldConfig, undefined),
+ maps:get(ConfKey, NewConfig, undefined)
+ )
+ end,
+ [storage, enable]
+ ).
+
+on_config_update(_, Config, Config) ->
+ ok;
+on_config_update(storage, OldConfig, NewConfig) ->
+ ok = emqx_ft_storage:on_config_update(OldConfig, NewConfig);
+on_config_update(enable, _, true) ->
+ ok = emqx_ft:hook();
+on_config_update(enable, _, false) ->
+ ok = emqx_ft:unhook().
diff --git a/apps/emqx_ft/src/emqx_ft_schema.erl b/apps/emqx_ft/src/emqx_ft_schema.erl
index ce3710cc1..e2eebbbb8 100644
--- a/apps/emqx_ft/src/emqx_ft_schema.erl
+++ b/apps/emqx_ft/src/emqx_ft_schema.erl
@@ -25,6 +25,8 @@
-export([schema/1]).
+-export([translate/1]).
+
-type json_value() ::
null
| boolean()
@@ -53,6 +55,15 @@ roots() -> [file_transfer].
fields(file_transfer) ->
[
+ {enable,
+ mk(
+ boolean(),
+ #{
+ desc => ?DESC("enable"),
+ required => false,
+ default => false
+ }
+ )},
{init_timeout,
mk(
emqx_schema:duration_ms(),
@@ -82,13 +93,22 @@ fields(file_transfer) ->
)},
{storage,
mk(
- hoconsc:union([
- ref(local_storage)
- ]),
+ hoconsc:union(
+ fun
+ (all_union_members) ->
+ [ref(local_storage)];
+ ({value, #{<<"type">> := <<"local">>}}) ->
+ [ref(local_storage)];
+ ({value, #{<<"type">> := _}}) ->
+ throw(#{field_name => type, expected => "local"});
+ ({value, _}) ->
+ [ref(local_storage)]
+ end
+ ),
#{
required => false,
desc => ?DESC("storage"),
- default => default_storage()
+ default => #{<<"type">> => <<"local">>}
}
)}
];
@@ -108,18 +128,38 @@ fields(local_storage) ->
ref(local_storage_segments),
#{
desc => ?DESC("local_storage_segments"),
- required => false
+ required => false,
+ default => #{
+ <<"gc">> => #{}
+ }
}
)},
{exporter,
mk(
- hoconsc:union([
- ref(local_storage_exporter),
- ref(s3_exporter)
- ]),
+ hoconsc:union(
+ fun
+ (all_union_members) ->
+ [
+ ref(local_storage_exporter),
+ ref(s3_exporter)
+ ];
+ ({value, #{<<"type">> := <<"local">>}}) ->
+ [ref(local_storage_exporter)];
+ ({value, #{<<"type">> := <<"s3">>}}) ->
+ [ref(s3_exporter)];
+ ({value, #{<<"type">> := _}}) ->
+ throw(#{field_name => type, expected => "local | s3"});
+ ({value, _}) ->
+ % NOTE: default
+ [ref(local_storage_exporter)]
+ end
+ ),
#{
desc => ?DESC("local_storage_exporter"),
- required => true
+ required => true,
+ default => #{
+ <<"type">> => <<"local">>
+ }
}
)}
];
@@ -277,10 +317,11 @@ converter(unicode_string) ->
ref(Ref) ->
ref(?MODULE, Ref).
-default_storage() ->
- #{
- <<"type">> => <<"local">>,
- <<"exporter">> => #{
- <<"type">> => <<"local">>
- }
- }.
+translate(Conf) ->
+ [Root] = roots(),
+ maps:get(
+ Root,
+ hocon_tconf:check_plain(
+ ?MODULE, #{atom_to_binary(Root) => Conf}, #{atom_key => true}, [Root]
+ )
+ ).
diff --git a/apps/emqx_ft/src/emqx_ft_storage.erl b/apps/emqx_ft/src/emqx_ft_storage.erl
index 79538a9c7..fee16cd09 100644
--- a/apps/emqx_ft/src/emqx_ft_storage.erl
+++ b/apps/emqx_ft/src/emqx_ft_storage.erl
@@ -27,7 +27,9 @@
files/0,
with_storage_type/2,
- with_storage_type/3
+ with_storage_type/3,
+
+ on_config_update/2
]
).
@@ -90,26 +92,35 @@ child_spec() ->
-spec store_filemeta(emqx_ft:transfer(), emqx_ft:filemeta()) ->
ok | {async, pid()} | {error, term()}.
store_filemeta(Transfer, FileMeta) ->
- Mod = mod(),
- Mod:store_filemeta(storage(), Transfer, FileMeta).
+ with_storage(store_filemeta, [Transfer, FileMeta]).
-spec store_segment(emqx_ft:transfer(), emqx_ft:segment()) ->
ok | {async, pid()} | {error, term()}.
store_segment(Transfer, Segment) ->
- Mod = mod(),
- Mod:store_segment(storage(), Transfer, Segment).
+ with_storage(store_segment, [Transfer, Segment]).
-spec assemble(emqx_ft:transfer(), emqx_ft:bytes()) ->
ok | {async, pid()} | {error, term()}.
assemble(Transfer, Size) ->
- Mod = mod(),
- Mod:assemble(storage(), Transfer, Size).
+ with_storage(assemble, [Transfer, Size]).
-spec files() ->
{ok, [file_info()]} | {error, term()}.
files() ->
- Mod = mod(),
- Mod:files(storage()).
+ with_storage(files, []).
+
+-spec with_storage(atom() | function()) -> any().
+with_storage(Fun) ->
+ with_storage(Fun, []).
+
+-spec with_storage(atom() | function(), list(term())) -> any().
+with_storage(Fun, Args) ->
+ case storage() of
+ Storage = #{} ->
+ apply_storage(Storage, Fun, Args);
+ undefined ->
+ {error, disabled}
+ end.
-spec with_storage_type(atom(), atom() | function()) -> any().
with_storage_type(Type, Fun) ->
@@ -117,17 +128,61 @@ with_storage_type(Type, Fun) ->
-spec with_storage_type(atom(), atom() | function(), list(term())) -> any().
with_storage_type(Type, Fun, Args) ->
- Storage = storage(),
- case Storage of
- #{type := Type} when is_function(Fun) ->
- apply(Fun, [Storage | Args]);
- #{type := Type} when is_atom(Fun) ->
- Mod = mod(Storage),
- apply(Mod, Fun, [Storage | Args]);
- disabled ->
- {error, disabled};
- _ ->
- {error, {invalid_storage_type, Type}}
+ with_storage(fun(Storage) ->
+ case Storage of
+ #{type := Type} ->
+ apply_storage(Storage, Fun, Args);
+ _ ->
+ {error, {invalid_storage_type, Storage}}
+ end
+ end).
+
+apply_storage(Storage, Fun, Args) when is_atom(Fun) ->
+ apply(mod(Storage), Fun, [Storage | Args]);
+apply_storage(Storage, Fun, Args) when is_function(Fun) ->
+ apply(Fun, [Storage | Args]).
+
+%%
+
+-spec on_config_update(_Old :: emqx_maybe:t(storage()), _New :: emqx_maybe:t(storage())) ->
+ ok.
+on_config_update(Storage, Storage) ->
+ ok;
+on_config_update(#{type := Type} = StorageOld, #{type := Type} = StorageNew) ->
+ ok = (mod(StorageNew)):on_config_update(StorageOld, StorageNew);
+on_config_update(StorageOld, StorageNew) ->
+ _ = emqx_maybe:apply(fun on_storage_stop/1, StorageOld),
+ _ = emqx_maybe:apply(fun on_storage_start/1, StorageNew),
+ _ = emqx_maybe:apply(
+ fun(Storage) -> (mod(Storage)):on_config_update(StorageOld, StorageNew) end,
+ StorageNew
+ ),
+ ok.
+
+on_storage_start(Storage = #{type := _}) ->
+ lists:foreach(
+ fun(ChildSpec) ->
+ {ok, _Child} = supervisor:start_child(emqx_ft_sup, ChildSpec)
+ end,
+ child_spec(Storage)
+ ).
+
+on_storage_stop(Storage = #{type := _}) ->
+ lists:foreach(
+ fun(#{id := ChildId}) ->
+ _ = supervisor:terminate_child(emqx_ft_sup, ChildId),
+ ok = supervisor:delete_child(emqx_ft_sup, ChildId)
+ end,
+ child_spec(Storage)
+ ).
+
+child_spec(Storage) ->
+ try
+ Mod = mod(Storage),
+ Mod:child_spec(Storage)
+ catch
+ error:disabled -> [];
+ error:undef -> []
end.
%%--------------------------------------------------------------------
@@ -144,7 +199,6 @@ mod(Storage) ->
case Storage of
#{type := local} ->
emqx_ft_storage_fs;
- disabled ->
+ undefined ->
error(disabled)
- % emqx_ft_storage_dummy
end.
diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter.erl
index 61241bd6f..72128cb40 100644
--- a/apps/emqx_ft/src/emqx_ft_storage_exporter.erl
+++ b/apps/emqx_ft/src/emqx_ft_storage_exporter.erl
@@ -31,7 +31,7 @@
-export([list/1]).
%% Lifecycle API
--export([update_exporter/2]).
+-export([on_config_update/2]).
%% Internal API
-export([exporter/1]).
@@ -141,30 +141,28 @@ list(Storage) ->
%% Lifecycle
--spec update_exporter(emqx_config:config(), emqx_config:config()) -> ok | {error, term()}.
-update_exporter(
- #{exporter := #{type := OldType}} = OldConfig,
- #{exporter := #{type := OldType}} = NewConfig
-) ->
- {ExporterMod, OldExporterOpts} = exporter(OldConfig),
- {_NewExporterMod, NewExporterOpts} = exporter(NewConfig),
- ExporterMod:update(OldExporterOpts, NewExporterOpts);
-update_exporter(
- #{exporter := _} = OldConfig,
- #{exporter := _} = NewConfig
-) ->
- {OldExporterMod, OldExporterOpts} = exporter(OldConfig),
- {NewExporterMod, NewExporterOpts} = exporter(NewConfig),
- ok = OldExporterMod:stop(OldExporterOpts),
- NewExporterMod:start(NewExporterOpts);
-update_exporter(undefined, NewConfig) ->
- {ExporterMod, ExporterOpts} = exporter(NewConfig),
- ExporterMod:start(ExporterOpts);
-update_exporter(OldConfig, undefined) ->
- {ExporterMod, ExporterOpts} = exporter(OldConfig),
- ExporterMod:stop(ExporterOpts);
-update_exporter(_, _) ->
+-spec on_config_update(storage(), storage()) -> ok | {error, term()}.
+on_config_update(StorageOld, StorageNew) ->
+ on_exporter_update(
+ emqx_maybe:apply(fun exporter/1, StorageOld),
+ emqx_maybe:apply(fun exporter/1, StorageNew)
+ ).
+
+on_exporter_update(Config, Config) ->
+ ok;
+on_exporter_update({ExporterMod, ConfigOld}, {ExporterMod, ConfigNew}) ->
+ ExporterMod:update(ConfigOld, ConfigNew);
+on_exporter_update(ExporterOld, ExporterNew) ->
+ _ = emqx_maybe:apply(fun stop_exporter/1, ExporterOld),
+ _ = emqx_maybe:apply(fun start_exporter/1, ExporterNew),
ok.
+
+start_exporter({ExporterMod, ExporterOpts}) ->
+ ok = ExporterMod:start(ExporterOpts).
+
+stop_exporter({ExporterMod, ExporterOpts}) ->
+ ok = ExporterMod:stop(ExporterOpts).
+
%%------------------------------------------------------------------------------
%% Internal functions
%%------------------------------------------------------------------------------
diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_fs.erl
index 1b1d9ecf7..823407307 100644
--- a/apps/emqx_ft/src/emqx_ft_storage_fs.erl
+++ b/apps/emqx_ft/src/emqx_ft_storage_fs.erl
@@ -47,6 +47,8 @@
-export([files/1]).
+-export([on_config_update/2]).
+
-export_type([storage/0]).
-export_type([filefrag/1]).
-export_type([filefrag/0]).
@@ -96,6 +98,7 @@
}.
-type storage() :: #{
+ type := 'local',
segments := segments(),
exporter := emqx_ft_storage_exporter:exporter()
}.
@@ -219,6 +222,13 @@ files(Storage) ->
%%
+on_config_update(StorageOld, StorageNew) ->
+ % NOTE: this will reset GC timer, frequent changes would postpone GC indefinitely
+ ok = emqx_ft_storage_fs_gc:reset(StorageNew),
+ emqx_ft_storage_exporter:on_config_update(StorageOld, StorageNew).
+
+%%
+
-spec transfers(storage()) ->
{ok, #{transfer() => transferinfo()}}.
transfers(Storage) ->
diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl b/apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl
index 0f61e65b7..692a270e3 100644
--- a/apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl
+++ b/apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl
@@ -29,8 +29,9 @@
-export([start_link/1]).
--export([collect/1]).
+-export([collect/0]).
-export([collect/3]).
+-export([reset/0]).
-export([reset/1]).
-behaviour(gen_server).
@@ -40,38 +41,43 @@
-export([handle_info/2]).
-record(st, {
- storage :: emqx_ft_storage_fs:storage(),
next_gc_timer :: maybe(reference()),
last_gc :: maybe(gcstats())
}).
-type gcstats() :: #gcstats{}.
+-define(IS_ENABLED(INTERVAL), (is_integer(INTERVAL) andalso INTERVAL > 0)).
+
%%
start_link(Storage) ->
- gen_server:start_link(mk_server_ref(Storage), ?MODULE, Storage, []).
+ gen_server:start_link(mk_server_ref(global), ?MODULE, Storage, []).
--spec collect(emqx_ft_storage_fs:storage()) -> gcstats().
-collect(Storage) ->
- gen_server:call(mk_server_ref(Storage), {collect, erlang:system_time()}, infinity).
+-spec collect() -> gcstats().
+collect() ->
+ gen_server:call(mk_server_ref(global), {collect, erlang:system_time()}, infinity).
+
+-spec reset() -> ok.
+reset() ->
+ reset(emqx_ft_conf:storage()).
-spec reset(emqx_ft_storage_fs:storage()) -> ok.
reset(Storage) ->
- gen_server:cast(mk_server_ref(Storage), reset).
+ gen_server:cast(mk_server_ref(global), {reset, gc_interval(Storage)}).
collect(Storage, Transfer, Nodes) ->
- gen_server:cast(mk_server_ref(Storage), {collect, Transfer, Nodes}).
+ gc_enabled(Storage) andalso cast_collect(mk_server_ref(global), Storage, Transfer, Nodes).
-mk_server_ref(Storage) ->
+mk_server_ref(Name) ->
% TODO
- {via, gproc, {n, l, {?MODULE, get_segments_root(Storage)}}}.
+ {via, gproc, {n, l, {?MODULE, Name}}}.
%%
init(Storage) ->
- St = #st{storage = Storage},
- {ok, start_timer(St)}.
+ St = #st{},
+ {ok, start_timer(gc_interval(Storage), St)}.
handle_call({collect, CalledAt}, _From, St) ->
StNext = maybe_collect_garbage(CalledAt, St),
@@ -80,22 +86,17 @@ handle_call(Call, From, St) ->
?SLOG(error, #{msg => "unexpected_call", call => Call, from => From}),
{noreply, St}.
-handle_cast({collect, Transfer, [Node | Rest]}, St) ->
- case gc_enabled(St) of
- true ->
- ok = do_collect_transfer(Transfer, Node, St),
- case Rest of
- [_ | _] ->
- gen_server:cast(self(), {collect, Transfer, Rest});
- [] ->
- ok
- end;
- false ->
- skip
+handle_cast({collect, Storage, Transfer, [Node | Rest]}, St) ->
+ ok = do_collect_transfer(Storage, Transfer, Node, St),
+ case Rest of
+ [_ | _] ->
+ cast_collect(self(), Storage, Transfer, Rest);
+ [] ->
+ ok
end,
{noreply, St};
-handle_cast(reset, St) ->
- {noreply, reset_timer(St)};
+handle_cast({reset, Interval}, St) ->
+ {noreply, start_timer(Interval, cancel_timer(St))};
handle_cast(Cast, St) ->
?SLOG(error, #{msg => "unexpected_cast", cast => Cast}),
{noreply, St}.
@@ -104,14 +105,17 @@ handle_info({timeout, TRef, collect}, St = #st{next_gc_timer = TRef}) ->
StNext = do_collect_garbage(St),
{noreply, start_timer(StNext#st{next_gc_timer = undefined})}.
-do_collect_transfer(Transfer, Node, St = #st{storage = Storage}) when Node == node() ->
+do_collect_transfer(Storage, Transfer, Node, St = #st{}) when Node == node() ->
Stats = try_collect_transfer(Storage, Transfer, complete, init_gcstats()),
ok = maybe_report(Stats, St),
ok;
-do_collect_transfer(_Transfer, _Node, _St = #st{}) ->
+do_collect_transfer(_Storage, _Transfer, _Node, _St = #st{}) ->
% TODO
ok.
+cast_collect(Ref, Storage, Transfer, Nodes) ->
+ gen_server:cast(Ref, {collect, Storage, Transfer, Nodes}).
+
maybe_collect_garbage(_CalledAt, St = #st{last_gc = undefined}) ->
do_collect_garbage(St);
maybe_collect_garbage(CalledAt, St = #st{last_gc = #gcstats{finished_at = FinishedAt}}) ->
@@ -119,36 +123,41 @@ maybe_collect_garbage(CalledAt, St = #st{last_gc = #gcstats{finished_at = Finish
true ->
St;
false ->
- reset_timer(do_collect_garbage(St))
+ start_timer(do_collect_garbage(cancel_timer(St)))
end.
-do_collect_garbage(St = #st{storage = Storage}) ->
- Stats = collect_garbage(Storage),
- ok = maybe_report(Stats, St),
- St#st{last_gc = Stats}.
+do_collect_garbage(St = #st{}) ->
+ emqx_ft_storage:with_storage_type(local, fun(Storage) ->
+ Stats = collect_garbage(Storage),
+ ok = maybe_report(Stats, Storage),
+ St#st{last_gc = Stats}
+ end).
-maybe_report(#gcstats{errors = Errors}, #st{storage = Storage}) when map_size(Errors) > 0 ->
+maybe_report(#gcstats{errors = Errors}, Storage) when map_size(Errors) > 0 ->
?tp(warning, "garbage_collection_errors", #{errors => Errors, storage => Storage});
-maybe_report(#gcstats{} = _Stats, #st{storage = _Storage}) ->
+maybe_report(#gcstats{} = _Stats, _Storage) ->
?tp(garbage_collection, #{stats => _Stats, storage => _Storage}).
-start_timer(St = #st{storage = Storage, next_gc_timer = undefined}) ->
- case emqx_ft_conf:gc_interval(Storage) of
- Delay when Delay > 0 ->
- St#st{next_gc_timer = emqx_utils:start_timer(Delay, collect)};
- 0 ->
- ?SLOG(warning, #{msg => "periodic_gc_disabled"}),
- St
- end.
+start_timer(St) ->
+ start_timer(gc_interval(emqx_ft_conf:storage()), St).
-reset_timer(St = #st{next_gc_timer = undefined}) ->
- start_timer(St);
-reset_timer(St = #st{next_gc_timer = TRef}) ->
+start_timer(Interval, St = #st{next_gc_timer = undefined}) when ?IS_ENABLED(Interval) ->
+ St#st{next_gc_timer = emqx_utils:start_timer(Interval, collect)};
+start_timer(Interval, St) ->
+ ?SLOG(warning, #{msg => "periodic_gc_disabled", interval => Interval}),
+ St.
+
+cancel_timer(St = #st{next_gc_timer = undefined}) ->
+ St;
+cancel_timer(St = #st{next_gc_timer = TRef}) ->
ok = emqx_utils:cancel_timer(TRef),
- start_timer(St#st{next_gc_timer = undefined}).
+ St#st{next_gc_timer = undefined}.
-gc_enabled(St) ->
- emqx_ft_conf:gc_interval(St#st.storage) > 0.
+gc_enabled(Storage) ->
+ ?IS_ENABLED(gc_interval(Storage)).
+
+gc_interval(Storage) ->
+ emqx_ft_conf:gc_interval(Storage).
%%
@@ -175,8 +184,13 @@ try_collect_transfer(Storage, Transfer, TransferInfo = #{}, Stats) ->
% heuristic we only delete transfer directory itself only if it is also outdated
% _and was empty at the start of GC_, as a precaution against races between
% writers and GCs.
- TTL = get_segments_ttl(Storage, TransferInfo),
- Cutoff = erlang:system_time(second) - TTL,
+ Cutoff =
+ case get_segments_ttl(Storage, TransferInfo) of
+ TTL when is_integer(TTL) ->
+ erlang:system_time(second) - TTL;
+ undefined ->
+ 0
+ end,
{FragCleaned, Stats1} = collect_outdated_fragments(Storage, Transfer, Cutoff, Stats),
{TempCleaned, Stats2} = collect_outdated_tempfiles(Storage, Transfer, Cutoff, Stats1),
% TODO: collect empty directories separately
@@ -338,16 +352,17 @@ filepath_to_binary(S) ->
unicode:characters_to_binary(S, unicode, file:native_name_encoding()).
get_segments_ttl(Storage, TransferInfo) ->
- {MinTTL, MaxTTL} = emqx_ft_conf:segments_ttl(Storage),
- clamp(MinTTL, MaxTTL, try_get_filemeta_ttl(TransferInfo)).
+ clamp(emqx_ft_conf:segments_ttl(Storage), try_get_filemeta_ttl(TransferInfo)).
try_get_filemeta_ttl(#{filemeta := Filemeta}) ->
maps:get(segments_ttl, Filemeta, undefined);
try_get_filemeta_ttl(#{}) ->
undefined.
-clamp(Min, Max, V) ->
- min(Max, max(Min, V)).
+clamp({Min, Max}, V) ->
+ min(Max, max(Min, V));
+clamp(undefined, V) ->
+ V.
%%
diff --git a/apps/emqx_ft/src/emqx_ft_sup.erl b/apps/emqx_ft/src/emqx_ft_sup.erl
index 3c28eae30..8d388814c 100644
--- a/apps/emqx_ft/src/emqx_ft_sup.erl
+++ b/apps/emqx_ft/src/emqx_ft_sup.erl
@@ -61,5 +61,5 @@ init([]) ->
modules => [emqx_ft_responder_sup]
},
- ChildSpecs = [Responder, AssemblerSup, FileReaderSup | emqx_ft_storage:child_spec()],
+ ChildSpecs = [Responder, AssemblerSup, FileReaderSup],
{ok, {SupFlags, ChildSpecs}}.
diff --git a/apps/emqx_ft/test/emqx_ft_SUITE.erl b/apps/emqx_ft/test/emqx_ft_SUITE.erl
index 151b8e5fe..d3a3aee21 100644
--- a/apps/emqx_ft/test/emqx_ft_SUITE.erl
+++ b/apps/emqx_ft/test/emqx_ft_SUITE.erl
@@ -63,6 +63,7 @@ set_special_configs(Config) ->
% NOTE
% Inhibit local fs GC to simulate it isn't fast enough to collect
% complete transfers.
+ enable => true,
storage => emqx_utils_maps:deep_merge(
Storage,
#{segments => #{gc => #{interval => 0}}}
diff --git a/apps/emqx_ft/test/emqx_ft_api_SUITE.erl b/apps/emqx_ft/test/emqx_ft_api_SUITE.erl
index 5f3b213fb..523026d5a 100644
--- a/apps/emqx_ft/test/emqx_ft_api_SUITE.erl
+++ b/apps/emqx_ft/test/emqx_ft_api_SUITE.erl
@@ -30,7 +30,7 @@ all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
ok = emqx_mgmt_api_test_util:init_suite(
- [emqx_conf, emqx_ft], set_special_configs(Config)
+ [emqx_conf, emqx_ft], emqx_ft_test_helpers:env_handler(Config)
),
{ok, _} = emqx:update_config([rpc, port_discovery], manual),
Config.
@@ -38,16 +38,6 @@ end_per_suite(_Config) ->
ok = emqx_mgmt_api_test_util:end_suite([emqx_ft, emqx_conf]),
ok.
-set_special_configs(Config) ->
- fun
- (emqx_ft) ->
- emqx_ft_test_helpers:load_config(#{
- storage => emqx_ft_test_helpers:local_storage(Config)
- });
- (_) ->
- ok
- end.
-
init_per_testcase(Case, Config) ->
[{tc, Case} | Config].
end_per_testcase(_Case, _Config) ->
diff --git a/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl b/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl
index 4b5610f51..a7323fc0e 100644
--- a/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl
+++ b/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl
@@ -46,8 +46,8 @@ init_per_testcase(TC, Config) ->
ok = snabbkaffe:start_trace(),
{ok, Pid} = emqx_ft_assembler_sup:start_link(),
[
- {storage_root, "file_transfer_root"},
- {exports_root, "file_transfer_exports"},
+ {storage_root, <<"file_transfer_root">>},
+ {exports_root, <<"file_transfer_exports">>},
{file_id, atom_to_binary(TC)},
{assembler_sup, Pid}
| Config
@@ -246,13 +246,17 @@ exporter(Config) ->
emqx_ft_storage_exporter:exporter(storage(Config)).
storage(Config) ->
- #{
- type => local,
- segments => #{
- root => ?config(storage_root, Config)
- },
- exporter => #{
- type => local,
- root => ?config(exports_root, Config)
- }
- }.
+ maps:get(
+ storage,
+ emqx_ft_schema:translate(#{
+ <<"storage">> => #{
+ <<"type">> => <<"local">>,
+ <<"segments">> => #{
+ <<"root">> => ?config(storage_root, Config)
+ },
+ <<"exporter">> => #{
+ <<"root">> => ?config(exports_root, Config)
+ }
+ }
+ })
+ ).
diff --git a/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl b/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl
index c616681dd..106c34702 100644
--- a/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl
+++ b/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl
@@ -21,26 +21,26 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("stdlib/include/assert.hrl").
+-include_lib("snabbkaffe/include/test_macros.hrl").
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
- _ = emqx_config:save_schema_mod_and_names(emqx_ft_schema),
- ok = emqx_common_test_helpers:start_apps(
- [emqx_conf, emqx_ft], emqx_ft_test_helpers:env_handler(Config)
- ),
- {ok, _} = emqx:update_config([rpc, port_discovery], manual),
Config.
end_per_suite(_Config) ->
- ok = emqx_common_test_helpers:stop_apps([emqx_ft, emqx_conf]),
ok.
init_per_testcase(_Case, Config) ->
+ % NOTE: running each testcase with clean config
+ _ = emqx_config:save_schema_mod_and_names(emqx_ft_schema),
+ ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_ft], fun(_) -> ok end),
+ {ok, _} = emqx:update_config([rpc, port_discovery], manual),
Config.
end_per_testcase(_Case, _Config) ->
- ok.
+ ok = emqx_common_test_helpers:stop_apps([emqx_ft, emqx_conf]),
+ ok = emqx_config:erase(file_transfer).
%%--------------------------------------------------------------------
%% Tests
@@ -60,6 +60,7 @@ t_update_config(_Config) ->
emqx_conf:update(
[file_transfer],
#{
+ <<"enable">> => true,
<<"storage">> => #{
<<"type">> => <<"local">>,
<<"segments">> => #{
@@ -85,3 +86,153 @@ t_update_config(_Config) ->
5 * 60 * 1000,
emqx_ft_conf:gc_interval(emqx_ft_conf:storage())
).
+
+t_disable_restore_config(Config) ->
+ ?assertMatch(
+ {ok, _},
+ emqx_conf:update(
+ [file_transfer],
+ #{<<"enable">> => true, <<"storage">> => #{<<"type">> => <<"local">>}},
+ #{}
+ )
+ ),
+ ?assertEqual(
+ 60 * 60 * 1000,
+ emqx_ft_conf:gc_interval(emqx_ft_conf:storage())
+ ),
+ % Verify that transfers work
+ ok = emqx_ft_test_helpers:upload_file(gen_clientid(), <<"f1">>, "f1", <>),
+ % Verify that clearing storage settings reverts config to defaults
+ ?assertMatch(
+ {ok, _},
+ emqx_conf:update(
+ [file_transfer],
+ #{<<"enable">> => false, <<"storage">> => undefined},
+ #{}
+ )
+ ),
+ ?assertEqual(
+ false,
+ emqx_ft_conf:enabled()
+ ),
+ ?assertMatch(
+ #{type := local, exporter := #{type := local}},
+ emqx_ft_conf:storage()
+ ),
+ ClientId = gen_clientid(),
+ Client = emqx_ft_test_helpers:start_client(ClientId),
+ % Verify that transfers fail cleanly when storage is disabled
+ ?check_trace(
+ ?assertMatch(
+ {ok, #{reason_code_name := no_matching_subscribers}},
+ emqtt:publish(
+ Client,
+ <<"$file/f2/init">>,
+ emqx_utils_json:encode(emqx_ft:encode_filemeta(#{name => "f2", size => 42})),
+ 1
+ )
+ ),
+ fun(Trace) ->
+ ?assertMatch([], ?of_kind("file_transfer_init", Trace))
+ end
+ ),
+ ok = emqtt:stop(Client),
+ % Restore local storage backend
+ Root = iolist_to_binary(emqx_ft_test_helpers:root(Config, node(), [segments])),
+ ?assertMatch(
+ {ok, _},
+ emqx_conf:update(
+ [file_transfer],
+ #{
+ <<"enable">> => true,
+ <<"storage">> => #{
+ <<"type">> => <<"local">>,
+ <<"segments">> => #{
+ <<"root">> => Root,
+ <<"gc">> => #{<<"interval">> => <<"1s">>}
+ }
+ }
+ },
+ #{}
+ )
+ ),
+ % Verify that GC is getting triggered eventually
+ ?check_trace(
+ ?block_until(#{?snk_kind := garbage_collection}, 5000, 0),
+ fun(Trace) ->
+ ?assertMatch(
+ [
+ #{
+ ?snk_kind := garbage_collection,
+ storage := #{
+ type := local,
+ segments := #{root := Root}
+ }
+ }
+ ],
+ ?of_kind(garbage_collection, Trace)
+ )
+ end
+ ),
+ % Verify that transfers work again
+ ok = emqx_ft_test_helpers:upload_file(gen_clientid(), <<"f1">>, "f1", <>).
+
+t_switch_exporter(_Config) ->
+ ?assertMatch(
+ {ok, _},
+ emqx_conf:update(
+ [file_transfer],
+ #{<<"enable">> => true},
+ #{}
+ )
+ ),
+ ?assertMatch(
+ #{type := local, exporter := #{type := local}},
+ emqx_ft_conf:storage()
+ ),
+ % Verify that switching to a different exporter works
+ ?assertMatch(
+ {ok, _},
+ emqx_conf:update(
+ [file_transfer, storage, exporter],
+ #{
+ <<"type">> => <<"s3">>,
+ <<"bucket">> => <<"emqx">>,
+ <<"host">> => <<"https://localhost">>,
+ <<"port">> => 9000,
+ <<"transport_options">> => #{
+ <<"ipv6_probe">> => false
+ }
+ },
+ #{}
+ )
+ ),
+ ?assertMatch(
+ #{type := local, exporter := #{type := s3}},
+ emqx_ft_conf:storage()
+ ),
+ % Verify that switching back to local exporter works
+ ?assertMatch(
+ {ok, _},
+ emqx_conf:remove(
+ [file_transfer, storage, exporter],
+ #{}
+ )
+ ),
+ ?assertMatch(
+ {ok, _},
+ emqx_conf:update(
+ [file_transfer, storage, exporter],
+ #{<<"type">> => <<"local">>},
+ #{}
+ )
+ ),
+ ?assertMatch(
+ #{type := local, exporter := #{type := local}},
+ emqx_ft_conf:storage()
+ ),
+ % Verify that transfers work
+ ok = emqx_ft_test_helpers:upload_file(gen_clientid(), <<"f1">>, "f1", <>).
+
+gen_clientid() ->
+ emqx_base62:encode(emqx_guid:gen()).
diff --git a/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl b/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl
index e3decf0f5..d4c13f7d1 100644
--- a/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl
+++ b/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl
@@ -35,22 +35,12 @@ groups() ->
].
init_per_suite(Config) ->
- ok = emqx_common_test_helpers:start_apps([emqx_ft], set_special_configs(Config)),
+ ok = emqx_common_test_helpers:start_apps([emqx_ft], emqx_ft_test_helpers:env_handler(Config)),
Config.
end_per_suite(_Config) ->
ok = emqx_common_test_helpers:stop_apps([emqx_ft]),
ok.
-set_special_configs(Config) ->
- fun
- (emqx_ft) ->
- emqx_ft_test_helpers:load_config(#{
- storage => emqx_ft_test_helpers:local_storage(Config)
- });
- (_) ->
- ok
- end.
-
init_per_testcase(Case, Config) ->
[{tc, Case} | Config].
end_per_testcase(_Case, _Config) ->
diff --git a/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl b/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl
index abf2749ed..065e9ae0a 100644
--- a/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl
+++ b/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl
@@ -68,7 +68,7 @@ end_per_testcase(_TC, _Config) ->
t_gc_triggers_periodically(_Config) ->
Interval = 500,
ok = set_gc_config(interval, Interval),
- ok = emqx_ft_storage_fs_gc:reset(emqx_ft_conf:storage()),
+ ok = emqx_ft_storage_fs_gc:reset(),
?check_trace(
timer:sleep(Interval * 3),
fun(Trace) ->
@@ -92,7 +92,7 @@ t_gc_triggers_manually(_Config) ->
?assertMatch(
#gcstats{files = 0, directories = 0, space = 0, errors = #{} = Errors} when
map_size(Errors) == 0,
- emqx_ft_storage_fs_gc:collect(emqx_ft_conf:storage())
+ emqx_ft_storage_fs_gc:collect()
),
fun(Trace) ->
[Event] = ?of_kind(garbage_collection, Trace),
@@ -108,7 +108,7 @@ t_gc_complete_transfers(_Config) ->
ok = set_gc_config(minimum_segments_ttl, 0),
ok = set_gc_config(maximum_segments_ttl, 3),
ok = set_gc_config(interval, 500),
- ok = emqx_ft_storage_fs_gc:reset(Storage),
+ ok = emqx_ft_storage_fs_gc:reset(),
Transfers = [
{
T1 = {<<"client1">>, mk_file_id()},
@@ -134,7 +134,7 @@ t_gc_complete_transfers(_Config) ->
?assertEqual([S1, S2, S3], TransferSizes),
?assertMatch(
#gcstats{files = 0, directories = 0, errors = #{} = Es} when map_size(Es) == 0,
- emqx_ft_storage_fs_gc:collect(Storage)
+ emqx_ft_storage_fs_gc:collect()
),
% 2. Complete just the first transfer
{ok, {ok, Event}} = ?wait_async_action(
@@ -224,7 +224,7 @@ t_gc_incomplete_transfers(_Config) ->
_ = emqx_utils:pmap(fun(Transfer) -> start_transfer(Storage, Transfer) end, Transfers),
% 2. Enable periodic GC every 0.5 seconds.
ok = set_gc_config(interval, 500),
- ok = emqx_ft_storage_fs_gc:reset(Storage),
+ ok = emqx_ft_storage_fs_gc:reset(),
% 3. First we need the first transfer to be collected.
{ok, _} = ?block_until(
#{
@@ -304,7 +304,7 @@ t_gc_handling_errors(_Config) ->
{directory, DirTransfer2} := eexist
}
} when Files == ?NSEGS(Size, SegSize) * 2 andalso Space > Size * 2,
- emqx_ft_storage_fs_gc:collect(Storage)
+ emqx_ft_storage_fs_gc:collect()
),
fun(Trace) ->
?assertMatch(
diff --git a/apps/emqx_ft/test/emqx_ft_test_helpers.erl b/apps/emqx_ft/test/emqx_ft_test_helpers.erl
index 704e55454..89e349fae 100644
--- a/apps/emqx_ft/test/emqx_ft_test_helpers.erl
+++ b/apps/emqx_ft/test/emqx_ft_test_helpers.erl
@@ -41,7 +41,7 @@ stop_additional_node(Node) ->
env_handler(Config) ->
fun
(emqx_ft) ->
- load_config(#{storage => local_storage(Config)});
+ load_config(#{enable => true, storage => local_storage(Config)});
(_) ->
ok
end.
@@ -68,15 +68,22 @@ tcp_port(Node) ->
root(Config, Node, Tail) ->
filename:join([?config(priv_dir, Config), "file_transfer", Node | Tail]).
+start_client(ClientId) ->
+ start_client(ClientId, node()).
+
+start_client(ClientId, Node) ->
+ Port = tcp_port(Node),
+ {ok, Client} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}, {port, Port}]),
+ {ok, _} = emqtt:connect(Client),
+ Client.
+
upload_file(ClientId, FileId, Name, Data) ->
upload_file(ClientId, FileId, Name, Data, node()).
upload_file(ClientId, FileId, Name, Data, Node) ->
- Port = tcp_port(Node),
- Size = byte_size(Data),
+ C1 = start_client(ClientId, Node),
- {ok, C1} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}, {port, Port}]),
- {ok, _} = emqtt:connect(C1),
+ Size = byte_size(Data),
Meta = #{
name => Name,
expire_at => erlang:system_time(_Unit = second) + 3600,
diff --git a/apps/emqx_s3/src/emqx_s3_client.erl b/apps/emqx_s3/src/emqx_s3_client.erl
index e3e1d84af..3bc5861c6 100644
--- a/apps/emqx_s3/src/emqx_s3_client.erl
+++ b/apps/emqx_s3/src/emqx_s3_client.erl
@@ -57,7 +57,7 @@
port := part_number(),
bucket := string(),
headers := headers(),
- acl := emqx_s3:acl(),
+ acl := emqx_s3:acl() | undefined,
url_expire_time := pos_integer(),
access_key_id := string() | undefined,
secret_access_key := string() | undefined,
diff --git a/apps/emqx_s3/src/emqx_s3_schema.erl b/apps/emqx_s3/src/emqx_s3_schema.erl
index 23e69ec5d..5866f8c2b 100644
--- a/apps/emqx_s3/src/emqx_s3_schema.erl
+++ b/apps/emqx_s3/src/emqx_s3_schema.erl
@@ -105,7 +105,6 @@ fields(s3) ->
bucket_owner_full_control
]),
#{
- default => private,
desc => ?DESC("acl"),
required => false
}
diff --git a/apps/emqx_s3/test/emqx_s3_schema_SUITE.erl b/apps/emqx_s3/test/emqx_s3_schema_SUITE.erl
index 89ec8a958..63f659da0 100644
--- a/apps/emqx_s3/test/emqx_s3_schema_SUITE.erl
+++ b/apps/emqx_s3/test/emqx_s3_schema_SUITE.erl
@@ -23,7 +23,6 @@ t_minimal_config(_Config) ->
bucket := "bucket",
host := "s3.us-east-1.endpoint.com",
port := 443,
- acl := private,
min_part_size := 5242880,
transport_options :=
#{
diff --git a/rel/i18n/emqx_ft_schema.hocon b/rel/i18n/emqx_ft_schema.hocon
index 28c93e1ef..e7e551289 100644
--- a/rel/i18n/emqx_ft_schema.hocon
+++ b/rel/i18n/emqx_ft_schema.hocon
@@ -1,5 +1,11 @@
emqx_ft_schema {
+enable.desc:
+"""Enable the File Transfer feature.
+Enabling File Transfer implies reserving special MQTT topics in order to serve the protocol.
+This toggle does not have an effect neither on the availability of the File Transfer REST API, nor
+on storage-dependent background activities (e.g. garbage collection)."""
+
init_timeout.desc:
"""Timeout for initializing the file transfer.
After reaching the timeout, `init` message will be acked with an error"""