Merge pull request #10555 from savonarola/0427-file-transfer-more-s3-tests
chore(ft): add s3 exporter tests
This commit is contained in:
commit
6205a2ba2b
|
@ -29,6 +29,7 @@
|
|||
-export([file_schema/1]).
|
||||
-export([base_path/0]).
|
||||
-export([relative_uri/1]).
|
||||
-export([compose_filters/2]).
|
||||
|
||||
-export([filter_check_request/2, filter_check_request_and_translate_body/2]).
|
||||
|
||||
|
@ -82,14 +83,30 @@
|
|||
-type request() :: #{bindings => map(), query_string => map(), body => map()}.
|
||||
-type request_meta() :: #{module => module(), path => string(), method => atom()}.
|
||||
|
||||
-type filter_result() :: {ok, request()} | {400, 'BAD_REQUEST', binary()}.
|
||||
-type filter() :: fun((request(), request_meta()) -> filter_result()).
|
||||
%% More exact types are defined in minirest.hrl, but we don't want to include it
|
||||
%% because it defines a lot of types and they may clash with the types declared locally.
|
||||
-type status_code() :: pos_integer().
|
||||
-type error_code() :: atom() | binary().
|
||||
-type error_message() :: binary().
|
||||
-type response_body() :: term().
|
||||
-type headers() :: map().
|
||||
|
||||
-type response() ::
|
||||
status_code()
|
||||
| {status_code()}
|
||||
| {status_code(), response_body()}
|
||||
| {status_code(), headers(), response_body()}
|
||||
| {status_code(), error_code(), error_message()}.
|
||||
|
||||
-type filter_result() :: {ok, request()} | response().
|
||||
-type filter() :: emqx_maybe:t(fun((request(), request_meta()) -> filter_result())).
|
||||
|
||||
-type spec_opts() :: #{
|
||||
check_schema => boolean() | filter(),
|
||||
translate_body => boolean(),
|
||||
schema_converter => fun((hocon_schema:schema(), Module :: atom()) -> map()),
|
||||
i18n_lang => atom() | string() | binary()
|
||||
i18n_lang => atom() | string() | binary(),
|
||||
filter => filter()
|
||||
}.
|
||||
|
||||
-type route_path() :: string() | binary().
|
||||
|
@ -115,9 +132,9 @@ spec(Module, Options) ->
|
|||
lists:foldl(
|
||||
fun(Path, {AllAcc, AllRefsAcc}) ->
|
||||
{OperationId, Specs, Refs} = parse_spec_ref(Module, Path, Options),
|
||||
CheckSchema = support_check_schema(Options),
|
||||
Opts = #{filter => filter(Options)},
|
||||
{
|
||||
[{filename:join("/", Path), Specs, OperationId, CheckSchema} | AllAcc],
|
||||
[{filename:join("/", Path), Specs, OperationId, Opts} | AllAcc],
|
||||
Refs ++ AllRefsAcc
|
||||
}
|
||||
end,
|
||||
|
@ -204,6 +221,21 @@ file_schema(FileName) ->
|
|||
}
|
||||
}.
|
||||
|
||||
-spec compose_filters(filter(), filter()) -> filter().
|
||||
compose_filters(undefined, Filter2) ->
|
||||
Filter2;
|
||||
compose_filters(Filter1, undefined) ->
|
||||
Filter1;
|
||||
compose_filters(Filter1, Filter2) ->
|
||||
fun(Request, RequestMeta) ->
|
||||
case Filter1(Request, RequestMeta) of
|
||||
{ok, Request1} ->
|
||||
Filter2(Request1, RequestMeta);
|
||||
Response ->
|
||||
Response
|
||||
end
|
||||
end.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Private functions
|
||||
%%------------------------------------------------------------------------------
|
||||
|
@ -235,14 +267,22 @@ check_only(Schema, Map, Opts) ->
|
|||
_ = hocon_tconf:check_plain(Schema, Map, Opts),
|
||||
Map.
|
||||
|
||||
support_check_schema(#{check_schema := true, translate_body := true}) ->
|
||||
#{filter => fun ?MODULE:filter_check_request_and_translate_body/2};
|
||||
support_check_schema(#{check_schema := true}) ->
|
||||
#{filter => fun ?MODULE:filter_check_request/2};
|
||||
support_check_schema(#{check_schema := Filter}) when is_function(Filter, 2) ->
|
||||
#{filter => Filter};
|
||||
support_check_schema(_) ->
|
||||
#{filter => undefined}.
|
||||
filter(Options) ->
|
||||
CheckSchemaFilter = check_schema_filter(Options),
|
||||
CustomFilter = custom_filter(Options),
|
||||
compose_filters(CheckSchemaFilter, CustomFilter).
|
||||
|
||||
custom_filter(Options) ->
|
||||
maps:get(filter, Options, undefined).
|
||||
|
||||
check_schema_filter(#{check_schema := true, translate_body := true}) ->
|
||||
fun ?MODULE:filter_check_request_and_translate_body/2;
|
||||
check_schema_filter(#{check_schema := true}) ->
|
||||
fun ?MODULE:filter_check_request/2;
|
||||
check_schema_filter(#{check_schema := Filter}) when is_function(Filter, 2) ->
|
||||
Filter;
|
||||
check_schema_filter(_) ->
|
||||
undefined.
|
||||
|
||||
parse_spec_ref(Module, Path, Options) ->
|
||||
Schema =
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
minio
|
|
@ -1,15 +0,0 @@
|
|||
file_transfer {
|
||||
storage {
|
||||
type = local
|
||||
segments {
|
||||
root = "{{ platform_data_dir }}/file_transfer/segments"
|
||||
gc {
|
||||
interval = "1h"
|
||||
}
|
||||
}
|
||||
exporter {
|
||||
root = "{{ platform_data_dir }}/file_transfer/exports"
|
||||
type = local
|
||||
}
|
||||
}
|
||||
}
|
|
@ -33,6 +33,9 @@
|
|||
fields/1
|
||||
]).
|
||||
|
||||
%% Minirest filter for checking if file transfer is enabled
|
||||
-export([check_ft_enabled/2]).
|
||||
|
||||
%% API callbacks
|
||||
-export([
|
||||
'/file_transfer/files'/2,
|
||||
|
@ -44,7 +47,9 @@
|
|||
namespace() -> "file_transfer".
|
||||
|
||||
api_spec() ->
|
||||
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
|
||||
emqx_dashboard_swagger:spec(?MODULE, #{
|
||||
check_schema => true, filter => fun ?MODULE:check_ft_enabled/2
|
||||
}).
|
||||
|
||||
paths() ->
|
||||
[
|
||||
|
@ -97,6 +102,14 @@ schema("/file_transfer/files/:clientid/:fileid") ->
|
|||
}
|
||||
}.
|
||||
|
||||
check_ft_enabled(Params, _Meta) ->
|
||||
case emqx_ft_conf:enabled() of
|
||||
true ->
|
||||
{ok, Params};
|
||||
false ->
|
||||
{503, error_msg('SERVICE_UNAVAILABLE', <<"Service unavailable">>)}
|
||||
end.
|
||||
|
||||
'/file_transfer/files'(get, #{
|
||||
query_string := QueryString
|
||||
}) ->
|
||||
|
|
|
@ -56,7 +56,7 @@ enabled() ->
|
|||
|
||||
-spec storage() -> _Storage.
|
||||
storage() ->
|
||||
emqx_config:get([file_transfer, storage], undefined).
|
||||
emqx_config:get([file_transfer, storage]).
|
||||
|
||||
-spec gc_interval(_Storage) -> emqx_maybe:t(milliseconds()).
|
||||
gc_interval(Conf = #{type := local}) ->
|
||||
|
@ -88,11 +88,12 @@ store_segment_timeout() ->
|
|||
|
||||
-spec load() -> ok.
|
||||
load() ->
|
||||
ok = on_config_update(#{}, emqx_config:get([file_transfer], #{})),
|
||||
ok = maybe_start(),
|
||||
emqx_conf:add_handler([file_transfer], ?MODULE).
|
||||
|
||||
-spec unload() -> ok.
|
||||
unload() ->
|
||||
ok = stop(),
|
||||
emqx_conf:remove_handler([file_transfer]).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -115,23 +116,26 @@ pre_config_update(_, Req, _Config) ->
|
|||
post_config_update([file_transfer | _], _Req, NewConfig, OldConfig, _AppEnvs) ->
|
||||
on_config_update(OldConfig, NewConfig).
|
||||
|
||||
on_config_update(OldConfig, NewConfig) ->
|
||||
lists:foreach(
|
||||
fun(ConfKey) ->
|
||||
on_config_update(
|
||||
ConfKey,
|
||||
maps:get(ConfKey, OldConfig, undefined),
|
||||
maps:get(ConfKey, NewConfig, undefined)
|
||||
)
|
||||
end,
|
||||
[storage, enable]
|
||||
).
|
||||
|
||||
on_config_update(_, Config, Config) ->
|
||||
on_config_update(#{enable := false}, #{enable := false}) ->
|
||||
ok;
|
||||
on_config_update(storage, OldConfig, NewConfig) ->
|
||||
ok = emqx_ft_storage:on_config_update(OldConfig, NewConfig);
|
||||
on_config_update(enable, _, true) ->
|
||||
on_config_update(#{enable := true, storage := OldStorage}, #{enable := false}) ->
|
||||
ok = emqx_ft_storage:on_config_update(OldStorage, undefined),
|
||||
ok = emqx_ft:unhook();
|
||||
on_config_update(#{enable := false}, #{enable := true, storage := NewStorage}) ->
|
||||
ok = emqx_ft_storage:on_config_update(undefined, NewStorage),
|
||||
ok = emqx_ft:hook();
|
||||
on_config_update(enable, _, false) ->
|
||||
ok = emqx_ft:unhook().
|
||||
on_config_update(#{enable := true, storage := OldStorage}, #{enable := true, storage := NewStorage}) ->
|
||||
ok = emqx_ft_storage:on_config_update(OldStorage, NewStorage).
|
||||
|
||||
maybe_start() ->
|
||||
case emqx_config:get([file_transfer]) of
|
||||
#{enable := true, storage := Storage} ->
|
||||
ok = emqx_ft_storage:on_config_update(undefined, Storage),
|
||||
ok = emqx_ft:hook();
|
||||
_ ->
|
||||
ok
|
||||
end.
|
||||
|
||||
stop() ->
|
||||
ok = emqx_ft:unhook(),
|
||||
ok = emqx_ft_storage:on_config_update(storage(), undefined).
|
||||
|
|
|
@ -90,6 +90,12 @@
|
|||
-callback files(storage(), query(Cursor)) ->
|
||||
{ok, page(file_info(), Cursor)} | {error, term()}.
|
||||
|
||||
-callback start(emqx_config:config()) -> any().
|
||||
-callback stop(emqx_config:config()) -> any().
|
||||
|
||||
-callback on_config_update(_OldConfig :: emqx_config:config(), _NewConfig :: emqx_config:config()) ->
|
||||
any().
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% API
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -167,49 +173,28 @@ apply_storage(Storage, Fun, Args) when is_function(Fun) ->
|
|||
|
||||
-spec on_config_update(_Old :: emqx_maybe:t(storage()), _New :: emqx_maybe:t(storage())) ->
|
||||
ok.
|
||||
on_config_update(Storage, Storage) ->
|
||||
on_config_update(#{type := _} = Storage, #{type := _} = Storage) ->
|
||||
ok;
|
||||
on_config_update(#{type := Type} = StorageOld, #{type := Type} = StorageNew) ->
|
||||
ok = (mod(StorageNew)):on_config_update(StorageOld, StorageNew);
|
||||
on_config_update(StorageOld, StorageNew) ->
|
||||
on_config_update(StorageOld, StorageNew) when
|
||||
(StorageOld =:= undefined orelse is_map_key(type, StorageOld)) andalso
|
||||
(StorageNew =:= undefined orelse is_map_key(type, StorageNew))
|
||||
->
|
||||
_ = emqx_maybe:apply(fun on_storage_stop/1, StorageOld),
|
||||
_ = emqx_maybe:apply(fun on_storage_start/1, StorageNew),
|
||||
_ = emqx_maybe:apply(
|
||||
fun(Storage) -> (mod(Storage)):on_config_update(StorageOld, StorageNew) end,
|
||||
StorageNew
|
||||
),
|
||||
ok.
|
||||
|
||||
on_storage_start(Storage = #{type := _}) ->
|
||||
lists:foreach(
|
||||
fun(ChildSpec) ->
|
||||
{ok, _Child} = supervisor:start_child(emqx_ft_sup, ChildSpec)
|
||||
end,
|
||||
child_spec(Storage)
|
||||
).
|
||||
|
||||
on_storage_stop(Storage = #{type := _}) ->
|
||||
lists:foreach(
|
||||
fun(#{id := ChildId}) ->
|
||||
_ = supervisor:terminate_child(emqx_ft_sup, ChildId),
|
||||
ok = supervisor:delete_child(emqx_ft_sup, ChildId)
|
||||
end,
|
||||
child_spec(Storage)
|
||||
).
|
||||
|
||||
child_spec(Storage) ->
|
||||
try
|
||||
Mod = mod(Storage),
|
||||
Mod:child_spec(Storage)
|
||||
catch
|
||||
error:disabled -> [];
|
||||
error:undef -> []
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Local FS API
|
||||
%% Local API
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
on_storage_start(Storage) ->
|
||||
(mod(Storage)):start(Storage).
|
||||
|
||||
on_storage_stop(Storage) ->
|
||||
(mod(Storage)):stop(Storage).
|
||||
|
||||
storage() ->
|
||||
emqx_ft_conf:storage().
|
||||
|
||||
|
|
|
@ -153,14 +153,14 @@ on_exporter_update(Config, Config) ->
|
|||
on_exporter_update({ExporterMod, ConfigOld}, {ExporterMod, ConfigNew}) ->
|
||||
ExporterMod:update(ConfigOld, ConfigNew);
|
||||
on_exporter_update(ExporterOld, ExporterNew) ->
|
||||
_ = emqx_maybe:apply(fun stop_exporter/1, ExporterOld),
|
||||
_ = emqx_maybe:apply(fun start_exporter/1, ExporterNew),
|
||||
_ = emqx_maybe:apply(fun stop/1, ExporterOld),
|
||||
_ = emqx_maybe:apply(fun start/1, ExporterNew),
|
||||
ok.
|
||||
|
||||
start_exporter({ExporterMod, ExporterOpts}) ->
|
||||
start({ExporterMod, ExporterOpts}) ->
|
||||
ok = ExporterMod:start(ExporterOpts).
|
||||
|
||||
stop_exporter({ExporterMod, ExporterOpts}) ->
|
||||
stop({ExporterMod, ExporterOpts}) ->
|
||||
ok = ExporterMod:stop(ExporterOpts).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
|
|
|
@ -47,7 +47,9 @@
|
|||
namespace() -> "file_transfer".
|
||||
|
||||
api_spec() ->
|
||||
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
|
||||
emqx_dashboard_swagger:spec(?MODULE, #{
|
||||
check_schema => true, filter => fun emqx_ft_api:check_ft_enabled/2
|
||||
}).
|
||||
|
||||
paths() ->
|
||||
[
|
||||
|
|
|
@ -149,7 +149,7 @@ list(Client, _Options, #{transfer := Transfer}) ->
|
|||
end;
|
||||
list(Client, _Options, Query) ->
|
||||
Limit = maps:get(limit, Query, undefined),
|
||||
Marker = emqx_maybe:apply(fun decode_cursor/1, maps:get(cursor, Query, undefined)),
|
||||
Marker = emqx_maybe:apply(fun decode_cursor/1, maps:get(following, Query, undefined)),
|
||||
case list_pages(Client, Marker, Limit, []) of
|
||||
{ok, {Exports, undefined}} ->
|
||||
{ok, #{items => Exports}};
|
||||
|
|
|
@ -48,6 +48,8 @@
|
|||
-export([files/2]).
|
||||
|
||||
-export([on_config_update/2]).
|
||||
-export([start/1]).
|
||||
-export([stop/1]).
|
||||
|
||||
-export_type([storage/0]).
|
||||
-export_type([filefrag/1]).
|
||||
|
@ -227,6 +229,27 @@ on_config_update(StorageOld, StorageNew) ->
|
|||
ok = emqx_ft_storage_fs_gc:reset(StorageNew),
|
||||
emqx_ft_storage_exporter:on_config_update(StorageOld, StorageNew).
|
||||
|
||||
start(Storage) ->
|
||||
ok = lists:foreach(
|
||||
fun(ChildSpec) ->
|
||||
{ok, _Child} = supervisor:start_child(emqx_ft_sup, ChildSpec)
|
||||
end,
|
||||
child_spec(Storage)
|
||||
),
|
||||
ok = emqx_ft_storage_exporter:on_config_update(undefined, Storage),
|
||||
ok.
|
||||
|
||||
stop(Storage) ->
|
||||
ok = emqx_ft_storage_exporter:on_config_update(Storage, undefined),
|
||||
ok = lists:foreach(
|
||||
fun(#{id := ChildId}) ->
|
||||
_ = supervisor:terminate_child(emqx_ft_sup, ChildId),
|
||||
ok = supervisor:delete_child(emqx_ft_sup, ChildId)
|
||||
end,
|
||||
child_spec(Storage)
|
||||
),
|
||||
ok.
|
||||
|
||||
%%
|
||||
|
||||
-spec transfers(storage()) ->
|
||||
|
|
|
@ -83,6 +83,8 @@ mk_cluster_specs(Config) ->
|
|||
|
||||
init_per_testcase(Case, Config) ->
|
||||
[{tc, Case} | Config].
|
||||
end_per_testcase(t_ft_disabled, _Config) ->
|
||||
emqx_config:put([file_transfer, enable], true);
|
||||
end_per_testcase(_Case, _Config) ->
|
||||
ok.
|
||||
|
||||
|
@ -223,6 +225,35 @@ t_list_files_paging(Config) ->
|
|||
?assertEqual(Files, PageThrough(#{limit => 8}, [])),
|
||||
?assertEqual(Files, PageThrough(#{limit => NFiles}, [])).
|
||||
|
||||
t_ft_disabled(_Config) ->
|
||||
?assertMatch(
|
||||
{ok, 200, _},
|
||||
request_json(get, uri(["file_transfer", "files"]))
|
||||
),
|
||||
|
||||
?assertMatch(
|
||||
{ok, 400, _},
|
||||
request_json(
|
||||
get,
|
||||
uri(["file_transfer", "file"]) ++ query(#{fileref => <<"f1">>})
|
||||
)
|
||||
),
|
||||
|
||||
ok = emqx_config:put([file_transfer, enable], false),
|
||||
|
||||
?assertMatch(
|
||||
{ok, 503, _},
|
||||
request_json(get, uri(["file_transfer", "files"]))
|
||||
),
|
||||
|
||||
?assertMatch(
|
||||
{ok, 503, _},
|
||||
request_json(
|
||||
get,
|
||||
uri(["file_transfer", "file"]) ++ query(#{fileref => <<"f1">>, node => node()})
|
||||
)
|
||||
).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Helpers
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -32,9 +32,10 @@ end_per_suite(_Config) ->
|
|||
ok.
|
||||
|
||||
init_per_testcase(_Case, Config) ->
|
||||
% NOTE: running each testcase with clean config
|
||||
_ = emqx_config:save_schema_mod_and_names(emqx_ft_schema),
|
||||
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_ft], fun(_) -> ok end),
|
||||
ok = emqx_common_test_helpers:start_apps(
|
||||
[emqx_conf, emqx_ft], emqx_ft_test_helpers:env_handler(Config)
|
||||
),
|
||||
{ok, _} = emqx:update_config([rpc, port_discovery], manual),
|
||||
Config.
|
||||
|
||||
|
|
|
@ -0,0 +1,199 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_ft_storage_exporter_s3_SUITE).
|
||||
|
||||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
-include_lib("stdlib/include/assert.hrl").
|
||||
|
||||
-define(assertS3Data(Data, Url),
|
||||
case httpc:request(Url) of
|
||||
{ok, {{_StatusLine, 200, "OK"}, _Headers, Body}} ->
|
||||
?assertEqual(Data, list_to_binary(Body), "S3 data mismatch");
|
||||
OtherResponse ->
|
||||
ct:fail("Unexpected response: ~p", [OtherResponse])
|
||||
end
|
||||
).
|
||||
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
Config.
|
||||
end_per_suite(_Config) ->
|
||||
ok.
|
||||
|
||||
set_special_configs(Config) ->
|
||||
fun
|
||||
(emqx_ft) ->
|
||||
Storage = emqx_ft_test_helpers:local_storage(Config, #{
|
||||
exporter => s3, bucket_name => ?config(bucket_name, Config)
|
||||
}),
|
||||
emqx_ft_test_helpers:load_config(#{<<"enable">> => true, <<"storage">> => Storage});
|
||||
(_) ->
|
||||
ok
|
||||
end.
|
||||
|
||||
init_per_testcase(Case, Config0) ->
|
||||
ClientId = atom_to_binary(Case),
|
||||
BucketName = create_bucket(),
|
||||
Config1 = [{bucket_name, BucketName}, {clientid, ClientId} | Config0],
|
||||
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_ft], set_special_configs(Config1)),
|
||||
Config1.
|
||||
end_per_testcase(_Case, _Config) ->
|
||||
ok = emqx_common_test_helpers:stop_apps([emqx_ft, emqx_conf]),
|
||||
ok.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Test Cases
|
||||
%%-------------------------------------------------------------------
|
||||
|
||||
t_happy_path(Config) ->
|
||||
ClientId = ?config(clientid, Config),
|
||||
|
||||
FileId = <<"🌚"/utf8>>,
|
||||
Name = "cool_name",
|
||||
Data = <<"data"/utf8>>,
|
||||
|
||||
?assertEqual(
|
||||
ok,
|
||||
emqx_ft_test_helpers:upload_file(ClientId, FileId, Name, Data)
|
||||
),
|
||||
|
||||
{ok, #{items := [#{uri := Uri}]}} = emqx_ft_storage:files(),
|
||||
|
||||
?assertS3Data(Data, Uri),
|
||||
|
||||
Key = binary_to_list(ClientId) ++ "/" ++ binary_to_list(FileId) ++ "/" ++ Name,
|
||||
Meta = erlcloud_s3:get_object_metadata(
|
||||
?config(bucket_name, Config), Key, emqx_ft_test_helpers:aws_config()
|
||||
),
|
||||
|
||||
?assertEqual(
|
||||
ClientId,
|
||||
metadata_field("clientid", Meta)
|
||||
),
|
||||
|
||||
?assertEqual(
|
||||
FileId,
|
||||
metadata_field("fileid", Meta)
|
||||
),
|
||||
|
||||
NameBin = list_to_binary(Name),
|
||||
?assertMatch(
|
||||
#{
|
||||
<<"name">> := NameBin,
|
||||
<<"size">> := 4
|
||||
},
|
||||
emqx_utils_json:decode(metadata_field("filemeta", Meta), [return_maps])
|
||||
).
|
||||
|
||||
t_upload_error(Config) ->
|
||||
ClientId = ?config(clientid, Config),
|
||||
|
||||
FileId = <<"🌚"/utf8>>,
|
||||
Name = "cool_name",
|
||||
Data = <<"data"/utf8>>,
|
||||
|
||||
{ok, _} = emqx_conf:update(
|
||||
[file_transfer, storage, exporter, bucket], <<"invalid-bucket">>, #{}
|
||||
),
|
||||
|
||||
?assertEqual(
|
||||
{error, unspecified_error},
|
||||
emqx_ft_test_helpers:upload_file(ClientId, FileId, Name, Data)
|
||||
).
|
||||
|
||||
t_paging(Config) ->
|
||||
ClientId = ?config(clientid, Config),
|
||||
N = 1050,
|
||||
|
||||
FileId = fun integer_to_binary/1,
|
||||
Name = "cool_name",
|
||||
Data = fun integer_to_binary/1,
|
||||
|
||||
ok = lists:foreach(
|
||||
fun(I) ->
|
||||
ok = emqx_ft_test_helpers:upload_file(ClientId, FileId(I), Name, Data(I))
|
||||
end,
|
||||
lists:seq(1, N)
|
||||
),
|
||||
|
||||
{ok, #{items := [#{uri := Uri}]}} = emqx_ft_storage:files(#{transfer => {ClientId, FileId(123)}}),
|
||||
|
||||
?assertS3Data(Data(123), Uri),
|
||||
|
||||
lists:foreach(
|
||||
fun(PageSize) ->
|
||||
Pages = file_pages(#{limit => PageSize}),
|
||||
?assertEqual(
|
||||
expected_page_count(PageSize, N),
|
||||
length(Pages)
|
||||
),
|
||||
FileIds = [
|
||||
FId
|
||||
|| #{transfer := {_, FId}} <- lists:concat(Pages)
|
||||
],
|
||||
?assertEqual(
|
||||
lists:sort([FileId(I) || I <- lists:seq(1, N)]),
|
||||
lists:sort(FileIds)
|
||||
)
|
||||
end,
|
||||
%% less than S3 limit, greater than S3 limit
|
||||
[20, 550]
|
||||
).
|
||||
|
||||
t_invalid_cursor(_Config) ->
|
||||
InvalidUtf8 = <<16#80>>,
|
||||
?assertError(
|
||||
{badarg, cursor},
|
||||
emqx_ft_storage:files(#{following => InvalidUtf8})
|
||||
).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Helper Functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
expected_page_count(PageSize, Total) ->
|
||||
case Total rem PageSize of
|
||||
0 -> Total div PageSize;
|
||||
_ -> Total div PageSize + 1
|
||||
end.
|
||||
|
||||
file_pages(Query) ->
|
||||
case emqx_ft_storage:files(Query) of
|
||||
{ok, #{items := Items, cursor := NewCursor}} ->
|
||||
[Items] ++ file_pages(Query#{following => NewCursor});
|
||||
{ok, #{items := Items}} ->
|
||||
[Items];
|
||||
{error, Error} ->
|
||||
ct:fail("Failed to download files: ~p", [Error])
|
||||
end.
|
||||
|
||||
metadata_field(Field, Meta) ->
|
||||
Key = "x-amz-meta-" ++ Field,
|
||||
case lists:keyfind(Key, 1, Meta) of
|
||||
{Key, Value} -> list_to_binary(Value);
|
||||
false -> false
|
||||
end.
|
||||
|
||||
create_bucket() ->
|
||||
BucketName = emqx_s3_test_helpers:unique_bucket(),
|
||||
_ = application:ensure_all_started(lhttpc),
|
||||
ok = erlcloud_s3:create_bucket(BucketName, emqx_ft_test_helpers:aws_config()),
|
||||
BucketName.
|
|
@ -84,7 +84,9 @@ client_id(Config) ->
|
|||
atom_to_binary(?config(tc, Config), utf8).
|
||||
|
||||
storage(Config) ->
|
||||
emqx_ft_test_helpers:local_storage(Config).
|
||||
RawConfig = #{<<"storage">> => emqx_ft_test_helpers:local_storage(Config)},
|
||||
#{storage := Storage} = emqx_ft_schema:translate(RawConfig),
|
||||
Storage.
|
||||
|
||||
list_files(Config) ->
|
||||
{ok, #{items := Files}} = emqx_ft_storage_fs:files(storage(Config), #{}),
|
||||
|
|
|
@ -40,14 +40,15 @@ init_per_testcase(TC, Config) ->
|
|||
emqx_ft,
|
||||
fun(emqx_ft) ->
|
||||
emqx_ft_test_helpers:load_config(#{
|
||||
storage => #{
|
||||
type => local,
|
||||
segments => #{
|
||||
root => emqx_ft_test_helpers:root(Config, node(), [TC, segments])
|
||||
<<"enable">> => true,
|
||||
<<"storage">> => #{
|
||||
<<"type">> => <<"local">>,
|
||||
<<"segments">> => #{
|
||||
<<"root">> => emqx_ft_test_helpers:root(Config, node(), [TC, segments])
|
||||
},
|
||||
exporter => #{
|
||||
type => local,
|
||||
root => emqx_ft_test_helpers:root(Config, node(), [TC, exports])
|
||||
<<"exporter">> => #{
|
||||
<<"type">> => <<"local">>,
|
||||
<<"root">> => emqx_ft_test_helpers:root(Config, node(), [TC, exports])
|
||||
}
|
||||
}
|
||||
})
|
||||
|
|
|
@ -25,7 +25,7 @@
|
|||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
ok = emqx_common_test_helpers:start_apps([emqx_ft]),
|
||||
ok = emqx_common_test_helpers:start_apps([emqx_ft], emqx_ft_test_helpers:env_handler(Config)),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
|
|
|
@ -21,6 +21,9 @@
|
|||
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
|
||||
-define(S3_HOST, <<"minio">>).
|
||||
-define(S3_PORT, 9000).
|
||||
|
||||
start_additional_node(Config, Name) ->
|
||||
emqx_common_test_helpers:start_slave(
|
||||
Name,
|
||||
|
@ -41,32 +44,41 @@ stop_additional_node(Node) ->
|
|||
env_handler(Config) ->
|
||||
fun
|
||||
(emqx_ft) ->
|
||||
load_config(#{enable => true, storage => local_storage(Config)});
|
||||
load_config(#{<<"enable">> => true, <<"storage">> => local_storage(Config)});
|
||||
(_) ->
|
||||
ok
|
||||
end.
|
||||
|
||||
local_storage(Config) ->
|
||||
local_storage(Config, #{exporter => local}).
|
||||
|
||||
local_storage(Config, Opts) ->
|
||||
#{
|
||||
type => local,
|
||||
segments => #{
|
||||
root => root(Config, node(), [segments])
|
||||
},
|
||||
exporter => #{
|
||||
type => local,
|
||||
root => root(Config, node(), [exports])
|
||||
}
|
||||
<<"type">> => <<"local">>,
|
||||
<<"segments">> => #{<<"root">> => root(Config, node(), [segments])},
|
||||
<<"exporter">> => exporter(Config, Opts)
|
||||
}.
|
||||
|
||||
exporter(Config, #{exporter := local}) ->
|
||||
#{<<"type">> => <<"local">>, <<"root">> => root(Config, node(), [exports])};
|
||||
exporter(_Config, #{exporter := s3, bucket_name := BucketName}) ->
|
||||
BaseConfig = emqx_s3_test_helpers:base_raw_config(tcp),
|
||||
BaseConfig#{
|
||||
<<"bucket">> => list_to_binary(BucketName),
|
||||
<<"type">> => <<"s3">>,
|
||||
<<"host">> => ?S3_HOST,
|
||||
<<"port">> => ?S3_PORT
|
||||
}.
|
||||
|
||||
load_config(Config) ->
|
||||
emqx_common_test_helpers:load_config(emqx_ft_schema, #{file_transfer => Config}).
|
||||
emqx_common_test_helpers:load_config(emqx_ft_schema, #{<<"file_transfer">> => Config}).
|
||||
|
||||
tcp_port(Node) ->
|
||||
{_, Port} = rpc:call(Node, emqx_config, get, [[listeners, tcp, default, bind]]),
|
||||
Port.
|
||||
|
||||
root(Config, Node, Tail) ->
|
||||
filename:join([?config(priv_dir, Config), "file_transfer", Node | Tail]).
|
||||
iolist_to_binary(filename:join([?config(priv_dir, Config), "file_transfer", Node | Tail])).
|
||||
|
||||
start_client(ClientId) ->
|
||||
start_client(ClientId, node()).
|
||||
|
@ -94,9 +106,21 @@ upload_file(ClientId, FileId, Name, Data, Node) ->
|
|||
ct:pal("MetaPayload = ~ts", [MetaPayload]),
|
||||
|
||||
MetaTopic = <<"$file/", FileId/binary, "/init">>,
|
||||
{ok, _} = emqtt:publish(C1, MetaTopic, MetaPayload, 1),
|
||||
{ok, _} = emqtt:publish(C1, <<"$file/", FileId/binary, "/0">>, Data, 1),
|
||||
{ok, #{reason_code_name := success}} = emqtt:publish(C1, MetaTopic, MetaPayload, 1),
|
||||
{ok, #{reason_code_name := success}} = emqtt:publish(
|
||||
C1, <<"$file/", FileId/binary, "/0">>, Data, 1
|
||||
),
|
||||
|
||||
FinTopic = <<"$file/", FileId/binary, "/fin/", (integer_to_binary(Size))/binary>>,
|
||||
{ok, _} = emqtt:publish(C1, FinTopic, <<>>, 1),
|
||||
ok = emqtt:stop(C1).
|
||||
FinResult =
|
||||
case emqtt:publish(C1, FinTopic, <<>>, 1) of
|
||||
{ok, #{reason_code_name := success}} ->
|
||||
ok;
|
||||
{ok, #{reason_code_name := Error}} ->
|
||||
{error, Error}
|
||||
end,
|
||||
ok = emqtt:stop(C1),
|
||||
FinResult.
|
||||
|
||||
aws_config() ->
|
||||
emqx_s3_test_helpers:aws_config(tcp, binary_to_list(?S3_HOST), ?S3_PORT).
|
||||
|
|
|
@ -36,19 +36,24 @@
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
aws_config(tcp) ->
|
||||
aws_config(tcp, ?TCP_HOST, ?TCP_PORT);
|
||||
aws_config(tls) ->
|
||||
aws_config(tls, ?TLS_HOST, ?TLS_PORT).
|
||||
|
||||
aws_config(tcp, Host, Port) ->
|
||||
erlcloud_s3_new(
|
||||
?ACCESS_KEY_ID,
|
||||
?SECRET_ACCESS_KEY,
|
||||
?TCP_HOST,
|
||||
?TCP_PORT,
|
||||
Host,
|
||||
Port,
|
||||
"http://"
|
||||
);
|
||||
aws_config(tls) ->
|
||||
aws_config(tls, Host, Port) ->
|
||||
erlcloud_s3_new(
|
||||
?ACCESS_KEY_ID,
|
||||
?SECRET_ACCESS_KEY,
|
||||
?TLS_HOST,
|
||||
?TLS_PORT,
|
||||
Host,
|
||||
Port,
|
||||
"https://"
|
||||
).
|
||||
|
||||
|
|
|
@ -3,8 +3,8 @@ emqx_ft_schema {
|
|||
enable.desc:
|
||||
"""Enable the File Transfer feature.<br/>
|
||||
Enabling File Transfer implies reserving special MQTT topics in order to serve the protocol.<br/>
|
||||
This toggle does not have an effect neither on the availability of the File Transfer REST API, nor
|
||||
on storage-dependent background activities (e.g. garbage collection)."""
|
||||
This toggle also affects the availability of the File Transfer REST API and
|
||||
storage-dependent background activities (e.g. garbage collection)."""
|
||||
|
||||
init_timeout.desc:
|
||||
"""Timeout for initializing the file transfer.<br/>
|
||||
|
|
Loading…
Reference in New Issue