diff --git a/apps/emqx_ft/src/emqx_ft_conf.erl b/apps/emqx_ft/src/emqx_ft_conf.erl index 61f639271..74f2df27d 100644 --- a/apps/emqx_ft/src/emqx_ft_conf.erl +++ b/apps/emqx_ft/src/emqx_ft_conf.erl @@ -54,24 +54,26 @@ enabled() -> emqx_config:get([file_transfer, enable], false). --spec storage() -> _Storage. +-spec storage() -> emqx_config:config(). storage() -> emqx_config:get([file_transfer, storage]). --spec gc_interval(_Storage) -> emqx_maybe:t(milliseconds()). -gc_interval(Conf = #{type := local}) -> - emqx_utils_maps:deep_get([segments, gc, interval], Conf); -gc_interval(_) -> - undefined. +-spec gc_interval(emqx_ft_storage_fs:storage()) -> + emqx_maybe:t(milliseconds()). +gc_interval(Storage) -> + emqx_utils_maps:deep_get([segments, gc, interval], Storage, undefined). --spec segments_ttl(_Storage) -> emqx_maybe:t({_Min :: seconds(), _Max :: seconds()}). -segments_ttl(Conf = #{type := local}) -> - { - emqx_utils_maps:deep_get([segments, gc, minimum_segments_ttl], Conf), - emqx_utils_maps:deep_get([segments, gc, maximum_segments_ttl], Conf) - }; -segments_ttl(_) -> - undefined. +-spec segments_ttl(emqx_ft_storage_fs:storage()) -> + emqx_maybe:t({_Min :: seconds(), _Max :: seconds()}). +segments_ttl(Storage) -> + Min = emqx_utils_maps:deep_get([segments, gc, minimum_segments_ttl], Storage, undefined), + Max = emqx_utils_maps:deep_get([segments, gc, maximum_segments_ttl], Storage, undefined), + case is_integer(Min) andalso is_integer(Max) of + true -> + {Min, Max}; + false -> + undefined + end. init_timeout() -> emqx_config:get([file_transfer, init_timeout]). diff --git a/apps/emqx_ft/src/emqx_ft_schema.erl b/apps/emqx_ft/src/emqx_ft_schema.erl index e2eebbbb8..26eedb1d6 100644 --- a/apps/emqx_ft/src/emqx_ft_schema.erl +++ b/apps/emqx_ft/src/emqx_ft_schema.erl @@ -93,36 +93,30 @@ fields(file_transfer) -> )}, {storage, mk( - hoconsc:union( - fun - (all_union_members) -> - [ref(local_storage)]; - ({value, #{<<"type">> := <<"local">>}}) -> - [ref(local_storage)]; - ({value, #{<<"type">> := _}}) -> - throw(#{field_name => type, expected => "local"}); - ({value, _}) -> - [ref(local_storage)] - end - ), + ref(storage_backend), #{ + desc => ?DESC("storage_backend"), required => false, - desc => ?DESC("storage"), - default => #{<<"type">> => <<"local">>} + validator => validator(backend), + default => #{ + <<"local">> => #{} + } + } + )} + ]; +fields(storage_backend) -> + [ + {local, + mk( + ref(local_storage), + #{ + desc => ?DESC("local_storage"), + required => {false, recursively} } )} ]; fields(local_storage) -> [ - {type, - mk( - local, - #{ - default => local, - required => false, - desc => ?DESC("local_type") - } - )}, {segments, mk( ref(local_storage_segments), @@ -136,29 +130,13 @@ fields(local_storage) -> )}, {exporter, mk( - hoconsc:union( - fun - (all_union_members) -> - [ - ref(local_storage_exporter), - ref(s3_exporter) - ]; - ({value, #{<<"type">> := <<"local">>}}) -> - [ref(local_storage_exporter)]; - ({value, #{<<"type">> := <<"s3">>}}) -> - [ref(s3_exporter)]; - ({value, #{<<"type">> := _}}) -> - throw(#{field_name => type, expected => "local | s3"}); - ({value, _}) -> - % NOTE: default - [ref(local_storage_exporter)] - end - ), + ref(local_storage_exporter_backend), #{ - desc => ?DESC("local_storage_exporter"), - required => true, + desc => ?DESC("local_storage_exporter_backend"), + required => false, + validator => validator(backend), default => #{ - <<"type">> => <<"local">> + <<"local">> => #{} } } )} @@ -181,17 +159,27 @@ fields(local_storage_segments) -> } )} ]; -fields(local_storage_exporter) -> +fields(local_storage_exporter_backend) -> [ - {type, + {local, mk( - local, + ref(local_storage_exporter), #{ - default => local, - required => false, - desc => ?DESC("local_storage_exporter_type") + desc => ?DESC("local_storage_exporter"), + required => {false, recursively} } )}, + {s3, + mk( + ref(s3_exporter), + #{ + desc => ?DESC("s3_exporter"), + required => {false, recursively} + } + )} + ]; +fields(local_storage_exporter) -> + [ {root, mk( binary(), @@ -202,18 +190,7 @@ fields(local_storage_exporter) -> )} ]; fields(s3_exporter) -> - [ - {type, - mk( - s3, - #{ - default => s3, - required => false, - desc => ?DESC("s3_exporter_type") - } - )} - ] ++ - emqx_s3_schema:fields(s3); + emqx_s3_schema:fields(s3); fields(local_storage_segments_gc) -> [ {interval, @@ -260,6 +237,10 @@ desc(s3_exporter) -> "S3 Exporter settings for the File transfer local storage backend"; desc(local_storage_segments_gc) -> "Garbage collection settings for the File transfer local segments storage"; +desc(local_storage_exporter_backend) -> + "Exporter for the local file system storage backend"; +desc(storage_backend) -> + "Storage backend settings for file transfer"; desc(_) -> undefined. @@ -287,7 +268,16 @@ validator(filename) -> byte_size(Bin) =< ?MAX_FILENAME_BYTELEN orelse {error, max_length_exceeded} end, fun emqx_ft_fs_util:is_filename_safe/1 - ]. + ]; +validator(backend) -> + fun(Config) -> + case maps:keys(Config) of + [_Type] -> + ok; + _Conflicts = [_ | _] -> + {error, multiple_conflicting_backends} + end + end. converter(checksum) -> fun diff --git a/apps/emqx_ft/src/emqx_ft_storage.erl b/apps/emqx_ft/src/emqx_ft_storage.erl index 5ec342585..d35229bfc 100644 --- a/apps/emqx_ft/src/emqx_ft_storage.erl +++ b/apps/emqx_ft/src/emqx_ft_storage.erl @@ -18,8 +18,6 @@ -export( [ - child_spec/0, - store_filemeta/2, store_segment/2, assemble/2, @@ -30,11 +28,17 @@ with_storage_type/2, with_storage_type/3, + backend/0, on_config_update/2 ] ). --type storage() :: emqx_config:config(). +-type type() :: local. +-type backend() :: {type(), storage()}. +-type storage() :: config(). +-type config() :: emqx_config:config(). + +-export_type([backend/0]). -export_type([assemble_callback/0]). @@ -100,31 +104,20 @@ %% API %%-------------------------------------------------------------------- --spec child_spec() -> - [supervisor:child_spec()]. -child_spec() -> - try - Mod = mod(), - Mod:child_spec(storage()) - catch - error:disabled -> []; - error:undef -> [] - end. - -spec store_filemeta(emqx_ft:transfer(), emqx_ft:filemeta()) -> ok | {async, pid()} | {error, term()}. store_filemeta(Transfer, FileMeta) -> - with_storage(store_filemeta, [Transfer, FileMeta]). + dispatch(store_filemeta, [Transfer, FileMeta]). -spec store_segment(emqx_ft:transfer(), emqx_ft:segment()) -> ok | {async, pid()} | {error, term()}. store_segment(Transfer, Segment) -> - with_storage(store_segment, [Transfer, Segment]). + dispatch(store_segment, [Transfer, Segment]). -spec assemble(emqx_ft:transfer(), emqx_ft:bytes()) -> ok | {async, pid()} | {error, term()}. assemble(Transfer, Size) -> - with_storage(assemble, [Transfer, Size]). + dispatch(assemble, [Transfer, Size]). -spec files() -> {ok, page(file_info(), _)} | {error, term()}. @@ -134,20 +127,14 @@ files() -> -spec files(query(Cursor)) -> {ok, page(file_info(), Cursor)} | {error, term()}. files(Query) -> - with_storage(files, [Query]). + dispatch(files, [Query]). --spec with_storage(atom() | function()) -> any(). -with_storage(Fun) -> - with_storage(Fun, []). +-spec dispatch(atom(), list(term())) -> any(). +dispatch(Fun, Args) when is_atom(Fun) -> + {Type, Storage} = backend(), + apply(mod(Type), Fun, [Storage | Args]). --spec with_storage(atom() | function(), list(term())) -> any(). -with_storage(Fun, Args) -> - case storage() of - Storage = #{} -> - apply_storage(Storage, Fun, Args); - undefined -> - {error, disabled} - end. +%% -spec with_storage_type(atom(), atom() | function()) -> any(). with_storage_type(Type, Fun) -> @@ -155,56 +142,54 @@ with_storage_type(Type, Fun) -> -spec with_storage_type(atom(), atom() | function(), list(term())) -> any(). with_storage_type(Type, Fun, Args) -> - with_storage(fun(Storage) -> - case Storage of - #{type := Type} -> - apply_storage(Storage, Fun, Args); - _ -> - {error, {invalid_storage_type, Storage}} - end - end). - -apply_storage(Storage, Fun, Args) when is_atom(Fun) -> - apply(mod(Storage), Fun, [Storage | Args]); -apply_storage(Storage, Fun, Args) when is_function(Fun) -> - apply(Fun, [Storage | Args]). + case backend() of + {Type, Storage} when is_atom(Fun) -> + apply(mod(Type), Fun, [Storage | Args]); + {Type, Storage} when is_function(Fun) -> + apply(Fun, [Storage | Args]); + {_, _} = Backend -> + {error, {invalid_storage_backend, Backend}} + end. %% --spec on_config_update(_Old :: emqx_maybe:t(storage()), _New :: emqx_maybe:t(storage())) -> +-spec backend() -> backend(). +backend() -> + backend(emqx_ft_conf:storage()). + +-spec on_config_update(_Old :: emqx_maybe:t(config()), _New :: emqx_maybe:t(config())) -> ok. -on_config_update(#{type := _} = Storage, #{type := _} = Storage) -> +on_config_update(ConfigOld, ConfigNew) -> + on_backend_update( + emqx_maybe:apply(fun backend/1, ConfigOld), + emqx_maybe:apply(fun backend/1, ConfigNew) + ). + +on_backend_update({Type, _} = Backend, {Type, _} = Backend) -> ok; -on_config_update(#{type := Type} = StorageOld, #{type := Type} = StorageNew) -> - ok = (mod(StorageNew)):on_config_update(StorageOld, StorageNew); -on_config_update(StorageOld, StorageNew) when - (StorageOld =:= undefined orelse is_map_key(type, StorageOld)) andalso - (StorageNew =:= undefined orelse is_map_key(type, StorageNew)) +on_backend_update({Type, StorageOld}, {Type, StorageNew}) -> + ok = (mod(Type)):on_config_update(StorageOld, StorageNew); +on_backend_update(BackendOld, BackendNew) when + (BackendOld =:= undefined orelse is_tuple(BackendOld)) andalso + (BackendNew =:= undefined orelse is_tuple(BackendNew)) -> - _ = emqx_maybe:apply(fun on_storage_stop/1, StorageOld), - _ = emqx_maybe:apply(fun on_storage_start/1, StorageNew), + _ = emqx_maybe:apply(fun on_storage_stop/1, BackendOld), + _ = emqx_maybe:apply(fun on_storage_start/1, BackendNew), ok. %%-------------------------------------------------------------------- %% Local API %%-------------------------------------------------------------------- -on_storage_start(Storage) -> - (mod(Storage)):start(Storage). +-spec backend(config()) -> backend(). +backend(#{local := Storage}) -> + {local, Storage}. -on_storage_stop(Storage) -> - (mod(Storage)):stop(Storage). +on_storage_start({Type, Storage}) -> + (mod(Type)):start(Storage). -storage() -> - emqx_ft_conf:storage(). +on_storage_stop({Type, Storage}) -> + (mod(Type)):stop(Storage). -mod() -> - mod(storage()). - -mod(Storage) -> - case Storage of - #{type := local} -> - emqx_ft_storage_fs; - undefined -> - error(disabled) - end. +mod(local) -> + emqx_ft_storage_fs. diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter.erl index e000fe5c6..e25ab158e 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_exporter.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_exporter.erl @@ -169,15 +169,12 @@ stop({ExporterMod, ExporterOpts}) -> exporter(Storage) -> case maps:get(exporter, Storage) of - #{type := local} = Options -> - {emqx_ft_storage_exporter_fs, without_type(Options)}; - #{type := s3} = Options -> - {emqx_ft_storage_exporter_s3, without_type(Options)} + #{local := Options} -> + {emqx_ft_storage_exporter_fs, Options}; + #{s3 := Options} -> + {emqx_ft_storage_exporter_s3, Options} end. -without_type(#{type := _} = Options) -> - maps:without([type], Options). - init_checksum(#{checksum := {Algo, _}}) -> crypto:hash_init(Algo); init_checksum(#{}) -> diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl index 9109dadbb..9f77c8afb 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl @@ -257,6 +257,9 @@ read_exportinfo( Transfer = dirnames_to_transfer(ClientId, FileId), Info = mk_exportinfo(Options, Filename, RelFilepath, Transfer, Fileinfo), [Info | Acc]; +read_exportinfo(_Options, {node, _Root = "", {error, enoent}, []}, Acc) -> + % NOTE: Root directory does not exist, this is not an error. + Acc; read_exportinfo(Options, Entry, Acc) -> ok = log_invalid_entry(Options, Entry), Acc. diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl b/apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl index 692a270e3..713649759 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl @@ -58,9 +58,9 @@ start_link(Storage) -> collect() -> gen_server:call(mk_server_ref(global), {collect, erlang:system_time()}, infinity). --spec reset() -> ok. +-spec reset() -> ok | {error, _}. reset() -> - reset(emqx_ft_conf:storage()). + emqx_ft_storage:with_storage_type(local, fun reset/1). -spec reset(emqx_ft_storage_fs:storage()) -> ok. reset(Storage) -> @@ -139,7 +139,8 @@ maybe_report(#gcstats{} = _Stats, _Storage) -> ?tp(garbage_collection, #{stats => _Stats, storage => _Storage}). start_timer(St) -> - start_timer(gc_interval(emqx_ft_conf:storage()), St). + Interval = emqx_ft_storage:with_storage_type(local, fun gc_interval/1), + start_timer(Interval, St). start_timer(Interval, St = #st{next_gc_timer = undefined}) when ?IS_ENABLED(Interval) -> St#st{next_gc_timer = emqx_utils:start_timer(Interval, collect)}; diff --git a/apps/emqx_ft/test/emqx_ft_SUITE.erl b/apps/emqx_ft/test/emqx_ft_SUITE.erl index 929665ca9..6b675e0c0 100644 --- a/apps/emqx_ft/test/emqx_ft_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_SUITE.erl @@ -58,16 +58,16 @@ end_per_suite(_Config) -> set_special_configs(Config) -> fun (emqx_ft) -> - Storage = emqx_ft_test_helpers:local_storage(Config), + % NOTE + % Inhibit local fs GC to simulate it isn't fast enough to collect + % complete transfers. + Storage = emqx_utils_maps:deep_merge( + emqx_ft_test_helpers:local_storage(Config), + #{<<"local">> => #{<<"segments">> => #{<<"gc">> => #{<<"interval">> => 0}}}} + ), emqx_ft_test_helpers:load_config(#{ - % NOTE - % Inhibit local fs GC to simulate it isn't fast enough to collect - % complete transfers. - enable => true, - storage => emqx_utils_maps:deep_merge( - Storage, - #{segments => #{gc => #{interval => 0}}} - ) + <<"enable">> => true, + <<"storage">> => Storage }); (_) -> ok diff --git a/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl b/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl index 24a4593c2..c1deeb3bc 100644 --- a/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl @@ -246,16 +246,19 @@ exporter(Config) -> emqx_ft_storage_exporter:exporter(storage(Config)). storage(Config) -> - maps:get( - storage, + emqx_utils_maps:deep_get( + [storage, local], emqx_ft_schema:translate(#{ <<"storage">> => #{ - <<"type">> => <<"local">>, - <<"segments">> => #{ - <<"root">> => ?config(storage_root, Config) - }, - <<"exporter">> => #{ - <<"root">> => ?config(exports_root, Config) + <<"local">> => #{ + <<"segments">> => #{ + <<"root">> => ?config(storage_root, Config) + }, + <<"exporter">> => #{ + <<"local">> => #{ + <<"root">> => ?config(exports_root, Config) + } + } } } }) diff --git a/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl b/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl index bc9eb5d98..1f53f88af 100644 --- a/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl @@ -34,7 +34,12 @@ end_per_suite(_Config) -> init_per_testcase(_Case, Config) -> _ = emqx_config:save_schema_mod_and_names(emqx_ft_schema), ok = emqx_common_test_helpers:start_apps( - [emqx_conf, emqx_ft], emqx_ft_test_helpers:env_handler(Config) + [emqx_conf, emqx_ft], fun + (emqx_ft) -> + emqx_ft_test_helpers:load_config(#{}); + (_) -> + ok + end ), {ok, _} = emqx:update_config([rpc, port_discovery], manual), Config. @@ -52,7 +57,7 @@ t_update_config(_Config) -> {error, #{kind := validation_error}}, emqx_conf:update( [file_transfer], - #{<<"storage">> => #{<<"type">> => <<"unknown">>}}, + #{<<"storage">> => #{<<"unknown">> => #{<<"foo">> => 42}}}, #{} ) ), @@ -63,16 +68,18 @@ t_update_config(_Config) -> #{ <<"enable">> => true, <<"storage">> => #{ - <<"type">> => <<"local">>, - <<"segments">> => #{ - <<"root">> => <<"/tmp/path">>, - <<"gc">> => #{ - <<"interval">> => <<"5m">> + <<"local">> => #{ + <<"segments">> => #{ + <<"root">> => <<"/tmp/path">>, + <<"gc">> => #{ + <<"interval">> => <<"5m">> + } + }, + <<"exporter">> => #{ + <<"local">> => #{ + <<"root">> => <<"/tmp/exports">> + } } - }, - <<"exporter">> => #{ - <<"type">> => <<"local">>, - <<"root">> => <<"/tmp/exports">> } } }, @@ -81,11 +88,15 @@ t_update_config(_Config) -> ), ?assertEqual( <<"/tmp/path">>, - emqx_config:get([file_transfer, storage, segments, root]) + emqx_config:get([file_transfer, storage, local, segments, root]) ), ?assertEqual( 5 * 60 * 1000, - emqx_ft_conf:gc_interval(emqx_ft_conf:storage()) + emqx_ft_storage:with_storage_type(local, fun emqx_ft_conf:gc_interval/1) + ), + ?assertEqual( + {5 * 60, 24 * 60 * 60}, + emqx_ft_storage:with_storage_type(local, fun emqx_ft_conf:segments_ttl/1) ). t_disable_restore_config(Config) -> @@ -93,13 +104,13 @@ t_disable_restore_config(Config) -> {ok, _}, emqx_conf:update( [file_transfer], - #{<<"enable">> => true, <<"storage">> => #{<<"type">> => <<"local">>}}, + #{<<"enable">> => true, <<"storage">> => #{<<"local">> => #{}}}, #{} ) ), ?assertEqual( 60 * 60 * 1000, - emqx_ft_conf:gc_interval(emqx_ft_conf:storage()) + emqx_ft_storage:with_storage_type(local, fun emqx_ft_conf:gc_interval/1) ), % Verify that transfers work ok = emqx_ft_test_helpers:upload_file(gen_clientid(), <<"f1">>, "f1", <>), @@ -117,7 +128,7 @@ t_disable_restore_config(Config) -> emqx_ft_conf:enabled() ), ?assertMatch( - #{type := local, exporter := #{type := local}}, + #{local := #{exporter := #{local := _}}}, emqx_ft_conf:storage() ), ClientId = gen_clientid(), @@ -147,10 +158,11 @@ t_disable_restore_config(Config) -> #{ <<"enable">> => true, <<"storage">> => #{ - <<"type">> => <<"local">>, - <<"segments">> => #{ - <<"root">> => Root, - <<"gc">> => #{<<"interval">> => <<"1s">>} + <<"local">> => #{ + <<"segments">> => #{ + <<"root">> => Root, + <<"gc">> => #{<<"interval">> => <<"1s">>} + } } } }, @@ -165,10 +177,7 @@ t_disable_restore_config(Config) -> [ #{ ?snk_kind := garbage_collection, - storage := #{ - type := local, - segments := #{root := Root} - } + storage := #{segments := #{root := Root}} } ], ?of_kind(garbage_collection, Trace) @@ -188,48 +197,49 @@ t_switch_exporter(_Config) -> ) ), ?assertMatch( - #{type := local, exporter := #{type := local}}, + #{local := #{exporter := #{local := _}}}, emqx_ft_conf:storage() ), % Verify that switching to a different exporter works ?assertMatch( {ok, _}, emqx_conf:update( - [file_transfer, storage, exporter], + [file_transfer, storage, local, exporter], #{ - <<"type">> => <<"s3">>, - <<"bucket">> => <<"emqx">>, - <<"host">> => <<"https://localhost">>, - <<"port">> => 9000, - <<"transport_options">> => #{ - <<"ipv6_probe">> => false + <<"s3">> => #{ + <<"bucket">> => <<"emqx">>, + <<"host">> => <<"https://localhost">>, + <<"port">> => 9000, + <<"transport_options">> => #{ + <<"ipv6_probe">> => false + } } }, #{} ) ), ?assertMatch( - #{type := local, exporter := #{type := s3}}, + #{local := #{exporter := #{s3 := _}}}, emqx_ft_conf:storage() ), % Verify that switching back to local exporter works ?assertMatch( {ok, _}, emqx_conf:remove( - [file_transfer, storage, exporter], + [file_transfer, storage, local, exporter], #{} ) ), ?assertMatch( {ok, _}, emqx_conf:update( - [file_transfer, storage, exporter], - #{<<"type">> => <<"local">>}, + [file_transfer, storage, local, exporter], + #{<<"local">> => #{}}, #{} ) ), ?assertMatch( - #{type := local, exporter := #{type := local}}, + #{local := #{exporter := #{local := #{}}}}, emqx_ft_conf:storage() ), % Verify that transfers work diff --git a/apps/emqx_ft/test/emqx_ft_storage_exporter_s3_SUITE.erl b/apps/emqx_ft/test/emqx_ft_storage_exporter_s3_SUITE.erl index 86de74ee0..e717fe262 100644 --- a/apps/emqx_ft/test/emqx_ft_storage_exporter_s3_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_storage_exporter_s3_SUITE.erl @@ -111,7 +111,7 @@ t_upload_error(Config) -> Data = <<"data"/utf8>>, {ok, _} = emqx_conf:update( - [file_transfer, storage, exporter, bucket], <<"invalid-bucket">>, #{} + [file_transfer, storage, local, exporter, s3, bucket], <<"invalid-bucket">>, #{} ), ?assertEqual( diff --git a/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl b/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl index 2acb57a8e..50925cfb9 100644 --- a/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl @@ -85,7 +85,7 @@ client_id(Config) -> storage(Config) -> RawConfig = #{<<"storage">> => emqx_ft_test_helpers:local_storage(Config)}, - #{storage := Storage} = emqx_ft_schema:translate(RawConfig), + #{storage := #{local := Storage}} = emqx_ft_schema:translate(RawConfig), Storage. list_files(Config) -> diff --git a/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl b/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl index 04aedf8f3..a7ffd5675 100644 --- a/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl @@ -36,19 +36,19 @@ end_per_suite(_Config) -> ok. init_per_testcase(TC, Config) -> + SegmentsRoot = emqx_ft_test_helpers:root(Config, node(), [TC, segments]), + ExportsRoot = emqx_ft_test_helpers:root(Config, node(), [TC, exports]), ok = emqx_common_test_helpers:start_app( emqx_ft, fun(emqx_ft) -> emqx_ft_test_helpers:load_config(#{ <<"enable">> => true, <<"storage">> => #{ - <<"type">> => <<"local">>, - <<"segments">> => #{ - <<"root">> => emqx_ft_test_helpers:root(Config, node(), [TC, segments]) - }, - <<"exporter">> => #{ - <<"type">> => <<"local">>, - <<"root">> => emqx_ft_test_helpers:root(Config, node(), [TC, exports]) + <<"local">> => #{ + <<"segments">> => #{<<"root">> => SegmentsRoot}, + <<"exporter">> => #{ + <<"local">> => #{<<"root">> => ExportsRoot} + } } } }) @@ -105,7 +105,7 @@ t_gc_triggers_manually(_Config) -> ). t_gc_complete_transfers(_Config) -> - Storage = emqx_ft_conf:storage(), + {local, Storage} = emqx_ft_storage:backend(), ok = set_gc_config(minimum_segments_ttl, 0), ok = set_gc_config(maximum_segments_ttl, 3), ok = set_gc_config(interval, 500), @@ -198,7 +198,7 @@ t_gc_complete_transfers(_Config) -> t_gc_incomplete_transfers(_Config) -> ok = set_gc_config(minimum_segments_ttl, 0), ok = set_gc_config(maximum_segments_ttl, 4), - Storage = emqx_ft_conf:storage(), + {local, Storage} = emqx_ft_storage:backend(), Transfers = [ { {<<"client43"/utf8>>, <<"file-🦕"/utf8>>}, @@ -269,7 +269,7 @@ t_gc_incomplete_transfers(_Config) -> t_gc_handling_errors(_Config) -> ok = set_gc_config(minimum_segments_ttl, 0), ok = set_gc_config(maximum_segments_ttl, 0), - Storage = emqx_ft_conf:storage(), + {local, Storage} = emqx_ft_storage:backend(), Transfer1 = {<<"client1">>, mk_file_id()}, Transfer2 = {<<"client2">>, mk_file_id()}, Filemeta = #{name => "oops.pdf"}, @@ -325,7 +325,7 @@ t_gc_handling_errors(_Config) -> %% set_gc_config(Name, Value) -> - emqx_config:put([file_transfer, storage, segments, gc, Name], Value). + emqx_config:put([file_transfer, storage, local, segments, gc, Name], Value). start_transfer(Storage, {Transfer, Meta, Gen}) -> ?assertEqual( diff --git a/apps/emqx_ft/test/emqx_ft_test_helpers.erl b/apps/emqx_ft/test/emqx_ft_test_helpers.erl index 1482223d8..2eb6d84db 100644 --- a/apps/emqx_ft/test/emqx_ft_test_helpers.erl +++ b/apps/emqx_ft/test/emqx_ft_test_helpers.erl @@ -54,20 +54,22 @@ local_storage(Config) -> local_storage(Config, Opts) -> #{ - <<"type">> => <<"local">>, - <<"segments">> => #{<<"root">> => root(Config, node(), [segments])}, - <<"exporter">> => exporter(Config, Opts) + <<"local">> => #{ + <<"segments">> => #{<<"root">> => root(Config, node(), [segments])}, + <<"exporter">> => exporter(Config, Opts) + } }. exporter(Config, #{exporter := local}) -> - #{<<"type">> => <<"local">>, <<"root">> => root(Config, node(), [exports])}; + #{<<"local">> => #{<<"root">> => root(Config, node(), [exports])}}; exporter(_Config, #{exporter := s3, bucket_name := BucketName}) -> BaseConfig = emqx_s3_test_helpers:base_raw_config(tcp), - BaseConfig#{ - <<"bucket">> => list_to_binary(BucketName), - <<"type">> => <<"s3">>, - <<"host">> => ?S3_HOST, - <<"port">> => ?S3_PORT + #{ + <<"s3">> => BaseConfig#{ + <<"bucket">> => list_to_binary(BucketName), + <<"host">> => ?S3_HOST, + <<"port">> => ?S3_PORT + } }. load_config(Config) -> diff --git a/rel/i18n/emqx_ft_schema.hocon b/rel/i18n/emqx_ft_schema.hocon index 64b9bd67e..13bbc6970 100644 --- a/rel/i18n/emqx_ft_schema.hocon +++ b/rel/i18n/emqx_ft_schema.hocon @@ -18,11 +18,11 @@ store_segment_timeout.desc: """Timeout for storing a file segment.
After reaching the timeout, message with the segment will be acked with an error""" -storage.desc: +storage_backend.desc: """Storage settings for file transfer.""" -local_type.desc: -"""Use local file system to store uploaded fragments and temporary data.""" +local_storage.desc: +"""Local file system backend to store uploaded fragments and temporary data.""" local_storage_segments.desc: """Settings for local segments storage, which include uploaded transfer fragments and temporary data.""" @@ -30,15 +30,15 @@ local_storage_segments.desc: local_storage_segments_root.desc: """File system path to keep uploaded fragments and temporary data.""" -local_storage_exporter.desc: +local_storage_exporter_backend.desc: """Exporter for the local file system storage backend.
Exporter defines where and how fully transferred and assembled files are stored.""" -local_storage_exporter_type.desc: -"""Exporter type for the exporter to the local file system""" +local_storage_exporter.desc: +"""Exporter to the local file system.""" -s3_exporter_type.desc: -"""Exporter type for the exporter to S3""" +s3_exporter.desc: +"""Exporter to the S3 API compatible object storage.""" local_storage_exporter_root.desc: """File system path to keep uploaded files."""