feat(ft-s3): integrate exporter configs
This commit is contained in:
parent
8361223648
commit
5ac3543a76
|
@ -75,6 +75,10 @@ assert_storage(Type) ->
|
||||||
|
|
||||||
-spec load() -> ok.
|
-spec load() -> ok.
|
||||||
load() ->
|
load() ->
|
||||||
|
ok = emqx_ft_storage_exporter:update_exporter(
|
||||||
|
undefined,
|
||||||
|
emqx_config:get([file_transfer, storage])
|
||||||
|
),
|
||||||
emqx_conf:add_handler([file_transfer], ?MODULE).
|
emqx_conf:add_handler([file_transfer], ?MODULE).
|
||||||
|
|
||||||
-spec unload() -> ok.
|
-spec unload() -> ok.
|
||||||
|
@ -98,5 +102,7 @@ pre_config_update(_, Req, _Config) ->
|
||||||
emqx_config:app_envs()
|
emqx_config:app_envs()
|
||||||
) ->
|
) ->
|
||||||
ok | {ok, Result :: any()} | {error, Reason :: term()}.
|
ok | {ok, Result :: any()} | {error, Reason :: term()}.
|
||||||
post_config_update(_, _Req, _NewConfig, _OldConfig, _AppEnvs) ->
|
post_config_update(_Path, _Req, NewConfig, OldConfig, _AppEnvs) ->
|
||||||
ok.
|
OldStorageConfig = maps:get(storage, OldConfig, undefined),
|
||||||
|
NewStorageConfig = maps:get(storage, NewConfig, undefined),
|
||||||
|
emqx_ft_storage_exporter:update_exporter(OldStorageConfig, NewStorageConfig).
|
||||||
|
|
|
@ -29,9 +29,11 @@
|
||||||
|
|
||||||
%% Listing API
|
%% Listing API
|
||||||
-export([list/1]).
|
-export([list/1]).
|
||||||
% TODO
|
|
||||||
% -export([list/2]).
|
|
||||||
|
|
||||||
|
%% Lifecycle API
|
||||||
|
-export([update_exporter/2]).
|
||||||
|
|
||||||
|
%% Internal API
|
||||||
-export([exporter/1]).
|
-export([exporter/1]).
|
||||||
|
|
||||||
-export_type([export/0]).
|
-export_type([export/0]).
|
||||||
|
@ -71,6 +73,17 @@
|
||||||
-callback list(storage()) ->
|
-callback list(storage()) ->
|
||||||
{ok, [emqx_ft_storage:file_info()]} | {error, _Reason}.
|
{ok, [emqx_ft_storage:file_info()]} | {error, _Reason}.
|
||||||
|
|
||||||
|
%% Lifecycle callbacks
|
||||||
|
|
||||||
|
-callback start(exporter_conf()) ->
|
||||||
|
ok | {error, _Reason}.
|
||||||
|
|
||||||
|
-callback stop(exporter_conf()) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
-callback update(exporter_conf(), exporter_conf()) ->
|
||||||
|
ok | {error, _Reason}.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% API
|
%% API
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -126,6 +139,32 @@ list(Storage) ->
|
||||||
{ExporterMod, ExporterOpts} = exporter(Storage),
|
{ExporterMod, ExporterOpts} = exporter(Storage),
|
||||||
ExporterMod:list(ExporterOpts).
|
ExporterMod:list(ExporterOpts).
|
||||||
|
|
||||||
|
%% Lifecycle
|
||||||
|
|
||||||
|
-spec update_exporter(emqx_config:config(), emqx_config:config()) -> ok | {error, term()}.
|
||||||
|
update_exporter(
|
||||||
|
#{exporter := #{type := OldType}} = OldConfig,
|
||||||
|
#{exporter := #{type := OldType}} = NewConfig
|
||||||
|
) ->
|
||||||
|
{ExporterMod, OldExporterOpts} = exporter(OldConfig),
|
||||||
|
{_NewExporterMod, NewExporterOpts} = exporter(NewConfig),
|
||||||
|
ExporterMod:update(OldExporterOpts, NewExporterOpts);
|
||||||
|
update_exporter(
|
||||||
|
#{exporter := _} = OldConfig,
|
||||||
|
#{exporter := _} = NewConfig
|
||||||
|
) ->
|
||||||
|
{OldExporterMod, OldExporterOpts} = exporter(OldConfig),
|
||||||
|
{NewExporterMod, NewExporterOpts} = exporter(NewConfig),
|
||||||
|
ok = OldExporterMod:stop(OldExporterOpts),
|
||||||
|
NewExporterMod:start(NewExporterOpts);
|
||||||
|
update_exporter(undefined, NewConfig) ->
|
||||||
|
{ExporterMod, ExporterOpts} = exporter(NewConfig),
|
||||||
|
ExporterMod:start(ExporterOpts);
|
||||||
|
update_exporter(OldConfig, undefined) ->
|
||||||
|
{ExporterMod, ExporterOpts} = exporter(OldConfig),
|
||||||
|
ExporterMod:stop(ExporterOpts);
|
||||||
|
update_exporter(_, _) ->
|
||||||
|
ok.
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -138,7 +177,6 @@ exporter(Storage) ->
|
||||||
{emqx_ft_storage_exporter_s3, without_type(Options)}
|
{emqx_ft_storage_exporter_s3, without_type(Options)}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec without_type(exporter_conf()) -> exporter_conf().
|
|
||||||
without_type(#{type := _} = Options) ->
|
without_type(#{type := _} = Options) ->
|
||||||
maps:without([type], Options).
|
maps:without([type], Options).
|
||||||
|
|
||||||
|
|
|
@ -28,6 +28,12 @@
|
||||||
-export([discard/1]).
|
-export([discard/1]).
|
||||||
-export([list/1]).
|
-export([list/1]).
|
||||||
|
|
||||||
|
-export([
|
||||||
|
start/1,
|
||||||
|
stop/1,
|
||||||
|
update/2
|
||||||
|
]).
|
||||||
|
|
||||||
%% Internal API for RPC
|
%% Internal API for RPC
|
||||||
-export([list_local/1]).
|
-export([list_local/1]).
|
||||||
-export([list_local/2]).
|
-export([list_local/2]).
|
||||||
|
@ -88,7 +94,9 @@
|
||||||
})
|
})
|
||||||
).
|
).
|
||||||
|
|
||||||
%%
|
%%--------------------------------------------------------------------
|
||||||
|
%% Exporter behaviour
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-spec start_export(options(), transfer(), filemeta()) ->
|
-spec start_export(options(), transfer(), filemeta()) ->
|
||||||
{ok, export_st()} | {error, file_error()}.
|
{ok, export_st()} | {error, file_error()}.
|
||||||
|
@ -142,7 +150,25 @@ discard(#{path := Filepath, handle := Handle}) ->
|
||||||
ok = file:close(Handle),
|
ok = file:close(Handle),
|
||||||
file:delete(Filepath).
|
file:delete(Filepath).
|
||||||
|
|
||||||
%%
|
%%--------------------------------------------------------------------
|
||||||
|
%% Exporter behaviour (lifecycle)
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
%% FS Exporter does not have require any stateful entities,
|
||||||
|
%% so lifecycle callbacks are no-op.
|
||||||
|
|
||||||
|
-spec start(options()) -> ok.
|
||||||
|
start(_Options) -> ok.
|
||||||
|
|
||||||
|
-spec stop(options()) -> ok.
|
||||||
|
stop(_Options) -> ok.
|
||||||
|
|
||||||
|
-spec update(options(), options()) -> ok.
|
||||||
|
update(_OldOptions, _NewOptions) -> ok.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Internal API
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-spec list_local(options(), transfer()) ->
|
-spec list_local(options(), transfer()) ->
|
||||||
{ok, [exportinfo(), ...]} | {error, file_error()}.
|
{ok, [exportinfo(), ...]} | {error, file_error()}.
|
||||||
|
@ -199,6 +225,10 @@ list_local(Options) ->
|
||||||
Pattern
|
Pattern
|
||||||
)}.
|
)}.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Helpers
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
filter_manifest(?MANIFEST) ->
|
filter_manifest(?MANIFEST) ->
|
||||||
% Filename equals `?MANIFEST`, there should also be a manifest for it.
|
% Filename equals `?MANIFEST`, there should also be a manifest for it.
|
||||||
false;
|
false;
|
|
@ -25,6 +25,12 @@
|
||||||
-export([discard/1]).
|
-export([discard/1]).
|
||||||
-export([list/1]).
|
-export([list/1]).
|
||||||
|
|
||||||
|
-export([
|
||||||
|
start/1,
|
||||||
|
stop/1,
|
||||||
|
update/2
|
||||||
|
]).
|
||||||
|
|
||||||
-type options() :: emqx_s3:profile_config().
|
-type options() :: emqx_s3:profile_config().
|
||||||
-type transfer() :: emqx_ft:transfer().
|
-type transfer() :: emqx_ft:transfer().
|
||||||
-type filemeta() :: emqx_ft:filemeta().
|
-type filemeta() :: emqx_ft:filemeta().
|
||||||
|
@ -42,6 +48,10 @@
|
||||||
meta := filemeta()
|
meta := filemeta()
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Exporter behaviour
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-spec start_export(options(), transfer(), filemeta()) ->
|
-spec start_export(options(), transfer(), filemeta()) ->
|
||||||
{ok, export_st()} | {error, term()}.
|
{ok, export_st()} | {error, term()}.
|
||||||
start_export(_Options, _Transfer, Filemeta = #{name := Filename}) ->
|
start_export(_Options, _Transfer, Filemeta = #{name := Filename}) ->
|
||||||
|
@ -67,3 +77,26 @@ discard(_ExportSt) ->
|
||||||
{ok, [exportinfo()]} | {error, term()}.
|
{ok, [exportinfo()]} | {error, term()}.
|
||||||
list(_Options) ->
|
list(_Options) ->
|
||||||
{ok, []}.
|
{ok, []}.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Exporter behaviour (lifecycle)
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-spec start(options()) -> ok | {error, term()}.
|
||||||
|
start(Options) ->
|
||||||
|
emqx_s3:start_profile(s3_profile_id(), Options).
|
||||||
|
|
||||||
|
-spec stop(options()) -> ok.
|
||||||
|
stop(_Options) ->
|
||||||
|
ok = emqx_s3:stop_profile(s3_profile_id()).
|
||||||
|
|
||||||
|
-spec update(options(), options()) -> ok.
|
||||||
|
update(_OldOptions, NewOptions) ->
|
||||||
|
emqx_s3:update_profile(s3_profile_id(), NewOptions).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Internal functions
|
||||||
|
%% -------------------------------------------------------------------
|
||||||
|
|
||||||
|
s3_profile_id() ->
|
||||||
|
atom_to_binary(?MODULE).
|
Loading…
Reference in New Issue