chore(ft): add s3 exporter tests

This commit is contained in:
Ilya Averyanov 2023-04-27 23:23:52 +03:00
parent 4100c31d4a
commit cff7788b2e
19 changed files with 437 additions and 121 deletions

View File

@ -29,6 +29,7 @@
-export([file_schema/1]). -export([file_schema/1]).
-export([base_path/0]). -export([base_path/0]).
-export([relative_uri/1]). -export([relative_uri/1]).
-export([compose_filters/2]).
-export([filter_check_request/2, filter_check_request_and_translate_body/2]). -export([filter_check_request/2, filter_check_request_and_translate_body/2]).
@ -82,14 +83,30 @@
-type request() :: #{bindings => map(), query_string => map(), body => map()}. -type request() :: #{bindings => map(), query_string => map(), body => map()}.
-type request_meta() :: #{module => module(), path => string(), method => atom()}. -type request_meta() :: #{module => module(), path => string(), method => atom()}.
-type filter_result() :: {ok, request()} | {400, 'BAD_REQUEST', binary()}. %% More exact types are defined in minirest.hrl, but we don't want to include it
-type filter() :: fun((request(), request_meta()) -> filter_result()). %% because it defines a lot of types and they may clash with the types declared locally.
-type status_code() :: pos_integer().
-type error_code() :: atom() | binary().
-type error_message() :: binary().
-type response_body() :: term().
-type headers() :: map().
-type response() ::
status_code()
| {status_code()}
| {status_code(), response_body()}
| {status_code(), headers(), response_body()}
| {status_code(), error_code(), error_message()}.
-type filter_result() :: {ok, request()} | response().
-type filter() :: emqx_maybe:t(fun((request(), request_meta()) -> filter_result())).
-type spec_opts() :: #{ -type spec_opts() :: #{
check_schema => boolean() | filter(), check_schema => boolean() | filter(),
translate_body => boolean(), translate_body => boolean(),
schema_converter => fun((hocon_schema:schema(), Module :: atom()) -> map()), schema_converter => fun((hocon_schema:schema(), Module :: atom()) -> map()),
i18n_lang => atom() | string() | binary() i18n_lang => atom() | string() | binary(),
filter => filter()
}. }.
-type route_path() :: string() | binary(). -type route_path() :: string() | binary().
@ -115,9 +132,9 @@ spec(Module, Options) ->
lists:foldl( lists:foldl(
fun(Path, {AllAcc, AllRefsAcc}) -> fun(Path, {AllAcc, AllRefsAcc}) ->
{OperationId, Specs, Refs} = parse_spec_ref(Module, Path, Options), {OperationId, Specs, Refs} = parse_spec_ref(Module, Path, Options),
CheckSchema = support_check_schema(Options), Opts = #{filter => filter(Options)},
{ {
[{filename:join("/", Path), Specs, OperationId, CheckSchema} | AllAcc], [{filename:join("/", Path), Specs, OperationId, Opts} | AllAcc],
Refs ++ AllRefsAcc Refs ++ AllRefsAcc
} }
end, end,
@ -204,6 +221,21 @@ file_schema(FileName) ->
} }
}. }.
-spec compose_filters(filter(), filter()) -> filter().
compose_filters(undefined, Filter2) ->
Filter2;
compose_filters(Filter1, undefined) ->
Filter1;
compose_filters(Filter1, Filter2) ->
fun(Request, RequestMeta) ->
case Filter1(Request, RequestMeta) of
{ok, Request1} ->
Filter2(Request1, RequestMeta);
Response ->
Response
end
end.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Private functions %% Private functions
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -235,14 +267,22 @@ check_only(Schema, Map, Opts) ->
_ = hocon_tconf:check_plain(Schema, Map, Opts), _ = hocon_tconf:check_plain(Schema, Map, Opts),
Map. Map.
support_check_schema(#{check_schema := true, translate_body := true}) -> filter(Options) ->
#{filter => fun ?MODULE:filter_check_request_and_translate_body/2}; CheckSchemaFilter = check_schema_filter(Options),
support_check_schema(#{check_schema := true}) -> CustomFilter = custom_filter(Options),
#{filter => fun ?MODULE:filter_check_request/2}; compose_filters(CheckSchemaFilter, CustomFilter).
support_check_schema(#{check_schema := Filter}) when is_function(Filter, 2) ->
#{filter => Filter}; custom_filter(Options) ->
support_check_schema(_) -> maps:get(filter, Options, undefined).
#{filter => undefined}.
check_schema_filter(#{check_schema := true, translate_body := true}) ->
fun ?MODULE:filter_check_request_and_translate_body/2;
check_schema_filter(#{check_schema := true}) ->
fun ?MODULE:filter_check_request/2;
check_schema_filter(#{check_schema := Filter}) when is_function(Filter, 2) ->
Filter;
check_schema_filter(_) ->
undefined.
parse_spec_ref(Module, Path, Options) -> parse_spec_ref(Module, Path, Options) ->
Schema = Schema =

1
apps/emqx_ft/docker-ct Normal file
View File

@ -0,0 +1 @@
minio

View File

@ -1,15 +0,0 @@
file_transfer {
storage {
type = local
segments {
root = "{{ platform_data_dir }}/file_transfer/segments"
gc {
interval = "1h"
}
}
exporter {
root = "{{ platform_data_dir }}/file_transfer/exports"
type = local
}
}
}

View File

@ -33,6 +33,9 @@
fields/1 fields/1
]). ]).
%% Minirest filter for checking if file transfer is enabled
-export([check_ft_enabled/2]).
%% API callbacks %% API callbacks
-export([ -export([
'/file_transfer/files'/2, '/file_transfer/files'/2,
@ -44,7 +47,9 @@
namespace() -> "file_transfer". namespace() -> "file_transfer".
api_spec() -> api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). emqx_dashboard_swagger:spec(?MODULE, #{
check_schema => true, filter => fun ?MODULE:check_ft_enabled/2
}).
paths() -> paths() ->
[ [
@ -97,6 +102,14 @@ schema("/file_transfer/files/:clientid/:fileid") ->
} }
}. }.
check_ft_enabled(Params, _Meta) ->
case emqx_ft_conf:enabled() of
true ->
{ok, Params};
false ->
{503, error_msg('SERVICE_UNAVAILABLE', <<"Service unavailable">>)}
end.
'/file_transfer/files'(get, #{ '/file_transfer/files'(get, #{
query_string := QueryString query_string := QueryString
}) -> }) ->

