Merge pull request #10618 from keynslug/ft/EMQX-9678/config-schema

feat(ft-conf): simplify schema of storage / exporter backends
This commit is contained in:
Ilya Averyanov 2023-05-10 10:49:01 +05:00 committed by GitHub
commit 0856522b13
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 231 additions and 238 deletions

View File

@ -54,24 +54,26 @@
enabled() -> enabled() ->
emqx_config:get([file_transfer, enable], false). emqx_config:get([file_transfer, enable], false).
-spec storage() -> _Storage. -spec storage() -> emqx_config:config().
storage() -> storage() ->
emqx_config:get([file_transfer, storage]). emqx_config:get([file_transfer, storage]).
-spec gc_interval(_Storage) -> emqx_maybe:t(milliseconds()). -spec gc_interval(emqx_ft_storage_fs:storage()) ->
gc_interval(Conf = #{type := local}) -> emqx_maybe:t(milliseconds()).
emqx_utils_maps:deep_get([segments, gc, interval], Conf); gc_interval(Storage) ->
gc_interval(_) -> emqx_utils_maps:deep_get([segments, gc, interval], Storage, undefined).
undefined.
-spec segments_ttl(_Storage) -> emqx_maybe:t({_Min :: seconds(), _Max :: seconds()}). -spec segments_ttl(emqx_ft_storage_fs:storage()) ->
segments_ttl(Conf = #{type := local}) -> emqx_maybe:t({_Min :: seconds(), _Max :: seconds()}).
{ segments_ttl(Storage) ->
emqx_utils_maps:deep_get([segments, gc, minimum_segments_ttl], Conf), Min = emqx_utils_maps:deep_get([segments, gc, minimum_segments_ttl], Storage, undefined),
emqx_utils_maps:deep_get([segments, gc, maximum_segments_ttl], Conf) Max = emqx_utils_maps:deep_get([segments, gc, maximum_segments_ttl], Storage, undefined),
}; case is_integer(Min) andalso is_integer(Max) of
segments_ttl(_) -> true ->
undefined. {Min, Max};
false ->
undefined
end.
init_timeout() -> init_timeout() ->
emqx_config:get([file_transfer, init_timeout]). emqx_config:get([file_transfer, init_timeout]).

View File

@ -93,36 +93,30 @@ fields(file_transfer) ->
)}, )},
{storage, {storage,
mk( mk(
hoconsc:union( ref(storage_backend),
fun
(all_union_members) ->
[ref(local_storage)];
({value, #{<<"type">> := <<"local">>}}) ->
[ref(local_storage)];
({value, #{<<"type">> := _}}) ->
throw(#{field_name => type, expected => "local"});
({value, _}) ->
[ref(local_storage)]
end
),
#{ #{
desc => ?DESC("storage_backend"),
required => false, required => false,
desc => ?DESC("storage"), validator => validator(backend),
default => #{<<"type">> => <<"local">>} default => #{
<<"local">> => #{}
}
}
)}
];
fields(storage_backend) ->
[
{local,
mk(
ref(local_storage),
#{
desc => ?DESC("local_storage"),
required => {false, recursively}
} }
)} )}
]; ];
fields(local_storage) -> fields(local_storage) ->
[ [
{type,
mk(
local,
#{
default => local,
required => false,
desc => ?DESC("local_type")
}
)},
{segments, {segments,
mk( mk(
ref(local_storage_segments), ref(local_storage_segments),
@ -136,29 +130,13 @@ fields(local_storage) ->
)}, )},
{exporter, {exporter,
mk( mk(
hoconsc:union( ref(local_storage_exporter_backend),
fun
(all_union_members) ->
[
ref(local_storage_exporter),
ref(s3_exporter)
];
({value, #{<<"type">> := <<"local">>}}) ->
[ref(local_storage_exporter)];
({value, #{<<"type">> := <<"s3">>}}) ->
[ref(s3_exporter)];
({value, #{<<"type">> := _}}) ->
throw(#{field_name => type, expected => "local | s3"});
({value, _}) ->
% NOTE: default
[ref(local_storage_exporter)]
end
),
#{ #{
desc => ?DESC("local_storage_exporter"), desc => ?DESC("local_storage_exporter_backend"),
required => true, required => false,
validator => validator(backend),
default => #{ default => #{
<<"type">> => <<"local">> <<"local">> => #{}
} }
} }
)} )}
@ -181,17 +159,27 @@ fields(local_storage_segments) ->
} }
)} )}
]; ];
fields(local_storage_exporter) -> fields(local_storage_exporter_backend) ->
[ [
{type, {local,
mk( mk(
local, ref(local_storage_exporter),
#{ #{
default => local, desc => ?DESC("local_storage_exporter"),
required => false, required => {false, recursively}
desc => ?DESC("local_storage_exporter_type")
} }
)}, )},
{s3,
mk(
ref(s3_exporter),
#{
desc => ?DESC("s3_exporter"),
required => {false, recursively}
}
)}
];
fields(local_storage_exporter) ->
[
{root, {root,
mk( mk(
binary(), binary(),
@ -202,17 +190,6 @@ fields(local_storage_exporter) ->
)} )}
]; ];
fields(s3_exporter) -> fields(s3_exporter) ->
[
{type,
mk(
s3,
#{
default => s3,
required => false,
desc => ?DESC("s3_exporter_type")
}
)}
] ++
emqx_s3_schema:fields(s3); emqx_s3_schema:fields(s3);
fields(local_storage_segments_gc) -> fields(local_storage_segments_gc) ->
[ [
@ -260,6 +237,10 @@ desc(s3_exporter) ->
"S3 Exporter settings for the File transfer local storage backend"; "S3 Exporter settings for the File transfer local storage backend";
desc(local_storage_segments_gc) -> desc(local_storage_segments_gc) ->
"Garbage collection settings for the File transfer local segments storage"; "Garbage collection settings for the File transfer local segments storage";
desc(local_storage_exporter_backend) ->
"Exporter for the local file system storage backend";
desc(storage_backend) ->
"Storage backend settings for file transfer";
desc(_) -> desc(_) ->
undefined. undefined.
@ -287,7 +268,16 @@ validator(filename) ->
byte_size(Bin) =< ?MAX_FILENAME_BYTELEN orelse {error, max_length_exceeded} byte_size(Bin) =< ?MAX_FILENAME_BYTELEN orelse {error, max_length_exceeded}
end, end,
fun emqx_ft_fs_util:is_filename_safe/1 fun emqx_ft_fs_util:is_filename_safe/1
]. ];
validator(backend) ->
fun(Config) ->
case maps:keys(Config) of
[_Type] ->
ok;
_Conflicts = [_ | _] ->
{error, multiple_conflicting_backends}
end
end.
converter(checksum) -> converter(checksum) ->
fun fun

