feat(ft-conf): provide global killswitch

This commit is contained in:
Andrew Mayorov 2023-04-25 19:16:14 +03:00
parent a420c92d28
commit 811e449357
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
10 changed files with 148 additions and 69 deletions

View File

@ -198,8 +198,7 @@ on_file_command(PacketId, FileId, Msg, FileCommand) ->
end. end.
on_init(PacketId, Msg, Transfer, Meta) -> on_init(PacketId, Msg, Transfer, Meta) ->
?SLOG(info, #{ ?tp(info, "file_transfer_init", #{
msg => "on_init",
mqtt_msg => Msg, mqtt_msg => Msg,
packet_id => PacketId, packet_id => PacketId,
transfer => Transfer, transfer => Transfer,
@ -229,8 +228,7 @@ on_abort(_Msg, _FileId) ->
?RC_SUCCESS. ?RC_SUCCESS.
on_segment(PacketId, Msg, Transfer, Offset, Checksum) -> on_segment(PacketId, Msg, Transfer, Offset, Checksum) ->
?SLOG(info, #{ ?tp(info, "file_transfer_segment", #{
msg => "on_segment",
mqtt_msg => Msg, mqtt_msg => Msg,
packet_id => PacketId, packet_id => PacketId,
transfer => Transfer, transfer => Transfer,
@ -255,8 +253,7 @@ on_segment(PacketId, Msg, Transfer, Offset, Checksum) ->
end). end).
on_fin(PacketId, Msg, Transfer, FinalSize, Checksum) -> on_fin(PacketId, Msg, Transfer, FinalSize, Checksum) ->
?SLOG(info, #{ ?tp(info, "file_transfer_fin", #{
msg => "on_fin",
mqtt_msg => Msg, mqtt_msg => Msg,
packet_id => PacketId, packet_id => PacketId,
transfer => Transfer, transfer => Transfer,

View File

@ -22,11 +22,9 @@
start(_StartType, _StartArgs) -> start(_StartType, _StartArgs) ->
{ok, Sup} = emqx_ft_sup:start_link(), {ok, Sup} = emqx_ft_sup:start_link(),
ok = emqx_ft:hook(),
ok = emqx_ft_conf:load(), ok = emqx_ft_conf:load(),
{ok, Sup}. {ok, Sup}.
stop(_State) -> stop(_State) ->
ok = emqx_ft_conf:unload(), ok = emqx_ft_conf:unload(),
ok = emqx_ft:unhook(),
ok. ok.

View File

@ -23,6 +23,7 @@
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
%% Accessors %% Accessors
-export([enabled/0]).
-export([storage/0]). -export([storage/0]).
-export([gc_interval/1]). -export([gc_interval/1]).
-export([segments_ttl/1]). -export([segments_ttl/1]).
@ -49,6 +50,10 @@
%% Accessors %% Accessors
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec enabled() -> boolean().
enabled() ->
emqx_config:get([file_transfer, enable], false).
-spec storage() -> _Storage. -spec storage() -> _Storage.
storage() -> storage() ->
emqx_config:get([file_transfer, storage], undefined). emqx_config:get([file_transfer, storage], undefined).
@ -83,7 +88,7 @@ store_segment_timeout() ->
-spec load() -> ok. -spec load() -> ok.
load() -> load() ->
ok = emqx_ft_storage:on_config_update(undefined, storage()), ok = on_config_update(#{}, emqx_config:get([file_transfer], #{})),
emqx_conf:add_handler([file_transfer], ?MODULE). emqx_conf:add_handler([file_transfer], ?MODULE).
-spec unload() -> ok. -spec unload() -> ok.
@ -107,7 +112,26 @@ pre_config_update(_, Req, _Config) ->
emqx_config:app_envs() emqx_config:app_envs()
) -> ) ->
ok | {ok, Result :: any()} | {error, Reason :: term()}. ok | {ok, Result :: any()} | {error, Reason :: term()}.
post_config_update(_Path, _Req, NewConfig, OldConfig, _AppEnvs) -> post_config_update([file_transfer | _], _Req, NewConfig, OldConfig, _AppEnvs) ->
OldStorageConfig = maps:get(storage, OldConfig, undefined), on_config_update(OldConfig, NewConfig).
NewStorageConfig = maps:get(storage, NewConfig, undefined),
emqx_ft_storage:on_config_update(OldStorageConfig, NewStorageConfig). on_config_update(OldConfig, NewConfig) ->
lists:foreach(
fun(ConfKey) ->
on_config_update(
ConfKey,
maps:get(ConfKey, OldConfig, undefined),
maps:get(ConfKey, NewConfig, undefined)
)
end,
[storage, enable]
).
on_config_update(_, Config, Config) ->
ok;
on_config_update(storage, OldConfig, NewConfig) ->
ok = emqx_ft_storage:on_config_update(OldConfig, NewConfig);
on_config_update(enable, _, true) ->
ok = emqx_ft:hook();
on_config_update(enable, _, false) ->
ok = emqx_ft:unhook().