View File

@ -56,7 +56,7 @@ enabled() ->
-spec storage() -> _Storage. -spec storage() -> _Storage.
storage() -> storage() ->
emqx_config:get([file_transfer, storage], undefined). emqx_config:get([file_transfer, storage]).
-spec gc_interval(_Storage) -> emqx_maybe:t(milliseconds()). -spec gc_interval(_Storage) -> emqx_maybe:t(milliseconds()).
gc_interval(Conf = #{type := local}) -> gc_interval(Conf = #{type := local}) ->
@ -88,11 +88,12 @@ store_segment_timeout() ->
-spec load() -> ok. -spec load() -> ok.
load() -> load() ->
ok = on_config_update(#{}, emqx_config:get([file_transfer], #{})), ok = maybe_start(),
emqx_conf:add_handler([file_transfer], ?MODULE). emqx_conf:add_handler([file_transfer], ?MODULE).
-spec unload() -> ok. -spec unload() -> ok.
unload() -> unload() ->
ok = stop(),
emqx_conf:remove_handler([file_transfer]). emqx_conf:remove_handler([file_transfer]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -115,23 +116,26 @@ pre_config_update(_, Req, _Config) ->
post_config_update([file_transfer | _], _Req, NewConfig, OldConfig, _AppEnvs) -> post_config_update([file_transfer | _], _Req, NewConfig, OldConfig, _AppEnvs) ->
on_config_update(OldConfig, NewConfig). on_config_update(OldConfig, NewConfig).
on_config_update(OldConfig, NewConfig) -> on_config_update(#{enable := false}, #{enable := false}) ->
lists:foreach(
fun(ConfKey) ->
on_config_update(
ConfKey,
maps:get(ConfKey, OldConfig, undefined),
maps:get(ConfKey, NewConfig, undefined)
)
end,
[storage, enable]
).
on_config_update(_, Config, Config) ->
ok; ok;
on_config_update(storage, OldConfig, NewConfig) -> on_config_update(#{enable := true, storage := OldStorage}, #{enable := false}) ->
ok = emqx_ft_storage:on_config_update(OldConfig, NewConfig); ok = emqx_ft_storage:on_config_update(OldStorage, undefined),
on_config_update(enable, _, true) -> ok = emqx_ft:unhook();
on_config_update(#{enable := false}, #{enable := true, storage := NewStorage}) ->
ok = emqx_ft_storage:on_config_update(undefined, NewStorage),
ok = emqx_ft:hook(); ok = emqx_ft:hook();
on_config_update(enable, _, false) -> on_config_update(#{enable := true, storage := OldStorage}, #{enable := true, storage := NewStorage}) ->
ok = emqx_ft:unhook(). ok = emqx_ft_storage:on_config_update(OldStorage, NewStorage).
maybe_start() ->
case emqx_config:get([file_transfer]) of
#{enable := true, storage := Storage} ->
ok = emqx_ft_storage:on_config_update(undefined, Storage),
ok = emqx_ft:hook();
_ ->
ok
end.
stop() ->
ok = emqx_ft:unhook(),
ok = emqx_ft_storage:on_config_update(storage(), undefined).

View File

@ -90,6 +90,12 @@
-callback files(storage(), query(Cursor)) -> -callback files(storage(), query(Cursor)) ->
{ok, page(file_info(), Cursor)} | {error, term()}. {ok, page(file_info(), Cursor)} | {error, term()}.
-callback start(emqx_config:config()) -> any().
-callback stop(emqx_config:config()) -> any().
-callback on_config_update(_OldConfig :: emqx_config:config(), _NewConfig :: emqx_config:config()) ->
any().
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% API %% API
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -167,49 +173,28 @@ apply_storage(Storage, Fun, Args) when is_function(Fun) ->
-spec on_config_update(_Old :: emqx_maybe:t(storage()), _New :: emqx_maybe:t(storage())) -> -spec on_config_update(_Old :: emqx_maybe:t(storage()), _New :: emqx_maybe:t(storage())) ->
ok. ok.
on_config_update(Storage, Storage) -> on_config_update(#{type := _} = Storage, #{type := _} = Storage) ->
ok; ok;
on_config_update(#{type := Type} = StorageOld, #{type := Type} = StorageNew) -> on_config_update(#{type := Type} = StorageOld, #{type := Type} = StorageNew) ->
ok = (mod(StorageNew)):on_config_update(StorageOld, StorageNew); ok = (mod(StorageNew)):on_config_update(StorageOld, StorageNew);
on_config_update(StorageOld, StorageNew) -> on_config_update(StorageOld, StorageNew) when
(StorageOld =:= undefined orelse is_map_key(type, StorageOld)) andalso
(StorageNew =:= undefined orelse is_map_key(type, StorageNew))
->
_ = emqx_maybe:apply(fun on_storage_stop/1, StorageOld), _ = emqx_maybe:apply(fun on_storage_stop/1, StorageOld),
_ = emqx_maybe:apply(fun on_storage_start/1, StorageNew), _ = emqx_maybe:apply(fun on_storage_start/1, StorageNew),
_ = emqx_maybe:apply(
fun(Storage) -> (mod(Storage)):on_config_update(StorageOld, StorageNew) end,
StorageNew
),
ok. ok.
on_storage_start(Storage = #{type := _}) ->
lists:foreach(
fun(ChildSpec) ->
{ok, _Child} = supervisor:start_child(emqx_ft_sup, ChildSpec)
end,
child_spec(Storage)
).
on_storage_stop(Storage = #{type := _}) ->
lists:foreach(
fun(#{id := ChildId}) ->
_ = supervisor:terminate_child(emqx_ft_sup, ChildId),
ok = supervisor:delete_child(emqx_ft_sup, ChildId)
end,
child_spec(Storage)
).
child_spec(Storage) ->
try
Mod = mod(Storage),
Mod:child_spec(Storage)
catch
error:disabled -> [];
error:undef -> []
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Local FS API %% Local API
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
on_storage_start(Storage) ->
(mod(Storage)):start(Storage).
on_storage_stop(Storage) ->
(mod(Storage)):stop(Storage).
storage() -> storage() ->
emqx_ft_conf:storage(). emqx_ft_conf:storage().

View File

@ -153,14 +153,14 @@ on_exporter_update(Config, Config) ->
on_exporter_update({ExporterMod, ConfigOld}, {ExporterMod, ConfigNew}) -> on_exporter_update({ExporterMod, ConfigOld}, {ExporterMod, ConfigNew}) ->
ExporterMod:update(ConfigOld, ConfigNew); ExporterMod:update(ConfigOld, ConfigNew);
on_exporter_update(ExporterOld, ExporterNew) -> on_exporter_update(ExporterOld, ExporterNew) ->
_ = emqx_maybe:apply(fun stop_exporter/1, ExporterOld), _ = emqx_maybe:apply(fun stop/1, ExporterOld),
_ = emqx_maybe:apply(fun start_exporter/1, ExporterNew), _ = emqx_maybe:apply(fun start/1, ExporterNew),
ok. ok.
start_exporter({ExporterMod, ExporterOpts}) -> start({ExporterMod, ExporterOpts}) ->
ok = ExporterMod:start(ExporterOpts). ok = ExporterMod:start(ExporterOpts).
stop_exporter({ExporterMod, ExporterOpts}) -> stop({ExporterMod, ExporterOpts}) ->
ok = ExporterMod:stop(ExporterOpts). ok = ExporterMod:stop(ExporterOpts).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------

View File

@ -47,7 +47,9 @@
namespace() -> "file_transfer". namespace() -> "file_transfer".
api_spec() -> api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). emqx_dashboard_swagger:spec(?MODULE, #{
check_schema => true, filter => fun emqx_ft_api:check_ft_enabled/2
}).
paths() -> paths() ->
[ [

View File

@ -149,7 +149,7 @@ list(Client, _Options, #{transfer := Transfer}) ->
end; end;
list(Client, _Options, Query) -> list(Client, _Options, Query) ->
Limit = maps:get(limit, Query, undefined), Limit = maps:get(limit, Query, undefined),
Marker = emqx_maybe:apply(fun decode_cursor/1, maps:get(cursor, Query, undefined)), Marker = emqx_maybe:apply(fun decode_cursor/1, maps:get(following, Query, undefined)),
case list_pages(Client, Marker, Limit, []) of case list_pages(Client, Marker, Limit, []) of
{ok, {Exports, undefined}} -> {ok, {Exports, undefined}} ->
{ok, #{items => Exports}}; {ok, #{items => Exports}};

View File

@ -48,6 +48,8 @@
-export([files/2]). -export([files/2]).
-export([on_config_update/2]). -export([on_config_update/2]).
-export([start/1]).
-export([stop/1]).
-export_type([storage/0]). -export_type([storage/0]).
-export_type([filefrag/1]). -export_type([filefrag/1]).
@ -227,6 +229,27 @@ on_config_update(StorageOld, StorageNew) ->
ok = emqx_ft_storage_fs_gc:reset(StorageNew), ok = emqx_ft_storage_fs_gc:reset(StorageNew),
emqx_ft_storage_exporter:on_config_update(StorageOld, StorageNew). emqx_ft_storage_exporter:on_config_update(StorageOld, StorageNew).
start(Storage) ->
ok = lists:foreach(
fun(ChildSpec) ->
{ok, _Child} = supervisor:start_child(emqx_ft_sup, ChildSpec)
end,
child_spec(Storage)
),
ok = emqx_ft_storage_exporter:on_config_update(undefined, Storage),
ok.
stop(Storage) ->
ok = emqx_ft_storage_exporter:on_config_update(Storage, undefined),
ok = lists:foreach(
fun(#{id := ChildId}) ->
_ = supervisor:terminate_child(emqx_ft_sup, ChildId),
ok = supervisor:delete_child(emqx_ft_sup, ChildId)
end,
child_spec(Storage)
),
ok.
%% %%
-spec transfers(storage()) -> -spec transfers(storage()) ->

View File

@ -83,6 +83,8 @@ mk_cluster_specs(Config) ->
init_per_testcase(Case, Config) -> init_per_testcase(Case, Config) ->
[{tc, Case} | Config]. [{tc, Case} | Config].
end_per_testcase(t_ft_disabled, _Config) ->
emqx_config:put([file_transfer, enable], true);
end_per_testcase(_Case, _Config) -> end_per_testcase(_Case, _Config) ->
ok. ok.
@ -223,6 +225,35 @@ t_list_files_paging(Config) ->
?assertEqual(Files, PageThrough(#{limit => 8}, [])), ?assertEqual(Files, PageThrough(#{limit => 8}, [])),
?assertEqual(Files, PageThrough(#{limit => NFiles}, [])). ?assertEqual(Files, PageThrough(#{limit => NFiles}, [])).
t_ft_disabled(_Config) ->
?assertMatch(
{ok, 200, _},
request_json(get, uri(["file_transfer", "files"]))
),
?assertMatch(
{ok, 400, _},
request_json(
get,
uri(["file_transfer", "file"]) ++ query(#{fileref => <<"f1">>})
)
),
ok = emqx_config:put([file_transfer, enable], false),
?assertMatch(
{ok, 503, _},
request_json(get, uri(["file_transfer", "files"]))
),
?assertMatch(
{ok, 503, _},
request_json(
get,
uri(["file_transfer", "file"]) ++ query(#{fileref => <<"f1">>, node => node()})
)
).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Helpers %% Helpers
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -32,9 +32,10 @@ end_per_suite(_Config) ->
ok. ok.
init_per_testcase(_Case, Config) -> init_per_testcase(_Case, Config) ->
% NOTE: running each testcase with clean config
_ = emqx_config:save_schema_mod_and_names(emqx_ft_schema), _ = emqx_config:save_schema_mod_and_names(emqx_ft_schema),
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_ft], fun(_) -> ok end), ok = emqx_common_test_helpers:start_apps(
[emqx_conf, emqx_ft], emqx_ft_test_helpers:env_handler(Config)
),
{ok, _} = emqx:update_config([rpc, port_discovery], manual), {ok, _} = emqx:update_config([rpc, port_discovery], manual),
Config. Config.

View File

@ -0,0 +1,199 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ft_storage_exporter_s3_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("common_test/include/ct.hrl").
-include_lib("stdlib/include/assert.hrl").
-define(assertS3Data(Data, Url),
case httpc:request(Url) of
{ok, {{_StatusLine, 200, "OK"}, _Headers, Body}} ->
?assertEqual(Data, list_to_binary(Body), "S3 data mismatch");
OtherResponse ->
ct:fail("Unexpected response: ~p", [OtherResponse])
end
).
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
Config.
end_per_suite(_Config) ->
ok.
set_special_configs(Config) ->
fun
(emqx_ft) ->
Storage = emqx_ft_test_helpers:local_storage(Config, #{
exporter => s3, bucket_name => ?config(bucket_name, Config)
}),
emqx_ft_test_helpers:load_config(#{<<"enable">> => true, <<"storage">> => Storage});
(_) ->
ok
end.
init_per_testcase(Case, Config0) ->
ClientId = atom_to_binary(Case),
BucketName = create_bucket(),
Config1 = [{bucket_name, BucketName}, {clientid, ClientId} | Config0],
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_ft], set_special_configs(Config1)),
Config1.
end_per_testcase(_Case, _Config) ->
ok = emqx_common_test_helpers:stop_apps([emqx_ft, emqx_conf]),
ok.
%%--------------------------------------------------------------------
%% Test Cases
%%-------------------------------------------------------------------
t_happy_path(Config) ->
ClientId = ?config(clientid, Config),
FileId = <<"🌚"/utf8>>,
Name = "cool_name",
Data = <<"data"/utf8>>,
?assertEqual(
ok,
emqx_ft_test_helpers:upload_file(ClientId, FileId, Name, Data)
),
{ok, #{items := [#{uri := Uri}]}} = emqx_ft_storage:files(),
?assertS3Data(Data, Uri),
Key = binary_to_list(ClientId) ++ "/" ++ binary_to_list(FileId) ++ "/" ++ Name,
Meta = erlcloud_s3:get_object_metadata(
?config(bucket_name, Config), Key, emqx_ft_test_helpers:aws_config()
),
?assertEqual(
ClientId,
metadata_field("clientid", Meta)
),
?assertEqual(
FileId,
metadata_field("fileid", Meta)
),
NameBin = list_to_binary(Name),
?assertMatch(
#{
<<"name">> := NameBin,
<<"size">> := 4
},
emqx_utils_json:decode(metadata_field("filemeta", Meta), [return_maps])
).
t_upload_error(Config) ->
ClientId = ?config(clientid, Config),
FileId = <<"🌚"/utf8>>,
Name = "cool_name",
Data = <<"data"/utf8>>,
{ok, _} = emqx_conf:update(
[file_transfer, storage, exporter, bucket], <<"invalid-bucket">>, #{}
),
?assertEqual(
{error, unspecified_error},
emqx_ft_test_helpers:upload_file(ClientId, FileId, Name, Data)
).
t_paging(Config) ->
ClientId = ?config(clientid, Config),
N = 1050,
FileId = fun integer_to_binary/1,
Name = "cool_name",
Data = fun integer_to_binary/1,
ok = lists:foreach(
fun(I) ->
ok = emqx_ft_test_helpers:upload_file(ClientId, FileId(I), Name, Data(I))
end,
lists:seq(1, N)
),
{ok, #{items := [#{uri := Uri}]}} = emqx_ft_storage:files(#{transfer => {ClientId, FileId(123)}}),
?assertS3Data(Data(123), Uri),
lists:foreach(
fun(PageSize) ->
Pages = file_pages(#{limit => PageSize}),
?assertEqual(
expected_page_count(PageSize, N),
length(Pages)
),
FileIds = [
FId
|| #{transfer := {_, FId}} <- lists:concat(Pages)
],
?assertEqual(
lists:sort([FileId(I) || I <- lists:seq(1, N)]),
lists:sort(FileIds)
)
end,
%% less than S3 limit, greater than S3 limit
[20, 550]
).
t_invalid_cursor(_Config) ->
InvalidUtf8 = <<16#80>>,
?assertError(
{badarg, cursor},
emqx_ft_storage:files(#{following => InvalidUtf8})
).
%%--------------------------------------------------------------------
%% Helper Functions
%%--------------------------------------------------------------------
expected_page_count(PageSize, Total) ->
case Total rem PageSize of
0 -> Total div PageSize;
_ -> Total div PageSize + 1
end.
file_pages(Query) ->
case emqx_ft_storage:files(Query) of
{ok, #{items := Items, cursor := NewCursor}} ->
[Items] ++ file_pages(Query#{following => NewCursor});
{ok, #{items := Items}} ->
[Items];
{error, Error} ->
ct:fail("Failed to download files: ~p", [Error])
end.
metadata_field(Field, Meta) ->
Key = "x-amz-meta-" ++ Field,
case lists:keyfind(Key, 1, Meta) of
{Key, Value} -> list_to_binary(Value);
false -> false
end.
create_bucket() ->
BucketName = emqx_s3_test_helpers:unique_bucket(),
_ = application:ensure_all_started(lhttpc),
ok = erlcloud_s3:create_bucket(BucketName, emqx_ft_test_helpers:aws_config()),
BucketName.

View File

@ -84,7 +84,9 @@ client_id(Config) ->
atom_to_binary(?config(tc, Config), utf8). atom_to_binary(?config(tc, Config), utf8).
storage(Config) -> storage(Config) ->
emqx_ft_test_helpers:local_storage(Config). RawConfig = #{<<"storage">> => emqx_ft_test_helpers:local_storage(Config)},
#{storage := Storage} = emqx_ft_schema:translate(RawConfig),
Storage.
list_files(Config) -> list_files(Config) ->
{ok, #{items := Files}} = emqx_ft_storage_fs:files(storage(Config), #{}), {ok, #{items := Files}} = emqx_ft_storage_fs:files(storage(Config), #{}),

View File

@ -40,14 +40,15 @@ init_per_testcase(TC, Config) ->
emqx_ft, emqx_ft,
fun(emqx_ft) -> fun(emqx_ft) ->
emqx_ft_test_helpers:load_config(#{ emqx_ft_test_helpers:load_config(#{
storage => #{ <<"enable">> => true,
type => local, <<"storage">> => #{
segments => #{ <<"type">> => <<"local">>,
root => emqx_ft_test_helpers:root(Config, node(), [TC, segments]) <<"segments">> => #{
<<"root">> => emqx_ft_test_helpers:root(Config, node(), [TC, segments])
}, },
exporter => #{ <<"exporter">> => #{
type => local, <<"type">> => <<"local">>,
root => emqx_ft_test_helpers:root(Config, node(), [TC, exports]) <<"root">> => emqx_ft_test_helpers:root(Config, node(), [TC, exports])
} }
} }
}) })

View File

@ -25,7 +25,7 @@
all() -> emqx_common_test_helpers:all(?MODULE). all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
ok = emqx_common_test_helpers:start_apps([emqx_ft]), ok = emqx_common_test_helpers:start_apps([emqx_ft], emqx_ft_test_helpers:env_handler(Config)),
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->

View File

@ -21,6 +21,9 @@
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-define(S3_HOST, <<"minio">>).
-define(S3_PORT, 9000).
start_additional_node(Config, Name) -> start_additional_node(Config, Name) ->
emqx_common_test_helpers:start_slave( emqx_common_test_helpers:start_slave(
Name, Name,
@ -41,32 +44,41 @@ stop_additional_node(Node) ->
env_handler(Config) -> env_handler(Config) ->
fun fun
(emqx_ft) -> (emqx_ft) ->
load_config(#{enable => true, storage => local_storage(Config)}); load_config(#{<<"enable">> => true, <<"storage">> => local_storage(Config)});
(_) -> (_) ->
ok ok
end. end.
local_storage(Config) -> local_storage(Config) ->
local_storage(Config, #{exporter => local}).
local_storage(Config, Opts) ->
#{ #{
type => local, <<"type">> => <<"local">>,
segments => #{ <<"segments">> => #{<<"root">> => root(Config, node(), [segments])},
root => root(Config, node(), [segments]) <<"exporter">> => exporter(Config, Opts)
}, }.
exporter => #{
type => local, exporter(Config, #{exporter := local}) ->
root => root(Config, node(), [exports]) #{<<"type">> => <<"local">>, <<"root">> => root(Config, node(), [exports])};
} exporter(_Config, #{exporter := s3, bucket_name := BucketName}) ->
BaseConfig = emqx_s3_test_helpers:base_raw_config(tcp),
BaseConfig#{
<<"bucket">> => list_to_binary(BucketName),
<<"type">> => <<"s3">>,
<<"host">> => ?S3_HOST,
<<"port">> => ?S3_PORT
}. }.
load_config(Config) -> load_config(Config) ->
emqx_common_test_helpers:load_config(emqx_ft_schema, #{file_transfer => Config}). emqx_common_test_helpers:load_config(emqx_ft_schema, #{<<"file_transfer">> => Config}).
tcp_port(Node) -> tcp_port(Node) ->
{_, Port} = rpc:call(Node, emqx_config, get, [[listeners, tcp, default, bind]]), {_, Port} = rpc:call(Node, emqx_config, get, [[listeners, tcp, default, bind]]),
Port. Port.
root(Config, Node, Tail) -> root(Config, Node, Tail) ->
filename:join([?config(priv_dir, Config), "file_transfer", Node | Tail]). iolist_to_binary(filename:join([?config(priv_dir, Config), "file_transfer", Node | Tail])).
start_client(ClientId) -> start_client(ClientId) ->
start_client(ClientId, node()). start_client(ClientId, node()).
@ -94,9 +106,21 @@ upload_file(ClientId, FileId, Name, Data, Node) ->
ct:pal("MetaPayload = ~ts", [MetaPayload]), ct:pal("MetaPayload = ~ts", [MetaPayload]),
MetaTopic = <<"$file/", FileId/binary, "/init">>, MetaTopic = <<"$file/", FileId/binary, "/init">>,
{ok, _} = emqtt:publish(C1, MetaTopic, MetaPayload, 1), {ok, #{reason_code_name := success}} = emqtt:publish(C1, MetaTopic, MetaPayload, 1),
{ok, _} = emqtt:publish(C1, <<"$file/", FileId/binary, "/0">>, Data, 1), {ok, #{reason_code_name := success}} = emqtt:publish(
C1, <<"$file/", FileId/binary, "/0">>, Data, 1
),
FinTopic = <<"$file/", FileId/binary, "/fin/", (integer_to_binary(Size))/binary>>, FinTopic = <<"$file/", FileId/binary, "/fin/", (integer_to_binary(Size))/binary>>,
{ok, _} = emqtt:publish(C1, FinTopic, <<>>, 1), FinResult =
ok = emqtt:stop(C1). case emqtt:publish(C1, FinTopic, <<>>, 1) of
{ok, #{reason_code_name := success}} ->
ok;
{ok, #{reason_code_name := Error}} ->
{error, Error}
end,
ok = emqtt:stop(C1),
FinResult.
aws_config() ->
emqx_s3_test_helpers:aws_config(tcp, binary_to_list(?S3_HOST), ?S3_PORT).

View File

@ -36,19 +36,24 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
aws_config(tcp) -> aws_config(tcp) ->
aws_config(tcp, ?TCP_HOST, ?TCP_PORT);
aws_config(tls) ->
aws_config(tls, ?TLS_HOST, ?TLS_PORT).
aws_config(tcp, Host, Port) ->
erlcloud_s3_new( erlcloud_s3_new(
?ACCESS_KEY_ID, ?ACCESS_KEY_ID,
?SECRET_ACCESS_KEY, ?SECRET_ACCESS_KEY,
?TCP_HOST, Host,
?TCP_PORT, Port,
"http://" "http://"
); );
aws_config(tls) -> aws_config(tls, Host, Port) ->
erlcloud_s3_new( erlcloud_s3_new(
?ACCESS_KEY_ID, ?ACCESS_KEY_ID,
?SECRET_ACCESS_KEY, ?SECRET_ACCESS_KEY,
?TLS_HOST, Host,
?TLS_PORT, Port,
"https://" "https://"
). ).

View File

@ -3,8 +3,8 @@ emqx_ft_schema {
enable.desc: enable.desc:
"""Enable the File Transfer feature.<br/> """Enable the File Transfer feature.<br/>
Enabling File Transfer implies reserving special MQTT topics in order to serve the protocol.<br/> Enabling File Transfer implies reserving special MQTT topics in order to serve the protocol.<br/>
This toggle does not have an effect neither on the availability of the File Transfer REST API, nor This toggle also affects the availability of the File Transfer REST API and
on storage-dependent background activities (e.g. garbage collection).""" storage-dependent background activities (e.g. garbage collection)."""
init_timeout.desc: init_timeout.desc:
"""Timeout for initializing the file transfer.<br/> """Timeout for initializing the file transfer.<br/>