diff --git a/apps/emqx_ft/src/emqx_ft.erl b/apps/emqx_ft/src/emqx_ft.erl
index d500a6344..42611e537 100644
--- a/apps/emqx_ft/src/emqx_ft.erl
+++ b/apps/emqx_ft/src/emqx_ft.erl
@@ -198,8 +198,7 @@ on_file_command(PacketId, FileId, Msg, FileCommand) ->
end.
on_init(PacketId, Msg, Transfer, Meta) ->
- ?SLOG(info, #{
- msg => "on_init",
+ ?tp(info, "file_transfer_init", #{
mqtt_msg => Msg,
packet_id => PacketId,
transfer => Transfer,
@@ -229,8 +228,7 @@ on_abort(_Msg, _FileId) ->
?RC_SUCCESS.
on_segment(PacketId, Msg, Transfer, Offset, Checksum) ->
- ?SLOG(info, #{
- msg => "on_segment",
+ ?tp(info, "file_transfer_segment", #{
mqtt_msg => Msg,
packet_id => PacketId,
transfer => Transfer,
@@ -255,8 +253,7 @@ on_segment(PacketId, Msg, Transfer, Offset, Checksum) ->
end).
on_fin(PacketId, Msg, Transfer, FinalSize, Checksum) ->
- ?SLOG(info, #{
- msg => "on_fin",
+ ?tp(info, "file_transfer_fin", #{
mqtt_msg => Msg,
packet_id => PacketId,
transfer => Transfer,
diff --git a/apps/emqx_ft/src/emqx_ft_app.erl b/apps/emqx_ft/src/emqx_ft_app.erl
index 9b1513b46..0bac6b592 100644
--- a/apps/emqx_ft/src/emqx_ft_app.erl
+++ b/apps/emqx_ft/src/emqx_ft_app.erl
@@ -22,11 +22,9 @@
start(_StartType, _StartArgs) ->
{ok, Sup} = emqx_ft_sup:start_link(),
- ok = emqx_ft:hook(),
ok = emqx_ft_conf:load(),
{ok, Sup}.
stop(_State) ->
ok = emqx_ft_conf:unload(),
- ok = emqx_ft:unhook(),
ok.
diff --git a/apps/emqx_ft/src/emqx_ft_conf.erl b/apps/emqx_ft/src/emqx_ft_conf.erl
index 1e531ecdb..90b59c8d1 100644
--- a/apps/emqx_ft/src/emqx_ft_conf.erl
+++ b/apps/emqx_ft/src/emqx_ft_conf.erl
@@ -23,6 +23,7 @@
-include_lib("emqx/include/logger.hrl").
%% Accessors
+-export([enabled/0]).
-export([storage/0]).
-export([gc_interval/1]).
-export([segments_ttl/1]).
@@ -49,6 +50,10 @@
%% Accessors
%%--------------------------------------------------------------------
+-spec enabled() -> boolean().
+enabled() ->
+ emqx_config:get([file_transfer, enable], false).
+
-spec storage() -> _Storage.
storage() ->
emqx_config:get([file_transfer, storage], undefined).
@@ -83,7 +88,7 @@ store_segment_timeout() ->
-spec load() -> ok.
load() ->
- ok = emqx_ft_storage:on_config_update(undefined, storage()),
+ ok = on_config_update(#{}, emqx_config:get([file_transfer], #{})),
emqx_conf:add_handler([file_transfer], ?MODULE).
-spec unload() -> ok.
@@ -107,7 +112,26 @@ pre_config_update(_, Req, _Config) ->
emqx_config:app_envs()
) ->
ok | {ok, Result :: any()} | {error, Reason :: term()}.
-post_config_update(_Path, _Req, NewConfig, OldConfig, _AppEnvs) ->
- OldStorageConfig = maps:get(storage, OldConfig, undefined),
- NewStorageConfig = maps:get(storage, NewConfig, undefined),
- emqx_ft_storage:on_config_update(OldStorageConfig, NewStorageConfig).
+post_config_update([file_transfer | _], _Req, NewConfig, OldConfig, _AppEnvs) ->
+ on_config_update(OldConfig, NewConfig).
+
+on_config_update(OldConfig, NewConfig) ->
+ lists:foreach(
+ fun(ConfKey) ->
+ on_config_update(
+ ConfKey,
+ maps:get(ConfKey, OldConfig, undefined),
+ maps:get(ConfKey, NewConfig, undefined)
+ )
+ end,
+ [storage, enable]
+ ).
+
+on_config_update(_, Config, Config) ->
+ ok;
+on_config_update(storage, OldConfig, NewConfig) ->
+ ok = emqx_ft_storage:on_config_update(OldConfig, NewConfig);
+on_config_update(enable, _, true) ->
+ ok = emqx_ft:hook();
+on_config_update(enable, _, false) ->
+ ok = emqx_ft:unhook().
diff --git a/apps/emqx_ft/src/emqx_ft_schema.erl b/apps/emqx_ft/src/emqx_ft_schema.erl
index 27c593b6c..e2eebbbb8 100644
--- a/apps/emqx_ft/src/emqx_ft_schema.erl
+++ b/apps/emqx_ft/src/emqx_ft_schema.erl
@@ -55,6 +55,15 @@ roots() -> [file_transfer].
fields(file_transfer) ->
[
+ {enable,
+ mk(
+ boolean(),
+ #{
+ desc => ?DESC("enable"),
+ required => false,
+ default => false
+ }
+ )},
{init_timeout,
mk(
emqx_schema:duration_ms(),
@@ -87,22 +96,19 @@ fields(file_transfer) ->
hoconsc:union(
fun
(all_union_members) ->
- [
- % NOTE: by default storage is disabled
- undefined,
- ref(local_storage)
- ];
+ [ref(local_storage)];
({value, #{<<"type">> := <<"local">>}}) ->
[ref(local_storage)];
({value, #{<<"type">> := _}}) ->
throw(#{field_name => type, expected => "local"});
- (_) ->
- [undefined]
+ ({value, _}) ->
+ [ref(local_storage)]
end
),
#{
required => false,
- desc => ?DESC("storage")
+ desc => ?DESC("storage"),
+ default => #{<<"type">> => <<"local">>}
}
)}
];
diff --git a/apps/emqx_ft/test/emqx_ft_SUITE.erl b/apps/emqx_ft/test/emqx_ft_SUITE.erl
index 151b8e5fe..d3a3aee21 100644
--- a/apps/emqx_ft/test/emqx_ft_SUITE.erl
+++ b/apps/emqx_ft/test/emqx_ft_SUITE.erl
@@ -63,6 +63,7 @@ set_special_configs(Config) ->
% NOTE
% Inhibit local fs GC to simulate it isn't fast enough to collect
% complete transfers.
+ enable => true,
storage => emqx_utils_maps:deep_merge(
Storage,
#{segments => #{gc => #{interval => 0}}}
diff --git a/apps/emqx_ft/test/emqx_ft_api_SUITE.erl b/apps/emqx_ft/test/emqx_ft_api_SUITE.erl
index 5f3b213fb..523026d5a 100644
--- a/apps/emqx_ft/test/emqx_ft_api_SUITE.erl
+++ b/apps/emqx_ft/test/emqx_ft_api_SUITE.erl
@@ -30,7 +30,7 @@ all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
ok = emqx_mgmt_api_test_util:init_suite(
- [emqx_conf, emqx_ft], set_special_configs(Config)
+ [emqx_conf, emqx_ft], emqx_ft_test_helpers:env_handler(Config)
),
{ok, _} = emqx:update_config([rpc, port_discovery], manual),
Config.
@@ -38,16 +38,6 @@ end_per_suite(_Config) ->
ok = emqx_mgmt_api_test_util:end_suite([emqx_ft, emqx_conf]),
ok.
-set_special_configs(Config) ->
- fun
- (emqx_ft) ->
- emqx_ft_test_helpers:load_config(#{
- storage => emqx_ft_test_helpers:local_storage(Config)
- });
- (_) ->
- ok
- end.
-
init_per_testcase(Case, Config) ->
[{tc, Case} | Config].
end_per_testcase(_Case, _Config) ->
diff --git a/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl b/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl
index 89b0e895d..106c34702 100644
--- a/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl
+++ b/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl
@@ -26,22 +26,21 @@
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
- _ = emqx_config:save_schema_mod_and_names(emqx_ft_schema),
- ok = emqx_common_test_helpers:start_apps(
- [emqx_conf, emqx_ft], emqx_ft_test_helpers:env_handler(Config)
- ),
- {ok, _} = emqx:update_config([rpc, port_discovery], manual),
Config.
end_per_suite(_Config) ->
- ok = emqx_common_test_helpers:stop_apps([emqx_ft, emqx_conf]),
ok.
init_per_testcase(_Case, Config) ->
+ % NOTE: running each testcase with clean config
+ _ = emqx_config:save_schema_mod_and_names(emqx_ft_schema),
+ ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_ft], fun(_) -> ok end),
+ {ok, _} = emqx:update_config([rpc, port_discovery], manual),
Config.
end_per_testcase(_Case, _Config) ->
- ok.
+ ok = emqx_common_test_helpers:stop_apps([emqx_ft, emqx_conf]),
+ ok = emqx_config:erase(file_transfer).
%%--------------------------------------------------------------------
%% Tests
@@ -61,6 +60,7 @@ t_update_config(_Config) ->
emqx_conf:update(
[file_transfer],
#{
+ <<"enable">> => true,
<<"storage">> => #{
<<"type">> => <<"local">>,
<<"segments">> => #{
@@ -87,10 +87,14 @@ t_update_config(_Config) ->
emqx_ft_conf:gc_interval(emqx_ft_conf:storage())
).
-t_remove_restore_config(Config) ->
+t_disable_restore_config(Config) ->
?assertMatch(
{ok, _},
- emqx_conf:update([file_transfer, storage], #{<<"type">> => <<"local">>}, #{})
+ emqx_conf:update(
+ [file_transfer],
+ #{<<"enable">> => true, <<"storage">> => #{<<"type">> => <<"local">>}},
+ #{}
+ )
),
?assertEqual(
60 * 60 * 1000,
@@ -98,24 +102,29 @@ t_remove_restore_config(Config) ->
),
% Verify that transfers work
ok = emqx_ft_test_helpers:upload_file(gen_clientid(), <<"f1">>, "f1", <>),
+ % Verify that clearing storage settings reverts config to defaults
?assertMatch(
{ok, _},
- emqx_conf:remove([file_transfer, storage], #{})
+ emqx_conf:update(
+ [file_transfer],
+ #{<<"enable">> => false, <<"storage">> => undefined},
+ #{}
+ )
),
?assertEqual(
- undefined,
+ false,
+ emqx_ft_conf:enabled()
+ ),
+ ?assertMatch(
+ #{type := local, exporter := #{type := local}},
emqx_ft_conf:storage()
),
- ?assertEqual(
- undefined,
- emqx_ft_conf:gc_interval(emqx_ft_conf:storage())
- ),
ClientId = gen_clientid(),
Client = emqx_ft_test_helpers:start_client(ClientId),
% Verify that transfers fail cleanly when storage is disabled
?check_trace(
?assertMatch(
- {ok, #{reason_code_name := unspecified_error}},
+ {ok, #{reason_code_name := no_matching_subscribers}},
emqtt:publish(
Client,
<<"$file/f2/init">>,
@@ -124,23 +133,24 @@ t_remove_restore_config(Config) ->
)
),
fun(Trace) ->
- ?assertMatch(
- [#{transfer := {ClientId, <<"f2">>}, reason := {error, disabled}}],
- ?of_kind("store_filemeta_failed", Trace)
- )
+ ?assertMatch([], ?of_kind("file_transfer_init", Trace))
end
),
+ ok = emqtt:stop(Client),
% Restore local storage backend
Root = iolist_to_binary(emqx_ft_test_helpers:root(Config, node(), [segments])),
?assertMatch(
{ok, _},
emqx_conf:update(
- [file_transfer, storage],
+ [file_transfer],
#{
- <<"type">> => <<"local">>,
- <<"segments">> => #{
- <<"root">> => Root,
- <<"gc">> => #{<<"interval">> => <<"1s">>}
+ <<"enable">> => true,
+ <<"storage">> => #{
+ <<"type">> => <<"local">>,
+ <<"segments">> => #{
+ <<"root">> => Root,
+ <<"gc">> => #{<<"interval">> => <<"1s">>}
+ }
}
},
#{}
@@ -167,5 +177,62 @@ t_remove_restore_config(Config) ->
% Verify that transfers work again
ok = emqx_ft_test_helpers:upload_file(gen_clientid(), <<"f1">>, "f1", <>).
+t_switch_exporter(_Config) ->
+ ?assertMatch(
+ {ok, _},
+ emqx_conf:update(
+ [file_transfer],
+ #{<<"enable">> => true},
+ #{}
+ )
+ ),
+ ?assertMatch(
+ #{type := local, exporter := #{type := local}},
+ emqx_ft_conf:storage()
+ ),
+ % Verify that switching to a different exporter works
+ ?assertMatch(
+ {ok, _},
+ emqx_conf:update(
+ [file_transfer, storage, exporter],
+ #{
+ <<"type">> => <<"s3">>,
+ <<"bucket">> => <<"emqx">>,
+ <<"host">> => <<"https://localhost">>,
+ <<"port">> => 9000,
+ <<"transport_options">> => #{
+ <<"ipv6_probe">> => false
+ }
+ },
+ #{}
+ )
+ ),
+ ?assertMatch(
+ #{type := local, exporter := #{type := s3}},
+ emqx_ft_conf:storage()
+ ),
+ % Verify that switching back to local exporter works
+ ?assertMatch(
+ {ok, _},
+ emqx_conf:remove(
+ [file_transfer, storage, exporter],
+ #{}
+ )
+ ),
+ ?assertMatch(
+ {ok, _},
+ emqx_conf:update(
+ [file_transfer, storage, exporter],
+ #{<<"type">> => <<"local">>},
+ #{}
+ )
+ ),
+ ?assertMatch(
+ #{type := local, exporter := #{type := local}},
+ emqx_ft_conf:storage()
+ ),
+ % Verify that transfers work
+ ok = emqx_ft_test_helpers:upload_file(gen_clientid(), <<"f1">>, "f1", <>).
+
gen_clientid() ->
emqx_base62:encode(emqx_guid:gen()).
diff --git a/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl b/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl
index e3decf0f5..d4c13f7d1 100644
--- a/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl
+++ b/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl
@@ -35,22 +35,12 @@ groups() ->
].
init_per_suite(Config) ->
- ok = emqx_common_test_helpers:start_apps([emqx_ft], set_special_configs(Config)),
+ ok = emqx_common_test_helpers:start_apps([emqx_ft], emqx_ft_test_helpers:env_handler(Config)),
Config.
end_per_suite(_Config) ->
ok = emqx_common_test_helpers:stop_apps([emqx_ft]),
ok.
-set_special_configs(Config) ->
- fun
- (emqx_ft) ->
- emqx_ft_test_helpers:load_config(#{
- storage => emqx_ft_test_helpers:local_storage(Config)
- });
- (_) ->
- ok
- end.
-
init_per_testcase(Case, Config) ->
[{tc, Case} | Config].
end_per_testcase(_Case, _Config) ->
diff --git a/apps/emqx_ft/test/emqx_ft_test_helpers.erl b/apps/emqx_ft/test/emqx_ft_test_helpers.erl
index 11ddf191b..89e349fae 100644
--- a/apps/emqx_ft/test/emqx_ft_test_helpers.erl
+++ b/apps/emqx_ft/test/emqx_ft_test_helpers.erl
@@ -41,7 +41,7 @@ stop_additional_node(Node) ->
env_handler(Config) ->
fun
(emqx_ft) ->
- load_config(#{storage => local_storage(Config)});
+ load_config(#{enable => true, storage => local_storage(Config)});
(_) ->
ok
end.
diff --git a/rel/i18n/emqx_ft_schema.hocon b/rel/i18n/emqx_ft_schema.hocon
index 28c93e1ef..e7e551289 100644
--- a/rel/i18n/emqx_ft_schema.hocon
+++ b/rel/i18n/emqx_ft_schema.hocon
@@ -1,5 +1,11 @@
emqx_ft_schema {
+enable.desc:
+"""Enable the File Transfer feature.
+Enabling File Transfer implies reserving special MQTT topics in order to serve the protocol.
+This toggle does not have an effect neither on the availability of the File Transfer REST API, nor
+on storage-dependent background activities (e.g. garbage collection)."""
+
init_timeout.desc:
"""Timeout for initializing the file transfer.
After reaching the timeout, `init` message will be acked with an error"""