View File

@ -55,6 +55,15 @@ roots() -> [file_transfer].
fields(file_transfer) -> fields(file_transfer) ->
[ [
{enable,
mk(
boolean(),
#{
desc => ?DESC("enable"),
required => false,
default => false
}
)},
{init_timeout, {init_timeout,
mk( mk(
emqx_schema:duration_ms(), emqx_schema:duration_ms(),
@ -87,22 +96,19 @@ fields(file_transfer) ->
hoconsc:union( hoconsc:union(
fun fun
(all_union_members) -> (all_union_members) ->
[ [ref(local_storage)];
% NOTE: by default storage is disabled
undefined,
ref(local_storage)
];
({value, #{<<"type">> := <<"local">>}}) -> ({value, #{<<"type">> := <<"local">>}}) ->
[ref(local_storage)]; [ref(local_storage)];
({value, #{<<"type">> := _}}) -> ({value, #{<<"type">> := _}}) ->
throw(#{field_name => type, expected => "local"}); throw(#{field_name => type, expected => "local"});
(_) -> ({value, _}) ->
[undefined] [ref(local_storage)]
end end
), ),
#{ #{
required => false, required => false,
desc => ?DESC("storage") desc => ?DESC("storage"),
default => #{<<"type">> => <<"local">>}
} }
)} )}
]; ];

View File

@ -63,6 +63,7 @@ set_special_configs(Config) ->
% NOTE % NOTE
% Inhibit local fs GC to simulate it isn't fast enough to collect % Inhibit local fs GC to simulate it isn't fast enough to collect
% complete transfers. % complete transfers.
enable => true,
storage => emqx_utils_maps:deep_merge( storage => emqx_utils_maps:deep_merge(
Storage, Storage,
#{segments => #{gc => #{interval => 0}}} #{segments => #{gc => #{interval => 0}}}

View File

@ -30,7 +30,7 @@ all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
ok = emqx_mgmt_api_test_util:init_suite( ok = emqx_mgmt_api_test_util:init_suite(
[emqx_conf, emqx_ft], set_special_configs(Config) [emqx_conf, emqx_ft], emqx_ft_test_helpers:env_handler(Config)
), ),
{ok, _} = emqx:update_config([rpc, port_discovery], manual), {ok, _} = emqx:update_config([rpc, port_discovery], manual),
Config. Config.
@ -38,16 +38,6 @@ end_per_suite(_Config) ->
ok = emqx_mgmt_api_test_util:end_suite([emqx_ft, emqx_conf]), ok = emqx_mgmt_api_test_util:end_suite([emqx_ft, emqx_conf]),
ok. ok.
set_special_configs(Config) ->
fun
(emqx_ft) ->
emqx_ft_test_helpers:load_config(#{
storage => emqx_ft_test_helpers:local_storage(Config)
});
(_) ->
ok
end.
init_per_testcase(Case, Config) -> init_per_testcase(Case, Config) ->
[{tc, Case} | Config]. [{tc, Case} | Config].
end_per_testcase(_Case, _Config) -> end_per_testcase(_Case, _Config) ->

View File

@ -26,22 +26,21 @@
all() -> emqx_common_test_helpers:all(?MODULE). all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
_ = emqx_config:save_schema_mod_and_names(emqx_ft_schema),
ok = emqx_common_test_helpers:start_apps(
[emqx_conf, emqx_ft], emqx_ft_test_helpers:env_handler(Config)
),
{ok, _} = emqx:update_config([rpc, port_discovery], manual),
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
ok = emqx_common_test_helpers:stop_apps([emqx_ft, emqx_conf]),
ok. ok.
init_per_testcase(_Case, Config) -> init_per_testcase(_Case, Config) ->
% NOTE: running each testcase with clean config
_ = emqx_config:save_schema_mod_and_names(emqx_ft_schema),
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_ft], fun(_) -> ok end),
{ok, _} = emqx:update_config([rpc, port_discovery], manual),
Config. Config.
end_per_testcase(_Case, _Config) -> end_per_testcase(_Case, _Config) ->
ok. ok = emqx_common_test_helpers:stop_apps([emqx_ft, emqx_conf]),
ok = emqx_config:erase(file_transfer).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Tests %% Tests
@ -61,6 +60,7 @@ t_update_config(_Config) ->
emqx_conf:update( emqx_conf:update(
[file_transfer], [file_transfer],
#{ #{
<<"enable">> => true,
<<"storage">> => #{ <<"storage">> => #{
<<"type">> => <<"local">>, <<"type">> => <<"local">>,
<<"segments">> => #{ <<"segments">> => #{
@ -87,10 +87,14 @@ t_update_config(_Config) ->
emqx_ft_conf:gc_interval(emqx_ft_conf:storage()) emqx_ft_conf:gc_interval(emqx_ft_conf:storage())
). ).
t_remove_restore_config(Config) -> t_disable_restore_config(Config) ->
?assertMatch( ?assertMatch(
{ok, _}, {ok, _},
emqx_conf:update([file_transfer, storage], #{<<"type">> => <<"local">>}, #{}) emqx_conf:update(
[file_transfer],
#{<<"enable">> => true, <<"storage">> => #{<<"type">> => <<"local">>}},
#{}
)
), ),
?assertEqual( ?assertEqual(
60 * 60 * 1000, 60 * 60 * 1000,
@ -98,24 +102,29 @@ t_remove_restore_config(Config) ->
), ),
% Verify that transfers work % Verify that transfers work
ok = emqx_ft_test_helpers:upload_file(gen_clientid(), <<"f1">>, "f1", <<?MODULE_STRING>>), ok = emqx_ft_test_helpers:upload_file(gen_clientid(), <<"f1">>, "f1", <<?MODULE_STRING>>),
% Verify that clearing storage settings reverts config to defaults
?assertMatch( ?assertMatch(
{ok, _}, {ok, _},
emqx_conf:remove([file_transfer, storage], #{}) emqx_conf:update(
[file_transfer],
#{<<"enable">> => false, <<"storage">> => undefined},
#{}
)
), ),
?assertEqual( ?assertEqual(
undefined, false,
emqx_ft_conf:enabled()
),
?assertMatch(
#{type := local, exporter := #{type := local}},
emqx_ft_conf:storage() emqx_ft_conf:storage()
), ),
?assertEqual(
undefined,
emqx_ft_conf:gc_interval(emqx_ft_conf:storage())
),
ClientId = gen_clientid(), ClientId = gen_clientid(),
Client = emqx_ft_test_helpers:start_client(ClientId), Client = emqx_ft_test_helpers:start_client(ClientId),
% Verify that transfers fail cleanly when storage is disabled % Verify that transfers fail cleanly when storage is disabled
?check_trace( ?check_trace(
?assertMatch( ?assertMatch(
{ok, #{reason_code_name := unspecified_error}}, {ok, #{reason_code_name := no_matching_subscribers}},
emqtt:publish( emqtt:publish(
Client, Client,
<<"$file/f2/init">>, <<"$file/f2/init">>,
@ -124,24 +133,25 @@ t_remove_restore_config(Config) ->
) )
), ),
fun(Trace) -> fun(Trace) ->
?assertMatch( ?assertMatch([], ?of_kind("file_transfer_init", Trace))
[#{transfer := {ClientId, <<"f2">>}, reason := {error, disabled}}],
?of_kind("store_filemeta_failed", Trace)
)
end end
), ),
ok = emqtt:stop(Client),
% Restore local storage backend % Restore local storage backend
Root = iolist_to_binary(emqx_ft_test_helpers:root(Config, node(), [segments])), Root = iolist_to_binary(emqx_ft_test_helpers:root(Config, node(), [segments])),
?assertMatch( ?assertMatch(
{ok, _}, {ok, _},
emqx_conf:update( emqx_conf:update(
[file_transfer, storage], [file_transfer],
#{ #{
<<"enable">> => true,
<<"storage">> => #{
<<"type">> => <<"local">>, <<"type">> => <<"local">>,
<<"segments">> => #{ <<"segments">> => #{
<<"root">> => Root, <<"root">> => Root,
<<"gc">> => #{<<"interval">> => <<"1s">>} <<"gc">> => #{<<"interval">> => <<"1s">>}
} }
}
}, },
#{} #{}
) )
@ -167,5 +177,62 @@ t_remove_restore_config(Config) ->
% Verify that transfers work again % Verify that transfers work again
ok = emqx_ft_test_helpers:upload_file(gen_clientid(), <<"f1">>, "f1", <<?MODULE_STRING>>). ok = emqx_ft_test_helpers:upload_file(gen_clientid(), <<"f1">>, "f1", <<?MODULE_STRING>>).
t_switch_exporter(_Config) ->
?assertMatch(
{ok, _},
emqx_conf:update(
[file_transfer],
#{<<"enable">> => true},
#{}
)
),
?assertMatch(
#{type := local, exporter := #{type := local}},
emqx_ft_conf:storage()
),
% Verify that switching to a different exporter works
?assertMatch(
{ok, _},
emqx_conf:update(
[file_transfer, storage, exporter],
#{
<<"type">> => <<"s3">>,
<<"bucket">> => <<"emqx">>,
<<"host">> => <<"https://localhost">>,
<<"port">> => 9000,
<<"transport_options">> => #{
<<"ipv6_probe">> => false
}
},
#{}
)
),
?assertMatch(
#{type := local, exporter := #{type := s3}},
emqx_ft_conf:storage()
),
% Verify that switching back to local exporter works
?assertMatch(
{ok, _},
emqx_conf:remove(
[file_transfer, storage, exporter],
#{}
)
),
?assertMatch(
{ok, _},
emqx_conf:update(
[file_transfer, storage, exporter],
#{<<"type">> => <<"local">>},
#{}
)
),
?assertMatch(
#{type := local, exporter := #{type := local}},
emqx_ft_conf:storage()
),
% Verify that transfers work
ok = emqx_ft_test_helpers:upload_file(gen_clientid(), <<"f1">>, "f1", <<?MODULE_STRING>>).
gen_clientid() -> gen_clientid() ->
emqx_base62:encode(emqx_guid:gen()). emqx_base62:encode(emqx_guid:gen()).

