diff --git a/.ci/docker-compose-file/docker-compose-minio-tcp.yaml b/.ci/docker-compose-file/docker-compose-minio-tcp.yaml new file mode 100644 index 000000000..93e1c4ead --- /dev/null +++ b/.ci/docker-compose-file/docker-compose-minio-tcp.yaml @@ -0,0 +1,21 @@ +version: '3.7' + +services: + minio: + hostname: minio + image: quay.io/minio/minio:RELEASE.2023-03-20T20-16-18Z + command: server --address ":9000" --console-address ":9001" /minio-data + expose: + - "9000" + - "9001" + ports: + - "9000:9000" + - "9001:9001" + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"] + interval: 30s + timeout: 5s + retries: 3 + networks: + emqx_bridge: + diff --git a/.ci/docker-compose-file/docker-compose-minio-tls.yaml b/.ci/docker-compose-file/docker-compose-minio-tls.yaml new file mode 100644 index 000000000..2e7a6bea5 --- /dev/null +++ b/.ci/docker-compose-file/docker-compose-minio-tls.yaml @@ -0,0 +1,23 @@ +version: '3.7' + +services: + minio_tls: + hostname: minio-tls + image: quay.io/minio/minio:RELEASE.2023-03-20T20-16-18Z + command: server --certs-dir /etc/certs --address ":9100" --console-address ":9101" /minio-data + volumes: + - ./certs/server.crt:/etc/certs/public.crt + - ./certs/server.key:/etc/certs/private.key + expose: + - "9100" + - "9101" + ports: + - "9100:9100" + - "9101:9101" + healthcheck: + test: ["CMD", "curl", "-k", "-f", "https://localhost:9100/minio/health/live"] + interval: 30s + timeout: 5s + retries: 3 + networks: + emqx_bridge: diff --git a/.ci/docker-compose-file/docker-compose-toxiproxy.yaml b/.ci/docker-compose-file/docker-compose-toxiproxy.yaml index 9a1d08ba6..0cf689921 100644 --- a/.ci/docker-compose-file/docker-compose-toxiproxy.yaml +++ b/.ci/docker-compose-file/docker-compose-toxiproxy.yaml @@ -13,18 +13,34 @@ services: volumes: - "./toxiproxy.json:/config/toxiproxy.json" ports: + # Toxiproxy management API - 8474:8474 + # InfluxDB - 8086:8086 + # InfluxDB TLS - 8087:8087 + # MySQL - 13306:3306 + # MySQL TLS - 13307:3307 + # PostgreSQL - 15432:5432 + # PostgreSQL TLS - 15433:5433 + # TDEngine - 16041:6041 + # DynamoDB - 18000:8000 + # RocketMQ - 19876:9876 + # Cassandra - 19042:9042 + # Cassandra TLS - 19142:9142 + # S3 + - 19000:19000 + # S3 TLS + - 19100:19100 command: - "-host=0.0.0.0" - "-config=/config/toxiproxy.json" diff --git a/.ci/docker-compose-file/toxiproxy.json b/.ci/docker-compose-file/toxiproxy.json index 708cbf1ef..d8dd8a166 100644 --- a/.ci/docker-compose-file/toxiproxy.json +++ b/.ci/docker-compose-file/toxiproxy.json @@ -95,5 +95,17 @@ "listen": "0.0.0.0:9142", "upstream": "cassandra:9142", "enabled": true + }, + { + "name": "minio_tcp", + "listen": "0.0.0.0:19000", + "upstream": "minio:9000", + "enabled": true + }, + { + "name": "minio_tls", + "listen": "0.0.0.0:19100", + "upstream": "minio-tls:9100", + "enabled": true } ] diff --git a/apps/emqx_machine/src/emqx_machine.app.src b/apps/emqx_machine/src/emqx_machine.app.src index 0bee30e35..6bd36aab5 100644 --- a/apps/emqx_machine/src/emqx_machine.app.src +++ b/apps/emqx_machine/src/emqx_machine.app.src @@ -3,7 +3,7 @@ {id, "emqx_machine"}, {description, "The EMQX Machine"}, % strict semver, bump manually! - {vsn, "0.2.1"}, + {vsn, "0.2.2"}, {modules, []}, {registered, []}, {applications, [kernel, stdlib, emqx_ctl]}, diff --git a/apps/emqx_machine/src/emqx_machine_boot.erl b/apps/emqx_machine/src/emqx_machine_boot.erl index 82b3d602f..feeb1ba75 100644 --- a/apps/emqx_machine/src/emqx_machine_boot.erl +++ b/apps/emqx_machine/src/emqx_machine_boot.erl @@ -146,7 +146,8 @@ basic_reboot_apps() -> emqx_authz, emqx_slow_subs, emqx_auto_subscribe, - emqx_plugins + emqx_plugins, + emqx_s3 ], case emqx_release:edition() of ce -> CE; diff --git a/apps/emqx_s3/BSL.txt b/apps/emqx_s3/BSL.txt new file mode 100644 index 000000000..0acc0e696 --- /dev/null +++ b/apps/emqx_s3/BSL.txt @@ -0,0 +1,94 @@ +Business Source License 1.1 + +Licensor: Hangzhou EMQ Technologies Co., Ltd. +Licensed Work: EMQX Enterprise Edition + The Licensed Work is (c) 2023 + Hangzhou EMQ Technologies Co., Ltd. +Additional Use Grant: Students and educators are granted right to copy, + modify, and create derivative work for research + or education. +Change Date: 2027-02-01 +Change License: Apache License, Version 2.0 + +For information about alternative licensing arrangements for the Software, +please contact Licensor: https://www.emqx.com/en/contact + +Notice + +The Business Source License (this document, or the “License”) is not an Open +Source license. However, the Licensed Work will eventually be made available +under an Open Source License, as stated in this License. + +License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved. +“Business Source License” is a trademark of MariaDB Corporation Ab. + +----------------------------------------------------------------------------- + +Business Source License 1.1 + +Terms + +The Licensor hereby grants you the right to copy, modify, create derivative +works, redistribute, and make non-production use of the Licensed Work. The +Licensor may make an Additional Use Grant, above, permitting limited +production use. + +Effective on the Change Date, or the fourth anniversary of the first publicly +available distribution of a specific version of the Licensed Work under this +License, whichever comes first, the Licensor hereby grants you rights under +the terms of the Change License, and the rights granted in the paragraph +above terminate. + +If your use of the Licensed Work does not comply with the requirements +currently in effect as described in this License, you must purchase a +commercial license from the Licensor, its affiliated entities, or authorized +resellers, or you must refrain from using the Licensed Work. + +All copies of the original and modified Licensed Work, and derivative works +of the Licensed Work, are subject to this License. This License applies +separately for each version of the Licensed Work and the Change Date may vary +for each version of the Licensed Work released by Licensor. + +You must conspicuously display this License on each original or modified copy +of the Licensed Work. If you receive the Licensed Work in original or +modified form from a third party, the terms and conditions set forth in this +License apply to your use of that work. + +Any use of the Licensed Work in violation of this License will automatically +terminate your rights under this License for the current and all other +versions of the Licensed Work. + +This License does not grant you any right in any trademark or logo of +Licensor or its affiliates (provided that you may use a trademark or logo of +Licensor as expressly required by this License). + +TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON +AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS, +EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND +TITLE. + +MariaDB hereby grants you permission to use this License’s text to license +your works, and to refer to it using the trademark “Business Source License”, +as long as you comply with the Covenants of Licensor below. + +Covenants of Licensor + +In consideration of the right to use this License’s text and the “Business +Source License” name and trademark, Licensor covenants to MariaDB, and to all +other recipients of the licensed work to be provided by Licensor: + +1. To specify as the Change License the GPL Version 2.0 or any later version, + or a license that is compatible with GPL Version 2.0 or a later version, + where “compatible” means that software provided under the Change License can + be included in a program with software provided under GPL Version 2.0 or a + later version. Licensor may specify additional Change Licenses without + limitation. + +2. To either: (a) specify an additional grant of rights to use that does not + impose any additional restriction on the right granted in this License, as + the Additional Use Grant; or (b) insert the text “None”. + +3. To specify a Change Date. + +4. Not to modify this License in any other way. diff --git a/apps/emqx_s3/README.md b/apps/emqx_s3/README.md new file mode 100644 index 000000000..3f049d1a8 --- /dev/null +++ b/apps/emqx_s3/README.md @@ -0,0 +1,133 @@ +# emqx_s3 + +EMQX S3 Application + +## Description + +This application provides functionality for uploading files to S3. + +## Usage + +The steps to integrate this application are: +* Integrate S3 configuration schema where needed. +* On _client_ application start: + * Call `emqx_s3:start_profile(ProfileName, ProfileConfig)` with configuration. + * Add `emqx_config_handler` hook to call `emqx_s3:start_profile(ProfileName, ProfileConfig)` when configuration is updated. +* On _client_ application stop, call `emqx_s3:stop_profile(ProfileName)`. + +`ProfileName` is a unique name used to distinguish different sets of S3 settings. Each profile has its own connection pool and configuration. + +To use S3 from a _client_ application: +* Create an uploader process with `{ok, Pid} = emqx_s3:start_uploader(ProfileName, #{key => MyKey})`. +* Write data with `emqx_s3_uploader:write(Pid, <<"data">>)`. +* Finish the uploader with `emqx_s3_uploader:complete(Pid)` or `emqx_s3_uploader:abort(Pid)`. + +### Configuration + +Example of integrating S3 configuration schema into a _client_ application `emqx_someapp`. + +```erlang +-module(emqx_someapp_schema). + +... + +roots() -> [someapp] +... + +fields(someapp) -> + [ + {other_setting, ...}, + {s3_settings, + mk( + hoconsc:ref(emqx_s3_schema, s3), + #{ + desc => ?DESC("s3_settings"), + required => true + } + )} + ]; +... + +``` + +### Application start and config hooks + +```erlang +-module(emqx_someapp_app). + +-behaviour(application). + +-export([start/2, stop/1]). + +-export([ + pre_config_update/3, + post_config_update/5 +]). + +start(_StartType, _StartArgs) -> + ProfileConfig = emqx_config:get([someapp, s3_settings]), + ProfileName = someapp, + ok = emqx_s3:start_profile(ProfileName, ProfileConfig), + ok = emqx_config_handler:add_handler([someapp], ?MODULE). + +stop(_State) -> + ok = emqx_conf:remove_handler([someapp]), + ProfileName = someapp, + ok = emqx_s3:stop_profile(ProfileName). + +pre_config_update(_Path, NewConfig, _OldConfig) -> + {ok, NewConfig}. + +post_config_update(Path, _Req, NewConfig, _OldConfig, _AppEnvs) -> + NewProfileConfig = maps:get(s3_settings, NewConfig), + ProfileName = someapp, + %% more graceful error handling may be needed + ok = emqx_s3:update_profile(ProfileName, NewProfileConfig). + +``` + +### Uploader usage + +```erlang +-module(emqx_someapp_logic). +... + +-spec do_upload_data(Key :: string(), Data :: binary()) -> ok. +do_upload_data(Key, Data) -> + ProfileName = someapp, + {ok, Pid} = emqx_s3:start_uploader(ProfileName, #{key => Key}), + ok = emqx_s3_uploader:write(Pid, Data), + ok = emqx_s3_uploader:complete(Pid). + +``` + +## Design + +![Design](./docs/s3_app.png) + +* Each profile has its own supervisor `emqx_s3_profile_sup`. +* Under each profile supervisor, there is a + * `emqx_s3_profile_uploader_sup` supervisor for uploader processes. + * `emqx_s3_profile_conf` server for managing profile configuration. + +When an uploader process is started, it checkouts the actual S3 configuration for the profile from the `emqx_s3_profile_conf` server. It uses the obtained configuration and connection pool to upload data to S3 till the termination, even if the configuration is updated. + +`emqx_s3_profile_conf`: +* Keeps actual S3 configuration for the profile and creates a connection pool for the actual configuration. +* Creates a new connection pool when the configuration is updated. +* Keeps track of uploaders using connection pools. +* Drops connection pools when no uploaders are using it or after a timeout. + +The code is designed to allow a painless transition from `ehttpc` pool to any other HTTP pool/client. + +## Possible performance improvements + +One of the downsides of the current implementation is that there is a lot of message passing between the uploader client and the actual sockets. + +A possible improvement could be: +* Use a process-less HTTP client, like [Mint](https://github.com/elixir-mint/mint). +* Use a resource pool, like [NimblePool](https://github.com/dashbitco/nimble_pool) to manage the HTTP connections. It temporarily grants sockets to its clients. +* Do the buffering logic locally in the uploader client. +* Use `emqx_s3_client` directly from the uploader client. + +In this case, the data will be directly sent to the socket, without being sent to any intermediate processes. diff --git a/apps/emqx_s3/docker-ct b/apps/emqx_s3/docker-ct new file mode 100644 index 000000000..a5a001815 --- /dev/null +++ b/apps/emqx_s3/docker-ct @@ -0,0 +1,2 @@ +minio +toxiproxy diff --git a/apps/emqx_s3/docs/s3_app.png b/apps/emqx_s3/docs/s3_app.png new file mode 100644 index 000000000..372f16d37 Binary files /dev/null and b/apps/emqx_s3/docs/s3_app.png differ diff --git a/apps/emqx_s3/rebar.config b/apps/emqx_s3/rebar.config new file mode 100644 index 000000000..f8e4d4e42 --- /dev/null +++ b/apps/emqx_s3/rebar.config @@ -0,0 +1,6 @@ +{deps, [ + {emqx, {path, "../../apps/emqx"}}, + {erlcloud, {git, "https://github.com/savonarola/erlcloud", {tag, "3.6.7-emqx-1"}}} +]}. + +{project_plugins, [erlfmt]}. diff --git a/apps/emqx_s3/src/emqx_s3.app.src b/apps/emqx_s3/src/emqx_s3.app.src new file mode 100644 index 000000000..7864ffb29 --- /dev/null +++ b/apps/emqx_s3/src/emqx_s3.app.src @@ -0,0 +1,14 @@ +{application, emqx_s3, [ + {description, "EMQX S3"}, + {vsn, "5.0.6"}, + {modules, []}, + {registered, [emqx_s3_sup]}, + {applications, [ + kernel, + stdlib, + gproc, + erlcloud, + ehttpc + ]}, + {mod, {emqx_s3_app, []}} +]}. diff --git a/apps/emqx_s3/src/emqx_s3.erl b/apps/emqx_s3/src/emqx_s3.erl new file mode 100644 index 000000000..6d2577dca --- /dev/null +++ b/apps/emqx_s3/src/emqx_s3.erl @@ -0,0 +1,65 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_s3). + +-include_lib("emqx/include/types.hrl"). + +-export([ + start_profile/2, + stop_profile/1, + update_profile/2, + start_uploader/2, + with_client/2 +]). + +-export_type([ + profile_id/0, + profile_config/0 +]). + +-type profile_id() :: term(). + +%% TODO: define fields +-type profile_config() :: map(). + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- + +-spec start_profile(profile_id(), profile_config()) -> ok_or_error(term()). +start_profile(ProfileId, ProfileConfig) -> + case emqx_s3_sup:start_profile(ProfileId, ProfileConfig) of + {ok, _} -> + ok; + {error, _} = Error -> + Error + end. + +-spec stop_profile(profile_id()) -> ok_or_error(term()). +stop_profile(ProfileId) -> + emqx_s3_sup:stop_profile(ProfileId). + +-spec update_profile(profile_id(), profile_config()) -> ok_or_error(term()). +update_profile(ProfileId, ProfileConfig) -> + emqx_s3_profile_conf:update_config(ProfileId, ProfileConfig). + +-spec start_uploader(profile_id(), emqx_s3_uploader:opts()) -> + supervisor:start_ret() | {error, profile_not_found}. +start_uploader(ProfileId, Opts) -> + emqx_s3_profile_uploader_sup:start_uploader(ProfileId, Opts). + +-spec with_client(profile_id(), fun((emqx_s3_client:client()) -> Result)) -> + {error, profile_not_found} | Result. +with_client(ProfileId, Fun) when is_function(Fun, 1) -> + case emqx_s3_profile_conf:checkout_config(ProfileId) of + {ok, ClientConfig, _UploadConfig} -> + try + Fun(emqx_s3_client:create(ClientConfig)) + after + emqx_s3_profile_conf:checkin_config(ProfileId) + end; + {error, _} = Error -> + Error + end. diff --git a/apps/emqx_s3/src/emqx_s3_app.erl b/apps/emqx_s3/src/emqx_s3_app.erl new file mode 100644 index 000000000..8d8b0f7b9 --- /dev/null +++ b/apps/emqx_s3/src/emqx_s3_app.erl @@ -0,0 +1,16 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_s3_app). + +-behaviour(application). + +-export([start/2, stop/1]). + +start(_Type, _Args) -> + {ok, Sup} = emqx_s3_sup:start_link(), + {ok, Sup}. + +stop(_State) -> + ok. diff --git a/apps/emqx_s3/src/emqx_s3_client.erl b/apps/emqx_s3/src/emqx_s3_client.erl new file mode 100644 index 000000000..01d677922 --- /dev/null +++ b/apps/emqx_s3/src/emqx_s3_client.erl @@ -0,0 +1,293 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_s3_client). + +-include_lib("emqx/include/types.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include_lib("erlcloud/include/erlcloud_aws.hrl"). + +-compile(nowarn_export_all). +-compile(export_all). + +-export([ + create/1, + + put_object/3, + + start_multipart/2, + upload_part/5, + complete_multipart/4, + abort_multipart/3, + list/2, + + format/1 +]). + +-export_type([client/0]). + +-type s3_bucket_acl() :: + private + | public_read + | public_read_write + | authenticated_read + | bucket_owner_read + | bucket_owner_full_control. + +-type headers() :: #{binary() => binary()}. + +-type key() :: string(). +-type part_number() :: non_neg_integer(). +-type upload_id() :: string(). +-type etag() :: string(). + +-type upload_options() :: list({acl, s3_bucket_acl()}). + +-opaque client() :: #{ + aws_config := aws_config(), + options := upload_options(), + bucket := string(), + headers := headers() +}. + +-type config() :: #{ + scheme := string(), + host := string(), + port := part_number(), + bucket := string(), + headers := headers(), + acl := s3_bucket_acl(), + access_key_id := string() | undefined, + secret_access_key := string() | undefined, + http_pool := ecpool:pool_name(), + request_timeout := timeout() +}. + +-type s3_options() :: list({string(), string()}). + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- + +-spec create(config()) -> client(). +create(Config) -> + #{ + aws_config => aws_config(Config), + upload_options => upload_options(Config), + bucket => maps:get(bucket, Config), + headers => headers(Config) + }. + +-spec put_object(client(), key(), iodata()) -> ok_or_error(term()). +put_object( + #{bucket := Bucket, upload_options := Options, headers := Headers, aws_config := AwsConfig}, + Key, + Value +) -> + try erlcloud_s3:put_object(Bucket, Key, Value, Options, Headers, AwsConfig) of + Props when is_list(Props) -> + ok + catch + error:{aws_error, Reason} -> + ?SLOG(debug, #{msg => "put_object_fail", key => Key, reason => Reason}), + {error, Reason} + end. + +-spec start_multipart(client(), key()) -> ok_or_error(upload_id(), term()). +start_multipart( + #{bucket := Bucket, upload_options := Options, headers := Headers, aws_config := AwsConfig}, + Key +) -> + case erlcloud_s3:start_multipart(Bucket, Key, Options, Headers, AwsConfig) of + {ok, Props} -> + {ok, proplists:get_value(uploadId, Props)}; + {error, Reason} -> + ?SLOG(debug, #{msg => "start_multipart_fail", key => Key, reason => Reason}), + {error, Reason} + end. + +-spec upload_part(client(), key(), upload_id(), part_number(), iodata()) -> + ok_or_error(etag(), term()). +upload_part( + #{bucket := Bucket, headers := Headers, aws_config := AwsConfig}, + Key, + UploadId, + PartNumber, + Value +) -> + case erlcloud_s3:upload_part(Bucket, Key, UploadId, PartNumber, Value, Headers, AwsConfig) of + {ok, Props} -> + {ok, proplists:get_value(etag, Props)}; + {error, Reason} -> + ?SLOG(debug, #{msg => "upload_part_fail", key => Key, reason => Reason}), + {error, Reason} + end. + +-spec complete_multipart(client(), key(), upload_id(), [etag()]) -> ok_or_error(term()). +complete_multipart( + #{bucket := Bucket, headers := Headers, aws_config := AwsConfig}, Key, UploadId, ETags +) -> + case erlcloud_s3:complete_multipart(Bucket, Key, UploadId, ETags, Headers, AwsConfig) of + ok -> + ok; + {error, Reason} -> + ?SLOG(debug, #{msg => "complete_multipart_fail", key => Key, reason => Reason}), + {error, Reason} + end. + +-spec abort_multipart(client(), key(), upload_id()) -> ok_or_error(term()). +abort_multipart(#{bucket := Bucket, headers := Headers, aws_config := AwsConfig}, Key, UploadId) -> + case erlcloud_s3:abort_multipart(Bucket, Key, UploadId, [], Headers, AwsConfig) of + ok -> + ok; + {error, Reason} -> + ?SLOG(debug, #{msg => "abort_multipart_fail", key => Key, reason => Reason}), + {error, Reason} + end. + +-spec list(client(), s3_options()) -> ok_or_error(term()). +list(#{bucket := Bucket, aws_config := AwsConfig}, Options) -> + try + {ok, erlcloud_s3:list_objects(Bucket, Options, AwsConfig)} + catch + error:{aws_error, Reason} -> + ?SLOG(debug, #{msg => "list_objects_fail", bucket => Bucket, reason => Reason}), + {error, Reason} + end. + +-spec format(client()) -> term(). +format(#{aws_config := AwsConfig} = Client) -> + Client#{aws_config => AwsConfig#aws_config{secret_access_key = "***"}}. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +upload_options(Config) -> + [ + {acl, maps:get(acl, Config)} + ]. + +headers(#{headers := Headers}) -> + maps:to_list(Headers). + +aws_config(#{ + scheme := Scheme, + host := Host, + port := Port, + headers := Headers, + access_key_id := AccessKeyId, + secret_access_key := SecretAccessKey, + http_pool := HttpPool, + request_timeout := Timeout +}) -> + #aws_config{ + s3_scheme = Scheme, + s3_host = Host, + s3_port = Port, + s3_bucket_access_method = path, + + access_key_id = AccessKeyId, + secret_access_key = SecretAccessKey, + + http_client = request_fun(Headers, HttpPool), + timeout = Timeout + }. + +-type http_headers() :: [{binary(), binary()}]. +-type http_pool() :: term(). + +-spec request_fun(http_headers(), http_pool()) -> erlcloud_httpc:request_fun(). +request_fun(CustomHeaders, HttpPool) -> + fun(Url, Method, Headers, Body, Timeout, _Config) -> + with_path_and_query_only(Url, fun(PathQuery) -> + JoinedHeaders = join_headers(Headers, CustomHeaders), + Request = make_request(Method, PathQuery, JoinedHeaders, Body), + ehttpc_request(HttpPool, Method, Request, Timeout) + end) + end. + +ehttpc_request(HttpPool, Method, Request, Timeout) -> + try ehttpc:request(HttpPool, Method, Request, Timeout) of + {ok, StatusCode, RespHeaders} -> + {ok, {{StatusCode, undefined}, string_headers(RespHeaders), undefined}}; + {ok, StatusCode, RespHeaders, RespBody} -> + {ok, {{StatusCode, undefined}, string_headers(RespHeaders), RespBody}}; + {error, Reason} -> + ?SLOG(error, #{ + msg => "s3_ehttpc_request_fail", + reason => Reason, + timeout => Timeout, + pool => HttpPool, + method => Method + }), + {error, Reason} + catch + error:badarg -> + ?SLOG(error, #{ + msg => "s3_ehttpc_request_fail", + reason => badarg, + timeout => Timeout, + pool => HttpPool, + method => Method + }), + {error, no_ehttpc_pool}; + error:Reason -> + ?SLOG(error, #{ + msg => "s3_ehttpc_request_fail", + reason => Reason, + timeout => Timeout, + pool => HttpPool, + method => Method + }), + {error, Reason} + end. + +-define(IS_BODY_EMPTY(Body), (Body =:= undefined orelse Body =:= <<>>)). +-define(NEEDS_BODY(Method), (Method =:= get orelse Method =:= head orelse Method =:= delete)). + +make_request(Method, PathQuery, Headers, Body) when + ?IS_BODY_EMPTY(Body) andalso ?NEEDS_BODY(Method) +-> + {PathQuery, Headers}; +make_request(_Method, PathQuery, Headers, Body) when ?IS_BODY_EMPTY(Body) -> + {PathQuery, [{<<"content-length">>, <<"0">>} | Headers], <<>>}; +make_request(_Method, PathQuery, Headers, Body) -> + {PathQuery, Headers, Body}. + +format_request({PathQuery, Headers, _Body}) -> {PathQuery, Headers, <<"...">>}. + +join_headers(Headers, CustomHeaders) -> + MapHeaders = lists:foldl( + fun({K, V}, MHeaders) -> + maps:put(to_binary(K), V, MHeaders) + end, + #{}, + Headers ++ maps:to_list(CustomHeaders) + ), + maps:to_list(MapHeaders). + +with_path_and_query_only(Url, Fun) -> + case string:split(Url, "//", leading) of + [_Scheme, UrlRem] -> + case string:split(UrlRem, "/", leading) of + [_HostPort, PathQuery] -> + Fun([$/ | PathQuery]); + _ -> + {error, {invalid_url, Url}} + end; + _ -> + {error, {invalid_url, Url}} + end. + +to_binary(Val) when is_list(Val) -> list_to_binary(Val); +to_binary(Val) when is_binary(Val) -> Val. + +string_headers(Hdrs) -> + [{string:to_lower(to_list_string(K)), to_list_string(V)} || {K, V} <- Hdrs]. + +to_list_string(Val) when is_binary(Val) -> + binary_to_list(Val); +to_list_string(Val) when is_list(Val) -> + Val. diff --git a/apps/emqx_s3/src/emqx_s3_profile_conf.erl b/apps/emqx_s3/src/emqx_s3_profile_conf.erl new file mode 100644 index 000000000..09e945edc --- /dev/null +++ b/apps/emqx_s3/src/emqx_s3_profile_conf.erl @@ -0,0 +1,390 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_s3_profile_conf). + +-behaviour(gen_server). + +-include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/types.hrl"). + +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-export([ + start_link/2, + child_spec/2 +]). + +-export([ + checkout_config/1, + checkout_config/2, + checkin_config/1, + checkin_config/2, + + update_config/2, + update_config/3 +]). + +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 +]). + +%% For test purposes +-export([ + client_config/2, + start_http_pool/2, + id/1 +]). + +-define(DEFAULT_CALL_TIMEOUT, 5000). + +-define(DEFAULT_HTTP_POOL_TIMEOUT, 60000). +-define(DEAFULT_HTTP_POOL_CLEANUP_INTERVAL, 60000). + +-spec child_spec(emqx_s3:profile_id(), emqx_s3:profile_config()) -> supervisor:child_spec(). +child_spec(ProfileId, ProfileConfig) -> + #{ + id => ProfileId, + start => {?MODULE, start_link, [ProfileId, ProfileConfig]}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [?MODULE] + }. + +-spec start_link(emqx_s3:profile_id(), emqx_s3:profile_config()) -> gen_server:start_ret(). +start_link(ProfileId, ProfileConfig) -> + gen_server:start_link(?MODULE, [ProfileId, ProfileConfig], []). + +-spec update_config(emqx_s3:profile_id(), emqx_s3:profile_config()) -> ok_or_error(term()). +update_config(ProfileId, ProfileConfig) -> + update_config(ProfileId, ProfileConfig, ?DEFAULT_CALL_TIMEOUT). + +-spec update_config(emqx_s3:profile_id(), emqx_s3:profile_config(), timeout()) -> + ok_or_error(term()). +update_config(ProfileId, ProfileConfig, Timeout) -> + case gproc:where({n, l, id(ProfileId)}) of + undefined -> + {error, profile_not_found}; + Pid -> + gen_server:call(Pid, {update_config, ProfileConfig}, Timeout) + end. + +-spec checkout_config(emqx_s3:profile_id()) -> + {ok, emqx_s3_client:config(), emqx_s3_uploader:config()} | {error, profile_not_found}. +checkout_config(ProfileId) -> + checkout_config(ProfileId, ?DEFAULT_CALL_TIMEOUT). + +-spec checkout_config(emqx_s3:profile_id(), timeout()) -> + {ok, emqx_s3_client:config(), emqx_s3_uploader:config()} | {error, profile_not_found}. +checkout_config(ProfileId, Timeout) -> + case gproc:where({n, l, id(ProfileId)}) of + undefined -> + {error, profile_not_found}; + Pid -> + gen_server:call(Pid, {checkout_config, self()}, Timeout) + end. + +-spec checkin_config(emqx_s3:profile_id()) -> ok | {error, profile_not_found}. +checkin_config(ProfileId) -> + checkin_config(ProfileId, ?DEFAULT_CALL_TIMEOUT). + +-spec checkin_config(emqx_s3:profile_id(), timeout()) -> ok | {error, profile_not_found}. +checkin_config(ProfileId, Timeout) -> + case gproc:where({n, l, id(ProfileId)}) of + undefined -> + {error, profile_not_found}; + Pid -> + gen_server:call(Pid, {checkin_config, self()}, Timeout) + end. + +%%-------------------------------------------------------------------- +%% gen_server callbacks +%%-------------------------------------------------------------------- + +init([ProfileId, ProfileConfig]) -> + _ = process_flag(trap_exit, true), + ok = cleanup_orphaned_pools(ProfileId), + case start_http_pool(ProfileId, ProfileConfig) of + {ok, PoolName} -> + true = gproc:reg({n, l, id(ProfileId)}, ignored), + HttpPoolCleanupInterval = http_pool_cleanup_interval(ProfileConfig), + {ok, #{ + profile_id => ProfileId, + profile_config => ProfileConfig, + client_config => client_config(ProfileConfig, PoolName), + uploader_config => uploader_config(ProfileConfig), + pool_name => PoolName, + pool_clients => emqx_s3_profile_http_pool_clients:create_table(), + %% We don't expose these options to users currently, but use in tests + http_pool_timeout => http_pool_timeout(ProfileConfig), + http_pool_cleanup_interval => HttpPoolCleanupInterval, + + outdated_pool_cleanup_tref => erlang:send_after( + HttpPoolCleanupInterval, self(), cleanup_outdated + ) + }}; + {error, Reason} -> + {stop, Reason} + end. + +handle_call( + {checkout_config, Pid}, + _From, + #{ + client_config := ClientConfig, + uploader_config := UploaderConfig + } = State +) -> + ok = register_client(Pid, State), + {reply, {ok, ClientConfig, UploaderConfig}, State}; +handle_call({checkin_config, Pid}, _From, State) -> + ok = unregister_client(Pid, State), + {reply, ok, State}; +handle_call( + {update_config, NewProfileConfig}, + _From, + #{profile_id := ProfileId} = State +) -> + case update_http_pool(ProfileId, NewProfileConfig, State) of + {ok, PoolName} -> + NewState = State#{ + profile_config => NewProfileConfig, + client_config => client_config(NewProfileConfig, PoolName), + uploader_config => uploader_config(NewProfileConfig), + http_pool_timeout => http_pool_timeout(NewProfileConfig), + http_pool_cleanup_interval => http_pool_cleanup_interval(NewProfileConfig), + pool_name => PoolName + }, + {reply, ok, NewState}; + {error, Reason} -> + {reply, {error, Reason}, State} + end; +handle_call(_Request, _From, State) -> + {reply, {error, not_implemented}, State}. + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info({'DOWN', _Ref, process, Pid, _Reason}, State) -> + ok = unregister_client(Pid, State), + {noreply, State}; +handle_info(cleanup_outdated, #{http_pool_cleanup_interval := HttpPoolCleanupInterval} = State0) -> + %% Maybe cleanup asynchoronously + ok = cleanup_outdated_pools(State0), + State1 = State0#{ + outdated_pool_cleanup_tref => erlang:send_after( + HttpPoolCleanupInterval, self(), cleanup_outdated + ) + }, + {noreply, State1}; +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, #{profile_id := ProfileId}) -> + lists:foreach( + fun(PoolName) -> + ok = stop_http_pool(ProfileId, PoolName) + end, + emqx_s3_profile_http_pools:all(ProfileId) + ). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +id(ProfileId) -> + {?MODULE, ProfileId}. + +client_config(ProfileConfig, PoolName) -> + HTTPOpts = maps:get(transport_options, ProfileConfig, #{}), + #{ + scheme => scheme(HTTPOpts), + host => maps:get(host, ProfileConfig), + port => maps:get(port, ProfileConfig), + headers => maps:get(headers, HTTPOpts, #{}), + acl => maps:get(acl, ProfileConfig), + bucket => maps:get(bucket, ProfileConfig), + access_key_id => maps:get(access_key_id, ProfileConfig, undefined), + secret_access_key => maps:get(secret_access_key, ProfileConfig, undefined), + request_timeout => maps:get(request_timeout, HTTPOpts, undefined), + http_pool => PoolName + }. + +uploader_config(#{max_part_size := MaxPartSize, min_part_size := MinPartSize} = _ProfileConfig) -> + #{ + min_part_size => MinPartSize, + max_part_size => MaxPartSize + }. + +scheme(#{ssl := #{enable := true}}) -> "https://"; +scheme(_TransportOpts) -> "http://". + +start_http_pool(ProfileId, ProfileConfig) -> + HttpConfig = http_config(ProfileConfig), + PoolName = pool_name(ProfileId), + case do_start_http_pool(PoolName, HttpConfig) of + ok -> + ok = emqx_s3_profile_http_pools:register(ProfileId, PoolName), + ok = ?tp(debug, "s3_start_http_pool", #{pool_name => PoolName, profile_id => ProfileId}), + {ok, PoolName}; + {error, _} = Error -> + Error + end. + +update_http_pool(ProfileId, ProfileConfig, #{pool_name := OldPoolName} = State) -> + HttpConfig = http_config(ProfileConfig), + OldHttpConfig = old_http_config(State), + case OldHttpConfig =:= HttpConfig of + true -> + {ok, OldPoolName}; + false -> + PoolName = pool_name(ProfileId), + case do_start_http_pool(PoolName, HttpConfig) of + ok -> + ok = set_old_pool_outdated(State), + ok = emqx_s3_profile_http_pools:register(ProfileId, PoolName), + {ok, PoolName}; + {error, _} = Error -> + Error + end + end. + +pool_name(ProfileId) -> + iolist_to_binary([ + <<"s3-http-">>, + ProfileId, + <<"-">>, + integer_to_binary(erlang:system_time(millisecond)), + <<"-">>, + integer_to_binary(erlang:unique_integer([positive])) + ]). + +old_http_config(#{profile_config := ProfileConfig}) -> http_config(ProfileConfig). + +set_old_pool_outdated(#{ + profile_id := ProfileId, pool_name := PoolName, http_pool_timeout := HttpPoolTimeout +}) -> + _ = emqx_s3_profile_http_pools:set_outdated(ProfileId, PoolName, HttpPoolTimeout), + ok. + +cleanup_orphaned_pools(ProfileId) -> + lists:foreach( + fun(PoolName) -> + ok = stop_http_pool(ProfileId, PoolName) + end, + emqx_s3_profile_http_pools:all(ProfileId) + ). + +register_client(Pid, #{profile_id := ProfileId, pool_clients := PoolClients, pool_name := PoolName}) -> + MRef = monitor(process, Pid), + ok = emqx_s3_profile_http_pool_clients:register(PoolClients, Pid, MRef, PoolName), + _ = emqx_s3_profile_http_pools:register_client(ProfileId, PoolName), + ok. + +unregister_client( + Pid, + #{ + profile_id := ProfileId, pool_clients := PoolClients, pool_name := PoolName + } +) -> + case emqx_s3_profile_http_pool_clients:unregister(PoolClients, Pid) of + undefined -> + ok; + {MRef, PoolName} -> + true = erlang:demonitor(MRef, [flush]), + _ = emqx_s3_profile_http_pools:unregister_client(ProfileId, PoolName), + ok; + {MRef, OutdatedPoolName} -> + true = erlang:demonitor(MRef, [flush]), + ClientNum = emqx_s3_profile_http_pools:unregister_client(ProfileId, OutdatedPoolName), + maybe_stop_outdated_pool(ProfileId, OutdatedPoolName, ClientNum) + end. + +maybe_stop_outdated_pool(ProfileId, OutdatedPoolName, 0) -> + ok = stop_http_pool(ProfileId, OutdatedPoolName); +maybe_stop_outdated_pool(_ProfileId, _OutdatedPoolName, _ClientNum) -> + ok. + +cleanup_outdated_pools(#{profile_id := ProfileId}) -> + lists:foreach( + fun(PoolName) -> + ok = stop_http_pool(ProfileId, PoolName) + end, + emqx_s3_profile_http_pools:outdated(ProfileId) + ). + +%%-------------------------------------------------------------------- +%% HTTP Pool implementation dependent functions +%%-------------------------------------------------------------------- + +http_config( + #{ + host := Host, + port := Port, + transport_options := #{ + pool_type := PoolType, + pool_size := PoolSize, + enable_pipelining := EnablePipelining, + connect_timeout := ConnectTimeout + } = HTTPOpts + } +) -> + {Transport, TransportOpts} = + case scheme(HTTPOpts) of + "http://" -> + {tcp, []}; + "https://" -> + SSLOpts = emqx_tls_lib:to_client_opts(maps:get(ssl, HTTPOpts)), + {tls, SSLOpts} + end, + NTransportOpts = emqx_misc:ipv6_probe(TransportOpts), + [ + {host, Host}, + {port, Port}, + {connect_timeout, ConnectTimeout}, + {keepalive, 30000}, + {pool_type, PoolType}, + {pool_size, PoolSize}, + {transport, Transport}, + {transport_opts, NTransportOpts}, + {enable_pipelining, EnablePipelining} + ]. + +http_pool_cleanup_interval(ProfileConfig) -> + maps:get( + http_pool_cleanup_interval, ProfileConfig, ?DEAFULT_HTTP_POOL_CLEANUP_INTERVAL + ). + +http_pool_timeout(ProfileConfig) -> + maps:get( + http_pool_timeout, ProfileConfig, ?DEFAULT_HTTP_POOL_TIMEOUT + ). + +stop_http_pool(ProfileId, PoolName) -> + case ehttpc_sup:stop_pool(PoolName) of + ok -> + ok; + {error, Reason} -> + ?SLOG(error, #{msg => "ehttpc_pool_stop_fail", pool_name => PoolName, reason => Reason}), + ok + end, + ok = emqx_s3_profile_http_pools:unregister(ProfileId, PoolName), + ok = ?tp(debug, "s3_stop_http_pool", #{pool_name => PoolName}). + +do_start_http_pool(PoolName, HttpConfig) -> + case ehttpc_sup:start_pool(PoolName, HttpConfig) of + {ok, _} -> + ok; + {error, _} = Error -> + Error + end. diff --git a/apps/emqx_s3/src/emqx_s3_profile_http_pool_clients.erl b/apps/emqx_s3/src/emqx_s3_profile_http_pool_clients.erl new file mode 100644 index 000000000..b4e640f7c --- /dev/null +++ b/apps/emqx_s3/src/emqx_s3_profile_http_pool_clients.erl @@ -0,0 +1,35 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_s3_profile_http_pool_clients). + +-export([ + create_table/0, + + register/4, + unregister/2 +]). + +-define(TAB, ?MODULE). + +-spec create_table() -> ok. +create_table() -> + ets:new(?TAB, [ + private, + set + ]). + +-spec register(ets:tid(), pid(), reference(), emqx_s3_profile_http_pools:pool_name()) -> true. +register(Tab, Pid, MRef, PoolName) -> + true = ets:insert(Tab, {Pid, {MRef, PoolName}}), + ok. + +-spec unregister(ets:tid(), pid()) -> emqx_s3_profile_http_pools:pool_name() | undefined. +unregister(Tab, Pid) -> + case ets:take(Tab, Pid) of + [{Pid, {MRef, PoolName}}] -> + {MRef, PoolName}; + [] -> + undefined + end. diff --git a/apps/emqx_s3/src/emqx_s3_profile_http_pools.erl b/apps/emqx_s3/src/emqx_s3_profile_http_pools.erl new file mode 100644 index 000000000..e1b36c3be --- /dev/null +++ b/apps/emqx_s3/src/emqx_s3_profile_http_pools.erl @@ -0,0 +1,123 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_s3_profile_http_pools). + +-include_lib("stdlib/include/ms_transform.hrl"). + +-export([ + create_table/0, + + register/2, + unregister/2, + + register_client/2, + unregister_client/2, + + set_outdated/3, + + outdated/1, + all/1 +]). + +-export_type([pool_name/0]). + +-define(TAB, ?MODULE). + +-type pool_name() :: ecpool:pool_name(). + +-type pool_key() :: {emqx_s3:profile_id(), pool_name()}. + +-record(pool, { + key :: pool_key(), + client_count = 0 :: integer(), + deadline = undefined :: undefined | integer(), + extra = #{} :: map() +}). + +-spec create_table() -> ok. +create_table() -> + _ = ets:new(?TAB, [ + named_table, + public, + ordered_set, + {keypos, #pool.key}, + {read_concurrency, true}, + {write_concurrency, true} + ]), + ok. + +-spec register(emqx_s3:profile_id(), pool_name()) -> + ok. +register(ProfileId, PoolName) -> + Key = key(ProfileId, PoolName), + true = ets:insert(?TAB, #pool{ + key = Key, + client_count = 0, + deadline = undefined, + extra = #{} + }), + ok. + +-spec unregister(emqx_s3:profile_id(), pool_name()) -> + ok. +unregister(ProfileId, PoolName) -> + Key = key(ProfileId, PoolName), + true = ets:delete(?TAB, Key), + ok. + +-spec register_client(emqx_s3:profile_id(), pool_name()) -> + integer(). +register_client(ProfileId, PoolName) -> + Key = key(ProfileId, PoolName), + ets:update_counter(?TAB, Key, {#pool.client_count, 1}). + +-spec unregister_client(emqx_s3:profile_id(), pool_name()) -> + integer(). +unregister_client(ProfileId, PoolName) -> + Key = key(ProfileId, PoolName), + try + ets:update_counter(?TAB, Key, {#pool.client_count, -1}) + catch + error:badarg -> + undefined + end. + +-spec set_outdated(emqx_s3:profile_id(), pool_name(), integer()) -> + ok. +set_outdated(ProfileId, PoolName, Timeout) -> + Key = key(ProfileId, PoolName), + Now = erlang:monotonic_time(millisecond), + ets:update_element(?TAB, Key, {#pool.deadline, Now + Timeout}). + +-spec outdated(emqx_s3:profile_id()) -> + [pool_name()]. +outdated(ProfileId) -> + Now = erlang:monotonic_time(millisecond), + MS = ets:fun2ms( + fun(#pool{key = {ProfileId_, PoolName}, deadline = Deadline_}) when + ProfileId_ =:= ProfileId andalso + Deadline_ =/= undefined andalso Deadline_ < Now + -> + PoolName + end + ), + ets:select(?TAB, MS). + +-spec all(emqx_s3:profile_id()) -> + [pool_name()]. +all(ProfileId) -> + MS = ets:fun2ms( + fun(#pool{key = {ProfileId_, PoolName}}) when ProfileId_ =:= ProfileId -> + PoolName + end + ), + ets:select(?TAB, MS). + +%%-------------------------------------------------------------------- +%% Helpers +%%-------------------------------------------------------------------- + +key(ProfileId, PoolName) -> + {ProfileId, PoolName}. diff --git a/apps/emqx_s3/src/emqx_s3_profile_sup.erl b/apps/emqx_s3/src/emqx_s3_profile_sup.erl new file mode 100644 index 000000000..c39fc9f4b --- /dev/null +++ b/apps/emqx_s3/src/emqx_s3_profile_sup.erl @@ -0,0 +1,48 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_s3_profile_sup). + +-behaviour(supervisor). + +-include_lib("emqx/include/types.hrl"). + +-export([ + start_link/2, + child_spec/2 +]). + +-export([init/1]). + +-spec start_link(emqx_s3:profile_id(), emqx_s3:profile_config()) -> supervisor:start_ret(). +start_link(ProfileId, ProfileConfig) -> + supervisor:start_link(?MODULE, [ProfileId, ProfileConfig]). + +-spec child_spec(emqx_s3:profile_id(), emqx_s3:profile_config()) -> supervisor:child_spec(). +child_spec(ProfileId, ProfileConfig) -> + #{ + id => ProfileId, + start => {?MODULE, start_link, [ProfileId, ProfileConfig]}, + restart => permanent, + shutdown => 5000, + type => supervisor, + modules => [?MODULE] + }. + +%%-------------------------------------------------------------------- +%% supervisor callbacks +%%------------------------------------------------------------------- + +init([ProfileId, ProfileConfig]) -> + SupFlags = #{ + strategy => one_for_one, + intensity => 10, + period => 5 + }, + ChildSpecs = [ + %% Order matters + emqx_s3_profile_conf:child_spec(ProfileId, ProfileConfig), + emqx_s3_profile_uploader_sup:child_spec(ProfileId) + ], + {ok, {SupFlags, ChildSpecs}}. diff --git a/apps/emqx_s3/src/emqx_s3_profile_uploader_sup.erl b/apps/emqx_s3/src/emqx_s3_profile_uploader_sup.erl new file mode 100644 index 000000000..1cd155a77 --- /dev/null +++ b/apps/emqx_s3/src/emqx_s3_profile_uploader_sup.erl @@ -0,0 +1,73 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_s3_profile_uploader_sup). + +-behaviour(supervisor). + +-include_lib("emqx/include/types.hrl"). + +-export([ + start_link/1, + child_spec/1, + id/1, + start_uploader/2 +]). + +-export([init/1]). + +-export_type([id/0]). + +-type id() :: {?MODULE, emqx_s3:profile_id()}. + +-spec start_link(emqx_s3:profile_id()) -> supervisor:start_ret(). +start_link(ProfileId) -> + supervisor:start_link(?MODULE, [ProfileId]). + +-spec child_spec(emqx_s3:profile_id()) -> supervisor:child_spec(). +child_spec(ProfileId) -> + #{ + id => id(ProfileId), + start => {?MODULE, start_link, [ProfileId]}, + restart => permanent, + shutdown => 5000, + type => supervisor, + modules => [?MODULE] + }. + +-spec id(emqx_s3:profile_id()) -> id(). +id(ProfileId) -> + {?MODULE, ProfileId}. + +-spec start_uploader(emqx_s3:profile_id(), emqx_s3_uploader:opts()) -> + supervisor:start_ret() | {error, profile_not_found}. +start_uploader(ProfileId, Opts) -> + Id = id(ProfileId), + case gproc:where({n, l, Id}) of + undefined -> {error, profile_not_found}; + Pid -> supervisor:start_child(Pid, [Opts]) + end. + +%%-------------------------------------------------------------------- +%% supervisor callbacks +%%------------------------------------------------------------------- + +init([ProfileId]) -> + true = gproc:reg({n, l, id(ProfileId)}, ignored), + SupFlags = #{ + strategy => simple_one_for_one, + intensity => 10, + period => 5 + }, + ChildSpecs = [ + #{ + id => emqx_s3_uploader, + start => {emqx_s3_uploader, start_link, [ProfileId]}, + restart => temporary, + shutdown => 5000, + type => worker, + modules => [emqx_s3_uploader] + } + ], + {ok, {SupFlags, ChildSpecs}}. diff --git a/apps/emqx_s3/src/emqx_s3_schema.erl b/apps/emqx_s3/src/emqx_s3_schema.erl new file mode 100644 index 000000000..ceb0d1dd4 --- /dev/null +++ b/apps/emqx_s3/src/emqx_s3_schema.erl @@ -0,0 +1,143 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_s3_schema). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). + +-import(hoconsc, [mk/2, ref/1]). + +-export([roots/0, fields/1, namespace/0, tags/0]). + +-export([translate/1]). + +roots() -> + [s3]. + +namespace() -> "s3". + +tags() -> + [<<"S3">>]. + +fields(s3) -> + [ + {access_key_id, + mk( + string(), + #{ + desc => ?DESC("access_key_id"), + required => false + } + )}, + {secret_access_key, + mk( + string(), + #{ + desc => ?DESC("secret_access_key"), + required => false + } + )}, + {bucket, + mk( + string(), + #{ + desc => ?DESC("bucket"), + required => true + } + )}, + {host, + mk( + string(), + #{ + desc => ?DESC("host"), + required => true + } + )}, + {port, + mk( + pos_integer(), + #{ + desc => ?DESC("port"), + required => true + } + )}, + {min_part_size, + mk( + emqx_schema:bytesize(), + #{ + default => "5mb", + desc => ?DESC("min_part_size"), + required => true, + validator => fun part_size_validator/1 + } + )}, + {max_part_size, + mk( + emqx_schema:bytesize(), + #{ + default => "5gb", + desc => ?DESC("max_part_size"), + required => true, + validator => fun part_size_validator/1 + } + )}, + {acl, + mk( + hoconsc:enum([ + private, + public_read, + public_read_write, + authenticated_read, + bucket_owner_read, + bucket_owner_full_control + ]), + #{ + default => private, + desc => ?DESC("acl"), + required => true + } + )}, + {transport_options, + mk( + ref(transport_options), + #{ + desc => ?DESC("transport_options"), + required => false + } + )} + ]; +fields(transport_options) -> + props_without( + [base_url, max_retries, retry_interval, request], emqx_connector_http:fields(config) + ) ++ + props_with( + [headers, max_retries, request_timeout], emqx_connector_http:fields("request") + ). + +translate(Conf) -> + Options = #{atom_key => true}, + #{s3 := TranslatedConf} = hocon_tconf:check_plain( + emqx_s3_schema, #{<<"s3">> => Conf}, Options, [s3] + ), + TranslatedConf. + +%%-------------------------------------------------------------------- +%% Helpers +%%-------------------------------------------------------------------- + +props_with(Keys, Proplist) -> + lists:filter(fun({K, _}) -> lists:member(K, Keys) end, Proplist). + +props_without(Keys, Proplist) -> + lists:filter(fun({K, _}) -> not lists:member(K, Keys) end, Proplist). + +part_size_validator(PartSizeLimit) -> + case + PartSizeLimit >= 5 * 1024 * 1024 andalso + PartSizeLimit =< 5 * 1024 * 1024 * 1024 + of + true -> ok; + false -> {error, "must be at least 5mb and less than 5gb"} + end. diff --git a/apps/emqx_s3/src/emqx_s3_sup.erl b/apps/emqx_s3/src/emqx_s3_sup.erl new file mode 100644 index 000000000..0f6b0160b --- /dev/null +++ b/apps/emqx_s3/src/emqx_s3_sup.erl @@ -0,0 +1,47 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_s3_sup). + +-behaviour(supervisor). + +-include_lib("emqx/include/types.hrl"). + +-export([ + start_link/0, + start_profile/2, + stop_profile/1 +]). + +-export([init/1]). + +-spec start_link() -> supervisor:start_ret(). +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +-spec start_profile(emqx_s3:profile_id(), emqx_s3:profile_config()) -> supervisor:startchild_ret(). +start_profile(ProfileId, ProfileConfig) -> + supervisor:start_child(?MODULE, emqx_s3_profile_sup:child_spec(ProfileId, ProfileConfig)). + +-spec stop_profile(emqx_s3:profile_id()) -> ok_or_error(term()). +stop_profile(ProfileId) -> + case supervisor:terminate_child(?MODULE, ProfileId) of + ok -> + supervisor:delete_child(?MODULE, ProfileId); + {error, Reason} -> + {error, Reason} + end. + +%%-------------------------------------------------------------------- +%% supervisor callbacks +%%------------------------------------------------------------------- + +init([]) -> + ok = emqx_s3_profile_http_pools:create_table(), + SupFlags = #{ + strategy => one_for_one, + intensity => 10, + period => 5 + }, + {ok, {SupFlags, []}}. diff --git a/apps/emqx_s3/src/emqx_s3_uploader.erl b/apps/emqx_s3/src/emqx_s3_uploader.erl new file mode 100644 index 000000000..4e3fe15f2 --- /dev/null +++ b/apps/emqx_s3/src/emqx_s3_uploader.erl @@ -0,0 +1,318 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_s3_uploader). + +-include_lib("emqx/include/types.hrl"). + +-behaviour(gen_statem). + +-export([ + start_link/2, + + write/2, + complete/1, + abort/1 +]). + +-export([ + init/1, + callback_mode/0, + handle_event/4, + terminate/3, + code_change/4, + format_status/1, + format_status/2 +]). + +-export_type([opts/0, config/0]). + +-type opts() :: #{ + name := string() +}. + +-type config() :: #{ + min_part_size := pos_integer() +}. + +-type data() :: #{ + profile_id := emqx_s3:profile_id(), + client := emqx_s3_client:client(), + key := emqx_s3_client:key(), + buffer := iodata(), + buffer_size := non_neg_integer(), + min_part_size := pos_integer(), + max_part_size := pos_integer(), + upload_id := undefined | emqx_s3_client:upload_id(), + etags := [emqx_s3_client:etag()], + part_number := emqx_s3_client:part_number() +}. + +%% 5MB +-define(DEFAULT_MIN_PART_SIZE, 5242880). +%% 5GB +-define(DEFAULT_MAX_PART_SIZE, 5368709120). + +-spec start_link(emqx_s3:profile_id(), opts()) -> gen_statem:start_ret(). +start_link(ProfileId, #{key := Key} = Opts) when is_list(Key) -> + gen_statem:start_link(?MODULE, [ProfileId, Opts], []). + +-spec write(pid(), binary()) -> ok_or_error(term()). +write(Pid, WriteData) when is_binary(WriteData) -> + write(Pid, WriteData, infinity). + +-spec write(pid(), binary(), timeout()) -> ok_or_error(term()). +write(Pid, WriteData, Timeout) when is_binary(WriteData) -> + gen_statem:call(Pid, {write, wrap(WriteData)}, Timeout). + +-spec complete(pid()) -> ok_or_error(term()). +complete(Pid) -> + complete(Pid, infinity). + +-spec complete(pid(), timeout()) -> ok_or_error(term()). +complete(Pid, Timeout) -> + gen_statem:call(Pid, complete, Timeout). + +-spec abort(pid()) -> ok_or_error(term()). +abort(Pid) -> + abort(Pid, infinity). + +-spec abort(pid(), timeout()) -> ok_or_error(term()). +abort(Pid, Timeout) -> + gen_statem:call(Pid, abort, Timeout). + +%%-------------------------------------------------------------------- +%% gen_statem callbacks +%%-------------------------------------------------------------------- + +callback_mode() -> handle_event_function. + +init([ProfileId, #{key := Key}]) -> + process_flag(trap_exit, true), + {ok, ClientConfig, UploaderConfig} = emqx_s3_profile_conf:checkout_config(ProfileId), + Client = emqx_s3_client:create(ClientConfig), + {ok, upload_not_started, #{ + profile_id => ProfileId, + client => Client, + key => Key, + buffer => [], + buffer_size => 0, + min_part_size => maps:get(min_part_size, UploaderConfig, ?DEFAULT_MIN_PART_SIZE), + max_part_size => maps:get(max_part_size, UploaderConfig, ?DEFAULT_MAX_PART_SIZE), + upload_id => undefined, + etags => [], + part_number => 1 + }}. + +handle_event({call, From}, {write, WriteDataWrapped}, State, Data0) -> + WriteData = unwrap(WriteDataWrapped), + case is_valid_part(WriteData, Data0) of + true -> + handle_write(State, From, WriteData, Data0); + false -> + {keep_state_and_data, {reply, From, {error, {too_large, byte_size(WriteData)}}}} + end; +handle_event({call, From}, complete, upload_not_started, Data0) -> + case put_object(Data0) of + ok -> + {stop_and_reply, normal, {reply, From, ok}}; + {error, _} = Error -> + {stop_and_reply, Error, {reply, From, Error}, Data0} + end; +handle_event({call, From}, complete, upload_started, Data0) -> + case complete_upload(Data0) of + {ok, Data1} -> + {stop_and_reply, normal, {reply, From, ok}, Data1}; + {error, _} = Error -> + {stop_and_reply, Error, {reply, From, Error}, Data0} + end; +handle_event({call, From}, abort, upload_not_started, _Data) -> + {stop_and_reply, normal, {reply, From, ok}}; +handle_event({call, From}, abort, upload_started, Data0) -> + case abort_upload(Data0) of + ok -> + {stop_and_reply, normal, {reply, From, ok}}; + {error, _} = Error -> + {stop_and_reply, Error, {reply, From, Error}, Data0} + end. + +handle_write(upload_not_started, From, WriteData, Data0) -> + Data1 = append_buffer(Data0, WriteData), + case maybe_start_upload(Data1) of + not_started -> + {keep_state, Data1, {reply, From, ok}}; + {started, Data2} -> + case upload_part(Data2) of + {ok, Data3} -> + {next_state, upload_started, Data3, {reply, From, ok}}; + {error, _} = Error -> + {stop_and_reply, Error, {reply, From, Error}, Data2} + end; + {error, _} = Error -> + {stop_and_reply, Error, {reply, From, Error}, Data1} + end; +handle_write(upload_started, From, WriteData, Data0) -> + Data1 = append_buffer(Data0, WriteData), + case maybe_upload_part(Data1) of + {ok, Data2} -> + {keep_state, Data2, {reply, From, ok}}; + {error, _} = Error -> + {stop_and_reply, Error, {reply, From, Error}, Data1} + end. + +terminate(Reason, _State, #{client := Client, upload_id := UploadId, key := Key}) when + (UploadId =/= undefined) andalso (Reason =/= normal) +-> + emqx_s3_client:abort_multipart(Client, Key, UploadId); +terminate(_Reason, _State, _Data) -> + ok. + +code_change(_OldVsn, StateName, State, _Extra) -> + {ok, StateName, State}. + +format_status(#{data := #{client := Client} = Data} = Status) -> + Status#{ + data => Data#{ + client => emqx_s3_client:format(Client), + buffer => [<<"...">>] + } + }. + +format_status(_Opt, [PDict, State, #{client := Client} = Data]) -> + #{ + data => Data#{ + client => emqx_s3_client:format(Client), + buffer => [<<"...">>] + }, + state => State, + pdict => PDict + }. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +-spec maybe_start_upload(data()) -> not_started | {started, data()} | {error, term()}. +maybe_start_upload(#{buffer_size := BufferSize, min_part_size := MinPartSize} = Data) -> + case BufferSize >= MinPartSize of + true -> + start_upload(Data); + false -> + not_started + end. + +-spec start_upload(data()) -> {started, data()} | {error, term()}. +start_upload(#{client := Client, key := Key} = Data) -> + case emqx_s3_client:start_multipart(Client, Key) of + {ok, UploadId} -> + NewData = Data#{upload_id => UploadId}, + {started, NewData}; + {error, _} = Error -> + Error + end. + +-spec maybe_upload_part(data()) -> ok_or_error(data(), term()). +maybe_upload_part(#{buffer_size := BufferSize, min_part_size := MinPartSize} = Data) -> + case BufferSize >= MinPartSize of + true -> + upload_part(Data); + false -> + % ct:print("buffer size: ~p, max part size: ~p, no upload", [BufferSize, MinPartSize]), + {ok, Data} + end. + +-spec upload_part(data()) -> ok_or_error(data(), term()). +upload_part(#{buffer_size := 0} = Data) -> + {ok, Data}; +upload_part( + #{ + client := Client, + key := Key, + upload_id := UploadId, + buffer := Buffer, + part_number := PartNumber, + etags := ETags + } = Data +) -> + case emqx_s3_client:upload_part(Client, Key, UploadId, PartNumber, lists:reverse(Buffer)) of + {ok, ETag} -> + % ct:print("upload part ~p, etag: ~p", [PartNumber, ETag]), + NewData = Data#{ + buffer => [], + buffer_size => 0, + part_number => PartNumber + 1, + etags => [{PartNumber, ETag} | ETags] + }, + {ok, NewData}; + {error, _} = Error -> + % ct:print("upload part ~p failed: ~p", [PartNumber, Error]), + Error + end. + +-spec complete_upload(data()) -> ok_or_error(term()). +complete_upload( + #{ + client := Client, + key := Key, + upload_id := UploadId + } = Data0 +) -> + case upload_part(Data0) of + {ok, #{etags := ETags} = Data1} -> + case emqx_s3_client:complete_multipart(Client, Key, UploadId, lists:reverse(ETags)) of + ok -> + {ok, Data1}; + {error, _} = Error -> + Error + end; + {error, _} = Error -> + Error + end. + +-spec abort_upload(data()) -> ok_or_error(term()). +abort_upload( + #{ + client := Client, + key := Key, + upload_id := UploadId + } +) -> + case emqx_s3_client:abort_multipart(Client, Key, UploadId) of + ok -> + ok; + {error, _} = Error -> + Error + end. + +-spec put_object(data()) -> ok_or_error(term()). +put_object( + #{ + client := Client, + key := Key, + buffer := Buffer + } +) -> + case emqx_s3_client:put_object(Client, Key, lists:reverse(Buffer)) of + ok -> + ok; + {error, _} = Error -> + Error + end. + +-spec append_buffer(data(), binary()) -> data(). +append_buffer(#{buffer := Buffer, buffer_size := BufferSize} = Data, WriteData) -> + Data#{ + buffer => [WriteData | Buffer], + buffer_size => BufferSize + byte_size(WriteData) + }. + +-compile({inline, [wrap/1, unwrap/1]}). +wrap(Data) -> + fun() -> Data end. + +unwrap(WrappedData) -> + WrappedData(). + +is_valid_part(WriteData, #{max_part_size := MaxPartSize, buffer_size := BufferSize}) -> + BufferSize + byte_size(WriteData) =< MaxPartSize. diff --git a/apps/emqx_s3/test/certs/ca.crt b/apps/emqx_s3/test/certs/ca.crt new file mode 100644 index 000000000..8a9dafccd --- /dev/null +++ b/apps/emqx_s3/test/certs/ca.crt @@ -0,0 +1,29 @@ +-----BEGIN CERTIFICATE----- +MIIE5DCCAswCCQCF3o0gIdaNDjANBgkqhkiG9w0BAQsFADA0MRIwEAYDVQQKDAlF +TVFYIFRlc3QxHjAcBgNVBAMMFUNlcnRpZmljYXRlIEF1dGhvcml0eTAeFw0yMTEy +MzAwODQxMTFaFw00OTA1MTcwODQxMTFaMDQxEjAQBgNVBAoMCUVNUVggVGVzdDEe +MBwGA1UEAwwVQ2VydGlmaWNhdGUgQXV0aG9yaXR5MIICIjANBgkqhkiG9w0BAQEF +AAOCAg8AMIICCgKCAgEAqmqSrxyH16j63QhqGLT1UO8I+m6BM3HfnJQM8laQdtJ0 +WgHqCh0/OphH3S7v4SfF4fNJDEJWMWuuzJzU9cTqHPLzhvo3+ZHcMIENgtY2p2Cf +7AQjEqFViEDyv2ZWNEe76BJeShntdY5NZr4gIPar99YGG/Ln8YekspleV+DU38rE +EX9WzhgBr02NN9z4NzIxeB+jdvPnxcXs3WpUxzfnUjOQf/T1tManvSdRbFmKMbxl +A8NLYK3oAYm8EbljWUINUNN6loqYhbigKv8bvo5S4xvRqmX86XB7sc0SApngtNcg +O0EKn8z/KVPDskE+8lMfGMiU2e2Tzw6Rph57mQPOPtIp5hPiKRik7ST9n0p6piXW +zRLplJEzSjf40I1u+VHmpXlWI/Fs8b1UkDSMiMVJf0LyWb4ziBSZOY2LtZzWHbWj +LbNgxQcwSS29tKgUwfEFmFcm+iOM59cPfkl2IgqVLh5h4zmKJJbfQKSaYb5fcKRf +50b1qsN40VbR3Pk/0lJ0/WqgF6kZCExmT1qzD5HJES/5grjjKA4zIxmHOVU86xOF +ouWvtilVR4PGkzmkFvwK5yRhBUoGH/A9BurhqOc0QCGay1kqHQFA6se4JJS+9KOS +x8Rn1Nm6Pi7sd6Le3cKmHTlyl5a/ofKqTCX2Qh+v/7y62V1V1wnoh3ipRjdPTnMC +AwEAATANBgkqhkiG9w0BAQsFAAOCAgEARCqaocvlMFUQjtFtepO2vyG1krn11xJ0 +e7md26i+g8SxCCYqQ9IqGmQBg0Im8fyNDKRN/LZoj5+A4U4XkG1yya91ZIrPpWyF +KUiRAItchNj3g1kHmI2ckl1N//6Kpx3DPaS7qXZaN3LTExf6Ph+StE1FnS0wVF+s +tsNIf6EaQ+ZewW3pjdlLeAws3jvWKUkROc408Ngvx74zbbKo/zAC4tz8oH9ZcpsT +WD8enVVEeUQKI6ItcpZ9HgTI9TFWgfZ1vYwvkoRwNIeabYI62JKmLEo2vGfGwWKr +c+GjnJ/tlVI2DpPljfWOnQ037/7yyJI/zo65+HPRmGRD6MuW/BdPDYOvOZUTcQKh +kANi5THSbJJgZcG3jb1NLebaUQ1H0zgVjn0g3KhUV+NJQYk8RQ7rHtB+MySqTKlM +kRkRjfTfR0Ykxpks7Mjvsb6NcZENf08ZFPd45+e/ptsxpiKu4e4W4bV7NZDvNKf9 +0/aD3oGYNMiP7s+KJ1lRSAjnBuG21Yk8FpzG+yr8wvJhV8aFgNQ5wIH86SuUTmN0 +5bVzFEIcUejIwvGoQEctNHBlOwHrb7zmB6OwyZeMapdXBQ+9UDhYg8ehDqdDOdfn +wsBcnjD2MwNhlE1hjL+tZWLNwSHiD6xx3LvNoXZu2HK8Cp3SOrkE69cFghYMIZZb +T+fp6tNL6LE= +-----END CERTIFICATE----- diff --git a/apps/emqx_s3/test/emqx_s3_SUITE.erl b/apps/emqx_s3/test/emqx_s3_SUITE.erl new file mode 100644 index 000000000..287dcb597 --- /dev/null +++ b/apps/emqx_s3/test/emqx_s3_SUITE.erl @@ -0,0 +1,66 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_s3_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + {ok, _} = application:ensure_all_started(emqx_s3), + Config. + +end_per_suite(_Config) -> + ok = application:stop(emqx_s3). + +%%-------------------------------------------------------------------- +%% Test cases +%%-------------------------------------------------------------------- + +t_start_stop_update(_Config) -> + ProfileId = <<"test">>, + ProfileConfig = profile_config(), + + ?assertMatch( + ok, + emqx_s3:start_profile(ProfileId, ProfileConfig) + ), + + ?assertMatch( + {error, _}, + emqx_s3:start_profile(ProfileId, ProfileConfig) + ), + + ?assertEqual( + ok, + emqx_s3:update_profile(ProfileId, ProfileConfig) + ), + + ?assertMatch( + {error, _}, + emqx_s3:update_profile(<<"unknown">>, ProfileConfig) + ), + + ?assertEqual( + ok, + emqx_s3:stop_profile(ProfileId) + ), + + ?assertMatch( + {error, _}, + emqx_s3:stop_profile(ProfileId) + ). + +%%-------------------------------------------------------------------- +%% Helpers +%%-------------------------------------------------------------------- + +profile_config() -> + emqx_s3_test_helpers:base_config(tcp). diff --git a/apps/emqx_s3/test/emqx_s3_client_SUITE.erl b/apps/emqx_s3/test/emqx_s3_client_SUITE.erl new file mode 100644 index 000000000..3d0d7bb18 --- /dev/null +++ b/apps/emqx_s3/test/emqx_s3_client_SUITE.erl @@ -0,0 +1,104 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_s3_client_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +-define(PROFILE_ID, atom_to_binary(?MODULE)). + +all() -> + [ + {group, tcp}, + {group, tls} + ]. + +groups() -> + AllCases = emqx_common_test_helpers:all(?MODULE), + [ + {tcp, [], AllCases}, + {tls, [], AllCases} + ]. + +init_per_suite(Config) -> + {ok, _} = application:ensure_all_started(emqx_s3), + Config. + +end_per_suite(_Config) -> + ok = application:stop(emqx_s3). + +init_per_group(ConnType, Config) -> + [{conn_type, ConnType} | Config]. +end_per_group(_ConnType, _Config) -> + ok. + +init_per_testcase(_TestCase, Config0) -> + ConnType = ?config(conn_type, Config0), + + Bucket = emqx_s3_test_helpers:unique_bucket(), + TestAwsConfig = emqx_s3_test_helpers:aws_config(ConnType), + ok = erlcloud_s3:create_bucket(Bucket, TestAwsConfig), + Config1 = [ + {key, emqx_s3_test_helpers:unique_key()}, + {bucket, Bucket} + | Config0 + ], + {ok, PoolName} = emqx_s3_profile_conf:start_http_pool(?PROFILE_ID, profile_config(Config1)), + [{ehttpc_pool_name, PoolName} | Config1]. + +end_per_testcase(_TestCase, Config) -> + ok = ehttpc_sup:stop_pool(?config(ehttpc_pool_name, Config)). + +%%-------------------------------------------------------------------- +%% Test cases +%%-------------------------------------------------------------------- + +t_multipart_upload(Config) -> + Key = ?config(key, Config), + + Client = client(Config), + + {ok, UploadId} = emqx_s3_client:start_multipart(Client, Key), + + Data = data(6_000_000), + + {ok, Etag1} = emqx_s3_client:upload_part(Client, Key, UploadId, 1, Data), + {ok, Etag2} = emqx_s3_client:upload_part(Client, Key, UploadId, 2, Data), + + ok = emqx_s3_client:complete_multipart( + Client, Key, UploadId, [{1, Etag1}, {2, Etag2}] + ). + +t_simple_put(Config) -> + Key = ?config(key, Config), + + Client = client(Config), + + Data = data(6_000_000), + + ok = emqx_s3_client:put_object(Client, Key, Data). + +%%-------------------------------------------------------------------- +%% Helpers +%%-------------------------------------------------------------------- + +client(Config) -> + ClientConfig = emqx_s3_profile_conf:client_config( + profile_config(Config), ?config(ehttpc_pool_name, Config) + ), + emqx_s3_client:create(ClientConfig). + +profile_config(Config) -> + maps:put( + bucket, + ?config(bucket, Config), + emqx_s3_test_helpers:base_config(?config(conn_type, Config)) + ). + +data(Size) -> + iolist_to_binary([$a || _ <- lists:seq(1, Size)]). diff --git a/apps/emqx_s3/test/emqx_s3_profile_conf_SUITE.erl b/apps/emqx_s3/test/emqx_s3_profile_conf_SUITE.erl new file mode 100644 index 000000000..ce53525be --- /dev/null +++ b/apps/emqx_s3/test/emqx_s3_profile_conf_SUITE.erl @@ -0,0 +1,293 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_s3_profile_conf_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-define(assertWaitEvent(Code, EventMatch, Timeout), + ?assertMatch( + {_, {ok, EventMatch}}, + ?wait_async_action( + Code, + EventMatch, + Timeout + ) + ) +). + +all() -> emqx_common_test_helpers:all(?MODULE). + +suite() -> [{timetrap, {minutes, 1}}]. + +init_per_suite(Config) -> + {ok, _} = application:ensure_all_started(emqx_s3), + Config. + +end_per_suite(_Config) -> + ok = application:stop(emqx_s3). + +init_per_testcase(_TestCase, Config) -> + ok = snabbkaffe:start_trace(), + TestAwsConfig = emqx_s3_test_helpers:aws_config(tcp), + + Bucket = emqx_s3_test_helpers:unique_bucket(), + ok = erlcloud_s3:create_bucket(Bucket, TestAwsConfig), + + ProfileBaseConfig = emqx_s3_test_helpers:base_config(tcp), + ProfileConfig = ProfileBaseConfig#{bucket => Bucket}, + ok = emqx_s3:start_profile(profile_id(), ProfileConfig), + + [{profile_config, ProfileConfig} | Config]. + +end_per_testcase(_TestCase, _Config) -> + ok = snabbkaffe:stop(), + _ = emqx_s3:stop_profile(profile_id()). + +%%-------------------------------------------------------------------- +%% Test cases +%%-------------------------------------------------------------------- + +t_regular_outdated_pool_cleanup(Config) -> + _ = process_flag(trap_exit, true), + Key = emqx_s3_test_helpers:unique_key(), + {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}), + + [OldPool] = emqx_s3_profile_http_pools:all(profile_id()), + + ProfileBaseConfig = ?config(profile_config, Config), + ProfileConfig = emqx_map_lib:deep_put( + [transport_options, pool_size], ProfileBaseConfig, 16 + ), + ok = emqx_s3:update_profile(profile_id(), ProfileConfig), + + ?assertEqual( + 2, + length(emqx_s3_profile_http_pools:all(profile_id())) + ), + + ?assertWaitEvent( + ok = emqx_s3_uploader:abort(Pid), + #{?snk_kind := "s3_stop_http_pool", pool_name := OldPool}, + 1000 + ), + + [NewPool] = emqx_s3_profile_http_pools:all(profile_id()), + + ?assertWaitEvent( + ok = emqx_s3:stop_profile(profile_id()), + #{?snk_kind := "s3_stop_http_pool", pool_name := NewPool}, + 1000 + ), + + ?assertEqual( + 0, + length(emqx_s3_profile_http_pools:all(profile_id())) + ). + +t_timeout_pool_cleanup(Config) -> + _ = process_flag(trap_exit, true), + + %% We restart the profile to set `http_pool_timeout` value suitable for test + ok = emqx_s3:stop_profile(profile_id()), + ProfileBaseConfig = ?config(profile_config, Config), + ProfileConfig = ProfileBaseConfig#{ + http_pool_timeout => 500, + http_pool_cleanup_interval => 100 + }, + ok = emqx_s3:start_profile(profile_id(), ProfileConfig), + + %% Start uploader + Key = emqx_s3_test_helpers:unique_key(), + {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}), + ok = emqx_s3_uploader:write(Pid, <<"data">>), + + [OldPool] = emqx_s3_profile_http_pools:all(profile_id()), + + NewProfileConfig = emqx_map_lib:deep_put( + [transport_options, pool_size], ProfileConfig, 16 + ), + + %% We update profile to create new pool and wait for the old one to be stopped by timeout + ?assertWaitEvent( + ok = emqx_s3:update_profile(profile_id(), NewProfileConfig), + #{?snk_kind := "s3_stop_http_pool", pool_name := OldPool}, + 1000 + ), + + %% The uploader now has no valid pool and should fail + ?assertMatch( + {error, _}, + emqx_s3_uploader:complete(Pid) + ). + +t_checkout_no_profile(_Config) -> + ?assertEqual( + {error, profile_not_found}, + emqx_s3_profile_conf:checkout_config(<<"no_such_profile">>) + ). + +t_httpc_pool_start_error(Config) -> + %% `ehhtpc_pool`s are lazy so it is difficult to trigger an error + %% passing some bad connection options. + %% So we emulate some unknown crash with `meck`. + meck:new(ehttpc_pool, [passthrough]), + meck:expect(ehttpc_pool, init, fun(_) -> meck:raise(error, badarg) end), + + ?assertMatch( + {error, _}, + emqx_s3:start_profile(<<"profile">>, ?config(profile_config, Config)) + ). + +t_httpc_pool_update_error(Config) -> + %% `ehhtpc_pool`s are lazy so it is difficult to trigger an error + %% passing some bad connection options. + %% So we emulate some unknown crash with `meck`. + meck:new(ehttpc_pool, [passthrough]), + meck:expect(ehttpc_pool, init, fun(_) -> meck:raise(error, badarg) end), + + ProfileBaseConfig = ?config(profile_config, Config), + NewProfileConfig = emqx_map_lib:deep_put( + [transport_options, pool_size], ProfileBaseConfig, 16 + ), + + ?assertMatch( + {error, _}, + emqx_s3:start_profile(<<"profile">>, NewProfileConfig) + ). + +t_orphaned_pools_cleanup(_Config) -> + ProfileId = profile_id(), + Pid = gproc:where({n, l, emqx_s3_profile_conf:id(ProfileId)}), + + %% We kill conf and wait for it to restart + %% and create a new pool + ?assertWaitEvent( + exit(Pid, kill), + #{?snk_kind := "s3_start_http_pool", profile_id := ProfileId}, + 1000 + ), + + %% We should still have only one pool + ?assertEqual( + 1, + length(emqx_s3_profile_http_pools:all(ProfileId)) + ). + +t_orphaned_pools_cleanup_non_graceful(_Config) -> + ProfileId = profile_id(), + Pid = gproc:where({n, l, emqx_s3_profile_conf:id(ProfileId)}), + + %% We stop pool, conf server should not fail when attempting to stop it once more + [PoolName] = emqx_s3_profile_http_pools:all(ProfileId), + ok = ehttpc_pool:stop_pool(PoolName), + + %% We kill conf and wait for it to restart + %% and create a new pool + ?assertWaitEvent( + exit(Pid, kill), + #{?snk_kind := "s3_start_http_pool", profile_id := ProfileId}, + 1000 + ), + + %% We should still have only one pool + ?assertEqual( + 1, + length(emqx_s3_profile_http_pools:all(ProfileId)) + ). + +t_checkout_client(Config) -> + ProfileId = profile_id(), + Key = emqx_s3_test_helpers:unique_key(), + Caller = self(), + Pid = spawn_link(fun() -> + emqx_s3:with_client( + ProfileId, + fun(Client) -> + receive + put_object -> + Caller ! {put_object, emqx_s3_client:put_object(Client, Key, <<"data">>)} + end, + receive + list_objects -> + Caller ! {list_objects, emqx_s3_client:list(Client, [])} + end + end + ), + Caller ! client_released, + receive + stop -> ok + end + end), + + %% Ask spawned process to put object + Pid ! put_object, + receive + {put_object, ok} -> ok + after 1000 -> + ct:fail("put_object fail") + end, + + %% Now change config for the profile + ProfileBaseConfig = ?config(profile_config, Config), + NewProfileConfig0 = ProfileBaseConfig#{bucket => <<"new_bucket">>}, + NewProfileConfig1 = emqx_map_lib:deep_put( + [transport_options, pool_size], NewProfileConfig0, 16 + ), + ok = emqx_s3:update_profile(profile_id(), NewProfileConfig1), + + %% We should have two pools now, because the old one is still in use + %% by the spawned process + ?assertEqual( + 2, + length(emqx_s3_profile_http_pools:all(ProfileId)) + ), + + %% Ask spawned process to list objects + Pid ! list_objects, + receive + {list_objects, Result} -> + {ok, OkResult} = Result, + Contents = proplists:get_value(contents, OkResult), + ?assertEqual(1, length(Contents)), + ?assertEqual(Key, proplists:get_value(key, hd(Contents))) + after 1000 -> + ct:fail("list_objects fail") + end, + + %% Wait till spawned process releases client + receive + client_released -> ok + after 1000 -> + ct:fail("client not released") + end, + + %% We should have only one pool now, because the old one is released + ?assertEqual( + 1, + length(emqx_s3_profile_http_pools:all(ProfileId)) + ). + +t_unknown_messages(_Config) -> + Pid = gproc:where({n, l, emqx_s3_profile_conf:id(profile_id())}), + + Pid ! unknown, + ok = gen_server:cast(Pid, unknown), + + ?assertEqual( + {error, not_implemented}, + gen_server:call(Pid, unknown) + ). + +%%-------------------------------------------------------------------- +%% Test helpers +%%-------------------------------------------------------------------- + +profile_id() -> + <<"test">>. diff --git a/apps/emqx_s3/test/emqx_s3_schema_SUITE.erl b/apps/emqx_s3/test/emqx_s3_schema_SUITE.erl new file mode 100644 index 000000000..bba1a5ba8 --- /dev/null +++ b/apps/emqx_s3/test/emqx_s3_schema_SUITE.erl @@ -0,0 +1,154 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_s3_schema_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +all() -> + emqx_common_test_helpers:all(?MODULE). + +%%-------------------------------------------------------------------- +%% Test cases +%%-------------------------------------------------------------------- + +t_minimal_config(_Config) -> + ?assertMatch( + #{ + bucket := "bucket", + host := "s3.us-east-1.endpoint.com", + port := 443, + acl := private, + min_part_size := 5242880, + transport_options := + #{ + connect_timeout := 15000, + enable_pipelining := 100, + pool_size := 8, + pool_type := random, + ssl := #{enable := false} + } + }, + emqx_s3_schema:translate(#{ + <<"bucket">> => <<"bucket">>, + <<"host">> => <<"s3.us-east-1.endpoint.com">>, + <<"port">> => 443 + }) + ). + +t_full_config(_Config) -> + ?assertMatch( + #{ + access_key_id := "access_key_id", + acl := public_read, + bucket := "bucket", + host := "s3.us-east-1.endpoint.com", + min_part_size := 10485760, + port := 443, + secret_access_key := "secret_access_key", + transport_options := + #{ + connect_timeout := 30000, + enable_pipelining := 200, + headers := #{<<"x-amz-acl">> := <<"public-read">>}, + max_retries := 3, + pool_size := 10, + pool_type := random, + request_timeout := 10000, + ssl := + #{ + cacertfile := <<"cacertfile.crt">>, + certfile := <<"server.crt">>, + ciphers := ["ECDHE-RSA-AES256-GCM-SHA384"], + depth := 10, + enable := true, + keyfile := <<"server.key">>, + reuse_sessions := true, + secure_renegotiate := true, + server_name_indication := "some-host", + verify := verify_peer, + versions := ['tlsv1.2'] + } + } + }, + emqx_s3_schema:translate(#{ + <<"access_key_id">> => <<"access_key_id">>, + <<"secret_access_key">> => <<"secret_access_key">>, + <<"bucket">> => <<"bucket">>, + <<"host">> => <<"s3.us-east-1.endpoint.com">>, + <<"port">> => 443, + <<"min_part_size">> => <<"10mb">>, + <<"acl">> => <<"public_read">>, + <<"transport_options">> => #{ + <<"connect_timeout">> => 30000, + <<"enable_pipelining">> => 200, + <<"pool_size">> => 10, + <<"pool_type">> => <<"random">>, + <<"ssl">> => #{ + <<"enable">> => true, + <<"keyfile">> => <<"server.key">>, + <<"certfile">> => <<"server.crt">>, + <<"cacertfile">> => <<"cacertfile.crt">>, + <<"server_name_indication">> => <<"some-host">>, + <<"verify">> => <<"verify_peer">>, + <<"versions">> => [<<"tlsv1.2">>], + <<"ciphers">> => [<<"ECDHE-RSA-AES256-GCM-SHA384">>] + }, + <<"request_timeout">> => <<"10s">>, + <<"max_retries">> => 3, + <<"headers">> => #{ + <<"x-amz-acl">> => <<"public-read">> + } + } + }) + ). + +t_invalid_limits(_Config) -> + ?assertException( + throw, + {emqx_s3_schema, [#{kind := validation_error, path := "s3.min_part_size"}]}, + emqx_s3_schema:translate(#{ + <<"bucket">> => <<"bucket">>, + <<"host">> => <<"s3.us-east-1.endpoint.com">>, + <<"port">> => 443, + <<"min_part_size">> => <<"1mb">> + }) + ), + + ?assertException( + throw, + {emqx_s3_schema, [#{kind := validation_error, path := "s3.min_part_size"}]}, + emqx_s3_schema:translate(#{ + <<"bucket">> => <<"bucket">>, + <<"host">> => <<"s3.us-east-1.endpoint.com">>, + <<"port">> => 443, + <<"min_part_size">> => <<"100000gb">> + }) + ), + + ?assertException( + throw, + {emqx_s3_schema, [#{kind := validation_error, path := "s3.max_part_size"}]}, + emqx_s3_schema:translate(#{ + <<"bucket">> => <<"bucket">>, + <<"host">> => <<"s3.us-east-1.endpoint.com">>, + <<"port">> => 443, + <<"max_part_size">> => <<"1mb">> + }) + ), + + ?assertException( + throw, + {emqx_s3_schema, [#{kind := validation_error, path := "s3.max_part_size"}]}, + emqx_s3_schema:translate(#{ + <<"bucket">> => <<"bucket">>, + <<"host">> => <<"s3.us-east-1.endpoint.com">>, + <<"port">> => 443, + <<"max_part_size">> => <<"100000gb">> + }) + ). diff --git a/apps/emqx_s3/test/emqx_s3_test_helpers.erl b/apps/emqx_s3/test/emqx_s3_test_helpers.erl new file mode 100644 index 000000000..c74e78a4d --- /dev/null +++ b/apps/emqx_s3/test/emqx_s3_test_helpers.erl @@ -0,0 +1,135 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_s3_test_helpers). + +-compile(nowarn_export_all). +-compile(export_all). + +-define(ACCESS_KEY_ID, "minioadmin"). +-define(SECRET_ACCESS_KEY, "minioadmin"). + +-define(TOXIPROXY_HOST, "toxiproxy"). +-define(TOXIPROXY_PORT, 8474). + +-define(TCP_HOST, ?TOXIPROXY_HOST). +-define(TCP_PORT, 19000). +-define(TLS_HOST, ?TOXIPROXY_HOST). +-define(TLS_PORT, 19100). + +-include_lib("erlcloud/include/erlcloud_aws.hrl"). + +-export([ + aws_config/1, + base_raw_config/1, + base_config/1, + + unique_key/0, + unique_bucket/0, + + with_failure/3 +]). + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- + +aws_config(tcp) -> + erlcloud_s3_new( + ?ACCESS_KEY_ID, + ?SECRET_ACCESS_KEY, + ?TCP_HOST, + ?TCP_PORT, + "http://" + ); +aws_config(tls) -> + erlcloud_s3_new( + ?ACCESS_KEY_ID, + ?SECRET_ACCESS_KEY, + ?TLS_HOST, + ?TLS_PORT, + "https://" + ). + +base_raw_config(tcp) -> + #{ + <<"bucket">> => <<"bucket">>, + <<"access_key_id">> => bin(?ACCESS_KEY_ID), + <<"secret_access_key">> => bin(?SECRET_ACCESS_KEY), + <<"host">> => ?TCP_HOST, + <<"port">> => ?TCP_PORT, + <<"max_part_size">> => 10 * 1024 * 1024, + <<"transport_options">> => + #{ + <<"request_timeout">> => 2000 + } + }; +base_raw_config(tls) -> + #{ + <<"bucket">> => <<"bucket">>, + <<"access_key_id">> => bin(?ACCESS_KEY_ID), + <<"secret_access_key">> => bin(?SECRET_ACCESS_KEY), + <<"host">> => ?TLS_HOST, + <<"port">> => ?TLS_PORT, + <<"max_part_size">> => 10 * 1024 * 1024, + <<"transport_options">> => + #{ + <<"request_timeout">> => 2000, + <<"ssl">> => #{ + <<"enable">> => true, + <<"cacertfile">> => bin(cert_path("ca.crt")), + <<"server_name_indication">> => <<"authn-server">>, + <<"verify">> => <<"verify_peer">> + } + } + }. + +base_config(ConnType) -> + emqx_s3_schema:translate(base_raw_config(ConnType)). + +unique_key() -> + "key-" ++ integer_to_list(erlang:system_time(millisecond)) ++ "-" ++ + integer_to_list(erlang:unique_integer([positive])). + +unique_bucket() -> + "bucket-" ++ integer_to_list(erlang:system_time(millisecond)) ++ "-" ++ + integer_to_list(erlang:unique_integer([positive])). + +with_failure(_ConnType, ehttpc_500, Fun) -> + try + meck:new(ehttpc, [passthrough, no_history]), + meck:expect(ehttpc, request, fun(_, _, _, _) -> {ok, 500, []} end), + Fun() + after + meck:unload(ehttpc) + end; +with_failure(ConnType, FailureType, Fun) -> + emqx_common_test_helpers:with_failure( + FailureType, + toxproxy_name(ConnType), + ?TOXIPROXY_HOST, + ?TOXIPROXY_PORT, + Fun + ). + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +toxproxy_name(tcp) -> "minio_tcp"; +toxproxy_name(tls) -> "minio_tls". + +cert_path(FileName) -> + Dir = code:lib_dir(emqx_s3, test), + filename:join([Dir, <<"certs">>, FileName]). + +bin(String) when is_list(String) -> list_to_binary(String); +bin(Binary) when is_binary(Binary) -> Binary. + +erlcloud_s3_new(AccessKeyId, SecretAccessKey, Host, Port, Scheme) -> + AwsConfig = erlcloud_s3:new(AccessKeyId, SecretAccessKey, Host, Port), + AwsConfig#aws_config{ + s3_scheme = Scheme, + s3_bucket_access_method = path + }. diff --git a/apps/emqx_s3/test/emqx_s3_uploader_SUITE.erl b/apps/emqx_s3/test/emqx_s3_uploader_SUITE.erl new file mode 100644 index 000000000..ef1d916c6 --- /dev/null +++ b/apps/emqx_s3/test/emqx_s3_uploader_SUITE.erl @@ -0,0 +1,535 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_s3_uploader_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +-define(assertProcessExited(Reason, Pid), + receive + {'DOWN', _, _, Pid, Reason} -> + % ct:print("uploader process exited with reason: ~p", [R]), + ok + after 3000 -> + ct:fail("uploader process did not exit") + end +). + +-define(assertObjectEqual(Value, AwsConfig, Bucket, Key), + ?assertEqual( + Value, + proplists:get_value( + content, + erlcloud_s3:get_object( + Bucket, + Key, + AwsConfig + ) + ) + ) +). + +all() -> + [ + {group, tcp}, + {group, tls} + ]. + +groups() -> + [ + {tcp, [ + {group, common_cases}, + {group, tcp_cases} + ]}, + {tls, [ + {group, common_cases}, + {group, tls_cases} + ]}, + {common_cases, [], [ + t_happy_path_simple_put, + t_happy_path_multi, + t_abort_multi, + t_abort_simple_put, + + {group, noconn_errors}, + {group, timeout_errors}, + {group, http_errors} + ]}, + + {tcp_cases, [ + t_config_switch, + t_config_switch_http_settings, + t_too_large, + t_no_profile + ]}, + + {tls_cases, [ + t_tls_error + ]}, + + {noconn_errors, [{group, transport_errors}]}, + {timeout_errors, [{group, transport_errors}]}, + {http_errors, [{group, transport_errors}]}, + + {transport_errors, [ + t_start_multipart_error, + t_upload_part_error, + t_complete_multipart_error, + t_abort_multipart_error, + t_put_object_error + ]} + ]. + +suite() -> [{timetrap, {minutes, 1}}]. + +init_per_suite(Config) -> + {ok, _} = application:ensure_all_started(emqx_s3), + Config. + +end_per_suite(_Config) -> + ok = application:stop(emqx_s3). + +init_per_group(Group, Config) when Group =:= tcp orelse Group =:= tls -> + [{conn_type, Group} | Config]; +init_per_group(noconn_errors, Config) -> + [{failure, down} | Config]; +init_per_group(timeout_errors, Config) -> + [{failure, timeout} | Config]; +init_per_group(http_errors, Config) -> + [{failure, ehttpc_500} | Config]; +init_per_group(_ConnType, Config) -> + Config. + +end_per_group(_ConnType, _Config) -> + ok. + +init_per_testcase(_TestCase, Config) -> + ok = snabbkaffe:start_trace(), + ConnType = ?config(conn_type, Config), + TestAwsConfig = emqx_s3_test_helpers:aws_config(ConnType), + + Bucket = emqx_s3_test_helpers:unique_bucket(), + ok = erlcloud_s3:create_bucket(Bucket, TestAwsConfig), + + ProfileBaseConfig = emqx_s3_test_helpers:base_config(ConnType), + ProfileConfig = ProfileBaseConfig#{bucket => Bucket}, + ok = emqx_s3:start_profile(profile_id(), ProfileConfig), + + [{bucket, Bucket}, {test_aws_config, TestAwsConfig}, {profile_config, ProfileConfig} | Config]. + +end_per_testcase(_TestCase, _Config) -> + ok = snabbkaffe:stop(), + _ = emqx_s3:stop_profile(profile_id()). + +%%-------------------------------------------------------------------- +%% Test cases +%%-------------------------------------------------------------------- + +t_happy_path_simple_put(Config) -> + Key = emqx_s3_test_helpers:unique_key(), + {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}), + + _ = erlang:monitor(process, Pid), + + Data = data($a, 1024, 10), + + lists:foreach( + fun(Chunk) -> + ?assertEqual( + ok, + emqx_s3_uploader:write(Pid, Chunk) + ) + end, + Data + ), + + ok = emqx_s3_uploader:complete(Pid), + + ?assertProcessExited( + normal, + Pid + ), + + ?assertObjectEqual( + iolist_to_binary(Data), + ?config(test_aws_config, Config), + ?config(bucket, Config), + Key + ). + +t_happy_path_multi(Config) -> + Key = emqx_s3_test_helpers:unique_key(), + {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}), + + _ = erlang:monitor(process, Pid), + + Data = data($a, 1024 * 1024, 10), + + lists:foreach( + fun(Chunk) -> + ?assertEqual( + ok, + emqx_s3_uploader:write(Pid, Chunk) + ) + end, + Data + ), + + ok = emqx_s3_uploader:complete(Pid), + + ?assertProcessExited( + normal, + Pid + ), + + ?assertObjectEqual( + iolist_to_binary(Data), + ?config(test_aws_config, Config), + ?config(bucket, Config), + Key + ). + +t_abort_multi(Config) -> + Key = emqx_s3_test_helpers:unique_key(), + {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}), + + _ = erlang:monitor(process, Pid), + + [Data] = data($a, 6 * 1024 * 1024, 1), + + ok = emqx_s3_uploader:write(Pid, Data), + + ?assertMatch( + [], + list_objects(Config) + ), + + ok = emqx_s3_uploader:abort(Pid), + + ?assertMatch( + [], + list_objects(Config) + ), + + ?assertProcessExited( + normal, + Pid + ). + +t_abort_simple_put(_Config) -> + Key = emqx_s3_test_helpers:unique_key(), + {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}), + + _ = erlang:monitor(process, Pid), + + [Data] = data($a, 10 * 1024, 1), + + ok = emqx_s3_uploader:write(Pid, Data), + + ok = emqx_s3_uploader:abort(Pid), + + ?assertProcessExited( + normal, + Pid + ). + +t_config_switch(Config) -> + Key = emqx_s3_test_helpers:unique_key(), + OldBucket = ?config(bucket, Config), + {ok, Pid0} = emqx_s3:start_uploader(profile_id(), #{key => Key}), + + [Data0, Data1] = data($a, 6 * 1024 * 1024, 2), + + ok = emqx_s3_uploader:write(Pid0, Data0), + + %% Switch to the new config, but without changing HTTP settings + ProfileConfig = ?config(profile_config, Config), + NewBucket = emqx_s3_test_helpers:unique_bucket(), + ok = erlcloud_s3:create_bucket(NewBucket, ?config(test_aws_config, Config)), + NewProfileConfig = ProfileConfig#{bucket => NewBucket}, + + ok = emqx_s3:update_profile(profile_id(), NewProfileConfig), + + %% Already started uploader should be OK and use previous config + ok = emqx_s3_uploader:write(Pid0, Data1), + ok = emqx_s3_uploader:complete(Pid0), + + ?assertObjectEqual( + iolist_to_binary([Data0, Data1]), + ?config(test_aws_config, Config), + OldBucket, + Key + ), + + %% Now check that new uploader uses new config + {ok, Pid1} = emqx_s3:start_uploader(profile_id(), #{key => Key}), + ok = emqx_s3_uploader:write(Pid1, Data0), + ok = emqx_s3_uploader:complete(Pid1), + + ?assertObjectEqual( + iolist_to_binary(Data0), + ?config(test_aws_config, Config), + NewBucket, + Key + ). + +t_config_switch_http_settings(Config) -> + Key = emqx_s3_test_helpers:unique_key(), + OldBucket = ?config(bucket, Config), + {ok, Pid0} = emqx_s3:start_uploader(profile_id(), #{key => Key}), + + [Data0, Data1] = data($a, 6 * 1024 * 1024, 2), + + ok = emqx_s3_uploader:write(Pid0, Data0), + + %% Switch to the new config, completely changing HTTP settings (tcp -> tls) + NewBucket = emqx_s3_test_helpers:unique_bucket(), + NewTestAwsConfig = emqx_s3_test_helpers:aws_config(tls), + ok = erlcloud_s3:create_bucket(NewBucket, NewTestAwsConfig), + NewProfileConfig0 = emqx_s3_test_helpers:base_config(tls), + NewProfileConfig1 = NewProfileConfig0#{bucket => NewBucket}, + + ok = emqx_s3:update_profile(profile_id(), NewProfileConfig1), + + %% Already started uploader should be OK and use previous config + ok = emqx_s3_uploader:write(Pid0, Data1), + ok = emqx_s3_uploader:complete(Pid0), + + ?assertObjectEqual( + iolist_to_binary([Data0, Data1]), + ?config(test_aws_config, Config), + OldBucket, + Key + ), + + %% Now check that new uploader uses new config + {ok, Pid1} = emqx_s3:start_uploader(profile_id(), #{key => Key}), + ok = emqx_s3_uploader:write(Pid1, Data0), + ok = emqx_s3_uploader:complete(Pid1), + + ?assertObjectEqual( + iolist_to_binary(Data0), + NewTestAwsConfig, + NewBucket, + Key + ). + +t_start_multipart_error(Config) -> + _ = process_flag(trap_exit, true), + + Key = emqx_s3_test_helpers:unique_key(), + {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}), + + _ = erlang:monitor(process, Pid), + + [Data] = data($a, 6 * 1024 * 1024, 1), + + emqx_s3_test_helpers:with_failure( + ?config(conn_type, Config), + ?config(failure, Config), + fun() -> + ?assertMatch( + {error, _}, + emqx_s3_uploader:write(Pid, Data) + ) + end + ), + + ?assertProcessExited( + {error, _}, + Pid + ). + +t_upload_part_error(Config) -> + _ = process_flag(trap_exit, true), + + Key = emqx_s3_test_helpers:unique_key(), + {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}), + + _ = erlang:monitor(process, Pid), + + [Data0, Data1] = data($a, 6 * 1024 * 1024, 2), + + ok = emqx_s3_uploader:write(Pid, Data0), + + emqx_s3_test_helpers:with_failure( + ?config(conn_type, Config), + ?config(failure, Config), + fun() -> + ?assertMatch( + {error, _}, + emqx_s3_uploader:write(Pid, Data1) + ) + end + ), + + ?assertProcessExited( + {error, _}, + Pid + ). + +t_abort_multipart_error(Config) -> + _ = process_flag(trap_exit, true), + + Key = emqx_s3_test_helpers:unique_key(), + {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}), + + _ = erlang:monitor(process, Pid), + + [Data] = data($a, 6 * 1024 * 1024, 1), + + ok = emqx_s3_uploader:write(Pid, Data), + + emqx_s3_test_helpers:with_failure( + ?config(conn_type, Config), + ?config(failure, Config), + fun() -> + ?assertMatch( + {error, _}, + emqx_s3_uploader:abort(Pid) + ) + end + ), + + ?assertProcessExited( + {error, _}, + Pid + ). + +t_complete_multipart_error(Config) -> + _ = process_flag(trap_exit, true), + + Key = emqx_s3_test_helpers:unique_key(), + {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}), + + _ = erlang:monitor(process, Pid), + + [Data] = data($a, 6 * 1024 * 1024, 1), + + ok = emqx_s3_uploader:write(Pid, Data), + + emqx_s3_test_helpers:with_failure( + ?config(conn_type, Config), + ?config(failure, Config), + fun() -> + ?assertMatch( + {error, _}, + emqx_s3_uploader:complete(Pid) + ) + end + ), + + ?assertProcessExited( + {error, _}, + Pid + ). + +t_put_object_error(Config) -> + _ = process_flag(trap_exit, true), + + Key = emqx_s3_test_helpers:unique_key(), + {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}), + + _ = erlang:monitor(process, Pid), + + %% Little data to avoid multipart upload + [Data] = data($a, 1024, 1), + + emqx_s3_test_helpers:with_failure( + ?config(conn_type, Config), + ?config(failure, Config), + fun() -> + ok = emqx_s3_uploader:write(Pid, Data), + ?assertMatch( + {error, _}, + emqx_s3_uploader:complete(Pid) + ) + end + ), + + ?assertProcessExited( + {error, _}, + Pid + ). + +t_too_large(Config) -> + Key = emqx_s3_test_helpers:unique_key(), + {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}), + + _ = erlang:monitor(process, Pid), + + [Data] = data($a, 1024, 1), + + [DataLarge] = data($a, 20 * 1024 * 1024, 1), + + ?assertMatch( + {error, {too_large, _}}, + emqx_s3_uploader:write(Pid, DataLarge) + ), + + ok = emqx_s3_uploader:write(Pid, Data), + ok = emqx_s3_uploader:complete(Pid), + + ?assertProcessExited( + normal, + Pid + ), + + ?assertObjectEqual( + iolist_to_binary(Data), + ?config(test_aws_config, Config), + ?config(bucket, Config), + Key + ). + +t_tls_error(Config) -> + _ = process_flag(trap_exit, true), + + ProfileBaseConfig = ?config(profile_config, Config), + ProfileConfig = emqx_map_lib:deep_put( + [transport_options, ssl, server_name_indication], ProfileBaseConfig, "invalid-hostname" + ), + ok = emqx_s3:update_profile(profile_id(), ProfileConfig), + Key = emqx_s3_test_helpers:unique_key(), + {ok, Pid} = emqx_s3:start_uploader(profile_id(), #{key => Key}), + + _ = erlang:monitor(process, Pid), + + [Data] = data($a, 6 * 1024 * 1024, 1), + + ?assertMatch( + {error, _}, + emqx_s3_uploader:write(Pid, Data) + ), + + ?assertProcessExited( + {error, _}, + Pid + ). + +t_no_profile(_Config) -> + Key = emqx_s3_test_helpers:unique_key(), + ?assertMatch( + {error, profile_not_found}, + emqx_s3:start_uploader(<<"no-profile">>, #{key => Key}) + ). + +%%-------------------------------------------------------------------- +%% Test helpers +%%-------------------------------------------------------------------- + +profile_id() -> + <<"test">>. + +data(Byte, ChunkSize, ChunkCount) -> + Chunk = iolist_to_binary([Byte || _ <- lists:seq(1, ChunkSize)]), + [Chunk || _ <- lists:seq(1, ChunkCount)]. + +list_objects(Config) -> + Props = erlcloud_s3:list_objects(?config(bucket, Config), [], ?config(test_aws_config, Config)), + proplists:get_value(contents, Props). diff --git a/rebar.config.erl b/rebar.config.erl index ea0016ca9..51a6946dc 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -401,7 +401,8 @@ relx_apps(ReleaseType, Edition) -> emqx_psk, emqx_slow_subs, emqx_plugins, - emqx_ft + emqx_ft, + emqx_s3 ] ++ [quicer || is_quicer_supported()] ++ [bcrypt || provide_bcrypt_release(ReleaseType)] ++ diff --git a/scripts/ct/run.sh b/scripts/ct/run.sh index 82823720d..9644ec8b9 100755 --- a/scripts/ct/run.sh +++ b/scripts/ct/run.sh @@ -91,6 +91,12 @@ if [ "${WHICH_APP}" = 'novalue' ]; then exit 1 fi +if [ ! -d "${WHICH_APP}" ]; then + echo "must provide an existing path for --app arg" + help + exit 1 +fi + if [[ "${WHICH_APP}" == lib-ee* && (-z "${PROFILE+x}" || "${PROFILE}" != emqx-enterprise) ]]; then echo 'You are trying to run an enterprise test case without the emqx-enterprise profile.' echo 'This will most likely not work.' @@ -172,10 +178,14 @@ for dep in ${CT_DEPS}; do ;; rocketmq) FILES+=( '.ci/docker-compose-file/docker-compose-rocketmq.yaml' ) - ;; + ;; cassandra) FILES+=( '.ci/docker-compose-file/docker-compose-cassandra.yaml' ) ;; + minio) + FILES+=( '.ci/docker-compose-file/docker-compose-minio-tcp.yaml' + '.ci/docker-compose-file/docker-compose-minio-tls.yaml' ) + ;; *) echo "unknown_ct_dependency $dep" exit 1