Merge pull request #10986 from savonarola/ft/EMQX-9523/config-api
feat(ft-api): provide configuration API
This commit is contained in:
commit
58a83739b8
|
@ -305,7 +305,7 @@ default_appspec(emqx_conf, SuiteOpts) ->
|
|||
#{
|
||||
config => SharedConfig,
|
||||
% NOTE
|
||||
% We inform `emqx` of our config loader before starting `emqx_conf` sothat it won't
|
||||
% We inform `emqx` of our config loader before starting `emqx_conf` so that it won't
|
||||
% overwrite everything with a default configuration.
|
||||
before_start => fun() ->
|
||||
emqx_app:set_config_loader(?MODULE)
|
||||
|
|
|
@ -118,7 +118,7 @@
|
|||
-type route_path() :: string() | binary().
|
||||
-type route_methods() :: map().
|
||||
-type route_handler() :: atom().
|
||||
-type route_options() :: #{filter => filter() | undefined}.
|
||||
-type route_options() :: #{filter => filter()}.
|
||||
|
||||
-type api_spec_entry() :: {route_path(), route_methods(), route_handler(), route_options()}.
|
||||
-type api_spec_component() :: map().
|
||||
|
@ -137,10 +137,9 @@ spec(Module, Options) ->
|
|||
{ApiSpec, AllRefs} =
|
||||
lists:foldl(
|
||||
fun(Path, {AllAcc, AllRefsAcc}) ->
|
||||
{OperationId, Specs, Refs} = parse_spec_ref(Module, Path, Options),
|
||||
Opts = #{filter => filter(Options)},
|
||||
{OperationId, Specs, Refs, RouteOpts} = parse_spec_ref(Module, Path, Options),
|
||||
{
|
||||
[{filename:join("/", Path), Specs, OperationId, Opts} | AllAcc],
|
||||
[{filename:join("/", Path), Specs, OperationId, RouteOpts} | AllAcc],
|
||||
Refs ++ AllRefsAcc
|
||||
}
|
||||
end,
|
||||
|
@ -350,6 +349,7 @@ parse_spec_ref(Module, Path, Options) ->
|
|||
),
|
||||
error({failed_to_generate_swagger_spec, Module, Path})
|
||||
end,
|
||||
OperationId = maps:get('operationId', Schema),
|
||||
{Specs, Refs} = maps:fold(
|
||||
fun(Method, Meta, {Acc, RefsAcc}) ->
|
||||
(not lists:member(Method, ?METHODS)) andalso
|
||||
|
@ -358,9 +358,13 @@ parse_spec_ref(Module, Path, Options) ->
|
|||
{Acc#{Method => Spec}, SubRefs ++ RefsAcc}
|
||||
end,
|
||||
{#{}, []},
|
||||
maps:without(['operationId'], Schema)
|
||||
maps:without(['operationId', 'filter'], Schema)
|
||||
),
|
||||
{maps:get('operationId', Schema), Specs, Refs}.
|
||||
RouteOpts = generate_route_opts(Schema, Options),
|
||||
{OperationId, Specs, Refs, RouteOpts}.
|
||||
|
||||
generate_route_opts(Schema, Options) ->
|
||||
#{filter => compose_filters(filter(Options), custom_filter(Schema))}.
|
||||
|
||||
check_parameters(Request, Spec, Module) ->
|
||||
#{bindings := Bindings, query_string := QueryStr} = Request,
|
||||
|
@ -898,6 +902,8 @@ typename_to_spec("json_binary()", _Mod) ->
|
|||
#{type => string, example => <<"{\"a\": [1,true]}">>};
|
||||
typename_to_spec("port_number()", _Mod) ->
|
||||
range("1..65535");
|
||||
typename_to_spec("secret_access_key()", _Mod) ->
|
||||
#{type => string, example => <<"TW8dPwmjpjJJuLW....">>};
|
||||
typename_to_spec(Name, Mod) ->
|
||||
try_convert_to_spec(Name, Mod, [
|
||||
fun try_remote_module_type/2,
|
||||
|
|
|
@ -108,8 +108,12 @@ t_ref(_Config) ->
|
|||
LocalPath = "/test/in/ref/local",
|
||||
Path = "/test/in/ref",
|
||||
Expect = [#{<<"$ref">> => <<"#/components/parameters/emqx_swagger_parameter_SUITE.page">>}],
|
||||
{OperationId, Spec, Refs} = emqx_dashboard_swagger:parse_spec_ref(?MODULE, Path, #{}),
|
||||
{OperationId, Spec, Refs} = emqx_dashboard_swagger:parse_spec_ref(?MODULE, LocalPath, #{}),
|
||||
{OperationId, Spec, Refs, RouteOpts} = emqx_dashboard_swagger:parse_spec_ref(
|
||||
?MODULE, Path, #{}
|
||||
),
|
||||
{OperationId, Spec, Refs, RouteOpts} = emqx_dashboard_swagger:parse_spec_ref(
|
||||
?MODULE, LocalPath, #{}
|
||||
),
|
||||
?assertEqual(test, OperationId),
|
||||
Params = maps:get(parameters, maps:get(post, Spec)),
|
||||
?assertEqual(Expect, Params),
|
||||
|
@ -122,7 +126,7 @@ t_public_ref(_Config) ->
|
|||
#{<<"$ref">> => <<"#/components/parameters/public.page">>},
|
||||
#{<<"$ref">> => <<"#/components/parameters/public.limit">>}
|
||||
],
|
||||
{OperationId, Spec, Refs} = emqx_dashboard_swagger:parse_spec_ref(?MODULE, Path, #{}),
|
||||
{OperationId, Spec, Refs, #{}} = emqx_dashboard_swagger:parse_spec_ref(?MODULE, Path, #{}),
|
||||
?assertEqual(test, OperationId),
|
||||
Params = maps:get(parameters, maps:get(post, Spec)),
|
||||
?assertEqual(Expect, Params),
|
||||
|
@ -264,7 +268,7 @@ t_nullable(_Config) ->
|
|||
t_method(_Config) ->
|
||||
PathOk = "/method/ok",
|
||||
PathError = "/method/error",
|
||||
{test, Spec, []} = emqx_dashboard_swagger:parse_spec_ref(?MODULE, PathOk, #{}),
|
||||
{test, Spec, [], #{}} = emqx_dashboard_swagger:parse_spec_ref(?MODULE, PathOk, #{}),
|
||||
?assertEqual(lists:sort(?METHODS), lists:sort(maps:keys(Spec))),
|
||||
?assertThrow(
|
||||
{error, #{module := ?MODULE, path := PathError, method := bar}},
|
||||
|
@ -393,7 +397,7 @@ assert_all_filters_equal(Spec, Filter) ->
|
|||
).
|
||||
|
||||
validate(Path, ExpectParams) ->
|
||||
{OperationId, Spec, Refs} = emqx_dashboard_swagger:parse_spec_ref(?MODULE, Path, #{}),
|
||||
{OperationId, Spec, Refs, #{}} = emqx_dashboard_swagger:parse_spec_ref(?MODULE, Path, #{}),
|
||||
?assertEqual(test, OperationId),
|
||||
Params = maps:get(parameters, maps:get(post, Spec)),
|
||||
?assertEqual(ExpectParams, Params),
|
||||
|
|
|
@ -719,7 +719,7 @@ t_object_trans_error(_Config) ->
|
|||
ok.
|
||||
|
||||
validate(Path, ExpectSpec, ExpectRefs) ->
|
||||
{OperationId, Spec, Refs} = emqx_dashboard_swagger:parse_spec_ref(?MODULE, Path, #{}),
|
||||
{OperationId, Spec, Refs, #{}} = emqx_dashboard_swagger:parse_spec_ref(?MODULE, Path, #{}),
|
||||
?assertEqual(test, OperationId),
|
||||
?assertEqual(ExpectSpec, Spec),
|
||||
?assertEqual(ExpectRefs, Refs),
|
||||
|
|
|
@ -129,7 +129,7 @@ t_error(_Config) ->
|
|||
}
|
||||
}
|
||||
},
|
||||
{OperationId, Spec, Refs} = emqx_dashboard_swagger:parse_spec_ref(?MODULE, Path, #{}),
|
||||
{OperationId, Spec, Refs, #{}} = emqx_dashboard_swagger:parse_spec_ref(?MODULE, Path, #{}),
|
||||
?assertEqual(test, OperationId),
|
||||
Response = maps:get(responses, maps:get(get, Spec)),
|
||||
?assertEqual(Error400, maps:get(<<"400">>, Response)),
|
||||
|
@ -375,7 +375,7 @@ t_complicated_type(_Config) ->
|
|||
}
|
||||
}
|
||||
},
|
||||
{OperationId, Spec, Refs} = emqx_dashboard_swagger:parse_spec_ref(?MODULE, Path, #{}),
|
||||
{OperationId, Spec, Refs, #{}} = emqx_dashboard_swagger:parse_spec_ref(?MODULE, Path, #{}),
|
||||
?assertEqual(test, OperationId),
|
||||
Response = maps:get(responses, maps:get(post, Spec)),
|
||||
?assertEqual(Object, maps:get(<<"200">>, Response)),
|
||||
|
@ -665,7 +665,7 @@ schema("/fields/sub") ->
|
|||
to_schema(hoconsc:ref(sub_fields)).
|
||||
|
||||
validate(Path, ExpectObject, ExpectRefs) ->
|
||||
{OperationId, Spec, Refs} = emqx_dashboard_swagger:parse_spec_ref(?MODULE, Path, #{}),
|
||||
{OperationId, Spec, Refs, #{}} = emqx_dashboard_swagger:parse_spec_ref(?MODULE, Path, #{}),
|
||||
?assertEqual(test, OperationId),
|
||||
Response = maps:get(responses, maps:get(post, Spec)),
|
||||
?assertEqual(ExpectObject, maps:get(<<"200">>, Response)),
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_ft, [
|
||||
{description, "EMQX file transfer over MQTT"},
|
||||
{vsn, "0.1.3"},
|
||||
{vsn, "0.1.4"},
|
||||
{registered, []},
|
||||
{mod, {emqx_ft_app, []}},
|
||||
{applications, [
|
||||
|
|
|
@ -40,27 +40,30 @@
|
|||
%% API callbacks
|
||||
-export([
|
||||
'/file_transfer/files'/2,
|
||||
'/file_transfer/files/:clientid/:fileid'/2
|
||||
'/file_transfer/files/:clientid/:fileid'/2,
|
||||
'/file_transfer'/2
|
||||
]).
|
||||
|
||||
-import(hoconsc, [mk/2, ref/1, ref/2]).
|
||||
|
||||
-define(SCHEMA_CONFIG, ref(emqx_ft_schema, file_transfer)).
|
||||
|
||||
namespace() -> "file_transfer".
|
||||
|
||||
api_spec() ->
|
||||
emqx_dashboard_swagger:spec(?MODULE, #{
|
||||
check_schema => true, filter => fun ?MODULE:check_ft_enabled/2
|
||||
}).
|
||||
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
|
||||
|
||||
paths() ->
|
||||
[
|
||||
"/file_transfer/files",
|
||||
"/file_transfer/files/:clientid/:fileid"
|
||||
"/file_transfer/files/:clientid/:fileid",
|
||||
"/file_transfer"
|
||||
].
|
||||
|
||||
schema("/file_transfer/files") ->
|
||||
#{
|
||||
'operationId' => '/file_transfer/files',
|
||||
filter => fun ?MODULE:check_ft_enabled/2,
|
||||
get => #{
|
||||
tags => ?TAGS,
|
||||
summary => <<"List all uploaded files">>,
|
||||
|
@ -83,6 +86,7 @@ schema("/file_transfer/files") ->
|
|||
schema("/file_transfer/files/:clientid/:fileid") ->
|
||||
#{
|
||||
'operationId' => '/file_transfer/files/:clientid/:fileid',
|
||||
filter => fun ?MODULE:check_ft_enabled/2,
|
||||
get => #{
|
||||
tags => ?TAGS,
|
||||
summary => <<"List files uploaded in a specific transfer">>,
|
||||
|
@ -101,6 +105,30 @@ schema("/file_transfer/files/:clientid/:fileid") ->
|
|||
)
|
||||
}
|
||||
}
|
||||
};
|
||||
schema("/file_transfer") ->
|
||||
#{
|
||||
'operationId' => '/file_transfer',
|
||||
get => #{
|
||||
tags => [<<"file_transfer">>],
|
||||
summary => <<"Get current File Transfer configuration">>,
|
||||
description => ?DESC("file_transfer_get_config"),
|
||||
responses => #{
|
||||
200 => ?SCHEMA_CONFIG
|
||||
}
|
||||
},
|
||||
put => #{
|
||||
tags => [<<"file_transfer">>],
|
||||
summary => <<"Update File Transfer configuration">>,
|
||||
description => ?DESC("file_transfer_update_config"),
|
||||
'requestBody' => ?SCHEMA_CONFIG,
|
||||
responses => #{
|
||||
200 => ?SCHEMA_CONFIG,
|
||||
400 => emqx_dashboard_swagger:error_codes(
|
||||
['INVALID_CONFIG'], error_desc('INVALID_CONFIG')
|
||||
)
|
||||
}
|
||||
}
|
||||
}.
|
||||
|
||||
check_ft_enabled(Params, _Meta) ->
|
||||
|
@ -108,7 +136,7 @@ check_ft_enabled(Params, _Meta) ->
|
|||
true ->
|
||||
{ok, Params};
|
||||
false ->
|
||||
{503, error_msg('SERVICE_UNAVAILABLE', <<"Service unavailable">>)}
|
||||
{503, error_msg('SERVICE_UNAVAILABLE')}
|
||||
end.
|
||||
|
||||
'/file_transfer/files'(get, #{
|
||||
|
@ -147,6 +175,18 @@ check_ft_enabled(Params, _Meta) ->
|
|||
{503, error_msg('SERVICE_UNAVAILABLE')}
|
||||
end.
|
||||
|
||||
'/file_transfer'(get, _Meta) ->
|
||||
{200, format_config(emqx_ft_conf:get())};
|
||||
'/file_transfer'(put, #{body := ConfigIn}) ->
|
||||
case emqx_ft_conf:update(ConfigIn) of
|
||||
{ok, #{config := Config}} ->
|
||||
{200, format_config(Config)};
|
||||
{error, Error = #{kind := validation_error}} ->
|
||||
{400, error_msg('INVALID_CONFIG', format_validation_error(Error))};
|
||||
{error, Error} ->
|
||||
{400, error_msg('INVALID_CONFIG', emqx_utils:format(Error))}
|
||||
end.
|
||||
|
||||
format_page(#{items := Files, cursor := Cursor}) ->
|
||||
#{
|
||||
<<"files">> => lists:map(fun format_file_info/1, Files),
|
||||
|
@ -157,14 +197,23 @@ format_page(#{items := Files}) ->
|
|||
<<"files">> => lists:map(fun format_file_info/1, Files)
|
||||
}.
|
||||
|
||||
format_config(Config) ->
|
||||
Schema = emqx_hocon:make_schema(emqx_ft_schema:fields(file_transfer)),
|
||||
hocon_tconf:make_serializable(Schema, emqx_utils_maps:binary_key_map(Config), #{}).
|
||||
|
||||
format_validation_error(Error) ->
|
||||
emqx_logger_jsonfmt:best_effort_json(Error).
|
||||
|
||||
error_msg(Code) ->
|
||||
#{code => Code, message => error_desc(Code)}.
|
||||
|
||||
error_msg(Code, Msg) ->
|
||||
#{code => Code, message => emqx_utils:readable_error_msg(Msg)}.
|
||||
#{code => Code, message => Msg}.
|
||||
|
||||
error_desc('FILES_NOT_FOUND') ->
|
||||
<<"Files requested for this transfer could not be found">>;
|
||||
error_desc('INVALID_CONFIG') ->
|
||||
<<"Provided configuration is invalid">>;
|
||||
error_desc('SERVICE_UNAVAILABLE') ->
|
||||
<<"Service unavailable">>.
|
||||
|
||||
|
|
|
@ -18,13 +18,16 @@
|
|||
|
||||
-behaviour(application).
|
||||
|
||||
-export([start/2, stop/1]).
|
||||
-export([start/2, prep_stop/1, stop/1]).
|
||||
|
||||
start(_StartType, _StartArgs) ->
|
||||
{ok, Sup} = emqx_ft_sup:start_link(),
|
||||
ok = emqx_ft_conf:load(),
|
||||
{ok, Sup}.
|
||||
|
||||
stop(_State) ->
|
||||
prep_stop(State) ->
|
||||
ok = emqx_ft_conf:unload(),
|
||||
State.
|
||||
|
||||
stop(_State) ->
|
||||
ok.
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
-module(emqx_ft_conf).
|
||||
|
||||
-behaviour(emqx_config_handler).
|
||||
-behaviour(emqx_config_backup).
|
||||
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
|
||||
|
@ -34,7 +35,9 @@
|
|||
%% Load/Unload
|
||||
-export([
|
||||
load/0,
|
||||
unload/0
|
||||
unload/0,
|
||||
get/0,
|
||||
update/1
|
||||
]).
|
||||
|
||||
%% callbacks for emqx_config_handler
|
||||
|
@ -43,6 +46,13 @@
|
|||
post_config_update/5
|
||||
]).
|
||||
|
||||
%% callbacks for emqx_config_backup
|
||||
-export([
|
||||
import_config/1
|
||||
]).
|
||||
|
||||
-type update_request() :: emqx_config:config().
|
||||
|
||||
-type milliseconds() :: non_neg_integer().
|
||||
-type seconds() :: non_neg_integer().
|
||||
|
||||
|
@ -95,49 +105,118 @@ load() ->
|
|||
|
||||
-spec unload() -> ok.
|
||||
unload() ->
|
||||
ok = stop(),
|
||||
emqx_conf:remove_handler([file_transfer]).
|
||||
ok = emqx_conf:remove_handler([file_transfer]),
|
||||
maybe_stop().
|
||||
|
||||
-spec get() -> emqx_config:config().
|
||||
get() ->
|
||||
emqx_config:get([file_transfer]).
|
||||
|
||||
-spec update(emqx_config:config()) -> {ok, emqx_config:update_result()} | {error, term()}.
|
||||
update(Config) ->
|
||||
emqx_conf:update([file_transfer], Config, #{override_to => cluster}).
|
||||
|
||||
%%----------------------------------------------------------------------------------------
|
||||
%% Data backup
|
||||
%%----------------------------------------------------------------------------------------
|
||||
|
||||
import_config(#{<<"file_transfer">> := FTConf}) ->
|
||||
OldFTConf = emqx:get_raw_config([file_transfer], #{}),
|
||||
NewFTConf = maps:merge(OldFTConf, FTConf),
|
||||
case emqx_conf:update([file_transfer], NewFTConf, #{override_to => cluster}) of
|
||||
{ok, #{raw_config := NewRawConf}} ->
|
||||
Changed = maps:get(changed, emqx_utils_maps:diff_maps(NewRawConf, FTConf)),
|
||||
ChangedPaths = [[file_transfer, K] || K <- maps:keys(Changed)],
|
||||
{ok, #{root_key => file_transfer, changed => ChangedPaths}};
|
||||
Error ->
|
||||
{error, #{root_key => file_transfer, reason => Error}}
|
||||
end;
|
||||
import_config(_) ->
|
||||
{ok, #{root_key => file_transfer, changed => []}}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% emqx_config_handler callbacks
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-spec pre_config_update(list(atom()), emqx_config:update_request(), emqx_config:raw_config()) ->
|
||||
-spec pre_config_update(list(atom()), update_request(), emqx_config:raw_config()) ->
|
||||
{ok, emqx_config:update_request()} | {error, term()}.
|
||||
pre_config_update(_, Req, _Config) ->
|
||||
{ok, Req}.
|
||||
pre_config_update([file_transfer | _], NewConfig, OldConfig) ->
|
||||
propagate_config_update(
|
||||
fun emqx_ft_storage_exporter_s3:pre_config_update/3,
|
||||
[<<"storage">>, <<"local">>, <<"exporter">>, <<"s3">>],
|
||||
NewConfig,
|
||||
OldConfig
|
||||
).
|
||||
|
||||
-spec post_config_update(
|
||||
list(atom()),
|
||||
emqx_config:update_request(),
|
||||
update_request(),
|
||||
emqx_config:config(),
|
||||
emqx_config:config(),
|
||||
emqx_config:app_envs()
|
||||
) ->
|
||||
ok | {ok, Result :: any()} | {error, Reason :: term()}.
|
||||
post_config_update([file_transfer | _], _Req, NewConfig, OldConfig, _AppEnvs) ->
|
||||
on_config_update(OldConfig, NewConfig).
|
||||
PropResult = propagate_config_update(
|
||||
fun emqx_ft_storage_exporter_s3:post_config_update/3,
|
||||
[storage, local, exporter, s3],
|
||||
NewConfig,
|
||||
OldConfig
|
||||
),
|
||||
case PropResult of
|
||||
ok ->
|
||||
on_config_update(OldConfig, NewConfig);
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Internal functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
propagate_config_update(Fun, ConfKey, NewConfig, OldConfig) ->
|
||||
NewSubConf = emqx_utils_maps:deep_get(ConfKey, NewConfig, undefined),
|
||||
OldSubConf = emqx_utils_maps:deep_get(ConfKey, OldConfig, undefined),
|
||||
case Fun(ConfKey, NewSubConf, OldSubConf) of
|
||||
ok ->
|
||||
ok;
|
||||
{ok, undefined} ->
|
||||
{ok, NewConfig};
|
||||
{ok, NewSubConfUpdate} ->
|
||||
{ok, emqx_utils_maps:deep_put(ConfKey, NewConfig, NewSubConfUpdate)};
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
on_config_update(#{enable := false}, #{enable := false}) ->
|
||||
ok;
|
||||
on_config_update(#{enable := true, storage := OldStorage}, #{enable := false}) ->
|
||||
ok = emqx_ft_storage:on_config_update(OldStorage, undefined),
|
||||
ok = emqx_ft:unhook();
|
||||
ok = stop(OldStorage);
|
||||
on_config_update(#{enable := false}, #{enable := true, storage := NewStorage}) ->
|
||||
ok = emqx_ft_storage:on_config_update(undefined, NewStorage),
|
||||
ok = emqx_ft:hook();
|
||||
ok = start(NewStorage);
|
||||
on_config_update(#{enable := true, storage := OldStorage}, #{enable := true, storage := NewStorage}) ->
|
||||
ok = emqx_ft_storage:on_config_update(OldStorage, NewStorage).
|
||||
ok = emqx_ft_storage:update_config(OldStorage, NewStorage).
|
||||
|
||||
maybe_start() ->
|
||||
case emqx_config:get([file_transfer]) of
|
||||
#{enable := true, storage := Storage} ->
|
||||
ok = emqx_ft_storage:on_config_update(undefined, Storage),
|
||||
ok = emqx_ft:hook();
|
||||
start(Storage);
|
||||
_ ->
|
||||
ok
|
||||
end.
|
||||
|
||||
stop() ->
|
||||
maybe_stop() ->
|
||||
case emqx_config:get([file_transfer]) of
|
||||
#{enable := true, storage := Storage} ->
|
||||
stop(Storage);
|
||||
_ ->
|
||||
ok
|
||||
end.
|
||||
|
||||
start(Storage) ->
|
||||
ok = emqx_ft_storage:update_config(undefined, Storage),
|
||||
ok = emqx_ft:hook().
|
||||
|
||||
stop(Storage) ->
|
||||
ok = emqx_ft:unhook(),
|
||||
ok = emqx_ft_storage:on_config_update(storage(), undefined).
|
||||
ok = emqx_ft_storage:update_config(Storage, undefined).
|
||||
|
|
|
@ -16,6 +16,8 @@
|
|||
|
||||
-module(emqx_ft_storage).
|
||||
|
||||
-include_lib("emqx/include/types.hrl").
|
||||
|
||||
-export(
|
||||
[
|
||||
store_filemeta/2,
|
||||
|
@ -29,7 +31,7 @@
|
|||
with_storage_type/3,
|
||||
|
||||
backend/0,
|
||||
on_config_update/2
|
||||
update_config/2
|
||||
]
|
||||
).
|
||||
|
||||
|
@ -94,10 +96,10 @@
|
|||
-callback files(storage(), query(Cursor)) ->
|
||||
{ok, page(file_info(), Cursor)} | {error, term()}.
|
||||
|
||||
-callback start(emqx_config:config()) -> any().
|
||||
-callback stop(emqx_config:config()) -> any().
|
||||
-callback start(storage()) -> any().
|
||||
-callback stop(storage()) -> any().
|
||||
|
||||
-callback on_config_update(_OldConfig :: emqx_config:config(), _NewConfig :: emqx_config:config()) ->
|
||||
-callback update_config(_OldConfig :: maybe(storage()), _NewConfig :: maybe(storage())) ->
|
||||
any().
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -157,9 +159,9 @@ with_storage_type(Type, Fun, Args) ->
|
|||
backend() ->
|
||||
backend(emqx_ft_conf:storage()).
|
||||
|
||||
-spec on_config_update(_Old :: emqx_maybe:t(config()), _New :: emqx_maybe:t(config())) ->
|
||||
-spec update_config(_Old :: emqx_maybe:t(config()), _New :: emqx_maybe:t(config())) ->
|
||||
ok.
|
||||
on_config_update(ConfigOld, ConfigNew) ->
|
||||
update_config(ConfigOld, ConfigNew) ->
|
||||
on_backend_update(
|
||||
emqx_maybe:apply(fun backend/1, ConfigOld),
|
||||
emqx_maybe:apply(fun backend/1, ConfigNew)
|
||||
|
@ -168,13 +170,13 @@ on_config_update(ConfigOld, ConfigNew) ->
|
|||
on_backend_update({Type, _} = Backend, {Type, _} = Backend) ->
|
||||
ok;
|
||||
on_backend_update({Type, StorageOld}, {Type, StorageNew}) ->
|
||||
ok = (mod(Type)):on_config_update(StorageOld, StorageNew);
|
||||
ok = (mod(Type)):update_config(StorageOld, StorageNew);
|
||||
on_backend_update(BackendOld, BackendNew) when
|
||||
(BackendOld =:= undefined orelse is_tuple(BackendOld)) andalso
|
||||
(BackendNew =:= undefined orelse is_tuple(BackendNew))
|
||||
->
|
||||
_ = emqx_maybe:apply(fun on_storage_stop/1, BackendOld),
|
||||
_ = emqx_maybe:apply(fun on_storage_start/1, BackendNew),
|
||||
_ = emqx_maybe:apply(fun stop_backend/1, BackendOld),
|
||||
_ = emqx_maybe:apply(fun start_backend/1, BackendNew),
|
||||
ok.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -185,10 +187,10 @@ on_backend_update(BackendOld, BackendNew) when
|
|||
backend(Config) ->
|
||||
emqx_ft_schema:backend(Config).
|
||||
|
||||
on_storage_start({Type, Storage}) ->
|
||||
start_backend({Type, Storage}) ->
|
||||
(mod(Type)):start(Storage).
|
||||
|
||||
on_storage_stop({Type, Storage}) ->
|
||||
stop_backend({Type, Storage}) ->
|
||||
(mod(Type)):stop(Storage).
|
||||
|
||||
mod(local) ->
|
||||
|
|
|
@ -31,7 +31,7 @@
|
|||
-export([list/2]).
|
||||
|
||||
%% Lifecycle API
|
||||
-export([on_config_update/2]).
|
||||
-export([update_config/2]).
|
||||
|
||||
%% Internal API
|
||||
-export([exporter/1]).
|
||||
|
@ -81,7 +81,7 @@
|
|||
-callback stop(exporter_conf()) ->
|
||||
ok.
|
||||
|
||||
-callback update(exporter_conf(), exporter_conf()) ->
|
||||
-callback update_config(exporter_conf(), exporter_conf()) ->
|
||||
ok | {error, _Reason}.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
|
@ -148,8 +148,8 @@ list(Storage, Query) ->
|
|||
|
||||
%% Lifecycle
|
||||
|
||||
-spec on_config_update(storage(), storage()) -> ok | {error, term()}.
|
||||
on_config_update(StorageOld, StorageNew) ->
|
||||
-spec update_config(storage(), storage()) -> ok | {error, term()}.
|
||||
update_config(StorageOld, StorageNew) ->
|
||||
on_exporter_update(
|
||||
emqx_maybe:apply(fun exporter/1, StorageOld),
|
||||
emqx_maybe:apply(fun exporter/1, StorageNew)
|
||||
|
@ -158,7 +158,7 @@ on_config_update(StorageOld, StorageNew) ->
|
|||
on_exporter_update(Config, Config) ->
|
||||
ok;
|
||||
on_exporter_update({ExporterMod, ConfigOld}, {ExporterMod, ConfigNew}) ->
|
||||
ExporterMod:update(ConfigOld, ConfigNew);
|
||||
ExporterMod:update_config(ConfigOld, ConfigNew);
|
||||
on_exporter_update(ExporterOld, ExporterNew) ->
|
||||
_ = emqx_maybe:apply(fun stop/1, ExporterOld),
|
||||
_ = emqx_maybe:apply(fun start/1, ExporterNew),
|
||||
|
|
|
@ -31,7 +31,7 @@
|
|||
-export([
|
||||
start/1,
|
||||
stop/1,
|
||||
update/2
|
||||
update_config/2
|
||||
]).
|
||||
|
||||
%% Internal API for RPC
|
||||
|
@ -161,8 +161,8 @@ start(_Options) -> ok.
|
|||
-spec stop(options()) -> ok.
|
||||
stop(_Options) -> ok.
|
||||
|
||||
-spec update(options(), options()) -> ok.
|
||||
update(_OldOptions, _NewOptions) -> ok.
|
||||
-spec update_config(options(), options()) -> ok.
|
||||
update_config(_OldOptions, _NewOptions) -> ok.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Internal API
|
||||
|
|
|
@ -28,7 +28,12 @@
|
|||
-export([
|
||||
start/1,
|
||||
stop/1,
|
||||
update/2
|
||||
update_config/2
|
||||
]).
|
||||
|
||||
-export([
|
||||
pre_config_update/3,
|
||||
post_config_update/3
|
||||
]).
|
||||
|
||||
-type options() :: emqx_s3:profile_config().
|
||||
|
@ -112,12 +117,22 @@ start(Options) ->
|
|||
|
||||
-spec stop(options()) -> ok.
|
||||
stop(_Options) ->
|
||||
ok = emqx_s3:stop_profile(?S3_PROFILE_ID).
|
||||
emqx_s3:stop_profile(?S3_PROFILE_ID).
|
||||
|
||||
-spec update(options(), options()) -> ok.
|
||||
update(_OldOptions, NewOptions) ->
|
||||
-spec update_config(options(), options()) -> ok.
|
||||
update_config(_OldOptions, NewOptions) ->
|
||||
emqx_s3:update_profile(?S3_PROFILE_ID, NewOptions).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Config update hooks
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
pre_config_update(_ConfKey, NewOptions, OldOptions) ->
|
||||
emqx_s3:pre_config_update(?S3_PROFILE_ID, NewOptions, OldOptions).
|
||||
|
||||
post_config_update(_ConfKey, NewOptions, OldOptions) ->
|
||||
emqx_s3:post_config_update(?S3_PROFILE_ID, NewOptions, OldOptions).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Internal functions
|
||||
%% -------------------------------------------------------------------
|
||||
|
|
|
@ -48,9 +48,9 @@
|
|||
|
||||
-export([files/2]).
|
||||
|
||||
-export([on_config_update/2]).
|
||||
-export([start/1]).
|
||||
-export([stop/1]).
|
||||
-export([update_config/2]).
|
||||
|
||||
-export_type([storage/0]).
|
||||
-export_type([filefrag/1]).
|
||||
|
@ -230,10 +230,10 @@ files(Storage, Query) ->
|
|||
|
||||
%%
|
||||
|
||||
on_config_update(StorageOld, StorageNew) ->
|
||||
update_config(StorageOld, StorageNew) ->
|
||||
% NOTE: this will reset GC timer, frequent changes would postpone GC indefinitely
|
||||
ok = emqx_ft_storage_fs_gc:reset(StorageNew),
|
||||
emqx_ft_storage_exporter:on_config_update(StorageOld, StorageNew).
|
||||
emqx_ft_storage_exporter:update_config(StorageOld, StorageNew).
|
||||
|
||||
start(Storage) ->
|
||||
ok = lists:foreach(
|
||||
|
@ -242,11 +242,11 @@ start(Storage) ->
|
|||
end,
|
||||
child_spec(Storage)
|
||||
),
|
||||
ok = emqx_ft_storage_exporter:on_config_update(undefined, Storage),
|
||||
ok = emqx_ft_storage_exporter:update_config(undefined, Storage),
|
||||
ok.
|
||||
|
||||
stop(Storage) ->
|
||||
ok = emqx_ft_storage_exporter:on_config_update(Storage, undefined),
|
||||
ok = emqx_ft_storage_exporter:update_config(Storage, undefined),
|
||||
ok = lists:foreach(
|
||||
fun(#{id := ChildId}) ->
|
||||
_ = supervisor:terminate_child(emqx_ft_sup, ChildId),
|
||||
|
|
|
@ -24,58 +24,24 @@
|
|||
|
||||
-import(emqx_dashboard_api_test_helpers, [host/0, uri/1]).
|
||||
|
||||
all() ->
|
||||
[
|
||||
{group, single},
|
||||
{group, cluster}
|
||||
].
|
||||
|
||||
groups() ->
|
||||
[
|
||||
{single, [], emqx_common_test_helpers:all(?MODULE)},
|
||||
{cluster, [], emqx_common_test_helpers:all(?MODULE) -- [t_ft_disabled]}
|
||||
].
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
suite() ->
|
||||
[{timetrap, {seconds, 90}}].
|
||||
|
||||
init_per_suite(Config) ->
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
ok.
|
||||
|
||||
init_per_group(Group = single, Config) ->
|
||||
WorkDir = ?config(priv_dir, Config),
|
||||
Apps = emqx_cth_suite:start(
|
||||
[
|
||||
{emqx, #{}},
|
||||
{emqx_ft, "file_transfer { enable = true }"},
|
||||
{emqx_management, #{}},
|
||||
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
|
||||
],
|
||||
#{work_dir => WorkDir}
|
||||
),
|
||||
{ok, App} = emqx_common_test_http:create_default_app(),
|
||||
[{group, Group}, {group_apps, Apps}, {api, App} | Config];
|
||||
init_per_group(Group = cluster, Config) ->
|
||||
WorkDir = ?config(priv_dir, Config),
|
||||
Cluster = mk_cluster_specs(Config),
|
||||
Nodes = [Node1 | _] = emqx_cth_cluster:start(Cluster, #{work_dir => WorkDir}),
|
||||
{ok, App} = erpc:call(Node1, emqx_common_test_http, create_default_app, []),
|
||||
[{group, Group}, {cluster_nodes, Nodes}, {api, App} | Config].
|
||||
[{cluster_nodes, Nodes}, {api, App} | Config].
|
||||
|
||||
end_per_group(single, Config) ->
|
||||
{ok, _} = emqx_common_test_http:delete_default_app(),
|
||||
ok = emqx_cth_suite:stop(?config(group_apps, Config));
|
||||
end_per_group(cluster, Config) ->
|
||||
ok = emqx_cth_cluster:stop(?config(cluster_nodes, Config));
|
||||
end_per_group(_Group, _Config) ->
|
||||
ok.
|
||||
end_per_suite(Config) ->
|
||||
ok = emqx_cth_cluster:stop(?config(cluster_nodes, Config)).
|
||||
|
||||
mk_cluster_specs(_Config) ->
|
||||
Apps = [
|
||||
{emqx_conf, #{start => false}},
|
||||
emqx_conf,
|
||||
{emqx, #{override_env => [{boot_modules, [broker, listeners]}]}},
|
||||
{emqx_ft, "file_transfer { enable = true }"},
|
||||
{emqx_management, #{}}
|
||||
|
@ -106,9 +72,8 @@ mk_cluster_specs(_Config) ->
|
|||
|
||||
init_per_testcase(Case, Config) ->
|
||||
[{tc, Case} | Config].
|
||||
end_per_testcase(t_ft_disabled, _Config) ->
|
||||
emqx_config:put([file_transfer, enable], true);
|
||||
end_per_testcase(_Case, _Config) ->
|
||||
end_per_testcase(_Case, Config) ->
|
||||
ok = reset_ft_config(Config, true),
|
||||
ok.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -294,7 +259,7 @@ t_ft_disabled(Config) ->
|
|||
)
|
||||
),
|
||||
|
||||
ok = emqx_config:put([file_transfer, enable], false),
|
||||
ok = reset_ft_config(Config, false),
|
||||
|
||||
?assertMatch(
|
||||
{ok, 503, _},
|
||||
|
@ -310,17 +275,161 @@ t_ft_disabled(Config) ->
|
|||
)
|
||||
).
|
||||
|
||||
t_configure(Config) ->
|
||||
?assertMatch(
|
||||
{ok, 200, #{<<"enable">> := true, <<"storage">> := #{}}},
|
||||
request_json(get, uri(["file_transfer"]), Config)
|
||||
),
|
||||
?assertMatch(
|
||||
{ok, 200, #{<<"enable">> := false}},
|
||||
request_json(put, uri(["file_transfer"]), #{<<"enable">> => false}, Config)
|
||||
),
|
||||
?assertMatch(
|
||||
{ok, 200, #{<<"enable">> := false}},
|
||||
request_json(get, uri(["file_transfer"]), Config)
|
||||
),
|
||||
?assertMatch(
|
||||
{ok, 200, #{}},
|
||||
request_json(
|
||||
put,
|
||||
uri(["file_transfer"]),
|
||||
#{
|
||||
<<"enable">> => true,
|
||||
<<"storage">> => emqx_ft_test_helpers:local_storage(Config)
|
||||
},
|
||||
Config
|
||||
)
|
||||
),
|
||||
?assertMatch(
|
||||
{ok, 400, _},
|
||||
request(
|
||||
put,
|
||||
uri(["file_transfer"]),
|
||||
#{
|
||||
<<"enable">> => true,
|
||||
<<"storage">> => #{
|
||||
<<"local">> => #{},
|
||||
<<"remote">> => #{}
|
||||
}
|
||||
},
|
||||
Config
|
||||
)
|
||||
),
|
||||
?assertMatch(
|
||||
{ok, 400, _},
|
||||
request(
|
||||
put,
|
||||
uri(["file_transfer"]),
|
||||
#{
|
||||
<<"enable">> => true,
|
||||
<<"storage">> => #{
|
||||
<<"local">> => #{
|
||||
<<"gc">> => #{<<"interval">> => -42}
|
||||
}
|
||||
}
|
||||
},
|
||||
Config
|
||||
)
|
||||
),
|
||||
S3Exporter = #{
|
||||
<<"host">> => <<"localhost">>,
|
||||
<<"port">> => 9000,
|
||||
<<"bucket">> => <<"emqx">>,
|
||||
<<"transport_options">> => #{
|
||||
<<"ssl">> => #{
|
||||
<<"enable">> => true,
|
||||
<<"certfile">> => emqx_ft_test_helpers:pem_privkey(),
|
||||
<<"keyfile">> => emqx_ft_test_helpers:pem_privkey()
|
||||
}
|
||||
}
|
||||
},
|
||||
?assertMatch(
|
||||
{ok, 200, #{
|
||||
<<"enable">> := true,
|
||||
<<"storage">> := #{
|
||||
<<"local">> := #{
|
||||
<<"exporter">> := #{
|
||||
<<"s3">> := #{
|
||||
<<"transport_options">> := #{
|
||||
<<"ssl">> := #{
|
||||
<<"enable">> := true,
|
||||
<<"certfile">> := <<"/", _CertFilepath/bytes>>,
|
||||
<<"keyfile">> := <<"/", _KeyFilepath/bytes>>
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}},
|
||||
request_json(
|
||||
put,
|
||||
uri(["file_transfer"]),
|
||||
#{
|
||||
<<"enable">> => true,
|
||||
<<"storage">> => #{
|
||||
<<"local">> => #{
|
||||
<<"exporter">> => #{
|
||||
<<"s3">> => S3Exporter
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
Config
|
||||
)
|
||||
),
|
||||
?assertMatch(
|
||||
{ok, 400, _},
|
||||
request_json(
|
||||
put,
|
||||
uri(["file_transfer"]),
|
||||
#{
|
||||
<<"enable">> => true,
|
||||
<<"storage">> => #{
|
||||
<<"local">> => #{
|
||||
<<"exporter">> => #{
|
||||
<<"s3">> => emqx_utils_maps:deep_put(
|
||||
[<<"transport_options">>, <<"ssl">>, <<"keyfile">>],
|
||||
S3Exporter,
|
||||
<<>>
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
Config
|
||||
)
|
||||
),
|
||||
?assertMatch(
|
||||
{ok, 200, #{}},
|
||||
request_json(
|
||||
put,
|
||||
uri(["file_transfer"]),
|
||||
#{
|
||||
<<"enable">> => true,
|
||||
<<"storage">> => #{
|
||||
<<"local">> => #{
|
||||
<<"exporter">> => #{
|
||||
<<"s3">> => emqx_utils_maps:deep_put(
|
||||
[<<"transport_options">>, <<"ssl">>, <<"enable">>],
|
||||
S3Exporter,
|
||||
false
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
Config
|
||||
)
|
||||
),
|
||||
ok.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Helpers
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
test_nodes(Config) ->
|
||||
case proplists:get_value(cluster_nodes, Config, []) of
|
||||
[] ->
|
||||
[node()];
|
||||
Nodes ->
|
||||
Nodes
|
||||
end.
|
||||
?config(cluster_nodes, Config).
|
||||
|
||||
client_id(Config) ->
|
||||
iolist_to_binary(io_lib:format("~s.~s", [?config(group, Config), ?config(tc, Config)])).
|
||||
|
@ -332,17 +441,26 @@ mk_file_name(N) ->
|
|||
"file." ++ integer_to_list(N).
|
||||
|
||||
request(Method, Url, Config) ->
|
||||
Opts = #{compatible_mode => true, httpc_req_opts => [{body_format, binary}]},
|
||||
emqx_mgmt_api_test_util:request_api(Method, Url, [], auth_header(Config), [], Opts).
|
||||
request(Method, Url, [], Config).
|
||||
|
||||
request_json(Method, Url, Config) ->
|
||||
case request(Method, Url, Config) of
|
||||
{ok, Code, Body} ->
|
||||
{ok, Code, json(Body)};
|
||||
request(Method, Url, Body, Config) ->
|
||||
Opts = #{compatible_mode => true, httpc_req_opts => [{body_format, binary}]},
|
||||
request(Method, Url, Body, Opts, Config).
|
||||
|
||||
request(Method, Url, Body, Opts, Config) ->
|
||||
emqx_mgmt_api_test_util:request_api(Method, Url, [], auth_header(Config), Body, Opts).
|
||||
|
||||
request_json(Method, Url, Body, Config) ->
|
||||
case request(Method, Url, Body, Config) of
|
||||
{ok, Code, RespBody} ->
|
||||
{ok, Code, json(RespBody)};
|
||||
Otherwise ->
|
||||
Otherwise
|
||||
end.
|
||||
|
||||
request_json(Method, Url, Config) ->
|
||||
request_json(Method, Url, [], Config).
|
||||
|
||||
json(Body) when is_binary(Body) ->
|
||||
emqx_utils_json:decode(Body, [return_maps]).
|
||||
|
||||
|
@ -368,3 +486,17 @@ to_list(L) when is_list(L) ->
|
|||
|
||||
pick(N, List) ->
|
||||
lists:nth(1 + (N rem length(List)), List).
|
||||
|
||||
reset_ft_config(Config, Enable) ->
|
||||
[Node | _] = test_nodes(Config),
|
||||
LocalConfig =
|
||||
#{
|
||||
<<"enable">> => Enable,
|
||||
<<"storage">> => #{
|
||||
<<"local">> => #{
|
||||
<<"enable">> => true
|
||||
}
|
||||
}
|
||||
},
|
||||
{ok, _} = rpc:call(Node, emqx_ft_conf, update, [LocalConfig]),
|
||||
ok.
|
||||
|
|
|
@ -53,16 +53,13 @@ end_per_testcase(_Case, Config) ->
|
|||
t_update_config(_Config) ->
|
||||
?assertMatch(
|
||||
{error, #{kind := validation_error}},
|
||||
emqx_conf:update(
|
||||
[file_transfer],
|
||||
#{<<"storage">> => #{<<"unknown">> => #{<<"foo">> => 42}}},
|
||||
#{}
|
||||
emqx_ft_conf:update(
|
||||
#{<<"storage">> => #{<<"unknown">> => #{<<"foo">> => 42}}}
|
||||
)
|
||||
),
|
||||
?assertMatch(
|
||||
{ok, _},
|
||||
emqx_conf:update(
|
||||
[file_transfer],
|
||||
emqx_ft_conf:update(
|
||||
#{
|
||||
<<"enable">> => true,
|
||||
<<"storage">> => #{
|
||||
|
@ -81,8 +78,7 @@ t_update_config(_Config) ->
|
|||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
#{}
|
||||
}
|
||||
)
|
||||
),
|
||||
?assertEqual(
|
||||
|
@ -101,13 +97,8 @@ t_update_config(_Config) ->
|
|||
t_disable_restore_config(Config) ->
|
||||
?assertMatch(
|
||||
{ok, _},
|
||||
emqx_conf:update(
|
||||
[file_transfer],
|
||||
#{
|
||||
<<"enable">> => true,
|
||||
<<"storage">> => #{<<"local">> => #{}}
|
||||
},
|
||||
#{}
|
||||
emqx_ft_conf:update(
|
||||
#{<<"enable">> => true, <<"storage">> => #{<<"local">> => #{}}}
|
||||
)
|
||||
),
|
||||
?assertEqual(
|
||||
|
@ -119,11 +110,7 @@ t_disable_restore_config(Config) ->
|
|||
% Verify that clearing storage settings reverts config to defaults
|
||||
?assertMatch(
|
||||
{ok, _},
|
||||
emqx_conf:update(
|
||||
[file_transfer],
|
||||
#{<<"enable">> => false, <<"storage">> => undefined},
|
||||
#{}
|
||||
)
|
||||
emqx_ft_conf:update(#{<<"enable">> => false, <<"storage">> => undefined})
|
||||
),
|
||||
?assertEqual(
|
||||
false,
|
||||
|
@ -155,8 +142,7 @@ t_disable_restore_config(Config) ->
|
|||
Root = emqx_ft_test_helpers:root(Config, node(), [segments]),
|
||||
?assertMatch(
|
||||
{ok, _},
|
||||
emqx_conf:update(
|
||||
[file_transfer],
|
||||
emqx_ft_conf:update(
|
||||
#{
|
||||
<<"enable">> => true,
|
||||
<<"storage">> => #{
|
||||
|
@ -167,8 +153,7 @@ t_disable_restore_config(Config) ->
|
|||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
#{}
|
||||
}
|
||||
)
|
||||
),
|
||||
% Verify that GC is getting triggered eventually
|
||||
|
@ -192,11 +177,7 @@ t_disable_restore_config(Config) ->
|
|||
t_switch_exporter(_Config) ->
|
||||
?assertMatch(
|
||||
{ok, _},
|
||||
emqx_conf:update(
|
||||
[file_transfer],
|
||||
#{<<"enable">> => true},
|
||||
#{}
|
||||
)
|
||||
emqx_ft_conf:update(#{<<"enable">> => true})
|
||||
),
|
||||
?assertMatch(
|
||||
#{local := #{exporter := #{local := _}}},
|
||||
|
@ -248,5 +229,129 @@ t_switch_exporter(_Config) ->
|
|||
% Verify that transfers work
|
||||
ok = emqx_ft_test_helpers:upload_file(gen_clientid(), <<"f1">>, "f1", <<?MODULE_STRING>>).
|
||||
|
||||
t_persist_ssl_certfiles(Config) ->
|
||||
?assertMatch(
|
||||
{ok, _},
|
||||
emqx_ft_conf:update(mk_storage(true))
|
||||
),
|
||||
?assertEqual(
|
||||
[],
|
||||
list_ssl_certfiles(Config)
|
||||
),
|
||||
?assertMatch(
|
||||
{error, {pre_config_update, _, {bad_ssl_config, #{}}}},
|
||||
emqx_ft_conf:update(
|
||||
mk_storage(true, #{
|
||||
<<"s3">> => mk_s3_config(#{
|
||||
<<"transport_options">> => #{
|
||||
<<"ssl">> => #{
|
||||
<<"certfile">> => <<"cert.pem">>,
|
||||
<<"keyfile">> => <<"key.pem">>
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
)
|
||||
),
|
||||
?assertMatch(
|
||||
{ok, _},
|
||||
emqx_ft_conf:update(
|
||||
mk_storage(false, #{
|
||||
<<"s3">> => mk_s3_config(#{
|
||||
<<"transport_options">> => #{
|
||||
<<"ssl">> => #{
|
||||
<<"certfile">> => emqx_ft_test_helpers:pem_privkey(),
|
||||
<<"keyfile">> => emqx_ft_test_helpers:pem_privkey()
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
)
|
||||
),
|
||||
?assertMatch(
|
||||
#{
|
||||
local := #{
|
||||
exporter := #{
|
||||
s3 := #{
|
||||
transport_options := #{
|
||||
ssl := #{
|
||||
certfile := <<"/", _CertFilepath/binary>>,
|
||||
keyfile := <<"/", _KeyFilepath/binary>>
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
emqx_ft_conf:storage()
|
||||
),
|
||||
?assertMatch(
|
||||
[_Certfile, _Keyfile],
|
||||
list_ssl_certfiles(Config)
|
||||
),
|
||||
?assertMatch(
|
||||
{ok, _},
|
||||
emqx_ft_conf:update(mk_storage(true))
|
||||
).
|
||||
|
||||
t_import(_Config) ->
|
||||
{ok, _} =
|
||||
emqx_ft_conf:update(
|
||||
mk_storage(true, #{
|
||||
<<"s3">> => mk_s3_config(#{
|
||||
<<"transport_options">> => #{
|
||||
<<"ssl">> => #{
|
||||
<<"certfile">> => emqx_ft_test_helpers:pem_privkey(),
|
||||
<<"keyfile">> => emqx_ft_test_helpers:pem_privkey()
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
),
|
||||
|
||||
BackupConfig = emqx_config:get_raw([]),
|
||||
FTBackupConfig = maps:with([<<"file_transfer">>], BackupConfig),
|
||||
|
||||
{ok, _} = emqx_ft_conf:update(mk_storage(true)),
|
||||
|
||||
?assertMatch(
|
||||
{ok, _},
|
||||
emqx_ft_conf:import_config(FTBackupConfig)
|
||||
),
|
||||
|
||||
?assertMatch(
|
||||
#{local := #{exporter := #{s3 := #{enable := true}}}},
|
||||
emqx_ft_conf:storage()
|
||||
).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Helper functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
mk_storage(Enabled) ->
|
||||
mk_storage(Enabled, #{<<"local">> => #{}}).
|
||||
|
||||
mk_storage(Enabled, Exporter) ->
|
||||
#{
|
||||
<<"enable">> => Enabled,
|
||||
<<"storage">> => #{
|
||||
<<"local">> => #{
|
||||
<<"exporter">> => Exporter
|
||||
}
|
||||
}
|
||||
}.
|
||||
|
||||
mk_s3_config(S3Config) ->
|
||||
BaseS3Config = #{
|
||||
<<"bucket">> => <<"emqx">>,
|
||||
<<"host">> => <<"https://localhost">>,
|
||||
<<"port">> => 9000
|
||||
},
|
||||
maps:merge(BaseS3Config, S3Config).
|
||||
|
||||
gen_clientid() ->
|
||||
emqx_base62:encode(emqx_guid:gen()).
|
||||
|
||||
list_ssl_certfiles(_Config) ->
|
||||
CertDir = emqx:mutable_certs_dir(),
|
||||
filelib:fold_files(CertDir, ".*", true, fun(Filepath, Acc) -> [Filepath | Acc] end, []).
|
||||
|
|
|
@ -119,3 +119,13 @@ upload_file(ClientId, FileId, Name, Data, Node) ->
|
|||
|
||||
aws_config() ->
|
||||
emqx_s3_test_helpers:aws_config(tcp, binary_to_list(?S3_HOST), ?S3_PORT).
|
||||
|
||||
pem_privkey() ->
|
||||
<<
|
||||
"\n"
|
||||
"-----BEGIN EC PRIVATE KEY-----\n"
|
||||
"MHQCAQEEICKTbbathzvD8zvgjL7qRHhW4alS0+j0Loo7WeYX9AxaoAcGBSuBBAAK\n"
|
||||
"oUQDQgAEJBdF7MIdam5T4YF3JkEyaPKdG64TVWCHwr/plC0QzNVJ67efXwxlVGTo\n"
|
||||
"ju0VBj6tOX1y6C0U+85VOM0UU5xqvw==\n"
|
||||
"-----END EC PRIVATE KEY-----\n"
|
||||
>>.
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_s3, [
|
||||
{description, "EMQX S3"},
|
||||
{vsn, "5.0.8"},
|
||||
{vsn, "5.0.9"},
|
||||
{modules, []},
|
||||
{registered, [emqx_s3_sup]},
|
||||
{applications, [
|
||||
|
|
|
@ -14,6 +14,11 @@
|
|||
with_client/2
|
||||
]).
|
||||
|
||||
-export([
|
||||
pre_config_update/3,
|
||||
post_config_update/3
|
||||
]).
|
||||
|
||||
-export_type([
|
||||
profile_id/0,
|
||||
profile_config/0,
|
||||
|
@ -94,3 +99,31 @@ with_client(ProfileId, Fun) when is_function(Fun, 1) andalso ?IS_PROFILE_ID(Prof
|
|||
{error, _} = Error ->
|
||||
Error
|
||||
end.
|
||||
|
||||
%%
|
||||
|
||||
-spec pre_config_update(
|
||||
profile_id(), maybe(emqx_config:raw_config()), maybe(emqx_config:raw_config())
|
||||
) ->
|
||||
{ok, maybe(profile_config())} | {error, term()}.
|
||||
pre_config_update(ProfileId, NewConfig = #{<<"transport_options">> := TransportOpts}, _OldConfig) ->
|
||||
case emqx_connector_ssl:convert_certs(mk_certs_dir(ProfileId), TransportOpts) of
|
||||
{ok, TransportOptsConv} ->
|
||||
{ok, NewConfig#{<<"transport_options">> := TransportOptsConv}};
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end;
|
||||
pre_config_update(_ProfileId, NewConfig, _OldConfig) ->
|
||||
{ok, NewConfig}.
|
||||
|
||||
-spec post_config_update(
|
||||
profile_id(),
|
||||
maybe(emqx_config:config()),
|
||||
maybe(emqx_config:config())
|
||||
) ->
|
||||
ok.
|
||||
post_config_update(_ProfileId, _NewConfig, _OldConfig) ->
|
||||
ok.
|
||||
|
||||
mk_certs_dir(ProfileId) ->
|
||||
filename:join([s3, profiles, ProfileId]).
|
||||
|
|
|
@ -14,6 +14,9 @@
|
|||
-export([translate/1]).
|
||||
-export([translate/2]).
|
||||
|
||||
-type secret_access_key() :: string() | function().
|
||||
-reflect_type([secret_access_key/0]).
|
||||
|
||||
roots() ->
|
||||
[s3].
|
||||
|
||||
|
@ -34,7 +37,7 @@ fields(s3) ->
|
|||
)},
|
||||
{secret_access_key,
|
||||
mk(
|
||||
hoconsc:union([string(), function()]),
|
||||
secret_access_key(),
|
||||
#{
|
||||
desc => ?DESC("secret_access_key"),
|
||||
required => false,
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
{application, emqx_utils, [
|
||||
{description, "Miscellaneous utilities for EMQX apps"},
|
||||
% strict semver, bump manually!
|
||||
{vsn, "5.0.4"},
|
||||
{vsn, "5.0.5"},
|
||||
{modules, [
|
||||
emqx_utils,
|
||||
emqx_utils_api,
|
||||
|
|
|
@ -60,7 +60,8 @@
|
|||
safe_filename/1,
|
||||
diff_lists/3,
|
||||
merge_lists/3,
|
||||
tcp_keepalive_opts/4
|
||||
tcp_keepalive_opts/4,
|
||||
format/1
|
||||
]).
|
||||
|
||||
-export([
|
||||
|
@ -525,6 +526,9 @@ tcp_keepalive_opts({unix, darwin}, Idle, Interval, Probes) ->
|
|||
tcp_keepalive_opts(OS, _Idle, _Interval, _Probes) ->
|
||||
{error, {unsupported_os, OS}}.
|
||||
|
||||
format(Term) ->
|
||||
iolist_to_binary(io_lib:format("~0p", [Term])).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Internal Functions
|
||||
%%------------------------------------------------------------------------------
|
||||
|
@ -606,7 +610,7 @@ to_hr_error({not_authorized, _}) ->
|
|||
to_hr_error({malformed_username_or_password, _}) ->
|
||||
<<"Bad username or password">>;
|
||||
to_hr_error(Error) ->
|
||||
iolist_to_binary(io_lib:format("~0p", [Error])).
|
||||
format(Error).
|
||||
|
||||
try_to_existing_atom(Convert, Data, Encoding) ->
|
||||
try Convert(Data, Encoding) of
|
||||
|
|
|
@ -6,6 +6,12 @@ file_list.desc:
|
|||
file_list_transfer.desc:
|
||||
"""List a file uploaded during specified transfer, identified by client id and file id."""
|
||||
|
||||
file_transfer_get_config.desc:
|
||||
"""Show current File Transfer configuration."""
|
||||
|
||||
file_transfer_update_config.desc:
|
||||
"""Replace File Transfer configuration."""
|
||||
|
||||
}
|
||||
|
||||
emqx_ft_storage_exporter_fs_api {
|
||||
|
|
Loading…
Reference in New Issue