Merge pull request #10974 from keynslug/ft/EMQX-9523/backend-enable-flag
fix(ft): add `enable` flag to every backend
This commit is contained in:
commit
c2635e938b
|
@ -29,6 +29,7 @@ file_transfer {
|
||||||
enable = true
|
enable = true
|
||||||
storage {
|
storage {
|
||||||
local {
|
local {
|
||||||
|
enable = true
|
||||||
exporter {
|
exporter {
|
||||||
local { root = "/var/lib/emqx/transfers" }
|
local { root = "/var/lib/emqx/transfers" }
|
||||||
}
|
}
|
||||||
|
@ -50,7 +51,9 @@ file_transfer {
|
||||||
enable = true
|
enable = true
|
||||||
storage {
|
storage {
|
||||||
local {
|
local {
|
||||||
|
enable = true
|
||||||
exporter {
|
exporter {
|
||||||
|
enable = true
|
||||||
s3 {
|
s3 {
|
||||||
host = "s3.us-east-1.amazonaws.com"
|
host = "s3.us-east-1.amazonaws.com"
|
||||||
port = "443"
|
port = "443"
|
||||||
|
|
|
@ -25,6 +25,10 @@
|
||||||
|
|
||||||
-export([schema/1]).
|
-export([schema/1]).
|
||||||
|
|
||||||
|
%% Utilities
|
||||||
|
-export([backend/1]).
|
||||||
|
|
||||||
|
%% Test-only helpers
|
||||||
-export([translate/1]).
|
-export([translate/1]).
|
||||||
|
|
||||||
-type json_value() ::
|
-type json_value() ::
|
||||||
|
@ -142,7 +146,7 @@ fields(local_storage) ->
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
)}
|
)}
|
||||||
];
|
] ++ common_backend_fields();
|
||||||
fields(local_storage_segments) ->
|
fields(local_storage_segments) ->
|
||||||
[
|
[
|
||||||
{root,
|
{root,
|
||||||
|
@ -190,9 +194,9 @@ fields(local_storage_exporter) ->
|
||||||
required => false
|
required => false
|
||||||
}
|
}
|
||||||
)}
|
)}
|
||||||
];
|
] ++ common_backend_fields();
|
||||||
fields(s3_exporter) ->
|
fields(s3_exporter) ->
|
||||||
emqx_s3_schema:fields(s3);
|
emqx_s3_schema:fields(s3) ++ common_backend_fields();
|
||||||
fields(local_storage_segments_gc) ->
|
fields(local_storage_segments_gc) ->
|
||||||
[
|
[
|
||||||
{interval,
|
{interval,
|
||||||
|
@ -229,6 +233,18 @@ fields(local_storage_segments_gc) ->
|
||||||
)}
|
)}
|
||||||
].
|
].
|
||||||
|
|
||||||
|
common_backend_fields() ->
|
||||||
|
[
|
||||||
|
{enable,
|
||||||
|
mk(
|
||||||
|
boolean(), #{
|
||||||
|
desc => ?DESC("backend_enable"),
|
||||||
|
required => false,
|
||||||
|
default => true
|
||||||
|
}
|
||||||
|
)}
|
||||||
|
].
|
||||||
|
|
||||||
desc(file_transfer) ->
|
desc(file_transfer) ->
|
||||||
"File transfer settings";
|
"File transfer settings";
|
||||||
desc(local_storage) ->
|
desc(local_storage) ->
|
||||||
|
@ -275,11 +291,14 @@ validator(filename) ->
|
||||||
];
|
];
|
||||||
validator(backend) ->
|
validator(backend) ->
|
||||||
fun(Config) ->
|
fun(Config) ->
|
||||||
case maps:keys(Config) of
|
Enabled = maps:filter(fun(_, #{<<"enable">> := E}) -> E end, Config),
|
||||||
[_Type] ->
|
case maps:to_list(Enabled) of
|
||||||
|
[{_Type, _BackendConfig}] ->
|
||||||
ok;
|
ok;
|
||||||
_Conflicts = [_ | _] ->
|
_Conflicts = [_ | _] ->
|
||||||
{error, multiple_conflicting_backends}
|
{error, multiple_enabled_backends};
|
||||||
|
_None = [] ->
|
||||||
|
{error, no_enabled_backend}
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -311,11 +330,24 @@ converter(unicode_string) ->
|
||||||
ref(Ref) ->
|
ref(Ref) ->
|
||||||
ref(?MODULE, Ref).
|
ref(?MODULE, Ref).
|
||||||
|
|
||||||
|
%% Utilities
|
||||||
|
|
||||||
|
-spec backend(emqx_config:config()) ->
|
||||||
|
{_Type :: atom(), emqx_config:config()}.
|
||||||
|
backend(Config) ->
|
||||||
|
catch maps:foreach(fun emit_enabled/2, Config).
|
||||||
|
|
||||||
|
-spec emit_enabled(atom(), emqx_config:config()) ->
|
||||||
|
no_return().
|
||||||
|
emit_enabled(Type, BConf = #{enable := Enabled}) ->
|
||||||
|
Enabled andalso throw({Type, BConf}).
|
||||||
|
|
||||||
|
%% Test-only helpers
|
||||||
|
|
||||||
|
-spec translate(emqx_config:raw_config()) ->
|
||||||
|
emqx_config:config().
|
||||||
translate(Conf) ->
|
translate(Conf) ->
|
||||||
[Root] = roots(),
|
[Root] = roots(),
|
||||||
maps:get(
|
RootRaw = atom_to_binary(Root),
|
||||||
Root,
|
ConfChecked = hocon_tconf:check_plain(?MODULE, #{RootRaw => Conf}, #{}, [Root]),
|
||||||
hocon_tconf:check_plain(
|
emqx_utils_maps:unsafe_atom_key_map(maps:get(RootRaw, ConfChecked)).
|
||||||
?MODULE, #{atom_to_binary(Root) => Conf}, #{atom_key => true}, [Root]
|
|
||||||
)
|
|
||||||
).
|
|
||||||
|
|
|
@ -182,8 +182,8 @@ on_backend_update(BackendOld, BackendNew) when
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-spec backend(config()) -> backend().
|
-spec backend(config()) -> backend().
|
||||||
backend(#{local := Storage}) ->
|
backend(Config) ->
|
||||||
{local, Storage}.
|
emqx_ft_schema:backend(Config).
|
||||||
|
|
||||||
on_storage_start({Type, Storage}) ->
|
on_storage_start({Type, Storage}) ->
|
||||||
(mod(Type)):start(Storage).
|
(mod(Type)):start(Storage).
|
||||||
|
|
|
@ -175,10 +175,10 @@ stop({ExporterMod, ExporterOpts}) ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
exporter(Storage) ->
|
exporter(Storage) ->
|
||||||
case maps:get(exporter, Storage) of
|
case emqx_ft_schema:backend(maps:get(exporter, Storage)) of
|
||||||
#{local := Options} ->
|
{local, Options} ->
|
||||||
{emqx_ft_storage_exporter_fs, Options};
|
{emqx_ft_storage_exporter_fs, Options};
|
||||||
#{s3 := Options} ->
|
{s3, Options} ->
|
||||||
{emqx_ft_storage_exporter_s3, Options}
|
{emqx_ft_storage_exporter_s3, Options}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -46,6 +46,7 @@
|
||||||
-export_type([options/0]).
|
-export_type([options/0]).
|
||||||
|
|
||||||
-type options() :: #{
|
-type options() :: #{
|
||||||
|
enable := true,
|
||||||
root => file:name(),
|
root => file:name(),
|
||||||
_ => _
|
_ => _
|
||||||
}.
|
}.
|
||||||
|
|
|
@ -102,6 +102,7 @@
|
||||||
|
|
||||||
-type storage() :: #{
|
-type storage() :: #{
|
||||||
type := 'local',
|
type := 'local',
|
||||||
|
enable := true,
|
||||||
segments := segments(),
|
segments := segments(),
|
||||||
exporter := emqx_ft_storage_exporter:exporter()
|
exporter := emqx_ft_storage_exporter:exporter()
|
||||||
}.
|
}.
|
||||||
|
|
|
@ -256,6 +256,7 @@ storage(Config) ->
|
||||||
},
|
},
|
||||||
<<"exporter">> => #{
|
<<"exporter">> => #{
|
||||||
<<"local">> => #{
|
<<"local">> => #{
|
||||||
|
<<"enable">> => true,
|
||||||
<<"root">> => ?config(exports_root, Config)
|
<<"root">> => ?config(exports_root, Config)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -77,6 +77,7 @@ t_update_config(_Config) ->
|
||||||
},
|
},
|
||||||
<<"exporter">> => #{
|
<<"exporter">> => #{
|
||||||
<<"local">> => #{
|
<<"local">> => #{
|
||||||
|
<<"enable">> => true,
|
||||||
<<"root">> => <<"/tmp/exports">>
|
<<"root">> => <<"/tmp/exports">>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -104,7 +105,10 @@ t_disable_restore_config(Config) ->
|
||||||
{ok, _},
|
{ok, _},
|
||||||
emqx_conf:update(
|
emqx_conf:update(
|
||||||
[file_transfer],
|
[file_transfer],
|
||||||
#{<<"enable">> => true, <<"storage">> => #{<<"local">> => #{}}},
|
#{
|
||||||
|
<<"enable">> => true,
|
||||||
|
<<"storage">> => #{<<"local">> => #{}}
|
||||||
|
},
|
||||||
#{}
|
#{}
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
|
@ -207,6 +211,7 @@ t_switch_exporter(_Config) ->
|
||||||
[file_transfer, storage, local, exporter],
|
[file_transfer, storage, local, exporter],
|
||||||
#{
|
#{
|
||||||
<<"s3">> => #{
|
<<"s3">> => #{
|
||||||
|
<<"enable">> => true,
|
||||||
<<"bucket">> => <<"emqx">>,
|
<<"bucket">> => <<"emqx">>,
|
||||||
<<"host">> => <<"https://localhost">>,
|
<<"host">> => <<"https://localhost">>,
|
||||||
<<"port">> => 9000,
|
<<"port">> => 9000,
|
||||||
|
@ -234,7 +239,7 @@ t_switch_exporter(_Config) ->
|
||||||
{ok, _},
|
{ok, _},
|
||||||
emqx_conf:update(
|
emqx_conf:update(
|
||||||
[file_transfer, storage, local, exporter],
|
[file_transfer, storage, local, exporter],
|
||||||
#{<<"local">> => #{}},
|
#{<<"local">> => #{<<"enable">> => true}},
|
||||||
#{}
|
#{}
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
|
|
|
@ -45,9 +45,10 @@ init_per_testcase(TC, Config) ->
|
||||||
<<"enable">> => true,
|
<<"enable">> => true,
|
||||||
<<"storage">> => #{
|
<<"storage">> => #{
|
||||||
<<"local">> => #{
|
<<"local">> => #{
|
||||||
|
<<"enable">> => true,
|
||||||
<<"segments">> => #{<<"root">> => SegmentsRoot},
|
<<"segments">> => #{<<"root">> => SegmentsRoot},
|
||||||
<<"exporter">> => #{
|
<<"exporter">> => #{
|
||||||
<<"local">> => #{<<"root">> => ExportsRoot}
|
<<"local">> => #{<<"enable">> => true, <<"root">> => ExportsRoot}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -55,17 +55,24 @@ local_storage(Config) ->
|
||||||
local_storage(Config, Opts) ->
|
local_storage(Config, Opts) ->
|
||||||
#{
|
#{
|
||||||
<<"local">> => #{
|
<<"local">> => #{
|
||||||
|
<<"enable">> => true,
|
||||||
<<"segments">> => #{<<"root">> => root(Config, node(), [segments])},
|
<<"segments">> => #{<<"root">> => root(Config, node(), [segments])},
|
||||||
<<"exporter">> => exporter(Config, Opts)
|
<<"exporter">> => exporter(Config, Opts)
|
||||||
}
|
}
|
||||||
}.
|
}.
|
||||||
|
|
||||||
exporter(Config, #{exporter := local}) ->
|
exporter(Config, #{exporter := local}) ->
|
||||||
#{<<"local">> => #{<<"root">> => root(Config, node(), [exports])}};
|
#{
|
||||||
|
<<"local">> => #{
|
||||||
|
<<"enable">> => true,
|
||||||
|
<<"root">> => root(Config, node(), [exports])
|
||||||
|
}
|
||||||
|
};
|
||||||
exporter(_Config, #{exporter := s3, bucket_name := BucketName}) ->
|
exporter(_Config, #{exporter := s3, bucket_name := BucketName}) ->
|
||||||
BaseConfig = emqx_s3_test_helpers:base_raw_config(tcp),
|
BaseConfig = emqx_s3_test_helpers:base_raw_config(tcp),
|
||||||
#{
|
#{
|
||||||
<<"s3">> => BaseConfig#{
|
<<"s3">> => BaseConfig#{
|
||||||
|
<<"enable">> => true,
|
||||||
<<"bucket">> => list_to_binary(BucketName),
|
<<"bucket">> => list_to_binary(BucketName),
|
||||||
<<"host">> => ?S3_HOST,
|
<<"host">> => ?S3_HOST,
|
||||||
<<"port">> => ?S3_PORT
|
<<"port">> => ?S3_PORT
|
||||||
|
|
|
@ -18,6 +18,9 @@ store_segment_timeout.desc:
|
||||||
"""Timeout for storing a file segment.<br/>
|
"""Timeout for storing a file segment.<br/>
|
||||||
After reaching the timeout, message with the segment will be acked with an error"""
|
After reaching the timeout, message with the segment will be acked with an error"""
|
||||||
|
|
||||||
|
backend_enable.desc:
|
||||||
|
"""Whether to enable this backend."""
|
||||||
|
|
||||||
storage_backend.desc:
|
storage_backend.desc:
|
||||||
"""Storage settings for file transfer."""
|
"""Storage settings for file transfer."""
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue