diff --git a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl
index e42d9c3ae..6422d627c 100644
--- a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl
+++ b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl
@@ -29,6 +29,7 @@
-export([file_schema/1]).
-export([base_path/0]).
-export([relative_uri/1]).
+-export([compose_filters/2]).
-export([filter_check_request/2, filter_check_request_and_translate_body/2]).
@@ -82,14 +83,30 @@
-type request() :: #{bindings => map(), query_string => map(), body => map()}.
-type request_meta() :: #{module => module(), path => string(), method => atom()}.
--type filter_result() :: {ok, request()} | {400, 'BAD_REQUEST', binary()}.
--type filter() :: fun((request(), request_meta()) -> filter_result()).
+%% More exact types are defined in minirest.hrl, but we don't want to include it
+%% because it defines a lot of types and they may clash with the types declared locally.
+-type status_code() :: pos_integer().
+-type error_code() :: atom() | binary().
+-type error_message() :: binary().
+-type response_body() :: term().
+-type headers() :: map().
+
+-type response() ::
+ status_code()
+ | {status_code()}
+ | {status_code(), response_body()}
+ | {status_code(), headers(), response_body()}
+ | {status_code(), error_code(), error_message()}.
+
+-type filter_result() :: {ok, request()} | response().
+-type filter() :: emqx_maybe:t(fun((request(), request_meta()) -> filter_result())).
-type spec_opts() :: #{
check_schema => boolean() | filter(),
translate_body => boolean(),
schema_converter => fun((hocon_schema:schema(), Module :: atom()) -> map()),
- i18n_lang => atom() | string() | binary()
+ i18n_lang => atom() | string() | binary(),
+ filter => filter()
}.
-type route_path() :: string() | binary().
@@ -115,9 +132,9 @@ spec(Module, Options) ->
lists:foldl(
fun(Path, {AllAcc, AllRefsAcc}) ->
{OperationId, Specs, Refs} = parse_spec_ref(Module, Path, Options),
- CheckSchema = support_check_schema(Options),
+ Opts = #{filter => filter(Options)},
{
- [{filename:join("/", Path), Specs, OperationId, CheckSchema} | AllAcc],
+ [{filename:join("/", Path), Specs, OperationId, Opts} | AllAcc],
Refs ++ AllRefsAcc
}
end,
@@ -204,6 +221,21 @@ file_schema(FileName) ->
}
}.
+-spec compose_filters(filter(), filter()) -> filter().
+compose_filters(undefined, Filter2) ->
+ Filter2;
+compose_filters(Filter1, undefined) ->
+ Filter1;
+compose_filters(Filter1, Filter2) ->
+ fun(Request, RequestMeta) ->
+ case Filter1(Request, RequestMeta) of
+ {ok, Request1} ->
+ Filter2(Request1, RequestMeta);
+ Response ->
+ Response
+ end
+ end.
+
%%------------------------------------------------------------------------------
%% Private functions
%%------------------------------------------------------------------------------
@@ -235,14 +267,22 @@ check_only(Schema, Map, Opts) ->
_ = hocon_tconf:check_plain(Schema, Map, Opts),
Map.
-support_check_schema(#{check_schema := true, translate_body := true}) ->
- #{filter => fun ?MODULE:filter_check_request_and_translate_body/2};
-support_check_schema(#{check_schema := true}) ->
- #{filter => fun ?MODULE:filter_check_request/2};
-support_check_schema(#{check_schema := Filter}) when is_function(Filter, 2) ->
- #{filter => Filter};
-support_check_schema(_) ->
- #{filter => undefined}.
+filter(Options) ->
+ CheckSchemaFilter = check_schema_filter(Options),
+ CustomFilter = custom_filter(Options),
+ compose_filters(CheckSchemaFilter, CustomFilter).
+
+custom_filter(Options) ->
+ maps:get(filter, Options, undefined).
+
+check_schema_filter(#{check_schema := true, translate_body := true}) ->
+ fun ?MODULE:filter_check_request_and_translate_body/2;
+check_schema_filter(#{check_schema := true}) ->
+ fun ?MODULE:filter_check_request/2;
+check_schema_filter(#{check_schema := Filter}) when is_function(Filter, 2) ->
+ Filter;
+check_schema_filter(_) ->
+ undefined.
parse_spec_ref(Module, Path, Options) ->
Schema =
diff --git a/apps/emqx_ft/docker-ct b/apps/emqx_ft/docker-ct
new file mode 100644
index 000000000..36f9d86d3
--- /dev/null
+++ b/apps/emqx_ft/docker-ct
@@ -0,0 +1 @@
+minio
diff --git a/apps/emqx_ft/etc/emqx_ft.conf b/apps/emqx_ft/etc/emqx_ft.conf
index ec48d4e69..e69de29bb 100644
--- a/apps/emqx_ft/etc/emqx_ft.conf
+++ b/apps/emqx_ft/etc/emqx_ft.conf
@@ -1,15 +0,0 @@
-file_transfer {
- storage {
- type = local
- segments {
- root = "{{ platform_data_dir }}/file_transfer/segments"
- gc {
- interval = "1h"
- }
- }
- exporter {
- root = "{{ platform_data_dir }}/file_transfer/exports"
- type = local
- }
- }
-}
diff --git a/apps/emqx_ft/src/emqx_ft_api.erl b/apps/emqx_ft/src/emqx_ft_api.erl
index 7e1ed97ad..61a6a2c93 100644
--- a/apps/emqx_ft/src/emqx_ft_api.erl
+++ b/apps/emqx_ft/src/emqx_ft_api.erl
@@ -33,6 +33,9 @@
fields/1
]).
+%% Minirest filter for checking if file transfer is enabled
+-export([check_ft_enabled/2]).
+
%% API callbacks
-export([
'/file_transfer/files'/2,
@@ -44,7 +47,9 @@
namespace() -> "file_transfer".
api_spec() ->
- emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
+ emqx_dashboard_swagger:spec(?MODULE, #{
+ check_schema => true, filter => fun ?MODULE:check_ft_enabled/2
+ }).
paths() ->
[
@@ -97,6 +102,14 @@ schema("/file_transfer/files/:clientid/:fileid") ->
}
}.
+check_ft_enabled(Params, _Meta) ->
+ case emqx_ft_conf:enabled() of
+ true ->
+ {ok, Params};
+ false ->
+ {503, error_msg('SERVICE_UNAVAILABLE', <<"Service unavailable">>)}
+ end.
+
'/file_transfer/files'(get, #{
query_string := QueryString
}) ->
diff --git a/apps/emqx_ft/src/emqx_ft_conf.erl b/apps/emqx_ft/src/emqx_ft_conf.erl
index 90b59c8d1..61f639271 100644
--- a/apps/emqx_ft/src/emqx_ft_conf.erl
+++ b/apps/emqx_ft/src/emqx_ft_conf.erl
@@ -56,7 +56,7 @@ enabled() ->
-spec storage() -> _Storage.
storage() ->
- emqx_config:get([file_transfer, storage], undefined).
+ emqx_config:get([file_transfer, storage]).
-spec gc_interval(_Storage) -> emqx_maybe:t(milliseconds()).
gc_interval(Conf = #{type := local}) ->
@@ -88,11 +88,12 @@ store_segment_timeout() ->
-spec load() -> ok.
load() ->
- ok = on_config_update(#{}, emqx_config:get([file_transfer], #{})),
+ ok = maybe_start(),
emqx_conf:add_handler([file_transfer], ?MODULE).
-spec unload() -> ok.
unload() ->
+ ok = stop(),
emqx_conf:remove_handler([file_transfer]).
%%--------------------------------------------------------------------
@@ -115,23 +116,26 @@ pre_config_update(_, Req, _Config) ->
post_config_update([file_transfer | _], _Req, NewConfig, OldConfig, _AppEnvs) ->
on_config_update(OldConfig, NewConfig).
-on_config_update(OldConfig, NewConfig) ->
- lists:foreach(
- fun(ConfKey) ->
- on_config_update(
- ConfKey,
- maps:get(ConfKey, OldConfig, undefined),
- maps:get(ConfKey, NewConfig, undefined)
- )
- end,
- [storage, enable]
- ).
-
-on_config_update(_, Config, Config) ->
+on_config_update(#{enable := false}, #{enable := false}) ->
ok;
-on_config_update(storage, OldConfig, NewConfig) ->
- ok = emqx_ft_storage:on_config_update(OldConfig, NewConfig);
-on_config_update(enable, _, true) ->
+on_config_update(#{enable := true, storage := OldStorage}, #{enable := false}) ->
+ ok = emqx_ft_storage:on_config_update(OldStorage, undefined),
+ ok = emqx_ft:unhook();
+on_config_update(#{enable := false}, #{enable := true, storage := NewStorage}) ->
+ ok = emqx_ft_storage:on_config_update(undefined, NewStorage),
ok = emqx_ft:hook();
-on_config_update(enable, _, false) ->
- ok = emqx_ft:unhook().
+on_config_update(#{enable := true, storage := OldStorage}, #{enable := true, storage := NewStorage}) ->
+ ok = emqx_ft_storage:on_config_update(OldStorage, NewStorage).
+
+maybe_start() ->
+ case emqx_config:get([file_transfer]) of
+ #{enable := true, storage := Storage} ->
+ ok = emqx_ft_storage:on_config_update(undefined, Storage),
+ ok = emqx_ft:hook();
+ _ ->
+ ok
+ end.
+
+stop() ->
+ ok = emqx_ft:unhook(),
+ ok = emqx_ft_storage:on_config_update(storage(), undefined).
diff --git a/apps/emqx_ft/src/emqx_ft_storage.erl b/apps/emqx_ft/src/emqx_ft_storage.erl
index 5364211a4..5ec342585 100644
--- a/apps/emqx_ft/src/emqx_ft_storage.erl
+++ b/apps/emqx_ft/src/emqx_ft_storage.erl
@@ -90,6 +90,12 @@
-callback files(storage(), query(Cursor)) ->
{ok, page(file_info(), Cursor)} | {error, term()}.
+-callback start(emqx_config:config()) -> any().
+-callback stop(emqx_config:config()) -> any().
+
+-callback on_config_update(_OldConfig :: emqx_config:config(), _NewConfig :: emqx_config:config()) ->
+ any().
+
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
@@ -167,49 +173,28 @@ apply_storage(Storage, Fun, Args) when is_function(Fun) ->
-spec on_config_update(_Old :: emqx_maybe:t(storage()), _New :: emqx_maybe:t(storage())) ->
ok.
-on_config_update(Storage, Storage) ->
+on_config_update(#{type := _} = Storage, #{type := _} = Storage) ->
ok;
on_config_update(#{type := Type} = StorageOld, #{type := Type} = StorageNew) ->
ok = (mod(StorageNew)):on_config_update(StorageOld, StorageNew);
-on_config_update(StorageOld, StorageNew) ->
+on_config_update(StorageOld, StorageNew) when
+ (StorageOld =:= undefined orelse is_map_key(type, StorageOld)) andalso
+ (StorageNew =:= undefined orelse is_map_key(type, StorageNew))
+->
_ = emqx_maybe:apply(fun on_storage_stop/1, StorageOld),
_ = emqx_maybe:apply(fun on_storage_start/1, StorageNew),
- _ = emqx_maybe:apply(
- fun(Storage) -> (mod(Storage)):on_config_update(StorageOld, StorageNew) end,
- StorageNew
- ),
ok.
-on_storage_start(Storage = #{type := _}) ->
- lists:foreach(
- fun(ChildSpec) ->
- {ok, _Child} = supervisor:start_child(emqx_ft_sup, ChildSpec)
- end,
- child_spec(Storage)
- ).
-
-on_storage_stop(Storage = #{type := _}) ->
- lists:foreach(
- fun(#{id := ChildId}) ->
- _ = supervisor:terminate_child(emqx_ft_sup, ChildId),
- ok = supervisor:delete_child(emqx_ft_sup, ChildId)
- end,
- child_spec(Storage)
- ).
-
-child_spec(Storage) ->
- try
- Mod = mod(Storage),
- Mod:child_spec(Storage)
- catch
- error:disabled -> [];
- error:undef -> []
- end.
-
%%--------------------------------------------------------------------
-%% Local FS API
+%% Local API
%%--------------------------------------------------------------------
+on_storage_start(Storage) ->
+ (mod(Storage)):start(Storage).
+
+on_storage_stop(Storage) ->
+ (mod(Storage)):stop(Storage).
+
storage() ->
emqx_ft_conf:storage().
diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter.erl
index fb44093c1..e000fe5c6 100644
--- a/apps/emqx_ft/src/emqx_ft_storage_exporter.erl
+++ b/apps/emqx_ft/src/emqx_ft_storage_exporter.erl
@@ -153,14 +153,14 @@ on_exporter_update(Config, Config) ->
on_exporter_update({ExporterMod, ConfigOld}, {ExporterMod, ConfigNew}) ->
ExporterMod:update(ConfigOld, ConfigNew);
on_exporter_update(ExporterOld, ExporterNew) ->
- _ = emqx_maybe:apply(fun stop_exporter/1, ExporterOld),
- _ = emqx_maybe:apply(fun start_exporter/1, ExporterNew),
+ _ = emqx_maybe:apply(fun stop/1, ExporterOld),
+ _ = emqx_maybe:apply(fun start/1, ExporterNew),
ok.
-start_exporter({ExporterMod, ExporterOpts}) ->
+start({ExporterMod, ExporterOpts}) ->
ok = ExporterMod:start(ExporterOpts).
-stop_exporter({ExporterMod, ExporterOpts}) ->
+stop({ExporterMod, ExporterOpts}) ->
ok = ExporterMod:stop(ExporterOpts).
%%------------------------------------------------------------------------------
diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter_fs_api.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs_api.erl
index f1a8c6dae..a278b01a5 100644
--- a/apps/emqx_ft/src/emqx_ft_storage_exporter_fs_api.erl
+++ b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs_api.erl
@@ -47,7 +47,9 @@
namespace() -> "file_transfer".
api_spec() ->
- emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
+ emqx_dashboard_swagger:spec(?MODULE, #{
+ check_schema => true, filter => fun emqx_ft_api:check_ft_enabled/2
+ }).
paths() ->
[
diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl
index c7110c74a..5c5aade86 100644
--- a/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl
+++ b/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl
@@ -149,7 +149,7 @@ list(Client, _Options, #{transfer := Transfer}) ->
end;
list(Client, _Options, Query) ->
Limit = maps:get(limit, Query, undefined),
- Marker = emqx_maybe:apply(fun decode_cursor/1, maps:get(cursor, Query, undefined)),
+ Marker = emqx_maybe:apply(fun decode_cursor/1, maps:get(following, Query, undefined)),
case list_pages(Client, Marker, Limit, []) of
{ok, {Exports, undefined}} ->
{ok, #{items => Exports}};
diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_fs.erl
index bd88727c0..cdc86d218 100644
--- a/apps/emqx_ft/src/emqx_ft_storage_fs.erl
+++ b/apps/emqx_ft/src/emqx_ft_storage_fs.erl
@@ -48,6 +48,8 @@
-export([files/2]).
-export([on_config_update/2]).
+-export([start/1]).
+-export([stop/1]).
-export_type([storage/0]).
-export_type([filefrag/1]).
@@ -227,6 +229,27 @@ on_config_update(StorageOld, StorageNew) ->
ok = emqx_ft_storage_fs_gc:reset(StorageNew),
emqx_ft_storage_exporter:on_config_update(StorageOld, StorageNew).
+start(Storage) ->
+ ok = lists:foreach(
+ fun(ChildSpec) ->
+ {ok, _Child} = supervisor:start_child(emqx_ft_sup, ChildSpec)
+ end,
+ child_spec(Storage)
+ ),
+ ok = emqx_ft_storage_exporter:on_config_update(undefined, Storage),
+ ok.
+
+stop(Storage) ->
+ ok = emqx_ft_storage_exporter:on_config_update(Storage, undefined),
+ ok = lists:foreach(
+ fun(#{id := ChildId}) ->
+ _ = supervisor:terminate_child(emqx_ft_sup, ChildId),
+ ok = supervisor:delete_child(emqx_ft_sup, ChildId)
+ end,
+ child_spec(Storage)
+ ),
+ ok.
+
%%
-spec transfers(storage()) ->
diff --git a/apps/emqx_ft/test/emqx_ft_api_SUITE.erl b/apps/emqx_ft/test/emqx_ft_api_SUITE.erl
index 4efa31205..f69e13a6d 100644
--- a/apps/emqx_ft/test/emqx_ft_api_SUITE.erl
+++ b/apps/emqx_ft/test/emqx_ft_api_SUITE.erl
@@ -83,6 +83,8 @@ mk_cluster_specs(Config) ->
init_per_testcase(Case, Config) ->
[{tc, Case} | Config].
+end_per_testcase(t_ft_disabled, _Config) ->
+ emqx_config:put([file_transfer, enable], true);
end_per_testcase(_Case, _Config) ->
ok.
@@ -223,6 +225,35 @@ t_list_files_paging(Config) ->
?assertEqual(Files, PageThrough(#{limit => 8}, [])),
?assertEqual(Files, PageThrough(#{limit => NFiles}, [])).
+t_ft_disabled(_Config) ->
+ ?assertMatch(
+ {ok, 200, _},
+ request_json(get, uri(["file_transfer", "files"]))
+ ),
+
+ ?assertMatch(
+ {ok, 400, _},
+ request_json(
+ get,
+ uri(["file_transfer", "file"]) ++ query(#{fileref => <<"f1">>})
+ )
+ ),
+
+ ok = emqx_config:put([file_transfer, enable], false),
+
+ ?assertMatch(
+ {ok, 503, _},
+ request_json(get, uri(["file_transfer", "files"]))
+ ),
+
+ ?assertMatch(
+ {ok, 503, _},
+ request_json(
+ get,
+ uri(["file_transfer", "file"]) ++ query(#{fileref => <<"f1">>, node => node()})
+ )
+ ).
+
%%--------------------------------------------------------------------
%% Helpers
%%--------------------------------------------------------------------
diff --git a/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl b/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl
index 106c34702..bc9eb5d98 100644
--- a/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl
+++ b/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl
@@ -32,9 +32,10 @@ end_per_suite(_Config) ->
ok.
init_per_testcase(_Case, Config) ->
- % NOTE: running each testcase with clean config
_ = emqx_config:save_schema_mod_and_names(emqx_ft_schema),
- ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_ft], fun(_) -> ok end),
+ ok = emqx_common_test_helpers:start_apps(
+ [emqx_conf, emqx_ft], emqx_ft_test_helpers:env_handler(Config)
+ ),
{ok, _} = emqx:update_config([rpc, port_discovery], manual),
Config.
diff --git a/apps/emqx_ft/test/emqx_ft_storage_exporter_s3_SUITE.erl b/apps/emqx_ft/test/emqx_ft_storage_exporter_s3_SUITE.erl
new file mode 100644
index 000000000..86de74ee0
--- /dev/null
+++ b/apps/emqx_ft/test/emqx_ft_storage_exporter_s3_SUITE.erl
@@ -0,0 +1,199 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_ft_storage_exporter_s3_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("stdlib/include/assert.hrl").
+
+-define(assertS3Data(Data, Url),
+ case httpc:request(Url) of
+ {ok, {{_StatusLine, 200, "OK"}, _Headers, Body}} ->
+ ?assertEqual(Data, list_to_binary(Body), "S3 data mismatch");
+ OtherResponse ->
+ ct:fail("Unexpected response: ~p", [OtherResponse])
+ end
+).
+
+all() -> emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+ Config.
+end_per_suite(_Config) ->
+ ok.
+
+set_special_configs(Config) ->
+ fun
+ (emqx_ft) ->
+ Storage = emqx_ft_test_helpers:local_storage(Config, #{
+ exporter => s3, bucket_name => ?config(bucket_name, Config)
+ }),
+ emqx_ft_test_helpers:load_config(#{<<"enable">> => true, <<"storage">> => Storage});
+ (_) ->
+ ok
+ end.
+
+init_per_testcase(Case, Config0) ->
+ ClientId = atom_to_binary(Case),
+ BucketName = create_bucket(),
+ Config1 = [{bucket_name, BucketName}, {clientid, ClientId} | Config0],
+ ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_ft], set_special_configs(Config1)),
+ Config1.
+end_per_testcase(_Case, _Config) ->
+ ok = emqx_common_test_helpers:stop_apps([emqx_ft, emqx_conf]),
+ ok.
+
+%%--------------------------------------------------------------------
+%% Test Cases
+%%-------------------------------------------------------------------
+
+t_happy_path(Config) ->
+ ClientId = ?config(clientid, Config),
+
+ FileId = <<"🌚"/utf8>>,
+ Name = "cool_name",
+ Data = <<"data"/utf8>>,
+
+ ?assertEqual(
+ ok,
+ emqx_ft_test_helpers:upload_file(ClientId, FileId, Name, Data)
+ ),
+
+ {ok, #{items := [#{uri := Uri}]}} = emqx_ft_storage:files(),
+
+ ?assertS3Data(Data, Uri),
+
+ Key = binary_to_list(ClientId) ++ "/" ++ binary_to_list(FileId) ++ "/" ++ Name,
+ Meta = erlcloud_s3:get_object_metadata(
+ ?config(bucket_name, Config), Key, emqx_ft_test_helpers:aws_config()
+ ),
+
+ ?assertEqual(
+ ClientId,
+ metadata_field("clientid", Meta)
+ ),
+
+ ?assertEqual(
+ FileId,
+ metadata_field("fileid", Meta)
+ ),
+
+ NameBin = list_to_binary(Name),
+ ?assertMatch(
+ #{
+ <<"name">> := NameBin,
+ <<"size">> := 4
+ },
+ emqx_utils_json:decode(metadata_field("filemeta", Meta), [return_maps])
+ ).
+
+t_upload_error(Config) ->
+ ClientId = ?config(clientid, Config),
+
+ FileId = <<"🌚"/utf8>>,
+ Name = "cool_name",
+ Data = <<"data"/utf8>>,
+
+ {ok, _} = emqx_conf:update(
+ [file_transfer, storage, exporter, bucket], <<"invalid-bucket">>, #{}
+ ),
+
+ ?assertEqual(
+ {error, unspecified_error},
+ emqx_ft_test_helpers:upload_file(ClientId, FileId, Name, Data)
+ ).
+
+t_paging(Config) ->
+ ClientId = ?config(clientid, Config),
+ N = 1050,
+
+ FileId = fun integer_to_binary/1,
+ Name = "cool_name",
+ Data = fun integer_to_binary/1,
+
+ ok = lists:foreach(
+ fun(I) ->
+ ok = emqx_ft_test_helpers:upload_file(ClientId, FileId(I), Name, Data(I))
+ end,
+ lists:seq(1, N)
+ ),
+
+ {ok, #{items := [#{uri := Uri}]}} = emqx_ft_storage:files(#{transfer => {ClientId, FileId(123)}}),
+
+ ?assertS3Data(Data(123), Uri),
+
+ lists:foreach(
+ fun(PageSize) ->
+ Pages = file_pages(#{limit => PageSize}),
+ ?assertEqual(
+ expected_page_count(PageSize, N),
+ length(Pages)
+ ),
+ FileIds = [
+ FId
+ || #{transfer := {_, FId}} <- lists:concat(Pages)
+ ],
+ ?assertEqual(
+ lists:sort([FileId(I) || I <- lists:seq(1, N)]),
+ lists:sort(FileIds)
+ )
+ end,
+ %% less than S3 limit, greater than S3 limit
+ [20, 550]
+ ).
+
+t_invalid_cursor(_Config) ->
+ InvalidUtf8 = <<16#80>>,
+ ?assertError(
+ {badarg, cursor},
+ emqx_ft_storage:files(#{following => InvalidUtf8})
+ ).
+
+%%--------------------------------------------------------------------
+%% Helper Functions
+%%--------------------------------------------------------------------
+
+expected_page_count(PageSize, Total) ->
+ case Total rem PageSize of
+ 0 -> Total div PageSize;
+ _ -> Total div PageSize + 1
+ end.
+
+file_pages(Query) ->
+ case emqx_ft_storage:files(Query) of
+ {ok, #{items := Items, cursor := NewCursor}} ->
+ [Items] ++ file_pages(Query#{following => NewCursor});
+ {ok, #{items := Items}} ->
+ [Items];
+ {error, Error} ->
+ ct:fail("Failed to download files: ~p", [Error])
+ end.
+
+metadata_field(Field, Meta) ->
+ Key = "x-amz-meta-" ++ Field,
+ case lists:keyfind(Key, 1, Meta) of
+ {Key, Value} -> list_to_binary(Value);
+ false -> false
+ end.
+
+create_bucket() ->
+ BucketName = emqx_s3_test_helpers:unique_bucket(),
+ _ = application:ensure_all_started(lhttpc),
+ ok = erlcloud_s3:create_bucket(BucketName, emqx_ft_test_helpers:aws_config()),
+ BucketName.
diff --git a/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl b/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl
index 5635d981b..2acb57a8e 100644
--- a/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl
+++ b/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl
@@ -84,7 +84,9 @@ client_id(Config) ->
atom_to_binary(?config(tc, Config), utf8).
storage(Config) ->
- emqx_ft_test_helpers:local_storage(Config).
+ RawConfig = #{<<"storage">> => emqx_ft_test_helpers:local_storage(Config)},
+ #{storage := Storage} = emqx_ft_schema:translate(RawConfig),
+ Storage.
list_files(Config) ->
{ok, #{items := Files}} = emqx_ft_storage_fs:files(storage(Config), #{}),
diff --git a/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl b/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl
index 065e9ae0a..04aedf8f3 100644
--- a/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl
+++ b/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl
@@ -40,14 +40,15 @@ init_per_testcase(TC, Config) ->
emqx_ft,
fun(emqx_ft) ->
emqx_ft_test_helpers:load_config(#{
- storage => #{
- type => local,
- segments => #{
- root => emqx_ft_test_helpers:root(Config, node(), [TC, segments])
+ <<"enable">> => true,
+ <<"storage">> => #{
+ <<"type">> => <<"local">>,
+ <<"segments">> => #{
+ <<"root">> => emqx_ft_test_helpers:root(Config, node(), [TC, segments])
},
- exporter => #{
- type => local,
- root => emqx_ft_test_helpers:root(Config, node(), [TC, exports])
+ <<"exporter">> => #{
+ <<"type">> => <<"local">>,
+ <<"root">> => emqx_ft_test_helpers:root(Config, node(), [TC, exports])
}
}
})
diff --git a/apps/emqx_ft/test/emqx_ft_storage_fs_reader_SUITE.erl b/apps/emqx_ft/test/emqx_ft_storage_fs_reader_SUITE.erl
index 0ac5d2844..217205f6f 100644
--- a/apps/emqx_ft/test/emqx_ft_storage_fs_reader_SUITE.erl
+++ b/apps/emqx_ft/test/emqx_ft_storage_fs_reader_SUITE.erl
@@ -25,7 +25,7 @@
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
- ok = emqx_common_test_helpers:start_apps([emqx_ft]),
+ ok = emqx_common_test_helpers:start_apps([emqx_ft], emqx_ft_test_helpers:env_handler(Config)),
Config.
end_per_suite(_Config) ->
diff --git a/apps/emqx_ft/test/emqx_ft_test_helpers.erl b/apps/emqx_ft/test/emqx_ft_test_helpers.erl
index 89e349fae..1482223d8 100644
--- a/apps/emqx_ft/test/emqx_ft_test_helpers.erl
+++ b/apps/emqx_ft/test/emqx_ft_test_helpers.erl
@@ -21,6 +21,9 @@
-include_lib("common_test/include/ct.hrl").
+-define(S3_HOST, <<"minio">>).
+-define(S3_PORT, 9000).
+
start_additional_node(Config, Name) ->
emqx_common_test_helpers:start_slave(
Name,
@@ -41,32 +44,41 @@ stop_additional_node(Node) ->
env_handler(Config) ->
fun
(emqx_ft) ->
- load_config(#{enable => true, storage => local_storage(Config)});
+ load_config(#{<<"enable">> => true, <<"storage">> => local_storage(Config)});
(_) ->
ok
end.
local_storage(Config) ->
+ local_storage(Config, #{exporter => local}).
+
+local_storage(Config, Opts) ->
#{
- type => local,
- segments => #{
- root => root(Config, node(), [segments])
- },
- exporter => #{
- type => local,
- root => root(Config, node(), [exports])
- }
+ <<"type">> => <<"local">>,
+ <<"segments">> => #{<<"root">> => root(Config, node(), [segments])},
+ <<"exporter">> => exporter(Config, Opts)
+ }.
+
+exporter(Config, #{exporter := local}) ->
+ #{<<"type">> => <<"local">>, <<"root">> => root(Config, node(), [exports])};
+exporter(_Config, #{exporter := s3, bucket_name := BucketName}) ->
+ BaseConfig = emqx_s3_test_helpers:base_raw_config(tcp),
+ BaseConfig#{
+ <<"bucket">> => list_to_binary(BucketName),
+ <<"type">> => <<"s3">>,
+ <<"host">> => ?S3_HOST,
+ <<"port">> => ?S3_PORT
}.
load_config(Config) ->
- emqx_common_test_helpers:load_config(emqx_ft_schema, #{file_transfer => Config}).
+ emqx_common_test_helpers:load_config(emqx_ft_schema, #{<<"file_transfer">> => Config}).
tcp_port(Node) ->
{_, Port} = rpc:call(Node, emqx_config, get, [[listeners, tcp, default, bind]]),
Port.
root(Config, Node, Tail) ->
- filename:join([?config(priv_dir, Config), "file_transfer", Node | Tail]).
+ iolist_to_binary(filename:join([?config(priv_dir, Config), "file_transfer", Node | Tail])).
start_client(ClientId) ->
start_client(ClientId, node()).
@@ -94,9 +106,21 @@ upload_file(ClientId, FileId, Name, Data, Node) ->
ct:pal("MetaPayload = ~ts", [MetaPayload]),
MetaTopic = <<"$file/", FileId/binary, "/init">>,
- {ok, _} = emqtt:publish(C1, MetaTopic, MetaPayload, 1),
- {ok, _} = emqtt:publish(C1, <<"$file/", FileId/binary, "/0">>, Data, 1),
+ {ok, #{reason_code_name := success}} = emqtt:publish(C1, MetaTopic, MetaPayload, 1),
+ {ok, #{reason_code_name := success}} = emqtt:publish(
+ C1, <<"$file/", FileId/binary, "/0">>, Data, 1
+ ),
FinTopic = <<"$file/", FileId/binary, "/fin/", (integer_to_binary(Size))/binary>>,
- {ok, _} = emqtt:publish(C1, FinTopic, <<>>, 1),
- ok = emqtt:stop(C1).
+ FinResult =
+ case emqtt:publish(C1, FinTopic, <<>>, 1) of
+ {ok, #{reason_code_name := success}} ->
+ ok;
+ {ok, #{reason_code_name := Error}} ->
+ {error, Error}
+ end,
+ ok = emqtt:stop(C1),
+ FinResult.
+
+aws_config() ->
+ emqx_s3_test_helpers:aws_config(tcp, binary_to_list(?S3_HOST), ?S3_PORT).
diff --git a/apps/emqx_s3/test/emqx_s3_test_helpers.erl b/apps/emqx_s3/test/emqx_s3_test_helpers.erl
index 2edd52609..a73f618af 100644
--- a/apps/emqx_s3/test/emqx_s3_test_helpers.erl
+++ b/apps/emqx_s3/test/emqx_s3_test_helpers.erl
@@ -36,19 +36,24 @@
%%--------------------------------------------------------------------
aws_config(tcp) ->
+ aws_config(tcp, ?TCP_HOST, ?TCP_PORT);
+aws_config(tls) ->
+ aws_config(tls, ?TLS_HOST, ?TLS_PORT).
+
+aws_config(tcp, Host, Port) ->
erlcloud_s3_new(
?ACCESS_KEY_ID,
?SECRET_ACCESS_KEY,
- ?TCP_HOST,
- ?TCP_PORT,
+ Host,
+ Port,
"http://"
);
-aws_config(tls) ->
+aws_config(tls, Host, Port) ->
erlcloud_s3_new(
?ACCESS_KEY_ID,
?SECRET_ACCESS_KEY,
- ?TLS_HOST,
- ?TLS_PORT,
+ Host,
+ Port,
"https://"
).
diff --git a/rel/i18n/emqx_ft_schema.hocon b/rel/i18n/emqx_ft_schema.hocon
index e7e551289..64b9bd67e 100644
--- a/rel/i18n/emqx_ft_schema.hocon
+++ b/rel/i18n/emqx_ft_schema.hocon
@@ -3,8 +3,8 @@ emqx_ft_schema {
enable.desc:
"""Enable the File Transfer feature.
Enabling File Transfer implies reserving special MQTT topics in order to serve the protocol.
-This toggle does not have an effect neither on the availability of the File Transfer REST API, nor
-on storage-dependent background activities (e.g. garbage collection)."""
+This toggle also affects the availability of the File Transfer REST API and
+storage-dependent background activities (e.g. garbage collection)."""
init_timeout.desc:
"""Timeout for initializing the file transfer.