diff --git a/apps/emqx_ft/src/emqx_ft_conf.erl b/apps/emqx_ft/src/emqx_ft_conf.erl
index 61f639271..74f2df27d 100644
--- a/apps/emqx_ft/src/emqx_ft_conf.erl
+++ b/apps/emqx_ft/src/emqx_ft_conf.erl
@@ -54,24 +54,26 @@
enabled() ->
emqx_config:get([file_transfer, enable], false).
--spec storage() -> _Storage.
+-spec storage() -> emqx_config:config().
storage() ->
emqx_config:get([file_transfer, storage]).
--spec gc_interval(_Storage) -> emqx_maybe:t(milliseconds()).
-gc_interval(Conf = #{type := local}) ->
- emqx_utils_maps:deep_get([segments, gc, interval], Conf);
-gc_interval(_) ->
- undefined.
+-spec gc_interval(emqx_ft_storage_fs:storage()) ->
+ emqx_maybe:t(milliseconds()).
+gc_interval(Storage) ->
+ emqx_utils_maps:deep_get([segments, gc, interval], Storage, undefined).
--spec segments_ttl(_Storage) -> emqx_maybe:t({_Min :: seconds(), _Max :: seconds()}).
-segments_ttl(Conf = #{type := local}) ->
- {
- emqx_utils_maps:deep_get([segments, gc, minimum_segments_ttl], Conf),
- emqx_utils_maps:deep_get([segments, gc, maximum_segments_ttl], Conf)
- };
-segments_ttl(_) ->
- undefined.
+-spec segments_ttl(emqx_ft_storage_fs:storage()) ->
+ emqx_maybe:t({_Min :: seconds(), _Max :: seconds()}).
+segments_ttl(Storage) ->
+ Min = emqx_utils_maps:deep_get([segments, gc, minimum_segments_ttl], Storage, undefined),
+ Max = emqx_utils_maps:deep_get([segments, gc, maximum_segments_ttl], Storage, undefined),
+ case is_integer(Min) andalso is_integer(Max) of
+ true ->
+ {Min, Max};
+ false ->
+ undefined
+ end.
init_timeout() ->
emqx_config:get([file_transfer, init_timeout]).
diff --git a/apps/emqx_ft/src/emqx_ft_schema.erl b/apps/emqx_ft/src/emqx_ft_schema.erl
index e2eebbbb8..0440bdcb9 100644
--- a/apps/emqx_ft/src/emqx_ft_schema.erl
+++ b/apps/emqx_ft/src/emqx_ft_schema.erl
@@ -93,36 +93,30 @@ fields(file_transfer) ->
)},
{storage,
mk(
- hoconsc:union(
- fun
- (all_union_members) ->
- [ref(local_storage)];
- ({value, #{<<"type">> := <<"local">>}}) ->
- [ref(local_storage)];
- ({value, #{<<"type">> := _}}) ->
- throw(#{field_name => type, expected => "local"});
- ({value, _}) ->
- [ref(local_storage)]
- end
- ),
+ ref(storage_backend),
#{
+ desc => ?DESC("storage_backend"),
required => false,
- desc => ?DESC("storage"),
- default => #{<<"type">> => <<"local">>}
+ validator => validator(backend),
+ default => #{
+ <<"local">> => #{}
+ }
+ }
+ )}
+ ];
+fields(storage_backend) ->
+ [
+ {local,
+ mk(
+ ref(local_storage),
+ #{
+ desc => ?DESC("local_storage"),
+ required => {false, recursively}
}
)}
];
fields(local_storage) ->
[
- {type,
- mk(
- local,
- #{
- default => local,
- required => false,
- desc => ?DESC("local_type")
- }
- )},
{segments,
mk(
ref(local_storage_segments),
@@ -136,29 +130,13 @@ fields(local_storage) ->
)},
{exporter,
mk(
- hoconsc:union(
- fun
- (all_union_members) ->
- [
- ref(local_storage_exporter),
- ref(s3_exporter)
- ];
- ({value, #{<<"type">> := <<"local">>}}) ->
- [ref(local_storage_exporter)];
- ({value, #{<<"type">> := <<"s3">>}}) ->
- [ref(s3_exporter)];
- ({value, #{<<"type">> := _}}) ->
- throw(#{field_name => type, expected => "local | s3"});
- ({value, _}) ->
- % NOTE: default
- [ref(local_storage_exporter)]
- end
- ),
+ ref(local_storage_exporter_backend),
#{
- desc => ?DESC("local_storage_exporter"),
- required => true,
+ desc => ?DESC("local_storage_exporter_backend"),
+ required => false,
+ validator => validator(backend),
default => #{
- <<"type">> => <<"local">>
+ <<"local">> => #{}
}
}
)}
@@ -181,17 +159,27 @@ fields(local_storage_segments) ->
}
)}
];
-fields(local_storage_exporter) ->
+fields(local_storage_exporter_backend) ->
[
- {type,
+ {local,
mk(
- local,
+ ref(local_storage_exporter),
#{
- default => local,
- required => false,
- desc => ?DESC("local_storage_exporter_type")
+ desc => ?DESC("local_storage_exporter"),
+ required => {false, recursively}
}
)},
+ {s3,
+ mk(
+ ref(s3_exporter),
+ #{
+ desc => ?DESC("s3_exporter"),
+ required => {false, recursively}
+ }
+ )}
+ ];
+fields(local_storage_exporter) ->
+ [
{root,
mk(
binary(),
@@ -202,18 +190,7 @@ fields(local_storage_exporter) ->
)}
];
fields(s3_exporter) ->
- [
- {type,
- mk(
- s3,
- #{
- default => s3,
- required => false,
- desc => ?DESC("s3_exporter_type")
- }
- )}
- ] ++
- emqx_s3_schema:fields(s3);
+ emqx_s3_schema:fields(s3);
fields(local_storage_segments_gc) ->
[
{interval,
@@ -287,7 +264,16 @@ validator(filename) ->
byte_size(Bin) =< ?MAX_FILENAME_BYTELEN orelse {error, max_length_exceeded}
end,
fun emqx_ft_fs_util:is_filename_safe/1
- ].
+ ];
+validator(backend) ->
+ fun(Config) ->
+ case maps:keys(Config) of
+ [_Type] ->
+ ok;
+ _Conflicts = [_ | _] ->
+ {error, multiple_conflicting_backends}
+ end
+ end.
converter(checksum) ->
fun
diff --git a/apps/emqx_ft/src/emqx_ft_storage.erl b/apps/emqx_ft/src/emqx_ft_storage.erl
index 5ec342585..79fc2dcfc 100644
--- a/apps/emqx_ft/src/emqx_ft_storage.erl
+++ b/apps/emqx_ft/src/emqx_ft_storage.erl
@@ -18,8 +18,6 @@
-export(
[
- child_spec/0,
-
store_filemeta/2,
store_segment/2,
assemble/2,
@@ -30,11 +28,17 @@
with_storage_type/2,
with_storage_type/3,
+ backend/0,
on_config_update/2
]
).
--type storage() :: emqx_config:config().
+-type type() :: local.
+-type backend() :: {type(), storage()}.
+-type storage() :: config().
+-type config() :: emqx_config:config().
+
+-export_type([backend/0]).
-export_type([assemble_callback/0]).
@@ -100,31 +104,20 @@
%% API
%%--------------------------------------------------------------------
--spec child_spec() ->
- [supervisor:child_spec()].
-child_spec() ->
- try
- Mod = mod(),
- Mod:child_spec(storage())
- catch
- error:disabled -> [];
- error:undef -> []
- end.
-
-spec store_filemeta(emqx_ft:transfer(), emqx_ft:filemeta()) ->
ok | {async, pid()} | {error, term()}.
store_filemeta(Transfer, FileMeta) ->
- with_storage(store_filemeta, [Transfer, FileMeta]).
+ dispatch(store_filemeta, [Transfer, FileMeta]).
-spec store_segment(emqx_ft:transfer(), emqx_ft:segment()) ->
ok | {async, pid()} | {error, term()}.
store_segment(Transfer, Segment) ->
- with_storage(store_segment, [Transfer, Segment]).
+ dispatch(store_segment, [Transfer, Segment]).
-spec assemble(emqx_ft:transfer(), emqx_ft:bytes()) ->
ok | {async, pid()} | {error, term()}.
assemble(Transfer, Size) ->
- with_storage(assemble, [Transfer, Size]).
+ dispatch(assemble, [Transfer, Size]).
-spec files() ->
{ok, page(file_info(), _)} | {error, term()}.
@@ -134,77 +127,75 @@ files() ->
-spec files(query(Cursor)) ->
{ok, page(file_info(), Cursor)} | {error, term()}.
files(Query) ->
- with_storage(files, [Query]).
+ dispatch(files, [Query]).
--spec with_storage(atom() | function()) -> any().
-with_storage(Fun) ->
- with_storage(Fun, []).
-
--spec with_storage(atom() | function(), list(term())) -> any().
-with_storage(Fun, Args) ->
- case storage() of
- Storage = #{} ->
- apply_storage(Storage, Fun, Args);
- undefined ->
+-spec dispatch(atom(), list(term())) -> any().
+dispatch(Fun, Args) when is_atom(Fun) ->
+ case backend() of
+ {Type, Storage} ->
+ apply(mod(Type), Fun, [Storage | Args]);
+ _ ->
{error, disabled}
end.
+%%
+
-spec with_storage_type(atom(), atom() | function()) -> any().
with_storage_type(Type, Fun) ->
with_storage_type(Type, Fun, []).
-spec with_storage_type(atom(), atom() | function(), list(term())) -> any().
with_storage_type(Type, Fun, Args) ->
- with_storage(fun(Storage) ->
- case Storage of
- #{type := Type} ->
- apply_storage(Storage, Fun, Args);
- _ ->
- {error, {invalid_storage_type, Storage}}
- end
- end).
-
-apply_storage(Storage, Fun, Args) when is_atom(Fun) ->
- apply(mod(Storage), Fun, [Storage | Args]);
-apply_storage(Storage, Fun, Args) when is_function(Fun) ->
- apply(Fun, [Storage | Args]).
+ case backend() of
+ {Type, Storage} when is_atom(Fun) ->
+ apply(mod(Type), Fun, [Storage | Args]);
+ {Type, Storage} when is_function(Fun) ->
+ apply(Fun, [Storage | Args]);
+ {_, _} = Backend ->
+ {error, {invalid_storage_backend, Backend}};
+ _ ->
+ {error, disabled}
+ end.
%%
--spec on_config_update(_Old :: emqx_maybe:t(storage()), _New :: emqx_maybe:t(storage())) ->
+-spec backend() -> backend().
+backend() ->
+ backend(emqx_ft_conf:storage()).
+
+-spec on_config_update(_Old :: emqx_maybe:t(config()), _New :: emqx_maybe:t(config())) ->
ok.
-on_config_update(#{type := _} = Storage, #{type := _} = Storage) ->
+on_config_update(ConfigOld, ConfigNew) ->
+ on_backend_update(
+ emqx_maybe:apply(fun backend/1, ConfigOld),
+ emqx_maybe:apply(fun backend/1, ConfigNew)
+ ).
+
+on_backend_update({Type, _} = Backend, {Type, _} = Backend) ->
ok;
-on_config_update(#{type := Type} = StorageOld, #{type := Type} = StorageNew) ->
- ok = (mod(StorageNew)):on_config_update(StorageOld, StorageNew);
-on_config_update(StorageOld, StorageNew) when
- (StorageOld =:= undefined orelse is_map_key(type, StorageOld)) andalso
- (StorageNew =:= undefined orelse is_map_key(type, StorageNew))
+on_backend_update({Type, StorageOld}, {Type, StorageNew}) ->
+ ok = (mod(Type)):on_config_update(StorageOld, StorageNew);
+on_backend_update(BackendOld, BackendNew) when
+ (BackendOld =:= undefined orelse is_tuple(BackendOld)) andalso
+ (BackendNew =:= undefined orelse is_tuple(BackendNew))
->
- _ = emqx_maybe:apply(fun on_storage_stop/1, StorageOld),
- _ = emqx_maybe:apply(fun on_storage_start/1, StorageNew),
+ _ = emqx_maybe:apply(fun on_storage_stop/1, BackendOld),
+ _ = emqx_maybe:apply(fun on_storage_start/1, BackendNew),
ok.
%%--------------------------------------------------------------------
%% Local API
%%--------------------------------------------------------------------
-on_storage_start(Storage) ->
- (mod(Storage)):start(Storage).
+-spec backend(config()) -> backend().
+backend(#{local := Storage}) ->
+ {local, Storage}.
-on_storage_stop(Storage) ->
- (mod(Storage)):stop(Storage).
+on_storage_start({Type, Storage}) ->
+ (mod(Type)):start(Storage).
-storage() ->
- emqx_ft_conf:storage().
+on_storage_stop({Type, Storage}) ->
+ (mod(Type)):stop(Storage).
-mod() ->
- mod(storage()).
-
-mod(Storage) ->
- case Storage of
- #{type := local} ->
- emqx_ft_storage_fs;
- undefined ->
- error(disabled)
- end.
+mod(local) ->
+ emqx_ft_storage_fs.
diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter.erl
index e000fe5c6..e25ab158e 100644
--- a/apps/emqx_ft/src/emqx_ft_storage_exporter.erl
+++ b/apps/emqx_ft/src/emqx_ft_storage_exporter.erl
@@ -169,15 +169,12 @@ stop({ExporterMod, ExporterOpts}) ->
exporter(Storage) ->
case maps:get(exporter, Storage) of
- #{type := local} = Options ->
- {emqx_ft_storage_exporter_fs, without_type(Options)};
- #{type := s3} = Options ->
- {emqx_ft_storage_exporter_s3, without_type(Options)}
+ #{local := Options} ->
+ {emqx_ft_storage_exporter_fs, Options};
+ #{s3 := Options} ->
+ {emqx_ft_storage_exporter_s3, Options}
end.
-without_type(#{type := _} = Options) ->
- maps:without([type], Options).
-
init_checksum(#{checksum := {Algo, _}}) ->
crypto:hash_init(Algo);
init_checksum(#{}) ->
diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl b/apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl
index 692a270e3..713649759 100644
--- a/apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl
+++ b/apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl
@@ -58,9 +58,9 @@ start_link(Storage) ->
collect() ->
gen_server:call(mk_server_ref(global), {collect, erlang:system_time()}, infinity).
--spec reset() -> ok.
+-spec reset() -> ok | {error, _}.
reset() ->
- reset(emqx_ft_conf:storage()).
+ emqx_ft_storage:with_storage_type(local, fun reset/1).
-spec reset(emqx_ft_storage_fs:storage()) -> ok.
reset(Storage) ->
@@ -139,7 +139,8 @@ maybe_report(#gcstats{} = _Stats, _Storage) ->
?tp(garbage_collection, #{stats => _Stats, storage => _Storage}).
start_timer(St) ->
- start_timer(gc_interval(emqx_ft_conf:storage()), St).
+ Interval = emqx_ft_storage:with_storage_type(local, fun gc_interval/1),
+ start_timer(Interval, St).
start_timer(Interval, St = #st{next_gc_timer = undefined}) when ?IS_ENABLED(Interval) ->
St#st{next_gc_timer = emqx_utils:start_timer(Interval, collect)};
diff --git a/apps/emqx_ft/test/emqx_ft_SUITE.erl b/apps/emqx_ft/test/emqx_ft_SUITE.erl
index 929665ca9..6b675e0c0 100644
--- a/apps/emqx_ft/test/emqx_ft_SUITE.erl
+++ b/apps/emqx_ft/test/emqx_ft_SUITE.erl
@@ -58,16 +58,16 @@ end_per_suite(_Config) ->
set_special_configs(Config) ->
fun
(emqx_ft) ->
- Storage = emqx_ft_test_helpers:local_storage(Config),
+ % NOTE
+ % Inhibit local fs GC to simulate it isn't fast enough to collect
+ % complete transfers.
+ Storage = emqx_utils_maps:deep_merge(
+ emqx_ft_test_helpers:local_storage(Config),
+ #{<<"local">> => #{<<"segments">> => #{<<"gc">> => #{<<"interval">> => 0}}}}
+ ),
emqx_ft_test_helpers:load_config(#{
- % NOTE
- % Inhibit local fs GC to simulate it isn't fast enough to collect
- % complete transfers.
- enable => true,
- storage => emqx_utils_maps:deep_merge(
- Storage,
- #{segments => #{gc => #{interval => 0}}}
- )
+ <<"enable">> => true,
+ <<"storage">> => Storage
});
(_) ->
ok
diff --git a/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl b/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl
index 24a4593c2..c1deeb3bc 100644
--- a/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl
+++ b/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl
@@ -246,16 +246,19 @@ exporter(Config) ->
emqx_ft_storage_exporter:exporter(storage(Config)).
storage(Config) ->
- maps:get(
- storage,
+ emqx_utils_maps:deep_get(
+ [storage, local],
emqx_ft_schema:translate(#{
<<"storage">> => #{
- <<"type">> => <<"local">>,
- <<"segments">> => #{
- <<"root">> => ?config(storage_root, Config)
- },
- <<"exporter">> => #{
- <<"root">> => ?config(exports_root, Config)
+ <<"local">> => #{
+ <<"segments">> => #{
+ <<"root">> => ?config(storage_root, Config)
+ },
+ <<"exporter">> => #{
+ <<"local">> => #{
+ <<"root">> => ?config(exports_root, Config)
+ }
+ }
}
}
})
diff --git a/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl b/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl
index bc9eb5d98..1f53f88af 100644
--- a/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl
+++ b/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl
@@ -34,7 +34,12 @@ end_per_suite(_Config) ->
init_per_testcase(_Case, Config) ->
_ = emqx_config:save_schema_mod_and_names(emqx_ft_schema),
ok = emqx_common_test_helpers:start_apps(
- [emqx_conf, emqx_ft], emqx_ft_test_helpers:env_handler(Config)
+ [emqx_conf, emqx_ft], fun
+ (emqx_ft) ->
+ emqx_ft_test_helpers:load_config(#{});
+ (_) ->
+ ok
+ end
),
{ok, _} = emqx:update_config([rpc, port_discovery], manual),
Config.
@@ -52,7 +57,7 @@ t_update_config(_Config) ->
{error, #{kind := validation_error}},
emqx_conf:update(
[file_transfer],
- #{<<"storage">> => #{<<"type">> => <<"unknown">>}},
+ #{<<"storage">> => #{<<"unknown">> => #{<<"foo">> => 42}}},
#{}
)
),
@@ -63,16 +68,18 @@ t_update_config(_Config) ->
#{
<<"enable">> => true,
<<"storage">> => #{
- <<"type">> => <<"local">>,
- <<"segments">> => #{
- <<"root">> => <<"/tmp/path">>,
- <<"gc">> => #{
- <<"interval">> => <<"5m">>
+ <<"local">> => #{
+ <<"segments">> => #{
+ <<"root">> => <<"/tmp/path">>,
+ <<"gc">> => #{
+ <<"interval">> => <<"5m">>
+ }
+ },
+ <<"exporter">> => #{
+ <<"local">> => #{
+ <<"root">> => <<"/tmp/exports">>
+ }
}
- },
- <<"exporter">> => #{
- <<"type">> => <<"local">>,
- <<"root">> => <<"/tmp/exports">>
}
}
},
@@ -81,11 +88,15 @@ t_update_config(_Config) ->
),
?assertEqual(
<<"/tmp/path">>,
- emqx_config:get([file_transfer, storage, segments, root])
+ emqx_config:get([file_transfer, storage, local, segments, root])
),
?assertEqual(
5 * 60 * 1000,
- emqx_ft_conf:gc_interval(emqx_ft_conf:storage())
+ emqx_ft_storage:with_storage_type(local, fun emqx_ft_conf:gc_interval/1)
+ ),
+ ?assertEqual(
+ {5 * 60, 24 * 60 * 60},
+ emqx_ft_storage:with_storage_type(local, fun emqx_ft_conf:segments_ttl/1)
).
t_disable_restore_config(Config) ->
@@ -93,13 +104,13 @@ t_disable_restore_config(Config) ->
{ok, _},
emqx_conf:update(
[file_transfer],
- #{<<"enable">> => true, <<"storage">> => #{<<"type">> => <<"local">>}},
+ #{<<"enable">> => true, <<"storage">> => #{<<"local">> => #{}}},
#{}
)
),
?assertEqual(
60 * 60 * 1000,
- emqx_ft_conf:gc_interval(emqx_ft_conf:storage())
+ emqx_ft_storage:with_storage_type(local, fun emqx_ft_conf:gc_interval/1)
),
% Verify that transfers work
ok = emqx_ft_test_helpers:upload_file(gen_clientid(), <<"f1">>, "f1", <>),
@@ -117,7 +128,7 @@ t_disable_restore_config(Config) ->
emqx_ft_conf:enabled()
),
?assertMatch(
- #{type := local, exporter := #{type := local}},
+ #{local := #{exporter := #{local := _}}},
emqx_ft_conf:storage()
),
ClientId = gen_clientid(),
@@ -147,10 +158,11 @@ t_disable_restore_config(Config) ->
#{
<<"enable">> => true,
<<"storage">> => #{
- <<"type">> => <<"local">>,
- <<"segments">> => #{
- <<"root">> => Root,
- <<"gc">> => #{<<"interval">> => <<"1s">>}
+ <<"local">> => #{
+ <<"segments">> => #{
+ <<"root">> => Root,
+ <<"gc">> => #{<<"interval">> => <<"1s">>}
+ }
}
}
},
@@ -165,10 +177,7 @@ t_disable_restore_config(Config) ->
[
#{
?snk_kind := garbage_collection,
- storage := #{
- type := local,
- segments := #{root := Root}
- }
+ storage := #{segments := #{root := Root}}
}
],
?of_kind(garbage_collection, Trace)
@@ -188,48 +197,49 @@ t_switch_exporter(_Config) ->
)
),
?assertMatch(
- #{type := local, exporter := #{type := local}},
+ #{local := #{exporter := #{local := _}}},
emqx_ft_conf:storage()
),
% Verify that switching to a different exporter works
?assertMatch(
{ok, _},
emqx_conf:update(
- [file_transfer, storage, exporter],
+ [file_transfer, storage, local, exporter],
#{
- <<"type">> => <<"s3">>,
- <<"bucket">> => <<"emqx">>,
- <<"host">> => <<"https://localhost">>,
- <<"port">> => 9000,
- <<"transport_options">> => #{
- <<"ipv6_probe">> => false
+ <<"s3">> => #{
+ <<"bucket">> => <<"emqx">>,
+ <<"host">> => <<"https://localhost">>,
+ <<"port">> => 9000,
+ <<"transport_options">> => #{
+ <<"ipv6_probe">> => false
+ }
}
},
#{}
)
),
?assertMatch(
- #{type := local, exporter := #{type := s3}},
+ #{local := #{exporter := #{s3 := _}}},
emqx_ft_conf:storage()
),
% Verify that switching back to local exporter works
?assertMatch(
{ok, _},
emqx_conf:remove(
- [file_transfer, storage, exporter],
+ [file_transfer, storage, local, exporter],
#{}
)
),
?assertMatch(
{ok, _},
emqx_conf:update(
- [file_transfer, storage, exporter],
- #{<<"type">> => <<"local">>},
+ [file_transfer, storage, local, exporter],
+ #{<<"local">> => #{}},
#{}
)
),
?assertMatch(
- #{type := local, exporter := #{type := local}},
+ #{local := #{exporter := #{local := #{}}}},
emqx_ft_conf:storage()
),
% Verify that transfers work
diff --git a/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl b/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl
index 2acb57a8e..50925cfb9 100644
--- a/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl
+++ b/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl
@@ -85,7 +85,7 @@ client_id(Config) ->
storage(Config) ->
RawConfig = #{<<"storage">> => emqx_ft_test_helpers:local_storage(Config)},
- #{storage := Storage} = emqx_ft_schema:translate(RawConfig),
+ #{storage := #{local := Storage}} = emqx_ft_schema:translate(RawConfig),
Storage.
list_files(Config) ->
diff --git a/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl b/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl
index 04aedf8f3..a7ffd5675 100644
--- a/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl
+++ b/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl
@@ -36,19 +36,19 @@ end_per_suite(_Config) ->
ok.
init_per_testcase(TC, Config) ->
+ SegmentsRoot = emqx_ft_test_helpers:root(Config, node(), [TC, segments]),
+ ExportsRoot = emqx_ft_test_helpers:root(Config, node(), [TC, exports]),
ok = emqx_common_test_helpers:start_app(
emqx_ft,
fun(emqx_ft) ->
emqx_ft_test_helpers:load_config(#{
<<"enable">> => true,
<<"storage">> => #{
- <<"type">> => <<"local">>,
- <<"segments">> => #{
- <<"root">> => emqx_ft_test_helpers:root(Config, node(), [TC, segments])
- },
- <<"exporter">> => #{
- <<"type">> => <<"local">>,
- <<"root">> => emqx_ft_test_helpers:root(Config, node(), [TC, exports])
+ <<"local">> => #{
+ <<"segments">> => #{<<"root">> => SegmentsRoot},
+ <<"exporter">> => #{
+ <<"local">> => #{<<"root">> => ExportsRoot}
+ }
}
}
})
@@ -105,7 +105,7 @@ t_gc_triggers_manually(_Config) ->
).
t_gc_complete_transfers(_Config) ->
- Storage = emqx_ft_conf:storage(),
+ {local, Storage} = emqx_ft_storage:backend(),
ok = set_gc_config(minimum_segments_ttl, 0),
ok = set_gc_config(maximum_segments_ttl, 3),
ok = set_gc_config(interval, 500),
@@ -198,7 +198,7 @@ t_gc_complete_transfers(_Config) ->
t_gc_incomplete_transfers(_Config) ->
ok = set_gc_config(minimum_segments_ttl, 0),
ok = set_gc_config(maximum_segments_ttl, 4),
- Storage = emqx_ft_conf:storage(),
+ {local, Storage} = emqx_ft_storage:backend(),
Transfers = [
{
{<<"client43"/utf8>>, <<"file-🦕"/utf8>>},
@@ -269,7 +269,7 @@ t_gc_incomplete_transfers(_Config) ->
t_gc_handling_errors(_Config) ->
ok = set_gc_config(minimum_segments_ttl, 0),
ok = set_gc_config(maximum_segments_ttl, 0),
- Storage = emqx_ft_conf:storage(),
+ {local, Storage} = emqx_ft_storage:backend(),
Transfer1 = {<<"client1">>, mk_file_id()},
Transfer2 = {<<"client2">>, mk_file_id()},
Filemeta = #{name => "oops.pdf"},
@@ -325,7 +325,7 @@ t_gc_handling_errors(_Config) ->
%%
set_gc_config(Name, Value) ->
- emqx_config:put([file_transfer, storage, segments, gc, Name], Value).
+ emqx_config:put([file_transfer, storage, local, segments, gc, Name], Value).
start_transfer(Storage, {Transfer, Meta, Gen}) ->
?assertEqual(
diff --git a/apps/emqx_ft/test/emqx_ft_test_helpers.erl b/apps/emqx_ft/test/emqx_ft_test_helpers.erl
index 1482223d8..2eb6d84db 100644
--- a/apps/emqx_ft/test/emqx_ft_test_helpers.erl
+++ b/apps/emqx_ft/test/emqx_ft_test_helpers.erl
@@ -54,20 +54,22 @@ local_storage(Config) ->
local_storage(Config, Opts) ->
#{
- <<"type">> => <<"local">>,
- <<"segments">> => #{<<"root">> => root(Config, node(), [segments])},
- <<"exporter">> => exporter(Config, Opts)
+ <<"local">> => #{
+ <<"segments">> => #{<<"root">> => root(Config, node(), [segments])},
+ <<"exporter">> => exporter(Config, Opts)
+ }
}.
exporter(Config, #{exporter := local}) ->
- #{<<"type">> => <<"local">>, <<"root">> => root(Config, node(), [exports])};
+ #{<<"local">> => #{<<"root">> => root(Config, node(), [exports])}};
exporter(_Config, #{exporter := s3, bucket_name := BucketName}) ->
BaseConfig = emqx_s3_test_helpers:base_raw_config(tcp),
- BaseConfig#{
- <<"bucket">> => list_to_binary(BucketName),
- <<"type">> => <<"s3">>,
- <<"host">> => ?S3_HOST,
- <<"port">> => ?S3_PORT
+ #{
+ <<"s3">> => BaseConfig#{
+ <<"bucket">> => list_to_binary(BucketName),
+ <<"host">> => ?S3_HOST,
+ <<"port">> => ?S3_PORT
+ }
}.
load_config(Config) ->
diff --git a/rel/i18n/emqx_ft_schema.hocon b/rel/i18n/emqx_ft_schema.hocon
index 64b9bd67e..13bbc6970 100644
--- a/rel/i18n/emqx_ft_schema.hocon
+++ b/rel/i18n/emqx_ft_schema.hocon
@@ -18,11 +18,11 @@ store_segment_timeout.desc:
"""Timeout for storing a file segment.
After reaching the timeout, message with the segment will be acked with an error"""
-storage.desc:
+storage_backend.desc:
"""Storage settings for file transfer."""
-local_type.desc:
-"""Use local file system to store uploaded fragments and temporary data."""
+local_storage.desc:
+"""Local file system backend to store uploaded fragments and temporary data."""
local_storage_segments.desc:
"""Settings for local segments storage, which include uploaded transfer fragments and temporary data."""
@@ -30,15 +30,15 @@ local_storage_segments.desc:
local_storage_segments_root.desc:
"""File system path to keep uploaded fragments and temporary data."""
-local_storage_exporter.desc:
+local_storage_exporter_backend.desc:
"""Exporter for the local file system storage backend.
Exporter defines where and how fully transferred and assembled files are stored."""
-local_storage_exporter_type.desc:
-"""Exporter type for the exporter to the local file system"""
+local_storage_exporter.desc:
+"""Exporter to the local file system."""
-s3_exporter_type.desc:
-"""Exporter type for the exporter to S3"""
+s3_exporter.desc:
+"""Exporter to the S3 API compatible object storage."""
local_storage_exporter_root.desc:
"""File system path to keep uploaded files."""