diff --git a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl index e42d9c3ae..6422d627c 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl @@ -29,6 +29,7 @@ -export([file_schema/1]). -export([base_path/0]). -export([relative_uri/1]). +-export([compose_filters/2]). -export([filter_check_request/2, filter_check_request_and_translate_body/2]). @@ -82,14 +83,30 @@ -type request() :: #{bindings => map(), query_string => map(), body => map()}. -type request_meta() :: #{module => module(), path => string(), method => atom()}. --type filter_result() :: {ok, request()} | {400, 'BAD_REQUEST', binary()}. --type filter() :: fun((request(), request_meta()) -> filter_result()). +%% More exact types are defined in minirest.hrl, but we don't want to include it +%% because it defines a lot of types and they may clash with the types declared locally. +-type status_code() :: pos_integer(). +-type error_code() :: atom() | binary(). +-type error_message() :: binary(). +-type response_body() :: term(). +-type headers() :: map(). + +-type response() :: + status_code() + | {status_code()} + | {status_code(), response_body()} + | {status_code(), headers(), response_body()} + | {status_code(), error_code(), error_message()}. + +-type filter_result() :: {ok, request()} | response(). +-type filter() :: emqx_maybe:t(fun((request(), request_meta()) -> filter_result())). -type spec_opts() :: #{ check_schema => boolean() | filter(), translate_body => boolean(), schema_converter => fun((hocon_schema:schema(), Module :: atom()) -> map()), - i18n_lang => atom() | string() | binary() + i18n_lang => atom() | string() | binary(), + filter => filter() }. -type route_path() :: string() | binary(). @@ -115,9 +132,9 @@ spec(Module, Options) -> lists:foldl( fun(Path, {AllAcc, AllRefsAcc}) -> {OperationId, Specs, Refs} = parse_spec_ref(Module, Path, Options), - CheckSchema = support_check_schema(Options), + Opts = #{filter => filter(Options)}, { - [{filename:join("/", Path), Specs, OperationId, CheckSchema} | AllAcc], + [{filename:join("/", Path), Specs, OperationId, Opts} | AllAcc], Refs ++ AllRefsAcc } end, @@ -204,6 +221,21 @@ file_schema(FileName) -> } }. +-spec compose_filters(filter(), filter()) -> filter(). +compose_filters(undefined, Filter2) -> + Filter2; +compose_filters(Filter1, undefined) -> + Filter1; +compose_filters(Filter1, Filter2) -> + fun(Request, RequestMeta) -> + case Filter1(Request, RequestMeta) of + {ok, Request1} -> + Filter2(Request1, RequestMeta); + Response -> + Response + end + end. + %%------------------------------------------------------------------------------ %% Private functions %%------------------------------------------------------------------------------ @@ -235,14 +267,22 @@ check_only(Schema, Map, Opts) -> _ = hocon_tconf:check_plain(Schema, Map, Opts), Map. -support_check_schema(#{check_schema := true, translate_body := true}) -> - #{filter => fun ?MODULE:filter_check_request_and_translate_body/2}; -support_check_schema(#{check_schema := true}) -> - #{filter => fun ?MODULE:filter_check_request/2}; -support_check_schema(#{check_schema := Filter}) when is_function(Filter, 2) -> - #{filter => Filter}; -support_check_schema(_) -> - #{filter => undefined}. +filter(Options) -> + CheckSchemaFilter = check_schema_filter(Options), + CustomFilter = custom_filter(Options), + compose_filters(CheckSchemaFilter, CustomFilter). + +custom_filter(Options) -> + maps:get(filter, Options, undefined). + +check_schema_filter(#{check_schema := true, translate_body := true}) -> + fun ?MODULE:filter_check_request_and_translate_body/2; +check_schema_filter(#{check_schema := true}) -> + fun ?MODULE:filter_check_request/2; +check_schema_filter(#{check_schema := Filter}) when is_function(Filter, 2) -> + Filter; +check_schema_filter(_) -> + undefined. parse_spec_ref(Module, Path, Options) -> Schema = diff --git a/apps/emqx_ft/docker-ct b/apps/emqx_ft/docker-ct new file mode 100644 index 000000000..36f9d86d3 --- /dev/null +++ b/apps/emqx_ft/docker-ct @@ -0,0 +1 @@ +minio diff --git a/apps/emqx_ft/etc/emqx_ft.conf b/apps/emqx_ft/etc/emqx_ft.conf index ec48d4e69..e69de29bb 100644 --- a/apps/emqx_ft/etc/emqx_ft.conf +++ b/apps/emqx_ft/etc/emqx_ft.conf @@ -1,15 +0,0 @@ -file_transfer { - storage { - type = local - segments { - root = "{{ platform_data_dir }}/file_transfer/segments" - gc { - interval = "1h" - } - } - exporter { - root = "{{ platform_data_dir }}/file_transfer/exports" - type = local - } - } -} diff --git a/apps/emqx_ft/src/emqx_ft_api.erl b/apps/emqx_ft/src/emqx_ft_api.erl index 7e1ed97ad..61a6a2c93 100644 --- a/apps/emqx_ft/src/emqx_ft_api.erl +++ b/apps/emqx_ft/src/emqx_ft_api.erl @@ -33,6 +33,9 @@ fields/1 ]). +%% Minirest filter for checking if file transfer is enabled +-export([check_ft_enabled/2]). + %% API callbacks -export([ '/file_transfer/files'/2, @@ -44,7 +47,9 @@ namespace() -> "file_transfer". api_spec() -> - emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). + emqx_dashboard_swagger:spec(?MODULE, #{ + check_schema => true, filter => fun ?MODULE:check_ft_enabled/2 + }). paths() -> [ @@ -97,6 +102,14 @@ schema("/file_transfer/files/:clientid/:fileid") -> } }. +check_ft_enabled(Params, _Meta) -> + case emqx_ft_conf:enabled() of + true -> + {ok, Params}; + false -> + {503, error_msg('SERVICE_UNAVAILABLE', <<"Service unavailable">>)} + end. + '/file_transfer/files'(get, #{ query_string := QueryString }) -> diff --git a/apps/emqx_ft/src/emqx_ft_conf.erl b/apps/emqx_ft/src/emqx_ft_conf.erl index 90b59c8d1..61f639271 100644 --- a/apps/emqx_ft/src/emqx_ft_conf.erl +++ b/apps/emqx_ft/src/emqx_ft_conf.erl @@ -56,7 +56,7 @@ enabled() -> -spec storage() -> _Storage. storage() -> - emqx_config:get([file_transfer, storage], undefined). + emqx_config:get([file_transfer, storage]). -spec gc_interval(_Storage) -> emqx_maybe:t(milliseconds()). gc_interval(Conf = #{type := local}) -> @@ -88,11 +88,12 @@ store_segment_timeout() -> -spec load() -> ok. load() -> - ok = on_config_update(#{}, emqx_config:get([file_transfer], #{})), + ok = maybe_start(), emqx_conf:add_handler([file_transfer], ?MODULE). -spec unload() -> ok. unload() -> + ok = stop(), emqx_conf:remove_handler([file_transfer]). %%-------------------------------------------------------------------- @@ -115,23 +116,26 @@ pre_config_update(_, Req, _Config) -> post_config_update([file_transfer | _], _Req, NewConfig, OldConfig, _AppEnvs) -> on_config_update(OldConfig, NewConfig). -on_config_update(OldConfig, NewConfig) -> - lists:foreach( - fun(ConfKey) -> - on_config_update( - ConfKey, - maps:get(ConfKey, OldConfig, undefined), - maps:get(ConfKey, NewConfig, undefined) - ) - end, - [storage, enable] - ). - -on_config_update(_, Config, Config) -> +on_config_update(#{enable := false}, #{enable := false}) -> ok; -on_config_update(storage, OldConfig, NewConfig) -> - ok = emqx_ft_storage:on_config_update(OldConfig, NewConfig); -on_config_update(enable, _, true) -> +on_config_update(#{enable := true, storage := OldStorage}, #{enable := false}) -> + ok = emqx_ft_storage:on_config_update(OldStorage, undefined), + ok = emqx_ft:unhook(); +on_config_update(#{enable := false}, #{enable := true, storage := NewStorage}) -> + ok = emqx_ft_storage:on_config_update(undefined, NewStorage), ok = emqx_ft:hook(); -on_config_update(enable, _, false) -> - ok = emqx_ft:unhook(). +on_config_update(#{enable := true, storage := OldStorage}, #{enable := true, storage := NewStorage}) -> + ok = emqx_ft_storage:on_config_update(OldStorage, NewStorage). + +maybe_start() -> + case emqx_config:get([file_transfer]) of + #{enable := true, storage := Storage} -> + ok = emqx_ft_storage:on_config_update(undefined, Storage), + ok = emqx_ft:hook(); + _ -> + ok + end. + +stop() -> + ok = emqx_ft:unhook(), + ok = emqx_ft_storage:on_config_update(storage(), undefined). diff --git a/apps/emqx_ft/src/emqx_ft_storage.erl b/apps/emqx_ft/src/emqx_ft_storage.erl index 5364211a4..5ec342585 100644 --- a/apps/emqx_ft/src/emqx_ft_storage.erl +++ b/apps/emqx_ft/src/emqx_ft_storage.erl @@ -90,6 +90,12 @@ -callback files(storage(), query(Cursor)) -> {ok, page(file_info(), Cursor)} | {error, term()}. +-callback start(emqx_config:config()) -> any(). +-callback stop(emqx_config:config()) -> any(). + +-callback on_config_update(_OldConfig :: emqx_config:config(), _NewConfig :: emqx_config:config()) -> + any(). + %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- @@ -167,49 +173,28 @@ apply_storage(Storage, Fun, Args) when is_function(Fun) -> -spec on_config_update(_Old :: emqx_maybe:t(storage()), _New :: emqx_maybe:t(storage())) -> ok. -on_config_update(Storage, Storage) -> +on_config_update(#{type := _} = Storage, #{type := _} = Storage) -> ok; on_config_update(#{type := Type} = StorageOld, #{type := Type} = StorageNew) -> ok = (mod(StorageNew)):on_config_update(StorageOld, StorageNew); -on_config_update(StorageOld, StorageNew) -> +on_config_update(StorageOld, StorageNew) when + (StorageOld =:= undefined orelse is_map_key(type, StorageOld)) andalso + (StorageNew =:= undefined orelse is_map_key(type, StorageNew)) +-> _ = emqx_maybe:apply(fun on_storage_stop/1, StorageOld), _ = emqx_maybe:apply(fun on_storage_start/1, StorageNew), - _ = emqx_maybe:apply( - fun(Storage) -> (mod(Storage)):on_config_update(StorageOld, StorageNew) end, - StorageNew - ), ok. -on_storage_start(Storage = #{type := _}) -> - lists:foreach( - fun(ChildSpec) -> - {ok, _Child} = supervisor:start_child(emqx_ft_sup, ChildSpec) - end, - child_spec(Storage) - ). - -on_storage_stop(Storage = #{type := _}) -> - lists:foreach( - fun(#{id := ChildId}) -> - _ = supervisor:terminate_child(emqx_ft_sup, ChildId), - ok = supervisor:delete_child(emqx_ft_sup, ChildId) - end, - child_spec(Storage) - ). - -child_spec(Storage) -> - try - Mod = mod(Storage), - Mod:child_spec(Storage) - catch - error:disabled -> []; - error:undef -> [] - end. - %%-------------------------------------------------------------------- -%% Local FS API +%% Local API %%-------------------------------------------------------------------- +on_storage_start(Storage) -> + (mod(Storage)):start(Storage). + +on_storage_stop(Storage) -> + (mod(Storage)):stop(Storage). + storage() -> emqx_ft_conf:storage(). diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter.erl index fb44093c1..e000fe5c6 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_exporter.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_exporter.erl @@ -153,14 +153,14 @@ on_exporter_update(Config, Config) -> on_exporter_update({ExporterMod, ConfigOld}, {ExporterMod, ConfigNew}) -> ExporterMod:update(ConfigOld, ConfigNew); on_exporter_update(ExporterOld, ExporterNew) -> - _ = emqx_maybe:apply(fun stop_exporter/1, ExporterOld), - _ = emqx_maybe:apply(fun start_exporter/1, ExporterNew), + _ = emqx_maybe:apply(fun stop/1, ExporterOld), + _ = emqx_maybe:apply(fun start/1, ExporterNew), ok. -start_exporter({ExporterMod, ExporterOpts}) -> +start({ExporterMod, ExporterOpts}) -> ok = ExporterMod:start(ExporterOpts). -stop_exporter({ExporterMod, ExporterOpts}) -> +stop({ExporterMod, ExporterOpts}) -> ok = ExporterMod:stop(ExporterOpts). %%------------------------------------------------------------------------------ diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter_fs_api.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs_api.erl index f1a8c6dae..a278b01a5 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_exporter_fs_api.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs_api.erl @@ -47,7 +47,9 @@ namespace() -> "file_transfer". api_spec() -> - emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). + emqx_dashboard_swagger:spec(?MODULE, #{ + check_schema => true, filter => fun emqx_ft_api:check_ft_enabled/2 + }). paths() -> [ diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl index c7110c74a..5c5aade86 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl @@ -149,7 +149,7 @@ list(Client, _Options, #{transfer := Transfer}) -> end; list(Client, _Options, Query) -> Limit = maps:get(limit, Query, undefined), - Marker = emqx_maybe:apply(fun decode_cursor/1, maps:get(cursor, Query, undefined)), + Marker = emqx_maybe:apply(fun decode_cursor/1, maps:get(following, Query, undefined)), case list_pages(Client, Marker, Limit, []) of {ok, {Exports, undefined}} -> {ok, #{items => Exports}}; diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_fs.erl index bd88727c0..cdc86d218 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs.erl @@ -48,6 +48,8 @@ -export([files/2]). -export([on_config_update/2]). +-export([start/1]). +-export([stop/1]). -export_type([storage/0]). -export_type([filefrag/1]). @@ -227,6 +229,27 @@ on_config_update(StorageOld, StorageNew) -> ok = emqx_ft_storage_fs_gc:reset(StorageNew), emqx_ft_storage_exporter:on_config_update(StorageOld, StorageNew). +start(Storage) -> + ok = lists:foreach( + fun(ChildSpec) -> + {ok, _Child} = supervisor:start_child(emqx_ft_sup, ChildSpec) + end, + child_spec(Storage) + ), + ok = emqx_ft_storage_exporter:on_config_update(undefined, Storage), + ok. + +stop(Storage) -> + ok = emqx_ft_storage_exporter:on_config_update(Storage, undefined), + ok = lists:foreach( + fun(#{id := ChildId}) -> + _ = supervisor:terminate_child(emqx_ft_sup, ChildId), + ok = supervisor:delete_child(emqx_ft_sup, ChildId) + end, + child_spec(Storage) + ), + ok. + %% -spec transfers(storage()) -> diff --git a/apps/emqx_ft/test/emqx_ft_api_SUITE.erl b/apps/emqx_ft/test/emqx_ft_api_SUITE.erl index 4efa31205..f69e13a6d 100644 --- a/apps/emqx_ft/test/emqx_ft_api_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_api_SUITE.erl @@ -83,6 +83,8 @@ mk_cluster_specs(Config) -> init_per_testcase(Case, Config) -> [{tc, Case} | Config]. +end_per_testcase(t_ft_disabled, _Config) -> + emqx_config:put([file_transfer, enable], true); end_per_testcase(_Case, _Config) -> ok. @@ -223,6 +225,35 @@ t_list_files_paging(Config) -> ?assertEqual(Files, PageThrough(#{limit => 8}, [])), ?assertEqual(Files, PageThrough(#{limit => NFiles}, [])). +t_ft_disabled(_Config) -> + ?assertMatch( + {ok, 200, _}, + request_json(get, uri(["file_transfer", "files"])) + ), + + ?assertMatch( + {ok, 400, _}, + request_json( + get, + uri(["file_transfer", "file"]) ++ query(#{fileref => <<"f1">>}) + ) + ), + + ok = emqx_config:put([file_transfer, enable], false), + + ?assertMatch( + {ok, 503, _}, + request_json(get, uri(["file_transfer", "files"])) + ), + + ?assertMatch( + {ok, 503, _}, + request_json( + get, + uri(["file_transfer", "file"]) ++ query(#{fileref => <<"f1">>, node => node()}) + ) + ). + %%-------------------------------------------------------------------- %% Helpers %%-------------------------------------------------------------------- diff --git a/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl b/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl index 106c34702..bc9eb5d98 100644 --- a/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl @@ -32,9 +32,10 @@ end_per_suite(_Config) -> ok. init_per_testcase(_Case, Config) -> - % NOTE: running each testcase with clean config _ = emqx_config:save_schema_mod_and_names(emqx_ft_schema), - ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_ft], fun(_) -> ok end), + ok = emqx_common_test_helpers:start_apps( + [emqx_conf, emqx_ft], emqx_ft_test_helpers:env_handler(Config) + ), {ok, _} = emqx:update_config([rpc, port_discovery], manual), Config. diff --git a/apps/emqx_ft/test/emqx_ft_storage_exporter_s3_SUITE.erl b/apps/emqx_ft/test/emqx_ft_storage_exporter_s3_SUITE.erl new file mode 100644 index 000000000..86de74ee0 --- /dev/null +++ b/apps/emqx_ft/test/emqx_ft_storage_exporter_s3_SUITE.erl @@ -0,0 +1,199 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ft_storage_exporter_s3_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("stdlib/include/assert.hrl"). + +-define(assertS3Data(Data, Url), + case httpc:request(Url) of + {ok, {{_StatusLine, 200, "OK"}, _Headers, Body}} -> + ?assertEqual(Data, list_to_binary(Body), "S3 data mismatch"); + OtherResponse -> + ct:fail("Unexpected response: ~p", [OtherResponse]) + end +). + +all() -> emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + Config. +end_per_suite(_Config) -> + ok. + +set_special_configs(Config) -> + fun + (emqx_ft) -> + Storage = emqx_ft_test_helpers:local_storage(Config, #{ + exporter => s3, bucket_name => ?config(bucket_name, Config) + }), + emqx_ft_test_helpers:load_config(#{<<"enable">> => true, <<"storage">> => Storage}); + (_) -> + ok + end. + +init_per_testcase(Case, Config0) -> + ClientId = atom_to_binary(Case), + BucketName = create_bucket(), + Config1 = [{bucket_name, BucketName}, {clientid, ClientId} | Config0], + ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_ft], set_special_configs(Config1)), + Config1. +end_per_testcase(_Case, _Config) -> + ok = emqx_common_test_helpers:stop_apps([emqx_ft, emqx_conf]), + ok. + +%%-------------------------------------------------------------------- +%% Test Cases +%%------------------------------------------------------------------- + +t_happy_path(Config) -> + ClientId = ?config(clientid, Config), + + FileId = <<"🌚"/utf8>>, + Name = "cool_name", + Data = <<"data"/utf8>>, + + ?assertEqual( + ok, + emqx_ft_test_helpers:upload_file(ClientId, FileId, Name, Data) + ), + + {ok, #{items := [#{uri := Uri}]}} = emqx_ft_storage:files(), + + ?assertS3Data(Data, Uri), + + Key = binary_to_list(ClientId) ++ "/" ++ binary_to_list(FileId) ++ "/" ++ Name, + Meta = erlcloud_s3:get_object_metadata( + ?config(bucket_name, Config), Key, emqx_ft_test_helpers:aws_config() + ), + + ?assertEqual( + ClientId, + metadata_field("clientid", Meta) + ), + + ?assertEqual( + FileId, + metadata_field("fileid", Meta) + ), + + NameBin = list_to_binary(Name), + ?assertMatch( + #{ + <<"name">> := NameBin, + <<"size">> := 4 + }, + emqx_utils_json:decode(metadata_field("filemeta", Meta), [return_maps]) + ). + +t_upload_error(Config) -> + ClientId = ?config(clientid, Config), + + FileId = <<"🌚"/utf8>>, + Name = "cool_name", + Data = <<"data"/utf8>>, + + {ok, _} = emqx_conf:update( + [file_transfer, storage, exporter, bucket], <<"invalid-bucket">>, #{} + ), + + ?assertEqual( + {error, unspecified_error}, + emqx_ft_test_helpers:upload_file(ClientId, FileId, Name, Data) + ). + +t_paging(Config) -> + ClientId = ?config(clientid, Config), + N = 1050, + + FileId = fun integer_to_binary/1, + Name = "cool_name", + Data = fun integer_to_binary/1, + + ok = lists:foreach( + fun(I) -> + ok = emqx_ft_test_helpers:upload_file(ClientId, FileId(I), Name, Data(I)) + end, + lists:seq(1, N) + ), + + {ok, #{items := [#{uri := Uri}]}} = emqx_ft_storage:files(#{transfer => {ClientId, FileId(123)}}), + + ?assertS3Data(Data(123), Uri), + + lists:foreach( + fun(PageSize) -> + Pages = file_pages(#{limit => PageSize}), + ?assertEqual( + expected_page_count(PageSize, N), + length(Pages) + ), + FileIds = [ + FId + || #{transfer := {_, FId}} <- lists:concat(Pages) + ], + ?assertEqual( + lists:sort([FileId(I) || I <- lists:seq(1, N)]), + lists:sort(FileIds) + ) + end, + %% less than S3 limit, greater than S3 limit + [20, 550] + ). + +t_invalid_cursor(_Config) -> + InvalidUtf8 = <<16#80>>, + ?assertError( + {badarg, cursor}, + emqx_ft_storage:files(#{following => InvalidUtf8}) + ). + +%%-------------------------------------------------------------------- +%% Helper Functions +%%-------------------------------------------------------------------- + +expected_page_count(PageSize, Total) -> + case Total rem PageSize of + 0 -> Total div PageSize; + _ -> Total div PageSize + 1 + end. + +file_pages(Query) -> + case emqx_ft_storage:files(Query) of + {ok, #{items := Items, cursor := NewCursor}} -> + [Items] ++ file_pages(Query#{following => NewCursor}); + {ok, #{items := Items}} -> + [Items]; + {error, Error} -> + ct:fail("Failed to download files: ~p", [Error]) + end. + +metadata_field(Field, Meta) -> + Key = "x-amz-meta-" ++ Field, + case lists:keyfind(Key, 1, Meta) of + {Key, Value} -> list_to_binary(Value); + false -> false + end. + +create_bucket() -> + BucketName = emqx_s3_test_helpers:unique_bucket(), + _ = application:ensure_all_started(lhttpc), + ok = erlcloud_s3:create_bucket(BucketName, emqx_ft_test_helpers:aws_config()), + BucketName. diff --git a/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl b/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl index 5635d981b..2acb57a8e 100644 --- a/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl @@ -84,7 +84,9 @@ client_id(Config) -> atom_to_binary(?config(tc, Config), utf8). storage(Config) -> - emqx_ft_test_helpers:local_storage(Config). + RawConfig = #{<<"storage">> => emqx_ft_test_helpers:local_storage(Config)}, + #{storage := Storage} = emqx_ft_schema:translate(RawConfig), + Storage. list_files(Config) -> {ok, #{items := Files}} = emqx_ft_storage_fs:files(storage(Config), #{}), diff --git a/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl b/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl index 065e9ae0a..04aedf8f3 100644 --- a/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl @@ -40,14 +40,15 @@ init_per_testcase(TC, Config) -> emqx_ft, fun(emqx_ft) -> emqx_ft_test_helpers:load_config(#{ - storage => #{ - type => local, - segments => #{ - root => emqx_ft_test_helpers:root(Config, node(), [TC, segments]) + <<"enable">> => true, + <<"storage">> => #{ + <<"type">> => <<"local">>, + <<"segments">> => #{ + <<"root">> => emqx_ft_test_helpers:root(Config, node(), [TC, segments]) }, - exporter => #{ - type => local, - root => emqx_ft_test_helpers:root(Config, node(), [TC, exports]) + <<"exporter">> => #{ + <<"type">> => <<"local">>, + <<"root">> => emqx_ft_test_helpers:root(Config, node(), [TC, exports]) } } }) diff --git a/apps/emqx_ft/test/emqx_ft_storage_fs_reader_SUITE.erl b/apps/emqx_ft/test/emqx_ft_storage_fs_reader_SUITE.erl index 0ac5d2844..217205f6f 100644 --- a/apps/emqx_ft/test/emqx_ft_storage_fs_reader_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_storage_fs_reader_SUITE.erl @@ -25,7 +25,7 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - ok = emqx_common_test_helpers:start_apps([emqx_ft]), + ok = emqx_common_test_helpers:start_apps([emqx_ft], emqx_ft_test_helpers:env_handler(Config)), Config. end_per_suite(_Config) -> diff --git a/apps/emqx_ft/test/emqx_ft_test_helpers.erl b/apps/emqx_ft/test/emqx_ft_test_helpers.erl index 89e349fae..1482223d8 100644 --- a/apps/emqx_ft/test/emqx_ft_test_helpers.erl +++ b/apps/emqx_ft/test/emqx_ft_test_helpers.erl @@ -21,6 +21,9 @@ -include_lib("common_test/include/ct.hrl"). +-define(S3_HOST, <<"minio">>). +-define(S3_PORT, 9000). + start_additional_node(Config, Name) -> emqx_common_test_helpers:start_slave( Name, @@ -41,32 +44,41 @@ stop_additional_node(Node) -> env_handler(Config) -> fun (emqx_ft) -> - load_config(#{enable => true, storage => local_storage(Config)}); + load_config(#{<<"enable">> => true, <<"storage">> => local_storage(Config)}); (_) -> ok end. local_storage(Config) -> + local_storage(Config, #{exporter => local}). + +local_storage(Config, Opts) -> #{ - type => local, - segments => #{ - root => root(Config, node(), [segments]) - }, - exporter => #{ - type => local, - root => root(Config, node(), [exports]) - } + <<"type">> => <<"local">>, + <<"segments">> => #{<<"root">> => root(Config, node(), [segments])}, + <<"exporter">> => exporter(Config, Opts) + }. + +exporter(Config, #{exporter := local}) -> + #{<<"type">> => <<"local">>, <<"root">> => root(Config, node(), [exports])}; +exporter(_Config, #{exporter := s3, bucket_name := BucketName}) -> + BaseConfig = emqx_s3_test_helpers:base_raw_config(tcp), + BaseConfig#{ + <<"bucket">> => list_to_binary(BucketName), + <<"type">> => <<"s3">>, + <<"host">> => ?S3_HOST, + <<"port">> => ?S3_PORT }. load_config(Config) -> - emqx_common_test_helpers:load_config(emqx_ft_schema, #{file_transfer => Config}). + emqx_common_test_helpers:load_config(emqx_ft_schema, #{<<"file_transfer">> => Config}). tcp_port(Node) -> {_, Port} = rpc:call(Node, emqx_config, get, [[listeners, tcp, default, bind]]), Port. root(Config, Node, Tail) -> - filename:join([?config(priv_dir, Config), "file_transfer", Node | Tail]). + iolist_to_binary(filename:join([?config(priv_dir, Config), "file_transfer", Node | Tail])). start_client(ClientId) -> start_client(ClientId, node()). @@ -94,9 +106,21 @@ upload_file(ClientId, FileId, Name, Data, Node) -> ct:pal("MetaPayload = ~ts", [MetaPayload]), MetaTopic = <<"$file/", FileId/binary, "/init">>, - {ok, _} = emqtt:publish(C1, MetaTopic, MetaPayload, 1), - {ok, _} = emqtt:publish(C1, <<"$file/", FileId/binary, "/0">>, Data, 1), + {ok, #{reason_code_name := success}} = emqtt:publish(C1, MetaTopic, MetaPayload, 1), + {ok, #{reason_code_name := success}} = emqtt:publish( + C1, <<"$file/", FileId/binary, "/0">>, Data, 1 + ), FinTopic = <<"$file/", FileId/binary, "/fin/", (integer_to_binary(Size))/binary>>, - {ok, _} = emqtt:publish(C1, FinTopic, <<>>, 1), - ok = emqtt:stop(C1). + FinResult = + case emqtt:publish(C1, FinTopic, <<>>, 1) of + {ok, #{reason_code_name := success}} -> + ok; + {ok, #{reason_code_name := Error}} -> + {error, Error} + end, + ok = emqtt:stop(C1), + FinResult. + +aws_config() -> + emqx_s3_test_helpers:aws_config(tcp, binary_to_list(?S3_HOST), ?S3_PORT). diff --git a/apps/emqx_s3/test/emqx_s3_test_helpers.erl b/apps/emqx_s3/test/emqx_s3_test_helpers.erl index 2edd52609..a73f618af 100644 --- a/apps/emqx_s3/test/emqx_s3_test_helpers.erl +++ b/apps/emqx_s3/test/emqx_s3_test_helpers.erl @@ -36,19 +36,24 @@ %%-------------------------------------------------------------------- aws_config(tcp) -> + aws_config(tcp, ?TCP_HOST, ?TCP_PORT); +aws_config(tls) -> + aws_config(tls, ?TLS_HOST, ?TLS_PORT). + +aws_config(tcp, Host, Port) -> erlcloud_s3_new( ?ACCESS_KEY_ID, ?SECRET_ACCESS_KEY, - ?TCP_HOST, - ?TCP_PORT, + Host, + Port, "http://" ); -aws_config(tls) -> +aws_config(tls, Host, Port) -> erlcloud_s3_new( ?ACCESS_KEY_ID, ?SECRET_ACCESS_KEY, - ?TLS_HOST, - ?TLS_PORT, + Host, + Port, "https://" ). diff --git a/rel/i18n/emqx_ft_schema.hocon b/rel/i18n/emqx_ft_schema.hocon index e7e551289..64b9bd67e 100644 --- a/rel/i18n/emqx_ft_schema.hocon +++ b/rel/i18n/emqx_ft_schema.hocon @@ -3,8 +3,8 @@ emqx_ft_schema { enable.desc: """Enable the File Transfer feature.
Enabling File Transfer implies reserving special MQTT topics in order to serve the protocol.
-This toggle does not have an effect neither on the availability of the File Transfer REST API, nor -on storage-dependent background activities (e.g. garbage collection).""" +This toggle also affects the availability of the File Transfer REST API and +storage-dependent background activities (e.g. garbage collection).""" init_timeout.desc: """Timeout for initializing the file transfer.