From 811e449357dda9d2b874459c29c5d86e8c2997da Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 25 Apr 2023 19:16:14 +0300 Subject: [PATCH] feat(ft-conf): provide global killswitch --- apps/emqx_ft/src/emqx_ft.erl | 9 +- apps/emqx_ft/src/emqx_ft_app.erl | 2 - apps/emqx_ft/src/emqx_ft_conf.erl | 34 ++++- apps/emqx_ft/src/emqx_ft_schema.erl | 22 ++-- apps/emqx_ft/test/emqx_ft_SUITE.erl | 1 + apps/emqx_ft/test/emqx_ft_api_SUITE.erl | 12 +- apps/emqx_ft/test/emqx_ft_conf_SUITE.erl | 117 ++++++++++++++---- .../emqx_ft/test/emqx_ft_storage_fs_SUITE.erl | 12 +- apps/emqx_ft/test/emqx_ft_test_helpers.erl | 2 +- rel/i18n/emqx_ft_schema.hocon | 6 + 10 files changed, 148 insertions(+), 69 deletions(-) diff --git a/apps/emqx_ft/src/emqx_ft.erl b/apps/emqx_ft/src/emqx_ft.erl index d500a6344..42611e537 100644 --- a/apps/emqx_ft/src/emqx_ft.erl +++ b/apps/emqx_ft/src/emqx_ft.erl @@ -198,8 +198,7 @@ on_file_command(PacketId, FileId, Msg, FileCommand) -> end. on_init(PacketId, Msg, Transfer, Meta) -> - ?SLOG(info, #{ - msg => "on_init", + ?tp(info, "file_transfer_init", #{ mqtt_msg => Msg, packet_id => PacketId, transfer => Transfer, @@ -229,8 +228,7 @@ on_abort(_Msg, _FileId) -> ?RC_SUCCESS. on_segment(PacketId, Msg, Transfer, Offset, Checksum) -> - ?SLOG(info, #{ - msg => "on_segment", + ?tp(info, "file_transfer_segment", #{ mqtt_msg => Msg, packet_id => PacketId, transfer => Transfer, @@ -255,8 +253,7 @@ on_segment(PacketId, Msg, Transfer, Offset, Checksum) -> end). on_fin(PacketId, Msg, Transfer, FinalSize, Checksum) -> - ?SLOG(info, #{ - msg => "on_fin", + ?tp(info, "file_transfer_fin", #{ mqtt_msg => Msg, packet_id => PacketId, transfer => Transfer, diff --git a/apps/emqx_ft/src/emqx_ft_app.erl b/apps/emqx_ft/src/emqx_ft_app.erl index 9b1513b46..0bac6b592 100644 --- a/apps/emqx_ft/src/emqx_ft_app.erl +++ b/apps/emqx_ft/src/emqx_ft_app.erl @@ -22,11 +22,9 @@ start(_StartType, _StartArgs) -> {ok, Sup} = emqx_ft_sup:start_link(), - ok = emqx_ft:hook(), ok = emqx_ft_conf:load(), {ok, Sup}. stop(_State) -> ok = emqx_ft_conf:unload(), - ok = emqx_ft:unhook(), ok. diff --git a/apps/emqx_ft/src/emqx_ft_conf.erl b/apps/emqx_ft/src/emqx_ft_conf.erl index 1e531ecdb..90b59c8d1 100644 --- a/apps/emqx_ft/src/emqx_ft_conf.erl +++ b/apps/emqx_ft/src/emqx_ft_conf.erl @@ -23,6 +23,7 @@ -include_lib("emqx/include/logger.hrl"). %% Accessors +-export([enabled/0]). -export([storage/0]). -export([gc_interval/1]). -export([segments_ttl/1]). @@ -49,6 +50,10 @@ %% Accessors %%-------------------------------------------------------------------- +-spec enabled() -> boolean(). +enabled() -> + emqx_config:get([file_transfer, enable], false). + -spec storage() -> _Storage. storage() -> emqx_config:get([file_transfer, storage], undefined). @@ -83,7 +88,7 @@ store_segment_timeout() -> -spec load() -> ok. load() -> - ok = emqx_ft_storage:on_config_update(undefined, storage()), + ok = on_config_update(#{}, emqx_config:get([file_transfer], #{})), emqx_conf:add_handler([file_transfer], ?MODULE). -spec unload() -> ok. @@ -107,7 +112,26 @@ pre_config_update(_, Req, _Config) -> emqx_config:app_envs() ) -> ok | {ok, Result :: any()} | {error, Reason :: term()}. -post_config_update(_Path, _Req, NewConfig, OldConfig, _AppEnvs) -> - OldStorageConfig = maps:get(storage, OldConfig, undefined), - NewStorageConfig = maps:get(storage, NewConfig, undefined), - emqx_ft_storage:on_config_update(OldStorageConfig, NewStorageConfig). +post_config_update([file_transfer | _], _Req, NewConfig, OldConfig, _AppEnvs) -> + on_config_update(OldConfig, NewConfig). + +on_config_update(OldConfig, NewConfig) -> + lists:foreach( + fun(ConfKey) -> + on_config_update( + ConfKey, + maps:get(ConfKey, OldConfig, undefined), + maps:get(ConfKey, NewConfig, undefined) + ) + end, + [storage, enable] + ). + +on_config_update(_, Config, Config) -> + ok; +on_config_update(storage, OldConfig, NewConfig) -> + ok = emqx_ft_storage:on_config_update(OldConfig, NewConfig); +on_config_update(enable, _, true) -> + ok = emqx_ft:hook(); +on_config_update(enable, _, false) -> + ok = emqx_ft:unhook(). diff --git a/apps/emqx_ft/src/emqx_ft_schema.erl b/apps/emqx_ft/src/emqx_ft_schema.erl index 27c593b6c..e2eebbbb8 100644 --- a/apps/emqx_ft/src/emqx_ft_schema.erl +++ b/apps/emqx_ft/src/emqx_ft_schema.erl @@ -55,6 +55,15 @@ roots() -> [file_transfer]. fields(file_transfer) -> [ + {enable, + mk( + boolean(), + #{ + desc => ?DESC("enable"), + required => false, + default => false + } + )}, {init_timeout, mk( emqx_schema:duration_ms(), @@ -87,22 +96,19 @@ fields(file_transfer) -> hoconsc:union( fun (all_union_members) -> - [ - % NOTE: by default storage is disabled - undefined, - ref(local_storage) - ]; + [ref(local_storage)]; ({value, #{<<"type">> := <<"local">>}}) -> [ref(local_storage)]; ({value, #{<<"type">> := _}}) -> throw(#{field_name => type, expected => "local"}); - (_) -> - [undefined] + ({value, _}) -> + [ref(local_storage)] end ), #{ required => false, - desc => ?DESC("storage") + desc => ?DESC("storage"), + default => #{<<"type">> => <<"local">>} } )} ]; diff --git a/apps/emqx_ft/test/emqx_ft_SUITE.erl b/apps/emqx_ft/test/emqx_ft_SUITE.erl index 151b8e5fe..d3a3aee21 100644 --- a/apps/emqx_ft/test/emqx_ft_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_SUITE.erl @@ -63,6 +63,7 @@ set_special_configs(Config) -> % NOTE % Inhibit local fs GC to simulate it isn't fast enough to collect % complete transfers. + enable => true, storage => emqx_utils_maps:deep_merge( Storage, #{segments => #{gc => #{interval => 0}}} diff --git a/apps/emqx_ft/test/emqx_ft_api_SUITE.erl b/apps/emqx_ft/test/emqx_ft_api_SUITE.erl index 5f3b213fb..523026d5a 100644 --- a/apps/emqx_ft/test/emqx_ft_api_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_api_SUITE.erl @@ -30,7 +30,7 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> ok = emqx_mgmt_api_test_util:init_suite( - [emqx_conf, emqx_ft], set_special_configs(Config) + [emqx_conf, emqx_ft], emqx_ft_test_helpers:env_handler(Config) ), {ok, _} = emqx:update_config([rpc, port_discovery], manual), Config. @@ -38,16 +38,6 @@ end_per_suite(_Config) -> ok = emqx_mgmt_api_test_util:end_suite([emqx_ft, emqx_conf]), ok. -set_special_configs(Config) -> - fun - (emqx_ft) -> - emqx_ft_test_helpers:load_config(#{ - storage => emqx_ft_test_helpers:local_storage(Config) - }); - (_) -> - ok - end. - init_per_testcase(Case, Config) -> [{tc, Case} | Config]. end_per_testcase(_Case, _Config) -> diff --git a/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl b/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl index 89b0e895d..106c34702 100644 --- a/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl @@ -26,22 +26,21 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - _ = emqx_config:save_schema_mod_and_names(emqx_ft_schema), - ok = emqx_common_test_helpers:start_apps( - [emqx_conf, emqx_ft], emqx_ft_test_helpers:env_handler(Config) - ), - {ok, _} = emqx:update_config([rpc, port_discovery], manual), Config. end_per_suite(_Config) -> - ok = emqx_common_test_helpers:stop_apps([emqx_ft, emqx_conf]), ok. init_per_testcase(_Case, Config) -> + % NOTE: running each testcase with clean config + _ = emqx_config:save_schema_mod_and_names(emqx_ft_schema), + ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_ft], fun(_) -> ok end), + {ok, _} = emqx:update_config([rpc, port_discovery], manual), Config. end_per_testcase(_Case, _Config) -> - ok. + ok = emqx_common_test_helpers:stop_apps([emqx_ft, emqx_conf]), + ok = emqx_config:erase(file_transfer). %%-------------------------------------------------------------------- %% Tests @@ -61,6 +60,7 @@ t_update_config(_Config) -> emqx_conf:update( [file_transfer], #{ + <<"enable">> => true, <<"storage">> => #{ <<"type">> => <<"local">>, <<"segments">> => #{ @@ -87,10 +87,14 @@ t_update_config(_Config) -> emqx_ft_conf:gc_interval(emqx_ft_conf:storage()) ). -t_remove_restore_config(Config) -> +t_disable_restore_config(Config) -> ?assertMatch( {ok, _}, - emqx_conf:update([file_transfer, storage], #{<<"type">> => <<"local">>}, #{}) + emqx_conf:update( + [file_transfer], + #{<<"enable">> => true, <<"storage">> => #{<<"type">> => <<"local">>}}, + #{} + ) ), ?assertEqual( 60 * 60 * 1000, @@ -98,24 +102,29 @@ t_remove_restore_config(Config) -> ), % Verify that transfers work ok = emqx_ft_test_helpers:upload_file(gen_clientid(), <<"f1">>, "f1", <>), + % Verify that clearing storage settings reverts config to defaults ?assertMatch( {ok, _}, - emqx_conf:remove([file_transfer, storage], #{}) + emqx_conf:update( + [file_transfer], + #{<<"enable">> => false, <<"storage">> => undefined}, + #{} + ) ), ?assertEqual( - undefined, + false, + emqx_ft_conf:enabled() + ), + ?assertMatch( + #{type := local, exporter := #{type := local}}, emqx_ft_conf:storage() ), - ?assertEqual( - undefined, - emqx_ft_conf:gc_interval(emqx_ft_conf:storage()) - ), ClientId = gen_clientid(), Client = emqx_ft_test_helpers:start_client(ClientId), % Verify that transfers fail cleanly when storage is disabled ?check_trace( ?assertMatch( - {ok, #{reason_code_name := unspecified_error}}, + {ok, #{reason_code_name := no_matching_subscribers}}, emqtt:publish( Client, <<"$file/f2/init">>, @@ -124,23 +133,24 @@ t_remove_restore_config(Config) -> ) ), fun(Trace) -> - ?assertMatch( - [#{transfer := {ClientId, <<"f2">>}, reason := {error, disabled}}], - ?of_kind("store_filemeta_failed", Trace) - ) + ?assertMatch([], ?of_kind("file_transfer_init", Trace)) end ), + ok = emqtt:stop(Client), % Restore local storage backend Root = iolist_to_binary(emqx_ft_test_helpers:root(Config, node(), [segments])), ?assertMatch( {ok, _}, emqx_conf:update( - [file_transfer, storage], + [file_transfer], #{ - <<"type">> => <<"local">>, - <<"segments">> => #{ - <<"root">> => Root, - <<"gc">> => #{<<"interval">> => <<"1s">>} + <<"enable">> => true, + <<"storage">> => #{ + <<"type">> => <<"local">>, + <<"segments">> => #{ + <<"root">> => Root, + <<"gc">> => #{<<"interval">> => <<"1s">>} + } } }, #{} @@ -167,5 +177,62 @@ t_remove_restore_config(Config) -> % Verify that transfers work again ok = emqx_ft_test_helpers:upload_file(gen_clientid(), <<"f1">>, "f1", <>). +t_switch_exporter(_Config) -> + ?assertMatch( + {ok, _}, + emqx_conf:update( + [file_transfer], + #{<<"enable">> => true}, + #{} + ) + ), + ?assertMatch( + #{type := local, exporter := #{type := local}}, + emqx_ft_conf:storage() + ), + % Verify that switching to a different exporter works + ?assertMatch( + {ok, _}, + emqx_conf:update( + [file_transfer, storage, exporter], + #{ + <<"type">> => <<"s3">>, + <<"bucket">> => <<"emqx">>, + <<"host">> => <<"https://localhost">>, + <<"port">> => 9000, + <<"transport_options">> => #{ + <<"ipv6_probe">> => false + } + }, + #{} + ) + ), + ?assertMatch( + #{type := local, exporter := #{type := s3}}, + emqx_ft_conf:storage() + ), + % Verify that switching back to local exporter works + ?assertMatch( + {ok, _}, + emqx_conf:remove( + [file_transfer, storage, exporter], + #{} + ) + ), + ?assertMatch( + {ok, _}, + emqx_conf:update( + [file_transfer, storage, exporter], + #{<<"type">> => <<"local">>}, + #{} + ) + ), + ?assertMatch( + #{type := local, exporter := #{type := local}}, + emqx_ft_conf:storage() + ), + % Verify that transfers work + ok = emqx_ft_test_helpers:upload_file(gen_clientid(), <<"f1">>, "f1", <>). + gen_clientid() -> emqx_base62:encode(emqx_guid:gen()). diff --git a/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl b/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl index e3decf0f5..d4c13f7d1 100644 --- a/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl @@ -35,22 +35,12 @@ groups() -> ]. init_per_suite(Config) -> - ok = emqx_common_test_helpers:start_apps([emqx_ft], set_special_configs(Config)), + ok = emqx_common_test_helpers:start_apps([emqx_ft], emqx_ft_test_helpers:env_handler(Config)), Config. end_per_suite(_Config) -> ok = emqx_common_test_helpers:stop_apps([emqx_ft]), ok. -set_special_configs(Config) -> - fun - (emqx_ft) -> - emqx_ft_test_helpers:load_config(#{ - storage => emqx_ft_test_helpers:local_storage(Config) - }); - (_) -> - ok - end. - init_per_testcase(Case, Config) -> [{tc, Case} | Config]. end_per_testcase(_Case, _Config) -> diff --git a/apps/emqx_ft/test/emqx_ft_test_helpers.erl b/apps/emqx_ft/test/emqx_ft_test_helpers.erl index 11ddf191b..89e349fae 100644 --- a/apps/emqx_ft/test/emqx_ft_test_helpers.erl +++ b/apps/emqx_ft/test/emqx_ft_test_helpers.erl @@ -41,7 +41,7 @@ stop_additional_node(Node) -> env_handler(Config) -> fun (emqx_ft) -> - load_config(#{storage => local_storage(Config)}); + load_config(#{enable => true, storage => local_storage(Config)}); (_) -> ok end. diff --git a/rel/i18n/emqx_ft_schema.hocon b/rel/i18n/emqx_ft_schema.hocon index 28c93e1ef..e7e551289 100644 --- a/rel/i18n/emqx_ft_schema.hocon +++ b/rel/i18n/emqx_ft_schema.hocon @@ -1,5 +1,11 @@ emqx_ft_schema { +enable.desc: +"""Enable the File Transfer feature.
+Enabling File Transfer implies reserving special MQTT topics in order to serve the protocol.
+This toggle does not have an effect neither on the availability of the File Transfer REST API, nor +on storage-dependent background activities (e.g. garbage collection).""" + init_timeout.desc: """Timeout for initializing the file transfer.
After reaching the timeout, `init` message will be acked with an error"""