View File

@ -18,8 +18,6 @@
-export( -export(
[ [
child_spec/0,
store_filemeta/2, store_filemeta/2,
store_segment/2, store_segment/2,
assemble/2, assemble/2,
@ -30,11 +28,17 @@
with_storage_type/2, with_storage_type/2,
with_storage_type/3, with_storage_type/3,
backend/0,
on_config_update/2 on_config_update/2
] ]
). ).
-type storage() :: emqx_config:config(). -type type() :: local.
-type backend() :: {type(), storage()}.
-type storage() :: config().
-type config() :: emqx_config:config().
-export_type([backend/0]).
-export_type([assemble_callback/0]). -export_type([assemble_callback/0]).
@ -100,31 +104,20 @@
%% API %% API
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec child_spec() ->
[supervisor:child_spec()].
child_spec() ->
try
Mod = mod(),
Mod:child_spec(storage())
catch
error:disabled -> [];
error:undef -> []
end.
-spec store_filemeta(emqx_ft:transfer(), emqx_ft:filemeta()) -> -spec store_filemeta(emqx_ft:transfer(), emqx_ft:filemeta()) ->
ok | {async, pid()} | {error, term()}. ok | {async, pid()} | {error, term()}.
store_filemeta(Transfer, FileMeta) -> store_filemeta(Transfer, FileMeta) ->
with_storage(store_filemeta, [Transfer, FileMeta]). dispatch(store_filemeta, [Transfer, FileMeta]).
-spec store_segment(emqx_ft:transfer(), emqx_ft:segment()) -> -spec store_segment(emqx_ft:transfer(), emqx_ft:segment()) ->
ok | {async, pid()} | {error, term()}. ok | {async, pid()} | {error, term()}.
store_segment(Transfer, Segment) -> store_segment(Transfer, Segment) ->
with_storage(store_segment, [Transfer, Segment]). dispatch(store_segment, [Transfer, Segment]).
-spec assemble(emqx_ft:transfer(), emqx_ft:bytes()) -> -spec assemble(emqx_ft:transfer(), emqx_ft:bytes()) ->
ok | {async, pid()} | {error, term()}. ok | {async, pid()} | {error, term()}.
assemble(Transfer, Size) -> assemble(Transfer, Size) ->
with_storage(assemble, [Transfer, Size]). dispatch(assemble, [Transfer, Size]).
-spec files() -> -spec files() ->
{ok, page(file_info(), _)} | {error, term()}. {ok, page(file_info(), _)} | {error, term()}.
@ -134,20 +127,14 @@ files() ->
-spec files(query(Cursor)) -> -spec files(query(Cursor)) ->
{ok, page(file_info(), Cursor)} | {error, term()}. {ok, page(file_info(), Cursor)} | {error, term()}.
files(Query) -> files(Query) ->
with_storage(files, [Query]). dispatch(files, [Query]).
-spec with_storage(atom() | function()) -> any(). -spec dispatch(atom(), list(term())) -> any().
with_storage(Fun) -> dispatch(Fun, Args) when is_atom(Fun) ->
with_storage(Fun, []). {Type, Storage} = backend(),
apply(mod(Type), Fun, [Storage | Args]).
-spec with_storage(atom() | function(), list(term())) -> any(). %%
with_storage(Fun, Args) ->
case storage() of
Storage = #{} ->
apply_storage(Storage, Fun, Args);
undefined ->
{error, disabled}
end.
-spec with_storage_type(atom(), atom() | function()) -> any(). -spec with_storage_type(atom(), atom() | function()) -> any().
with_storage_type(Type, Fun) -> with_storage_type(Type, Fun) ->
@ -155,56 +142,54 @@ with_storage_type(Type, Fun) ->
-spec with_storage_type(atom(), atom() | function(), list(term())) -> any(). -spec with_storage_type(atom(), atom() | function(), list(term())) -> any().
with_storage_type(Type, Fun, Args) -> with_storage_type(Type, Fun, Args) ->
with_storage(fun(Storage) -> case backend() of
case Storage of {Type, Storage} when is_atom(Fun) ->
#{type := Type} -> apply(mod(Type), Fun, [Storage | Args]);
apply_storage(Storage, Fun, Args); {Type, Storage} when is_function(Fun) ->
_ -> apply(Fun, [Storage | Args]);
{error, {invalid_storage_type, Storage}} {_, _} = Backend ->
end {error, {invalid_storage_backend, Backend}}
end). end.
apply_storage(Storage, Fun, Args) when is_atom(Fun) ->
apply(mod(Storage), Fun, [Storage | Args]);
apply_storage(Storage, Fun, Args) when is_function(Fun) ->
apply(Fun, [Storage | Args]).
%% %%
-spec on_config_update(_Old :: emqx_maybe:t(storage()), _New :: emqx_maybe:t(storage())) -> -spec backend() -> backend().
backend() ->
backend(emqx_ft_conf:storage()).
-spec on_config_update(_Old :: emqx_maybe:t(config()), _New :: emqx_maybe:t(config())) ->
ok. ok.
on_config_update(#{type := _} = Storage, #{type := _} = Storage) -> on_config_update(ConfigOld, ConfigNew) ->
on_backend_update(
emqx_maybe:apply(fun backend/1, ConfigOld),
emqx_maybe:apply(fun backend/1, ConfigNew)
).
on_backend_update({Type, _} = Backend, {Type, _} = Backend) ->
ok; ok;
on_config_update(#{type := Type} = StorageOld, #{type := Type} = StorageNew) -> on_backend_update({Type, StorageOld}, {Type, StorageNew}) ->
ok = (mod(StorageNew)):on_config_update(StorageOld, StorageNew); ok = (mod(Type)):on_config_update(StorageOld, StorageNew);
on_config_update(StorageOld, StorageNew) when on_backend_update(BackendOld, BackendNew) when
(StorageOld =:= undefined orelse is_map_key(type, StorageOld)) andalso (BackendOld =:= undefined orelse is_tuple(BackendOld)) andalso
(StorageNew =:= undefined orelse is_map_key(type, StorageNew)) (BackendNew =:= undefined orelse is_tuple(BackendNew))
-> ->
_ = emqx_maybe:apply(fun on_storage_stop/1, StorageOld), _ = emqx_maybe:apply(fun on_storage_stop/1, BackendOld),
_ = emqx_maybe:apply(fun on_storage_start/1, StorageNew), _ = emqx_maybe:apply(fun on_storage_start/1, BackendNew),
ok. ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Local API %% Local API
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
on_storage_start(Storage) -> -spec backend(config()) -> backend().
(mod(Storage)):start(Storage). backend(#{local := Storage}) ->
{local, Storage}.
on_storage_stop(Storage) -> on_storage_start({Type, Storage}) ->
(mod(Storage)):stop(Storage). (mod(Type)):start(Storage).
storage() -> on_storage_stop({Type, Storage}) ->
emqx_ft_conf:storage(). (mod(Type)):stop(Storage).
mod() -> mod(local) ->
mod(storage()). emqx_ft_storage_fs.
mod(Storage) ->
case Storage of
#{type := local} ->
emqx_ft_storage_fs;
undefined ->
error(disabled)
end.

View File

@ -169,15 +169,12 @@ stop({ExporterMod, ExporterOpts}) ->
exporter(Storage) -> exporter(Storage) ->
case maps:get(exporter, Storage) of case maps:get(exporter, Storage) of
#{type := local} = Options -> #{local := Options} ->
{emqx_ft_storage_exporter_fs, without_type(Options)}; {emqx_ft_storage_exporter_fs, Options};
#{type := s3} = Options -> #{s3 := Options} ->
{emqx_ft_storage_exporter_s3, without_type(Options)} {emqx_ft_storage_exporter_s3, Options}
end. end.
without_type(#{type := _} = Options) ->
maps:without([type], Options).
init_checksum(#{checksum := {Algo, _}}) -> init_checksum(#{checksum := {Algo, _}}) ->
crypto:hash_init(Algo); crypto:hash_init(Algo);
init_checksum(#{}) -> init_checksum(#{}) ->

View File

@ -257,6 +257,9 @@ read_exportinfo(
Transfer = dirnames_to_transfer(ClientId, FileId), Transfer = dirnames_to_transfer(ClientId, FileId),
Info = mk_exportinfo(Options, Filename, RelFilepath, Transfer, Fileinfo), Info = mk_exportinfo(Options, Filename, RelFilepath, Transfer, Fileinfo),
[Info | Acc]; [Info | Acc];
read_exportinfo(_Options, {node, _Root = "", {error, enoent}, []}, Acc) ->
% NOTE: Root directory does not exist, this is not an error.
Acc;
read_exportinfo(Options, Entry, Acc) -> read_exportinfo(Options, Entry, Acc) ->
ok = log_invalid_entry(Options, Entry), ok = log_invalid_entry(Options, Entry),
Acc. Acc.

View File

@ -58,9 +58,9 @@ start_link(Storage) ->
collect() -> collect() ->
gen_server:call(mk_server_ref(global), {collect, erlang:system_time()}, infinity). gen_server:call(mk_server_ref(global), {collect, erlang:system_time()}, infinity).
-spec reset() -> ok. -spec reset() -> ok | {error, _}.
reset() -> reset() ->
reset(emqx_ft_conf:storage()). emqx_ft_storage:with_storage_type(local, fun reset/1).
-spec reset(emqx_ft_storage_fs:storage()) -> ok. -spec reset(emqx_ft_storage_fs:storage()) -> ok.
reset(Storage) -> reset(Storage) ->
@ -139,7 +139,8 @@ maybe_report(#gcstats{} = _Stats, _Storage) ->
?tp(garbage_collection, #{stats => _Stats, storage => _Storage}). ?tp(garbage_collection, #{stats => _Stats, storage => _Storage}).
start_timer(St) -> start_timer(St) ->
start_timer(gc_interval(emqx_ft_conf:storage()), St). Interval = emqx_ft_storage:with_storage_type(local, fun gc_interval/1),
start_timer(Interval, St).
start_timer(Interval, St = #st{next_gc_timer = undefined}) when ?IS_ENABLED(Interval) -> start_timer(Interval, St = #st{next_gc_timer = undefined}) when ?IS_ENABLED(Interval) ->
St#st{next_gc_timer = emqx_utils:start_timer(Interval, collect)}; St#st{next_gc_timer = emqx_utils:start_timer(Interval, collect)};

View File

@ -58,16 +58,16 @@ end_per_suite(_Config) ->
set_special_configs(Config) -> set_special_configs(Config) ->
fun fun
(emqx_ft) -> (emqx_ft) ->
Storage = emqx_ft_test_helpers:local_storage(Config),
emqx_ft_test_helpers:load_config(#{
% NOTE % NOTE
% Inhibit local fs GC to simulate it isn't fast enough to collect % Inhibit local fs GC to simulate it isn't fast enough to collect
% complete transfers. % complete transfers.
enable => true, Storage = emqx_utils_maps:deep_merge(
storage => emqx_utils_maps:deep_merge( emqx_ft_test_helpers:local_storage(Config),
Storage, #{<<"local">> => #{<<"segments">> => #{<<"gc">> => #{<<"interval">> => 0}}}}
#{segments => #{gc => #{interval => 0}}} ),
) emqx_ft_test_helpers:load_config(#{
<<"enable">> => true,
<<"storage">> => Storage
}); });
(_) -> (_) ->
ok ok

View File

@ -246,17 +246,20 @@ exporter(Config) ->
emqx_ft_storage_exporter:exporter(storage(Config)). emqx_ft_storage_exporter:exporter(storage(Config)).
storage(Config) -> storage(Config) ->
maps:get( emqx_utils_maps:deep_get(
storage, [storage, local],
emqx_ft_schema:translate(#{ emqx_ft_schema:translate(#{
<<"storage">> => #{ <<"storage">> => #{
<<"type">> => <<"local">>, <<"local">> => #{
<<"segments">> => #{ <<"segments">> => #{
<<"root">> => ?config(storage_root, Config) <<"root">> => ?config(storage_root, Config)
}, },
<<"exporter">> => #{ <<"exporter">> => #{
<<"local">> => #{
<<"root">> => ?config(exports_root, Config) <<"root">> => ?config(exports_root, Config)
} }
} }
}
}
}) })
). ).

