feat(ft-conf): simplify schema of storage / exporter backends
Assumption is this changes will make `emqx_ft` config schema user-friendlier and also more future-proof.
This commit is contained in:
parent
7fa166f034
commit
5e5f854ce1
|
@ -54,24 +54,26 @@
|
||||||
enabled() ->
|
enabled() ->
|
||||||
emqx_config:get([file_transfer, enable], false).
|
emqx_config:get([file_transfer, enable], false).
|
||||||
|
|
||||||
-spec storage() -> _Storage.
|
-spec storage() -> emqx_config:config().
|
||||||
storage() ->
|
storage() ->
|
||||||
emqx_config:get([file_transfer, storage]).
|
emqx_config:get([file_transfer, storage]).
|
||||||
|
|
||||||
-spec gc_interval(_Storage) -> emqx_maybe:t(milliseconds()).
|
-spec gc_interval(emqx_ft_storage_fs:storage()) ->
|
||||||
gc_interval(Conf = #{type := local}) ->
|
emqx_maybe:t(milliseconds()).
|
||||||
emqx_utils_maps:deep_get([segments, gc, interval], Conf);
|
gc_interval(Storage) ->
|
||||||
gc_interval(_) ->
|
emqx_utils_maps:deep_get([segments, gc, interval], Storage, undefined).
|
||||||
undefined.
|
|
||||||
|
|
||||||
-spec segments_ttl(_Storage) -> emqx_maybe:t({_Min :: seconds(), _Max :: seconds()}).
|
-spec segments_ttl(emqx_ft_storage_fs:storage()) ->
|
||||||
segments_ttl(Conf = #{type := local}) ->
|
emqx_maybe:t({_Min :: seconds(), _Max :: seconds()}).
|
||||||
{
|
segments_ttl(Storage) ->
|
||||||
emqx_utils_maps:deep_get([segments, gc, minimum_segments_ttl], Conf),
|
Min = emqx_utils_maps:deep_get([segments, gc, minimum_segments_ttl], Storage, undefined),
|
||||||
emqx_utils_maps:deep_get([segments, gc, maximum_segments_ttl], Conf)
|
Max = emqx_utils_maps:deep_get([segments, gc, maximum_segments_ttl], Storage, undefined),
|
||||||
};
|
case is_integer(Min) andalso is_integer(Max) of
|
||||||
segments_ttl(_) ->
|
true ->
|
||||||
undefined.
|
{Min, Max};
|
||||||
|
false ->
|
||||||
|
undefined
|
||||||
|
end.
|
||||||
|
|
||||||
init_timeout() ->
|
init_timeout() ->
|
||||||
emqx_config:get([file_transfer, init_timeout]).
|
emqx_config:get([file_transfer, init_timeout]).
|
||||||
|
|
|
@ -93,36 +93,30 @@ fields(file_transfer) ->
|
||||||
)},
|
)},
|
||||||
{storage,
|
{storage,
|
||||||
mk(
|
mk(
|
||||||
hoconsc:union(
|
ref(storage_backend),
|
||||||
fun
|
|
||||||
(all_union_members) ->
|
|
||||||
[ref(local_storage)];
|
|
||||||
({value, #{<<"type">> := <<"local">>}}) ->
|
|
||||||
[ref(local_storage)];
|
|
||||||
({value, #{<<"type">> := _}}) ->
|
|
||||||
throw(#{field_name => type, expected => "local"});
|
|
||||||
({value, _}) ->
|
|
||||||
[ref(local_storage)]
|
|
||||||
end
|
|
||||||
),
|
|
||||||
#{
|
#{
|
||||||
|
desc => ?DESC("storage_backend"),
|
||||||
required => false,
|
required => false,
|
||||||
desc => ?DESC("storage"),
|
validator => validator(backend),
|
||||||
default => #{<<"type">> => <<"local">>}
|
default => #{
|
||||||
|
<<"local">> => #{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)}
|
||||||
|
];
|
||||||
|
fields(storage_backend) ->
|
||||||
|
[
|
||||||
|
{local,
|
||||||
|
mk(
|
||||||
|
ref(local_storage),
|
||||||
|
#{
|
||||||
|
desc => ?DESC("local_storage"),
|
||||||
|
required => {false, recursively}
|
||||||
}
|
}
|
||||||
)}
|
)}
|
||||||
];
|
];
|
||||||
fields(local_storage) ->
|
fields(local_storage) ->
|
||||||
[
|
[
|
||||||
{type,
|
|
||||||
mk(
|
|
||||||
local,
|
|
||||||
#{
|
|
||||||
default => local,
|
|
||||||
required => false,
|
|
||||||
desc => ?DESC("local_type")
|
|
||||||
}
|
|
||||||
)},
|
|
||||||
{segments,
|
{segments,
|
||||||
mk(
|
mk(
|
||||||
ref(local_storage_segments),
|
ref(local_storage_segments),
|
||||||
|
@ -136,29 +130,13 @@ fields(local_storage) ->
|
||||||
)},
|
)},
|
||||||
{exporter,
|
{exporter,
|
||||||
mk(
|
mk(
|
||||||
hoconsc:union(
|
ref(local_storage_exporter_backend),
|
||||||
fun
|
|
||||||
(all_union_members) ->
|
|
||||||
[
|
|
||||||
ref(local_storage_exporter),
|
|
||||||
ref(s3_exporter)
|
|
||||||
];
|
|
||||||
({value, #{<<"type">> := <<"local">>}}) ->
|
|
||||||
[ref(local_storage_exporter)];
|
|
||||||
({value, #{<<"type">> := <<"s3">>}}) ->
|
|
||||||
[ref(s3_exporter)];
|
|
||||||
({value, #{<<"type">> := _}}) ->
|
|
||||||
throw(#{field_name => type, expected => "local | s3"});
|
|
||||||
({value, _}) ->
|
|
||||||
% NOTE: default
|
|
||||||
[ref(local_storage_exporter)]
|
|
||||||
end
|
|
||||||
),
|
|
||||||
#{
|
#{
|
||||||
desc => ?DESC("local_storage_exporter"),
|
desc => ?DESC("local_storage_exporter_backend"),
|
||||||
required => true,
|
required => false,
|
||||||
|
validator => validator(backend),
|
||||||
default => #{
|
default => #{
|
||||||
<<"type">> => <<"local">>
|
<<"local">> => #{}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
)}
|
)}
|
||||||
|
@ -181,17 +159,27 @@ fields(local_storage_segments) ->
|
||||||
}
|
}
|
||||||
)}
|
)}
|
||||||
];
|
];
|
||||||
fields(local_storage_exporter) ->
|
fields(local_storage_exporter_backend) ->
|
||||||
[
|
[
|
||||||
{type,
|
{local,
|
||||||
mk(
|
mk(
|
||||||
local,
|
ref(local_storage_exporter),
|
||||||
#{
|
#{
|
||||||
default => local,
|
desc => ?DESC("local_storage_exporter"),
|
||||||
required => false,
|
required => {false, recursively}
|
||||||
desc => ?DESC("local_storage_exporter_type")
|
|
||||||
}
|
}
|
||||||
)},
|
)},
|
||||||
|
{s3,
|
||||||
|
mk(
|
||||||
|
ref(s3_exporter),
|
||||||
|
#{
|
||||||
|
desc => ?DESC("s3_exporter"),
|
||||||
|
required => {false, recursively}
|
||||||
|
}
|
||||||
|
)}
|
||||||
|
];
|
||||||
|
fields(local_storage_exporter) ->
|
||||||
|
[
|
||||||
{root,
|
{root,
|
||||||
mk(
|
mk(
|
||||||
binary(),
|
binary(),
|
||||||
|
@ -202,17 +190,6 @@ fields(local_storage_exporter) ->
|
||||||
)}
|
)}
|
||||||
];
|
];
|
||||||
fields(s3_exporter) ->
|
fields(s3_exporter) ->
|
||||||
[
|
|
||||||
{type,
|
|
||||||
mk(
|
|
||||||
s3,
|
|
||||||
#{
|
|
||||||
default => s3,
|
|
||||||
required => false,
|
|
||||||
desc => ?DESC("s3_exporter_type")
|
|
||||||
}
|
|
||||||
)}
|
|
||||||
] ++
|
|
||||||
emqx_s3_schema:fields(s3);
|
emqx_s3_schema:fields(s3);
|
||||||
fields(local_storage_segments_gc) ->
|
fields(local_storage_segments_gc) ->
|
||||||
[
|
[
|
||||||
|
@ -287,7 +264,16 @@ validator(filename) ->
|
||||||
byte_size(Bin) =< ?MAX_FILENAME_BYTELEN orelse {error, max_length_exceeded}
|
byte_size(Bin) =< ?MAX_FILENAME_BYTELEN orelse {error, max_length_exceeded}
|
||||||
end,
|
end,
|
||||||
fun emqx_ft_fs_util:is_filename_safe/1
|
fun emqx_ft_fs_util:is_filename_safe/1
|
||||||
].
|
];
|
||||||
|
validator(backend) ->
|
||||||
|
fun(Config) ->
|
||||||
|
case maps:keys(Config) of
|
||||||
|
[_Type] ->
|
||||||
|
ok;
|
||||||
|
_Conflicts = [_ | _] ->
|
||||||
|
{error, multiple_conflicting_backends}
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
converter(checksum) ->
|
converter(checksum) ->
|
||||||
fun
|
fun
|
||||||
|
|
|
@ -18,8 +18,6 @@
|
||||||
|
|
||||||
-export(
|
-export(
|
||||||
[
|
[
|
||||||
child_spec/0,
|
|
||||||
|
|
||||||
store_filemeta/2,
|
store_filemeta/2,
|
||||||
store_segment/2,
|
store_segment/2,
|
||||||
assemble/2,
|
assemble/2,
|
||||||
|
@ -30,11 +28,17 @@
|
||||||
with_storage_type/2,
|
with_storage_type/2,
|
||||||
with_storage_type/3,
|
with_storage_type/3,
|
||||||
|
|
||||||
|
backend/0,
|
||||||
on_config_update/2
|
on_config_update/2
|
||||||
]
|
]
|
||||||
).
|
).
|
||||||
|
|
||||||
-type storage() :: emqx_config:config().
|
-type type() :: local.
|
||||||
|
-type backend() :: {type(), storage()}.
|
||||||
|
-type storage() :: config().
|
||||||
|
-type config() :: emqx_config:config().
|
||||||
|
|
||||||
|
-export_type([backend/0]).
|
||||||
|
|
||||||
-export_type([assemble_callback/0]).
|
-export_type([assemble_callback/0]).
|
||||||
|
|
||||||
|
@ -100,31 +104,20 @@
|
||||||
%% API
|
%% API
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-spec child_spec() ->
|
|
||||||
[supervisor:child_spec()].
|
|
||||||
child_spec() ->
|
|
||||||
try
|
|
||||||
Mod = mod(),
|
|
||||||
Mod:child_spec(storage())
|
|
||||||
catch
|
|
||||||
error:disabled -> [];
|
|
||||||
error:undef -> []
|
|
||||||
end.
|
|
||||||
|
|
||||||
-spec store_filemeta(emqx_ft:transfer(), emqx_ft:filemeta()) ->
|
-spec store_filemeta(emqx_ft:transfer(), emqx_ft:filemeta()) ->
|
||||||
ok | {async, pid()} | {error, term()}.
|
ok | {async, pid()} | {error, term()}.
|
||||||
store_filemeta(Transfer, FileMeta) ->
|
store_filemeta(Transfer, FileMeta) ->
|
||||||
with_storage(store_filemeta, [Transfer, FileMeta]).
|
dispatch(store_filemeta, [Transfer, FileMeta]).
|
||||||
|
|
||||||
-spec store_segment(emqx_ft:transfer(), emqx_ft:segment()) ->
|
-spec store_segment(emqx_ft:transfer(), emqx_ft:segment()) ->
|
||||||
ok | {async, pid()} | {error, term()}.
|
ok | {async, pid()} | {error, term()}.
|
||||||
store_segment(Transfer, Segment) ->
|
store_segment(Transfer, Segment) ->
|
||||||
with_storage(store_segment, [Transfer, Segment]).
|
dispatch(store_segment, [Transfer, Segment]).
|
||||||
|
|
||||||
-spec assemble(emqx_ft:transfer(), emqx_ft:bytes()) ->
|
-spec assemble(emqx_ft:transfer(), emqx_ft:bytes()) ->
|
||||||
ok | {async, pid()} | {error, term()}.
|
ok | {async, pid()} | {error, term()}.
|
||||||
assemble(Transfer, Size) ->
|
assemble(Transfer, Size) ->
|
||||||
with_storage(assemble, [Transfer, Size]).
|
dispatch(assemble, [Transfer, Size]).
|
||||||
|
|
||||||
-spec files() ->
|
-spec files() ->
|
||||||
{ok, page(file_info(), _)} | {error, term()}.
|
{ok, page(file_info(), _)} | {error, term()}.
|
||||||
|
@ -134,77 +127,75 @@ files() ->
|
||||||
-spec files(query(Cursor)) ->
|
-spec files(query(Cursor)) ->
|
||||||
{ok, page(file_info(), Cursor)} | {error, term()}.
|
{ok, page(file_info(), Cursor)} | {error, term()}.
|
||||||
files(Query) ->
|
files(Query) ->
|
||||||
with_storage(files, [Query]).
|
dispatch(files, [Query]).
|
||||||
|
|
||||||
-spec with_storage(atom() | function()) -> any().
|
-spec dispatch(atom(), list(term())) -> any().
|
||||||
with_storage(Fun) ->
|
dispatch(Fun, Args) when is_atom(Fun) ->
|
||||||
with_storage(Fun, []).
|
case backend() of
|
||||||
|
{Type, Storage} ->
|
||||||
-spec with_storage(atom() | function(), list(term())) -> any().
|
apply(mod(Type), Fun, [Storage | Args]);
|
||||||
with_storage(Fun, Args) ->
|
_ ->
|
||||||
case storage() of
|
|
||||||
Storage = #{} ->
|
|
||||||
apply_storage(Storage, Fun, Args);
|
|
||||||
undefined ->
|
|
||||||
{error, disabled}
|
{error, disabled}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%%
|
||||||
|
|
||||||
-spec with_storage_type(atom(), atom() | function()) -> any().
|
-spec with_storage_type(atom(), atom() | function()) -> any().
|
||||||
with_storage_type(Type, Fun) ->
|
with_storage_type(Type, Fun) ->
|
||||||
with_storage_type(Type, Fun, []).
|
with_storage_type(Type, Fun, []).
|
||||||
|
|
||||||
-spec with_storage_type(atom(), atom() | function(), list(term())) -> any().
|
-spec with_storage_type(atom(), atom() | function(), list(term())) -> any().
|
||||||
with_storage_type(Type, Fun, Args) ->
|
with_storage_type(Type, Fun, Args) ->
|
||||||
with_storage(fun(Storage) ->
|
case backend() of
|
||||||
case Storage of
|
{Type, Storage} when is_atom(Fun) ->
|
||||||
#{type := Type} ->
|
apply(mod(Type), Fun, [Storage | Args]);
|
||||||
apply_storage(Storage, Fun, Args);
|
{Type, Storage} when is_function(Fun) ->
|
||||||
|
apply(Fun, [Storage | Args]);
|
||||||
|
{_, _} = Backend ->
|
||||||
|
{error, {invalid_storage_backend, Backend}};
|
||||||
_ ->
|
_ ->
|
||||||
{error, {invalid_storage_type, Storage}}
|
{error, disabled}
|
||||||
end
|
end.
|
||||||
end).
|
|
||||||
|
|
||||||
apply_storage(Storage, Fun, Args) when is_atom(Fun) ->
|
|
||||||
apply(mod(Storage), Fun, [Storage | Args]);
|
|
||||||
apply_storage(Storage, Fun, Args) when is_function(Fun) ->
|
|
||||||
apply(Fun, [Storage | Args]).
|
|
||||||
|
|
||||||
%%
|
%%
|
||||||
|
|
||||||
-spec on_config_update(_Old :: emqx_maybe:t(storage()), _New :: emqx_maybe:t(storage())) ->
|
-spec backend() -> backend().
|
||||||
|
backend() ->
|
||||||
|
backend(emqx_ft_conf:storage()).
|
||||||
|
|
||||||
|
-spec on_config_update(_Old :: emqx_maybe:t(config()), _New :: emqx_maybe:t(config())) ->
|
||||||
ok.
|
ok.
|
||||||
on_config_update(#{type := _} = Storage, #{type := _} = Storage) ->
|
on_config_update(ConfigOld, ConfigNew) ->
|
||||||
|
on_backend_update(
|
||||||
|
emqx_maybe:apply(fun backend/1, ConfigOld),
|
||||||
|
emqx_maybe:apply(fun backend/1, ConfigNew)
|
||||||
|
).
|
||||||
|
|
||||||
|
on_backend_update({Type, _} = Backend, {Type, _} = Backend) ->
|
||||||
ok;
|
ok;
|
||||||
on_config_update(#{type := Type} = StorageOld, #{type := Type} = StorageNew) ->
|
on_backend_update({Type, StorageOld}, {Type, StorageNew}) ->
|
||||||
ok = (mod(StorageNew)):on_config_update(StorageOld, StorageNew);
|
ok = (mod(Type)):on_config_update(StorageOld, StorageNew);
|
||||||
on_config_update(StorageOld, StorageNew) when
|
on_backend_update(BackendOld, BackendNew) when
|
||||||
(StorageOld =:= undefined orelse is_map_key(type, StorageOld)) andalso
|
(BackendOld =:= undefined orelse is_tuple(BackendOld)) andalso
|
||||||
(StorageNew =:= undefined orelse is_map_key(type, StorageNew))
|
(BackendNew =:= undefined orelse is_tuple(BackendNew))
|
||||||
->
|
->
|
||||||
_ = emqx_maybe:apply(fun on_storage_stop/1, StorageOld),
|
_ = emqx_maybe:apply(fun on_storage_stop/1, BackendOld),
|
||||||
_ = emqx_maybe:apply(fun on_storage_start/1, StorageNew),
|
_ = emqx_maybe:apply(fun on_storage_start/1, BackendNew),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Local API
|
%% Local API
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
on_storage_start(Storage) ->
|
-spec backend(config()) -> backend().
|
||||||
(mod(Storage)):start(Storage).
|
backend(#{local := Storage}) ->
|
||||||
|
{local, Storage}.
|
||||||
|
|
||||||
on_storage_stop(Storage) ->
|
on_storage_start({Type, Storage}) ->
|
||||||
(mod(Storage)):stop(Storage).
|
(mod(Type)):start(Storage).
|
||||||
|
|
||||||
storage() ->
|
on_storage_stop({Type, Storage}) ->
|
||||||
emqx_ft_conf:storage().
|
(mod(Type)):stop(Storage).
|
||||||
|
|
||||||
mod() ->
|
mod(local) ->
|
||||||
mod(storage()).
|
emqx_ft_storage_fs.
|
||||||
|
|
||||||
mod(Storage) ->
|
|
||||||
case Storage of
|
|
||||||
#{type := local} ->
|
|
||||||
emqx_ft_storage_fs;
|
|
||||||
undefined ->
|
|
||||||
error(disabled)
|
|
||||||
end.
|
|
||||||
|
|
|
@ -169,15 +169,12 @@ stop({ExporterMod, ExporterOpts}) ->
|
||||||
|
|
||||||
exporter(Storage) ->
|
exporter(Storage) ->
|
||||||
case maps:get(exporter, Storage) of
|
case maps:get(exporter, Storage) of
|
||||||
#{type := local} = Options ->
|
#{local := Options} ->
|
||||||
{emqx_ft_storage_exporter_fs, without_type(Options)};
|
{emqx_ft_storage_exporter_fs, Options};
|
||||||
#{type := s3} = Options ->
|
#{s3 := Options} ->
|
||||||
{emqx_ft_storage_exporter_s3, without_type(Options)}
|
{emqx_ft_storage_exporter_s3, Options}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
without_type(#{type := _} = Options) ->
|
|
||||||
maps:without([type], Options).
|
|
||||||
|
|
||||||
init_checksum(#{checksum := {Algo, _}}) ->
|
init_checksum(#{checksum := {Algo, _}}) ->
|
||||||
crypto:hash_init(Algo);
|
crypto:hash_init(Algo);
|
||||||
init_checksum(#{}) ->
|
init_checksum(#{}) ->
|
||||||
|
|
|
@ -58,9 +58,9 @@ start_link(Storage) ->
|
||||||
collect() ->
|
collect() ->
|
||||||
gen_server:call(mk_server_ref(global), {collect, erlang:system_time()}, infinity).
|
gen_server:call(mk_server_ref(global), {collect, erlang:system_time()}, infinity).
|
||||||
|
|
||||||
-spec reset() -> ok.
|
-spec reset() -> ok | {error, _}.
|
||||||
reset() ->
|
reset() ->
|
||||||
reset(emqx_ft_conf:storage()).
|
emqx_ft_storage:with_storage_type(local, fun reset/1).
|
||||||
|
|
||||||
-spec reset(emqx_ft_storage_fs:storage()) -> ok.
|
-spec reset(emqx_ft_storage_fs:storage()) -> ok.
|
||||||
reset(Storage) ->
|
reset(Storage) ->
|
||||||
|
@ -139,7 +139,8 @@ maybe_report(#gcstats{} = _Stats, _Storage) ->
|
||||||
?tp(garbage_collection, #{stats => _Stats, storage => _Storage}).
|
?tp(garbage_collection, #{stats => _Stats, storage => _Storage}).
|
||||||
|
|
||||||
start_timer(St) ->
|
start_timer(St) ->
|
||||||
start_timer(gc_interval(emqx_ft_conf:storage()), St).
|
Interval = emqx_ft_storage:with_storage_type(local, fun gc_interval/1),
|
||||||
|
start_timer(Interval, St).
|
||||||
|
|
||||||
start_timer(Interval, St = #st{next_gc_timer = undefined}) when ?IS_ENABLED(Interval) ->
|
start_timer(Interval, St = #st{next_gc_timer = undefined}) when ?IS_ENABLED(Interval) ->
|
||||||
St#st{next_gc_timer = emqx_utils:start_timer(Interval, collect)};
|
St#st{next_gc_timer = emqx_utils:start_timer(Interval, collect)};
|
||||||
|
|
|
@ -58,16 +58,16 @@ end_per_suite(_Config) ->
|
||||||
set_special_configs(Config) ->
|
set_special_configs(Config) ->
|
||||||
fun
|
fun
|
||||||
(emqx_ft) ->
|
(emqx_ft) ->
|
||||||
Storage = emqx_ft_test_helpers:local_storage(Config),
|
|
||||||
emqx_ft_test_helpers:load_config(#{
|
|
||||||
% NOTE
|
% NOTE
|
||||||
% Inhibit local fs GC to simulate it isn't fast enough to collect
|
% Inhibit local fs GC to simulate it isn't fast enough to collect
|
||||||
% complete transfers.
|
% complete transfers.
|
||||||
enable => true,
|
Storage = emqx_utils_maps:deep_merge(
|
||||||
storage => emqx_utils_maps:deep_merge(
|
emqx_ft_test_helpers:local_storage(Config),
|
||||||
Storage,
|
#{<<"local">> => #{<<"segments">> => #{<<"gc">> => #{<<"interval">> => 0}}}}
|
||||||
#{segments => #{gc => #{interval => 0}}}
|
),
|
||||||
)
|
emqx_ft_test_helpers:load_config(#{
|
||||||
|
<<"enable">> => true,
|
||||||
|
<<"storage">> => Storage
|
||||||
});
|
});
|
||||||
(_) ->
|
(_) ->
|
||||||
ok
|
ok
|
||||||
|
|
|
@ -246,17 +246,20 @@ exporter(Config) ->
|
||||||
emqx_ft_storage_exporter:exporter(storage(Config)).
|
emqx_ft_storage_exporter:exporter(storage(Config)).
|
||||||
|
|
||||||
storage(Config) ->
|
storage(Config) ->
|
||||||
maps:get(
|
emqx_utils_maps:deep_get(
|
||||||
storage,
|
[storage, local],
|
||||||
emqx_ft_schema:translate(#{
|
emqx_ft_schema:translate(#{
|
||||||
<<"storage">> => #{
|
<<"storage">> => #{
|
||||||
<<"type">> => <<"local">>,
|
<<"local">> => #{
|
||||||
<<"segments">> => #{
|
<<"segments">> => #{
|
||||||
<<"root">> => ?config(storage_root, Config)
|
<<"root">> => ?config(storage_root, Config)
|
||||||
},
|
},
|
||||||
<<"exporter">> => #{
|
<<"exporter">> => #{
|
||||||
|
<<"local">> => #{
|
||||||
<<"root">> => ?config(exports_root, Config)
|
<<"root">> => ?config(exports_root, Config)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
})
|
})
|
||||||
).
|
).
|
||||||
|
|
|
@ -34,7 +34,12 @@ end_per_suite(_Config) ->
|
||||||
init_per_testcase(_Case, Config) ->
|
init_per_testcase(_Case, Config) ->
|
||||||
_ = emqx_config:save_schema_mod_and_names(emqx_ft_schema),
|
_ = emqx_config:save_schema_mod_and_names(emqx_ft_schema),
|
||||||
ok = emqx_common_test_helpers:start_apps(
|
ok = emqx_common_test_helpers:start_apps(
|
||||||
[emqx_conf, emqx_ft], emqx_ft_test_helpers:env_handler(Config)
|
[emqx_conf, emqx_ft], fun
|
||||||
|
(emqx_ft) ->
|
||||||
|
emqx_ft_test_helpers:load_config(#{});
|
||||||
|
(_) ->
|
||||||
|
ok
|
||||||
|
end
|
||||||
),
|
),
|
||||||
{ok, _} = emqx:update_config([rpc, port_discovery], manual),
|
{ok, _} = emqx:update_config([rpc, port_discovery], manual),
|
||||||
Config.
|
Config.
|
||||||
|
@ -52,7 +57,7 @@ t_update_config(_Config) ->
|
||||||
{error, #{kind := validation_error}},
|
{error, #{kind := validation_error}},
|
||||||
emqx_conf:update(
|
emqx_conf:update(
|
||||||
[file_transfer],
|
[file_transfer],
|
||||||
#{<<"storage">> => #{<<"type">> => <<"unknown">>}},
|
#{<<"storage">> => #{<<"unknown">> => #{<<"foo">> => 42}}},
|
||||||
#{}
|
#{}
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
|
@ -63,7 +68,7 @@ t_update_config(_Config) ->
|
||||||
#{
|
#{
|
||||||
<<"enable">> => true,
|
<<"enable">> => true,
|
||||||
<<"storage">> => #{
|
<<"storage">> => #{
|
||||||
<<"type">> => <<"local">>,
|
<<"local">> => #{
|
||||||
<<"segments">> => #{
|
<<"segments">> => #{
|
||||||
<<"root">> => <<"/tmp/path">>,
|
<<"root">> => <<"/tmp/path">>,
|
||||||
<<"gc">> => #{
|
<<"gc">> => #{
|
||||||
|
@ -71,21 +76,27 @@ t_update_config(_Config) ->
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
<<"exporter">> => #{
|
<<"exporter">> => #{
|
||||||
<<"type">> => <<"local">>,
|
<<"local">> => #{
|
||||||
<<"root">> => <<"/tmp/exports">>
|
<<"root">> => <<"/tmp/exports">>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
},
|
},
|
||||||
#{}
|
#{}
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
<<"/tmp/path">>,
|
<<"/tmp/path">>,
|
||||||
emqx_config:get([file_transfer, storage, segments, root])
|
emqx_config:get([file_transfer, storage, local, segments, root])
|
||||||
),
|
),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
5 * 60 * 1000,
|
5 * 60 * 1000,
|
||||||
emqx_ft_conf:gc_interval(emqx_ft_conf:storage())
|
emqx_ft_storage:with_storage_type(local, fun emqx_ft_conf:gc_interval/1)
|
||||||
|
),
|
||||||
|
?assertEqual(
|
||||||
|
{5 * 60, 24 * 60 * 60},
|
||||||
|
emqx_ft_storage:with_storage_type(local, fun emqx_ft_conf:segments_ttl/1)
|
||||||
).
|
).
|
||||||
|
|
||||||
t_disable_restore_config(Config) ->
|
t_disable_restore_config(Config) ->
|
||||||
|
@ -93,13 +104,13 @@ t_disable_restore_config(Config) ->
|
||||||
{ok, _},
|
{ok, _},
|
||||||
emqx_conf:update(
|
emqx_conf:update(
|
||||||
[file_transfer],
|
[file_transfer],
|
||||||
#{<<"enable">> => true, <<"storage">> => #{<<"type">> => <<"local">>}},
|
#{<<"enable">> => true, <<"storage">> => #{<<"local">> => #{}}},
|
||||||
#{}
|
#{}
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
60 * 60 * 1000,
|
60 * 60 * 1000,
|
||||||
emqx_ft_conf:gc_interval(emqx_ft_conf:storage())
|
emqx_ft_storage:with_storage_type(local, fun emqx_ft_conf:gc_interval/1)
|
||||||
),
|
),
|
||||||
% Verify that transfers work
|
% Verify that transfers work
|
||||||
ok = emqx_ft_test_helpers:upload_file(gen_clientid(), <<"f1">>, "f1", <<?MODULE_STRING>>),
|
ok = emqx_ft_test_helpers:upload_file(gen_clientid(), <<"f1">>, "f1", <<?MODULE_STRING>>),
|
||||||
|
@ -117,7 +128,7 @@ t_disable_restore_config(Config) ->
|
||||||
emqx_ft_conf:enabled()
|
emqx_ft_conf:enabled()
|
||||||
),
|
),
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
#{type := local, exporter := #{type := local}},
|
#{local := #{exporter := #{local := _}}},
|
||||||
emqx_ft_conf:storage()
|
emqx_ft_conf:storage()
|
||||||
),
|
),
|
||||||
ClientId = gen_clientid(),
|
ClientId = gen_clientid(),
|
||||||
|
@ -147,12 +158,13 @@ t_disable_restore_config(Config) ->
|
||||||
#{
|
#{
|
||||||
<<"enable">> => true,
|
<<"enable">> => true,
|
||||||
<<"storage">> => #{
|
<<"storage">> => #{
|
||||||
<<"type">> => <<"local">>,
|
<<"local">> => #{
|
||||||
<<"segments">> => #{
|
<<"segments">> => #{
|
||||||
<<"root">> => Root,
|
<<"root">> => Root,
|
||||||
<<"gc">> => #{<<"interval">> => <<"1s">>}
|
<<"gc">> => #{<<"interval">> => <<"1s">>}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
},
|
},
|
||||||
#{}
|
#{}
|
||||||
)
|
)
|
||||||
|
@ -165,10 +177,7 @@ t_disable_restore_config(Config) ->
|
||||||
[
|
[
|
||||||
#{
|
#{
|
||||||
?snk_kind := garbage_collection,
|
?snk_kind := garbage_collection,
|
||||||
storage := #{
|
storage := #{segments := #{root := Root}}
|
||||||
type := local,
|
|
||||||
segments := #{root := Root}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
?of_kind(garbage_collection, Trace)
|
?of_kind(garbage_collection, Trace)
|
||||||
|
@ -188,48 +197,49 @@ t_switch_exporter(_Config) ->
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
#{type := local, exporter := #{type := local}},
|
#{local := #{exporter := #{local := _}}},
|
||||||
emqx_ft_conf:storage()
|
emqx_ft_conf:storage()
|
||||||
),
|
),
|
||||||
% Verify that switching to a different exporter works
|
% Verify that switching to a different exporter works
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
{ok, _},
|
{ok, _},
|
||||||
emqx_conf:update(
|
emqx_conf:update(
|
||||||
[file_transfer, storage, exporter],
|
[file_transfer, storage, local, exporter],
|
||||||
#{
|
#{
|
||||||
<<"type">> => <<"s3">>,
|
<<"s3">> => #{
|
||||||
<<"bucket">> => <<"emqx">>,
|
<<"bucket">> => <<"emqx">>,
|
||||||
<<"host">> => <<"https://localhost">>,
|
<<"host">> => <<"https://localhost">>,
|
||||||
<<"port">> => 9000,
|
<<"port">> => 9000,
|
||||||
<<"transport_options">> => #{
|
<<"transport_options">> => #{
|
||||||
<<"ipv6_probe">> => false
|
<<"ipv6_probe">> => false
|
||||||
}
|
}
|
||||||
|
}
|
||||||
},
|
},
|
||||||
#{}
|
#{}
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
#{type := local, exporter := #{type := s3}},
|
#{local := #{exporter := #{s3 := _}}},
|
||||||
emqx_ft_conf:storage()
|
emqx_ft_conf:storage()
|
||||||
),
|
),
|
||||||
% Verify that switching back to local exporter works
|
% Verify that switching back to local exporter works
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
{ok, _},
|
{ok, _},
|
||||||
emqx_conf:remove(
|
emqx_conf:remove(
|
||||||
[file_transfer, storage, exporter],
|
[file_transfer, storage, local, exporter],
|
||||||
#{}
|
#{}
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
{ok, _},
|
{ok, _},
|
||||||
emqx_conf:update(
|
emqx_conf:update(
|
||||||
[file_transfer, storage, exporter],
|
[file_transfer, storage, local, exporter],
|
||||||
#{<<"type">> => <<"local">>},
|
#{<<"local">> => #{}},
|
||||||
#{}
|
#{}
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
#{type := local, exporter := #{type := local}},
|
#{local := #{exporter := #{local := #{}}}},
|
||||||
emqx_ft_conf:storage()
|
emqx_ft_conf:storage()
|
||||||
),
|
),
|
||||||
% Verify that transfers work
|
% Verify that transfers work
|
||||||
|
|
|
@ -85,7 +85,7 @@ client_id(Config) ->
|
||||||
|
|
||||||
storage(Config) ->
|
storage(Config) ->
|
||||||
RawConfig = #{<<"storage">> => emqx_ft_test_helpers:local_storage(Config)},
|
RawConfig = #{<<"storage">> => emqx_ft_test_helpers:local_storage(Config)},
|
||||||
#{storage := Storage} = emqx_ft_schema:translate(RawConfig),
|
#{storage := #{local := Storage}} = emqx_ft_schema:translate(RawConfig),
|
||||||
Storage.
|
Storage.
|
||||||
|
|
||||||
list_files(Config) ->
|
list_files(Config) ->
|
||||||
|
|
|
@ -36,19 +36,19 @@ end_per_suite(_Config) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
init_per_testcase(TC, Config) ->
|
init_per_testcase(TC, Config) ->
|
||||||
|
SegmentsRoot = emqx_ft_test_helpers:root(Config, node(), [TC, segments]),
|
||||||
|
ExportsRoot = emqx_ft_test_helpers:root(Config, node(), [TC, exports]),
|
||||||
ok = emqx_common_test_helpers:start_app(
|
ok = emqx_common_test_helpers:start_app(
|
||||||
emqx_ft,
|
emqx_ft,
|
||||||
fun(emqx_ft) ->
|
fun(emqx_ft) ->
|
||||||
emqx_ft_test_helpers:load_config(#{
|
emqx_ft_test_helpers:load_config(#{
|
||||||
<<"enable">> => true,
|
<<"enable">> => true,
|
||||||
<<"storage">> => #{
|
<<"storage">> => #{
|
||||||
<<"type">> => <<"local">>,
|
<<"local">> => #{
|
||||||
<<"segments">> => #{
|
<<"segments">> => #{<<"root">> => SegmentsRoot},
|
||||||
<<"root">> => emqx_ft_test_helpers:root(Config, node(), [TC, segments])
|
|
||||||
},
|
|
||||||
<<"exporter">> => #{
|
<<"exporter">> => #{
|
||||||
<<"type">> => <<"local">>,
|
<<"local">> => #{<<"root">> => ExportsRoot}
|
||||||
<<"root">> => emqx_ft_test_helpers:root(Config, node(), [TC, exports])
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
@ -105,7 +105,7 @@ t_gc_triggers_manually(_Config) ->
|
||||||
).
|
).
|
||||||
|
|
||||||
t_gc_complete_transfers(_Config) ->
|
t_gc_complete_transfers(_Config) ->
|
||||||
Storage = emqx_ft_conf:storage(),
|
{local, Storage} = emqx_ft_storage:backend(),
|
||||||
ok = set_gc_config(minimum_segments_ttl, 0),
|
ok = set_gc_config(minimum_segments_ttl, 0),
|
||||||
ok = set_gc_config(maximum_segments_ttl, 3),
|
ok = set_gc_config(maximum_segments_ttl, 3),
|
||||||
ok = set_gc_config(interval, 500),
|
ok = set_gc_config(interval, 500),
|
||||||
|
@ -198,7 +198,7 @@ t_gc_complete_transfers(_Config) ->
|
||||||
t_gc_incomplete_transfers(_Config) ->
|
t_gc_incomplete_transfers(_Config) ->
|
||||||
ok = set_gc_config(minimum_segments_ttl, 0),
|
ok = set_gc_config(minimum_segments_ttl, 0),
|
||||||
ok = set_gc_config(maximum_segments_ttl, 4),
|
ok = set_gc_config(maximum_segments_ttl, 4),
|
||||||
Storage = emqx_ft_conf:storage(),
|
{local, Storage} = emqx_ft_storage:backend(),
|
||||||
Transfers = [
|
Transfers = [
|
||||||
{
|
{
|
||||||
{<<"client43"/utf8>>, <<"file-🦕"/utf8>>},
|
{<<"client43"/utf8>>, <<"file-🦕"/utf8>>},
|
||||||
|
@ -269,7 +269,7 @@ t_gc_incomplete_transfers(_Config) ->
|
||||||
t_gc_handling_errors(_Config) ->
|
t_gc_handling_errors(_Config) ->
|
||||||
ok = set_gc_config(minimum_segments_ttl, 0),
|
ok = set_gc_config(minimum_segments_ttl, 0),
|
||||||
ok = set_gc_config(maximum_segments_ttl, 0),
|
ok = set_gc_config(maximum_segments_ttl, 0),
|
||||||
Storage = emqx_ft_conf:storage(),
|
{local, Storage} = emqx_ft_storage:backend(),
|
||||||
Transfer1 = {<<"client1">>, mk_file_id()},
|
Transfer1 = {<<"client1">>, mk_file_id()},
|
||||||
Transfer2 = {<<"client2">>, mk_file_id()},
|
Transfer2 = {<<"client2">>, mk_file_id()},
|
||||||
Filemeta = #{name => "oops.pdf"},
|
Filemeta = #{name => "oops.pdf"},
|
||||||
|
@ -325,7 +325,7 @@ t_gc_handling_errors(_Config) ->
|
||||||
%%
|
%%
|
||||||
|
|
||||||
set_gc_config(Name, Value) ->
|
set_gc_config(Name, Value) ->
|
||||||
emqx_config:put([file_transfer, storage, segments, gc, Name], Value).
|
emqx_config:put([file_transfer, storage, local, segments, gc, Name], Value).
|
||||||
|
|
||||||
start_transfer(Storage, {Transfer, Meta, Gen}) ->
|
start_transfer(Storage, {Transfer, Meta, Gen}) ->
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
|
|
|
@ -54,20 +54,22 @@ local_storage(Config) ->
|
||||||
|
|
||||||
local_storage(Config, Opts) ->
|
local_storage(Config, Opts) ->
|
||||||
#{
|
#{
|
||||||
<<"type">> => <<"local">>,
|
<<"local">> => #{
|
||||||
<<"segments">> => #{<<"root">> => root(Config, node(), [segments])},
|
<<"segments">> => #{<<"root">> => root(Config, node(), [segments])},
|
||||||
<<"exporter">> => exporter(Config, Opts)
|
<<"exporter">> => exporter(Config, Opts)
|
||||||
|
}
|
||||||
}.
|
}.
|
||||||
|
|
||||||
exporter(Config, #{exporter := local}) ->
|
exporter(Config, #{exporter := local}) ->
|
||||||
#{<<"type">> => <<"local">>, <<"root">> => root(Config, node(), [exports])};
|
#{<<"local">> => #{<<"root">> => root(Config, node(), [exports])}};
|
||||||
exporter(_Config, #{exporter := s3, bucket_name := BucketName}) ->
|
exporter(_Config, #{exporter := s3, bucket_name := BucketName}) ->
|
||||||
BaseConfig = emqx_s3_test_helpers:base_raw_config(tcp),
|
BaseConfig = emqx_s3_test_helpers:base_raw_config(tcp),
|
||||||
BaseConfig#{
|
#{
|
||||||
|
<<"s3">> => BaseConfig#{
|
||||||
<<"bucket">> => list_to_binary(BucketName),
|
<<"bucket">> => list_to_binary(BucketName),
|
||||||
<<"type">> => <<"s3">>,
|
|
||||||
<<"host">> => ?S3_HOST,
|
<<"host">> => ?S3_HOST,
|
||||||
<<"port">> => ?S3_PORT
|
<<"port">> => ?S3_PORT
|
||||||
|
}
|
||||||
}.
|
}.
|
||||||
|
|
||||||
load_config(Config) ->
|
load_config(Config) ->
|
||||||
|
|
|
@ -18,11 +18,11 @@ store_segment_timeout.desc:
|
||||||
"""Timeout for storing a file segment.<br/>
|
"""Timeout for storing a file segment.<br/>
|
||||||
After reaching the timeout, message with the segment will be acked with an error"""
|
After reaching the timeout, message with the segment will be acked with an error"""
|
||||||
|
|
||||||
storage.desc:
|
storage_backend.desc:
|
||||||
"""Storage settings for file transfer."""
|
"""Storage settings for file transfer."""
|
||||||
|
|
||||||
local_type.desc:
|
local_storage.desc:
|
||||||
"""Use local file system to store uploaded fragments and temporary data."""
|
"""Local file system backend to store uploaded fragments and temporary data."""
|
||||||
|
|
||||||
local_storage_segments.desc:
|
local_storage_segments.desc:
|
||||||
"""Settings for local segments storage, which include uploaded transfer fragments and temporary data."""
|
"""Settings for local segments storage, which include uploaded transfer fragments and temporary data."""
|
||||||
|
@ -30,15 +30,15 @@ local_storage_segments.desc:
|
||||||
local_storage_segments_root.desc:
|
local_storage_segments_root.desc:
|
||||||
"""File system path to keep uploaded fragments and temporary data."""
|
"""File system path to keep uploaded fragments and temporary data."""
|
||||||
|
|
||||||
local_storage_exporter.desc:
|
local_storage_exporter_backend.desc:
|
||||||
"""Exporter for the local file system storage backend.<br/>
|
"""Exporter for the local file system storage backend.<br/>
|
||||||
Exporter defines where and how fully transferred and assembled files are stored."""
|
Exporter defines where and how fully transferred and assembled files are stored."""
|
||||||
|
|
||||||
local_storage_exporter_type.desc:
|
local_storage_exporter.desc:
|
||||||
"""Exporter type for the exporter to the local file system"""
|
"""Exporter to the local file system."""
|
||||||
|
|
||||||
s3_exporter_type.desc:
|
s3_exporter.desc:
|
||||||
"""Exporter type for the exporter to S3"""
|
"""Exporter to the S3 API compatible object storage."""
|
||||||
|
|
||||||
local_storage_exporter_root.desc:
|
local_storage_exporter_root.desc:
|
||||||
"""File system path to keep uploaded files."""
|
"""File system path to keep uploaded files."""
|
||||||
|
|
Loading…
Reference in New Issue