View File

@ -35,22 +35,12 @@ groups() ->
]. ].
init_per_suite(Config) -> init_per_suite(Config) ->
ok = emqx_common_test_helpers:start_apps([emqx_ft], set_special_configs(Config)), ok = emqx_common_test_helpers:start_apps([emqx_ft], emqx_ft_test_helpers:env_handler(Config)),
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
ok = emqx_common_test_helpers:stop_apps([emqx_ft]), ok = emqx_common_test_helpers:stop_apps([emqx_ft]),
ok. ok.
set_special_configs(Config) ->
fun
(emqx_ft) ->
emqx_ft_test_helpers:load_config(#{
storage => emqx_ft_test_helpers:local_storage(Config)
});
(_) ->
ok
end.
init_per_testcase(Case, Config) -> init_per_testcase(Case, Config) ->
[{tc, Case} | Config]. [{tc, Case} | Config].
end_per_testcase(_Case, _Config) -> end_per_testcase(_Case, _Config) ->

View File

@ -41,7 +41,7 @@ stop_additional_node(Node) ->
env_handler(Config) -> env_handler(Config) ->
fun fun
(emqx_ft) -> (emqx_ft) ->
load_config(#{storage => local_storage(Config)}); load_config(#{enable => true, storage => local_storage(Config)});
(_) -> (_) ->
ok ok
end. end.

View File

@ -1,5 +1,11 @@
emqx_ft_schema { emqx_ft_schema {
enable.desc:
"""Enable the File Transfer feature.<br/>
Enabling File Transfer implies reserving special MQTT topics in order to serve the protocol.<br/>
This toggle does not have an effect neither on the availability of the File Transfer REST API, nor
on storage-dependent background activities (e.g. garbage collection)."""
init_timeout.desc: init_timeout.desc:
"""Timeout for initializing the file transfer.<br/> """Timeout for initializing the file transfer.<br/>
After reaching the timeout, `init` message will be acked with an error""" After reaching the timeout, `init` message will be acked with an error"""