View File

@ -34,7 +34,12 @@ end_per_suite(_Config) ->
init_per_testcase(_Case, Config) -> init_per_testcase(_Case, Config) ->
_ = emqx_config:save_schema_mod_and_names(emqx_ft_schema), _ = emqx_config:save_schema_mod_and_names(emqx_ft_schema),
ok = emqx_common_test_helpers:start_apps( ok = emqx_common_test_helpers:start_apps(
[emqx_conf, emqx_ft], emqx_ft_test_helpers:env_handler(Config) [emqx_conf, emqx_ft], fun
(emqx_ft) ->
emqx_ft_test_helpers:load_config(#{});
(_) ->
ok
end
), ),
{ok, _} = emqx:update_config([rpc, port_discovery], manual), {ok, _} = emqx:update_config([rpc, port_discovery], manual),
Config. Config.
@ -52,7 +57,7 @@ t_update_config(_Config) ->
{error, #{kind := validation_error}}, {error, #{kind := validation_error}},
emqx_conf:update( emqx_conf:update(
[file_transfer], [file_transfer],
#{<<"storage">> => #{<<"type">> => <<"unknown">>}}, #{<<"storage">> => #{<<"unknown">> => #{<<"foo">> => 42}}},
#{} #{}
) )
), ),
@ -63,7 +68,7 @@ t_update_config(_Config) ->
#{ #{
<<"enable">> => true, <<"enable">> => true,
<<"storage">> => #{ <<"storage">> => #{
<<"type">> => <<"local">>, <<"local">> => #{
<<"segments">> => #{ <<"segments">> => #{
<<"root">> => <<"/tmp/path">>, <<"root">> => <<"/tmp/path">>,
<<"gc">> => #{ <<"gc">> => #{
@ -71,21 +76,27 @@ t_update_config(_Config) ->
} }
}, },
<<"exporter">> => #{ <<"exporter">> => #{
<<"type">> => <<"local">>, <<"local">> => #{
<<"root">> => <<"/tmp/exports">> <<"root">> => <<"/tmp/exports">>
} }
} }
}
}
}, },
#{} #{}
) )
), ),
?assertEqual( ?assertEqual(
<<"/tmp/path">>, <<"/tmp/path">>,
emqx_config:get([file_transfer, storage, segments, root]) emqx_config:get([file_transfer, storage, local, segments, root])
), ),
?assertEqual( ?assertEqual(
5 * 60 * 1000, 5 * 60 * 1000,
emqx_ft_conf:gc_interval(emqx_ft_conf:storage()) emqx_ft_storage:with_storage_type(local, fun emqx_ft_conf:gc_interval/1)
),
?assertEqual(
{5 * 60, 24 * 60 * 60},
emqx_ft_storage:with_storage_type(local, fun emqx_ft_conf:segments_ttl/1)
). ).
t_disable_restore_config(Config) -> t_disable_restore_config(Config) ->
@ -93,13 +104,13 @@ t_disable_restore_config(Config) ->
{ok, _}, {ok, _},
emqx_conf:update( emqx_conf:update(
[file_transfer], [file_transfer],
#{<<"enable">> => true, <<"storage">> => #{<<"type">> => <<"local">>}}, #{<<"enable">> => true, <<"storage">> => #{<<"local">> => #{}}},
#{} #{}
) )
), ),
?assertEqual( ?assertEqual(
60 * 60 * 1000, 60 * 60 * 1000,
emqx_ft_conf:gc_interval(emqx_ft_conf:storage()) emqx_ft_storage:with_storage_type(local, fun emqx_ft_conf:gc_interval/1)
), ),
% Verify that transfers work % Verify that transfers work
ok = emqx_ft_test_helpers:upload_file(gen_clientid(), <<"f1">>, "f1", <<?MODULE_STRING>>), ok = emqx_ft_test_helpers:upload_file(gen_clientid(), <<"f1">>, "f1", <<?MODULE_STRING>>),
@ -117,7 +128,7 @@ t_disable_restore_config(Config) ->
emqx_ft_conf:enabled() emqx_ft_conf:enabled()
), ),
?assertMatch( ?assertMatch(
#{type := local, exporter := #{type := local}}, #{local := #{exporter := #{local := _}}},
emqx_ft_conf:storage() emqx_ft_conf:storage()
), ),
ClientId = gen_clientid(), ClientId = gen_clientid(),
@ -147,12 +158,13 @@ t_disable_restore_config(Config) ->
#{ #{
<<"enable">> => true, <<"enable">> => true,
<<"storage">> => #{ <<"storage">> => #{
<<"type">> => <<"local">>, <<"local">> => #{
<<"segments">> => #{ <<"segments">> => #{
<<"root">> => Root, <<"root">> => Root,
<<"gc">> => #{<<"interval">> => <<"1s">>} <<"gc">> => #{<<"interval">> => <<"1s">>}
} }
} }
}
}, },
#{} #{}
) )
@ -165,10 +177,7 @@ t_disable_restore_config(Config) ->
[ [
#{ #{
?snk_kind := garbage_collection, ?snk_kind := garbage_collection,
storage := #{ storage := #{segments := #{root := Root}}
type := local,
segments := #{root := Root}
}
} }
], ],
?of_kind(garbage_collection, Trace) ?of_kind(garbage_collection, Trace)
@ -188,48 +197,49 @@ t_switch_exporter(_Config) ->
) )
), ),
?assertMatch( ?assertMatch(
#{type := local, exporter := #{type := local}}, #{local := #{exporter := #{local := _}}},
emqx_ft_conf:storage() emqx_ft_conf:storage()
), ),
% Verify that switching to a different exporter works % Verify that switching to a different exporter works
?assertMatch( ?assertMatch(
{ok, _}, {ok, _},
emqx_conf:update( emqx_conf:update(
[file_transfer, storage, exporter], [file_transfer, storage, local, exporter],
#{ #{
<<"type">> => <<"s3">>, <<"s3">> => #{
<<"bucket">> => <<"emqx">>, <<"bucket">> => <<"emqx">>,
<<"host">> => <<"https://localhost">>, <<"host">> => <<"https://localhost">>,
<<"port">> => 9000, <<"port">> => 9000,
<<"transport_options">> => #{ <<"transport_options">> => #{
<<"ipv6_probe">> => false <<"ipv6_probe">> => false
} }
}
}, },
#{} #{}
) )
), ),
?assertMatch( ?assertMatch(
#{type := local, exporter := #{type := s3}}, #{local := #{exporter := #{s3 := _}}},
emqx_ft_conf:storage() emqx_ft_conf:storage()
), ),
% Verify that switching back to local exporter works % Verify that switching back to local exporter works
?assertMatch( ?assertMatch(
{ok, _}, {ok, _},
emqx_conf:remove( emqx_conf:remove(
[file_transfer, storage, exporter], [file_transfer, storage, local, exporter],
#{} #{}
) )
), ),
?assertMatch( ?assertMatch(
{ok, _}, {ok, _},
emqx_conf:update( emqx_conf:update(
[file_transfer, storage, exporter], [file_transfer, storage, local, exporter],
#{<<"type">> => <<"local">>}, #{<<"local">> => #{}},
#{} #{}
) )
), ),
?assertMatch( ?assertMatch(
#{type := local, exporter := #{type := local}}, #{local := #{exporter := #{local := #{}}}},
emqx_ft_conf:storage() emqx_ft_conf:storage()
), ),
% Verify that transfers work % Verify that transfers work

View File

@ -111,7 +111,7 @@ t_upload_error(Config) ->
Data = <<"data"/utf8>>, Data = <<"data"/utf8>>,
{ok, _} = emqx_conf:update( {ok, _} = emqx_conf:update(
[file_transfer, storage, exporter, bucket], <<"invalid-bucket">>, #{} [file_transfer, storage, local, exporter, s3, bucket], <<"invalid-bucket">>, #{}
), ),
?assertEqual( ?assertEqual(

View File

@ -85,7 +85,7 @@ client_id(Config) ->
storage(Config) -> storage(Config) ->
RawConfig = #{<<"storage">> => emqx_ft_test_helpers:local_storage(Config)}, RawConfig = #{<<"storage">> => emqx_ft_test_helpers:local_storage(Config)},
#{storage := Storage} = emqx_ft_schema:translate(RawConfig), #{storage := #{local := Storage}} = emqx_ft_schema:translate(RawConfig),
Storage. Storage.
list_files(Config) -> list_files(Config) ->

View File

@ -36,19 +36,19 @@ end_per_suite(_Config) ->
ok. ok.
init_per_testcase(TC, Config) -> init_per_testcase(TC, Config) ->
SegmentsRoot = emqx_ft_test_helpers:root(Config, node(), [TC, segments]),
ExportsRoot = emqx_ft_test_helpers:root(Config, node(), [TC, exports]),
ok = emqx_common_test_helpers:start_app( ok = emqx_common_test_helpers:start_app(
emqx_ft, emqx_ft,
fun(emqx_ft) -> fun(emqx_ft) ->
emqx_ft_test_helpers:load_config(#{ emqx_ft_test_helpers:load_config(#{
<<"enable">> => true, <<"enable">> => true,
<<"storage">> => #{ <<"storage">> => #{
<<"type">> => <<"local">>, <<"local">> => #{
<<"segments">> => #{ <<"segments">> => #{<<"root">> => SegmentsRoot},
<<"root">> => emqx_ft_test_helpers:root(Config, node(), [TC, segments])
},
<<"exporter">> => #{ <<"exporter">> => #{
<<"type">> => <<"local">>, <<"local">> => #{<<"root">> => ExportsRoot}
<<"root">> => emqx_ft_test_helpers:root(Config, node(), [TC, exports]) }
} }
} }
}) })
@ -105,7 +105,7 @@ t_gc_triggers_manually(_Config) ->
). ).
t_gc_complete_transfers(_Config) -> t_gc_complete_transfers(_Config) ->
Storage = emqx_ft_conf:storage(), {local, Storage} = emqx_ft_storage:backend(),
ok = set_gc_config(minimum_segments_ttl, 0), ok = set_gc_config(minimum_segments_ttl, 0),
ok = set_gc_config(maximum_segments_ttl, 3), ok = set_gc_config(maximum_segments_ttl, 3),
ok = set_gc_config(interval, 500), ok = set_gc_config(interval, 500),
@ -198,7 +198,7 @@ t_gc_complete_transfers(_Config) ->
t_gc_incomplete_transfers(_Config) -> t_gc_incomplete_transfers(_Config) ->
ok = set_gc_config(minimum_segments_ttl, 0), ok = set_gc_config(minimum_segments_ttl, 0),
ok = set_gc_config(maximum_segments_ttl, 4), ok = set_gc_config(maximum_segments_ttl, 4),
Storage = emqx_ft_conf:storage(), {local, Storage} = emqx_ft_storage:backend(),
Transfers = [ Transfers = [
{ {
{<<"client43"/utf8>>, <<"file-🦕"/utf8>>}, {<<"client43"/utf8>>, <<"file-🦕"/utf8>>},
@ -269,7 +269,7 @@ t_gc_incomplete_transfers(_Config) ->
t_gc_handling_errors(_Config) -> t_gc_handling_errors(_Config) ->
ok = set_gc_config(minimum_segments_ttl, 0), ok = set_gc_config(minimum_segments_ttl, 0),
ok = set_gc_config(maximum_segments_ttl, 0), ok = set_gc_config(maximum_segments_ttl, 0),
Storage = emqx_ft_conf:storage(), {local, Storage} = emqx_ft_storage:backend(),
Transfer1 = {<<"client1">>, mk_file_id()}, Transfer1 = {<<"client1">>, mk_file_id()},
Transfer2 = {<<"client2">>, mk_file_id()}, Transfer2 = {<<"client2">>, mk_file_id()},
Filemeta = #{name => "oops.pdf"}, Filemeta = #{name => "oops.pdf"},
@ -325,7 +325,7 @@ t_gc_handling_errors(_Config) ->
%% %%
set_gc_config(Name, Value) -> set_gc_config(Name, Value) ->
emqx_config:put([file_transfer, storage, segments, gc, Name], Value). emqx_config:put([file_transfer, storage, local, segments, gc, Name], Value).
start_transfer(Storage, {Transfer, Meta, Gen}) -> start_transfer(Storage, {Transfer, Meta, Gen}) ->
?assertEqual( ?assertEqual(

View File

@ -54,20 +54,22 @@ local_storage(Config) ->
local_storage(Config, Opts) -> local_storage(Config, Opts) ->
#{ #{
<<"type">> => <<"local">>, <<"local">> => #{
<<"segments">> => #{<<"root">> => root(Config, node(), [segments])}, <<"segments">> => #{<<"root">> => root(Config, node(), [segments])},
<<"exporter">> => exporter(Config, Opts) <<"exporter">> => exporter(Config, Opts)
}
}. }.
exporter(Config, #{exporter := local}) -> exporter(Config, #{exporter := local}) ->
#{<<"type">> => <<"local">>, <<"root">> => root(Config, node(), [exports])}; #{<<"local">> => #{<<"root">> => root(Config, node(), [exports])}};
exporter(_Config, #{exporter := s3, bucket_name := BucketName}) -> exporter(_Config, #{exporter := s3, bucket_name := BucketName}) ->
BaseConfig = emqx_s3_test_helpers:base_raw_config(tcp), BaseConfig = emqx_s3_test_helpers:base_raw_config(tcp),
BaseConfig#{ #{
<<"s3">> => BaseConfig#{
<<"bucket">> => list_to_binary(BucketName), <<"bucket">> => list_to_binary(BucketName),
<<"type">> => <<"s3">>,
<<"host">> => ?S3_HOST, <<"host">> => ?S3_HOST,
<<"port">> => ?S3_PORT <<"port">> => ?S3_PORT
}
}. }.
load_config(Config) -> load_config(Config) ->

View File

@ -18,11 +18,11 @@ store_segment_timeout.desc:
"""Timeout for storing a file segment.<br/> """Timeout for storing a file segment.<br/>
After reaching the timeout, message with the segment will be acked with an error""" After reaching the timeout, message with the segment will be acked with an error"""
storage.desc: storage_backend.desc:
"""Storage settings for file transfer.""" """Storage settings for file transfer."""
local_type.desc: local_storage.desc:
"""Use local file system to store uploaded fragments and temporary data.""" """Local file system backend to store uploaded fragments and temporary data."""
local_storage_segments.desc: local_storage_segments.desc:
"""Settings for local segments storage, which include uploaded transfer fragments and temporary data.""" """Settings for local segments storage, which include uploaded transfer fragments and temporary data."""
@ -30,15 +30,15 @@ local_storage_segments.desc:
local_storage_segments_root.desc: local_storage_segments_root.desc:
"""File system path to keep uploaded fragments and temporary data.""" """File system path to keep uploaded fragments and temporary data."""
local_storage_exporter.desc: local_storage_exporter_backend.desc:
"""Exporter for the local file system storage backend.<br/> """Exporter for the local file system storage backend.<br/>
Exporter defines where and how fully transferred and assembled files are stored.""" Exporter defines where and how fully transferred and assembled files are stored."""
local_storage_exporter_type.desc: local_storage_exporter.desc:
"""Exporter type for the exporter to the local file system""" """Exporter to the local file system."""
s3_exporter_type.desc: s3_exporter.desc:
"""Exporter type for the exporter to S3""" """Exporter to the S3 API compatible object storage."""
local_storage_exporter_root.desc: local_storage_exporter_root.desc:
"""File system path to keep uploaded files.""" """File system path to keep uploaded files."""