diff --git a/.ci/docker-compose-file/.env b/.ci/docker-compose-file/.env index 956750e00..9675937d8 100644 --- a/.ci/docker-compose-file/.env +++ b/.ci/docker-compose-file/.env @@ -7,5 +7,6 @@ INFLUXDB_TAG=2.5.0 TDENGINE_TAG=3.0.2.4 DYNAMO_TAG=1.21.0 CASSANDRA_TAG=3.11.6 +MINIO_TAG=RELEASE.2023-03-20T20-16-18Z TARGET=emqx/emqx diff --git a/.ci/docker-compose-file/docker-compose-minio-tcp.yaml b/.ci/docker-compose-file/docker-compose-minio-tcp.yaml index 93e1c4ead..fa78e4426 100644 --- a/.ci/docker-compose-file/docker-compose-minio-tcp.yaml +++ b/.ci/docker-compose-file/docker-compose-minio-tcp.yaml @@ -3,7 +3,7 @@ version: '3.7' services: minio: hostname: minio - image: quay.io/minio/minio:RELEASE.2023-03-20T20-16-18Z + image: quay.io/minio/minio:${MINIO_TAG} command: server --address ":9000" --console-address ":9001" /minio-data expose: - "9000" diff --git a/.ci/docker-compose-file/docker-compose-minio-tls.yaml b/.ci/docker-compose-file/docker-compose-minio-tls.yaml index 2e7a6bea5..4999cccb5 100644 --- a/.ci/docker-compose-file/docker-compose-minio-tls.yaml +++ b/.ci/docker-compose-file/docker-compose-minio-tls.yaml @@ -3,7 +3,7 @@ version: '3.7' services: minio_tls: hostname: minio-tls - image: quay.io/minio/minio:RELEASE.2023-03-20T20-16-18Z + image: quay.io/minio/minio:${MINIO_TAG} command: server --certs-dir /etc/certs --address ":9100" --console-address ":9101" /minio-data volumes: - ./certs/server.crt:/etc/certs/public.crt diff --git a/.github/workflows/run_test_cases.yaml b/.github/workflows/run_test_cases.yaml index 8702cd849..3360bec6c 100644 --- a/.github/workflows/run_test_cases.yaml +++ b/.github/workflows/run_test_cases.yaml @@ -168,6 +168,7 @@ jobs: REDIS_TAG: "7.0" INFLUXDB_TAG: "2.5.0" TDENGINE_TAG: "3.0.2.4" + MINIO_TAG: "RELEASE.2023-03-20T20-16-18Z" PROFILE: ${{ matrix.profile }} CT_COVER_EXPORT_PREFIX: ${{ matrix.profile }}-${{ matrix.otp }} run: ./scripts/ct/run.sh --ci --app ${{ matrix.app }} diff --git a/apps/emqx_ft/src/emqx_ft_conf.erl b/apps/emqx_ft/src/emqx_ft_conf.erl index 6ce10408e..0e8bcc193 100644 --- a/apps/emqx_ft/src/emqx_ft_conf.erl +++ b/apps/emqx_ft/src/emqx_ft_conf.erl @@ -20,6 +20,8 @@ -behaviour(emqx_config_handler). +-include_lib("emqx/include/logger.hrl"). + %% Accessors -export([storage/0]). -export([gc_interval/1]). diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl index 3258a5cd1..24c3c52e5 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl @@ -49,7 +49,7 @@ transfer := transfer() }. --define(S3_PROFILE_ID, <<"emqx_ft_storage_exporter_s3">>). +-define(S3_PROFILE_ID, ?MODULE). -define(FILEMETA_VSN, <<"1">>). -define(S3_LIST_LIMIT, 500). @@ -66,6 +66,7 @@ start_export(_Options, Transfer, Filemeta) -> }, case emqx_s3:start_uploader(?S3_PROFILE_ID, Options) of {ok, Pid} -> + true = erlang:link(Pid), {ok, #{filemeta => Filemeta, pid => Pid}}; {error, _Reason} = Error -> Error @@ -151,7 +152,7 @@ list_key_info(Client, Options, Marker, Acc) -> ListOptions = [{max_keys, ?S3_LIST_LIMIT}] ++ Marker, case emqx_s3_client:list(Client, ListOptions) of {ok, Result} -> - ?SLOG(warning, #{msg => "list_key_info", result => Result}), + ?SLOG(debug, #{msg => "list_key_info", result => Result}), KeyInfos = proplists:get_value(contents, Result, []), case proplists:get_value(is_truncated, Result, false) of true -> diff --git a/apps/emqx_s3/src/emqx_s3.erl b/apps/emqx_s3/src/emqx_s3.erl index 8798b608d..15f63fbd5 100644 --- a/apps/emqx_s3/src/emqx_s3.erl +++ b/apps/emqx_s3/src/emqx_s3.erl @@ -16,10 +16,11 @@ -export_type([ profile_id/0, - profile_config/0 + profile_config/0, + acl/0 ]). --type profile_id() :: term(). +-type profile_id() :: atom() | binary(). -type acl() :: private @@ -30,9 +31,9 @@ | bucket_owner_full_control. -type transport_options() :: #{ + headers => map(), connect_timeout => pos_integer(), enable_pipelining => pos_integer(), - headers => map(), max_retries => pos_integer(), pool_size => pos_integer(), pool_type => atom(), @@ -51,12 +52,14 @@ transport_options => transport_options() }. +-define(IS_PROFILE_ID(ProfileId), (is_atom(ProfileId) orelse is_binary(ProfileId))). + %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- -spec start_profile(profile_id(), profile_config()) -> ok_or_error(term()). -start_profile(ProfileId, ProfileConfig) -> +start_profile(ProfileId, ProfileConfig) when ?IS_PROFILE_ID(ProfileId) -> case emqx_s3_sup:start_profile(ProfileId, ProfileConfig) of {ok, _} -> ok; @@ -65,21 +68,21 @@ start_profile(ProfileId, ProfileConfig) -> end. -spec stop_profile(profile_id()) -> ok_or_error(term()). -stop_profile(ProfileId) -> +stop_profile(ProfileId) when ?IS_PROFILE_ID(ProfileId) -> emqx_s3_sup:stop_profile(ProfileId). -spec update_profile(profile_id(), profile_config()) -> ok_or_error(term()). -update_profile(ProfileId, ProfileConfig) -> +update_profile(ProfileId, ProfileConfig) when ?IS_PROFILE_ID(ProfileId) -> emqx_s3_profile_conf:update_config(ProfileId, ProfileConfig). -spec start_uploader(profile_id(), emqx_s3_uploader:opts()) -> supervisor:start_ret() | {error, profile_not_found}. -start_uploader(ProfileId, Opts) -> +start_uploader(ProfileId, Opts) when ?IS_PROFILE_ID(ProfileId) -> emqx_s3_profile_uploader_sup:start_uploader(ProfileId, Opts). -spec with_client(profile_id(), fun((emqx_s3_client:client()) -> Result)) -> {error, profile_not_found} | Result. -with_client(ProfileId, Fun) when is_function(Fun, 1) -> +with_client(ProfileId, Fun) when is_function(Fun, 1) andalso ?IS_PROFILE_ID(ProfileId) -> case emqx_s3_profile_conf:checkout_config(ProfileId) of {ok, ClientConfig, _UploadConfig} -> try diff --git a/apps/emqx_s3/src/emqx_s3.hrl b/apps/emqx_s3/src/emqx_s3.hrl new file mode 100644 index 000000000..9a2cf8d2f --- /dev/null +++ b/apps/emqx_s3/src/emqx_s3.hrl @@ -0,0 +1,13 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-define(VIA_GPROC(Id), {via, gproc, {n, l, Id}}). + +-define(SAFE_CALL_VIA_GPROC(Id, Message, Timeout, NoProcError), + try gen_server:call(?VIA_GPROC(Id), Message, Timeout) of + Result -> Result + catch + exit:{noproc, _} -> {error, NoProcError} + end +). diff --git a/apps/emqx_s3/src/emqx_s3_client.erl b/apps/emqx_s3/src/emqx_s3_client.erl index 8639e0a25..c84f32dc3 100644 --- a/apps/emqx_s3/src/emqx_s3_client.erl +++ b/apps/emqx_s3/src/emqx_s3_client.erl @@ -31,22 +31,14 @@ headers/0 ]). --type s3_bucket_acl() :: - private - | public_read - | public_read_write - | authenticated_read - | bucket_owner_read - | bucket_owner_full_control. - --type headers() :: #{binary() | string() => binary() | string()}. +-type headers() :: #{binary() | string() => iodata()}. -type key() :: string(). -type part_number() :: non_neg_integer(). -type upload_id() :: string(). -type etag() :: string(). --type upload_options() :: list({acl, s3_bucket_acl()}). +-type upload_options() :: list({acl, emqx_s3:acl()}). -opaque client() :: #{ aws_config := aws_config(), @@ -61,11 +53,11 @@ port := part_number(), bucket := string(), headers := headers(), - acl := s3_bucket_acl(), + acl := emqx_s3:acl(), url_expire_time := pos_integer(), access_key_id := string() | undefined, secret_access_key := string() | undefined, - http_pool := ecpool:pool_name(), + http_pool := ehttpc:pool_name(), request_timeout := timeout() }. @@ -97,7 +89,7 @@ put_object( Value ) -> AllHeaders = join_headers(Headers, SpecialHeaders), - try erlcloud_s3:put_object(Bucket, key(Key), Value, Options, AllHeaders, AwsConfig) of + try erlcloud_s3:put_object(Bucket, erlcloud_key(Key), Value, Options, AllHeaders, AwsConfig) of Props when is_list(Props) -> ok catch @@ -117,9 +109,9 @@ start_multipart( Key ) -> AllHeaders = join_headers(Headers, SpecialHeaders), - case erlcloud_s3:start_multipart(Bucket, key(Key), Options, AllHeaders, AwsConfig) of + case erlcloud_s3:start_multipart(Bucket, erlcloud_key(Key), Options, AllHeaders, AwsConfig) of {ok, Props} -> - {ok, proplists:get_value('uploadId', Props)}; + {ok, response_property('uploadId', Props)}; {error, Reason} -> ?SLOG(debug, #{msg => "start_multipart_fail", key => Key, reason => Reason}), {error, Reason} @@ -135,10 +127,12 @@ upload_part( Value ) -> case - erlcloud_s3:upload_part(Bucket, key(Key), UploadId, PartNumber, Value, Headers, AwsConfig) + erlcloud_s3:upload_part( + Bucket, erlcloud_key(Key), UploadId, PartNumber, Value, Headers, AwsConfig + ) of {ok, Props} -> - {ok, proplists:get_value(etag, Props)}; + {ok, response_property(etag, Props)}; {error, Reason} -> ?SLOG(debug, #{msg => "upload_part_fail", key => Key, reason => Reason}), {error, Reason} @@ -151,7 +145,11 @@ complete_multipart( UploadId, ETags ) -> - case erlcloud_s3:complete_multipart(Bucket, key(Key), UploadId, ETags, Headers, AwsConfig) of + case + erlcloud_s3:complete_multipart( + Bucket, erlcloud_key(Key), UploadId, ETags, Headers, AwsConfig + ) + of ok -> ok; {error, Reason} -> @@ -161,7 +159,7 @@ complete_multipart( -spec abort_multipart(client(), key(), upload_id()) -> ok_or_error(term()). abort_multipart(#{bucket := Bucket, headers := Headers, aws_config := AwsConfig}, Key, UploadId) -> - case erlcloud_s3:abort_multipart(Bucket, key(Key), UploadId, [], Headers, AwsConfig) of + case erlcloud_s3:abort_multipart(Bucket, erlcloud_key(Key), UploadId, [], Headers, AwsConfig) of ok -> ok; {error, Reason} -> @@ -181,7 +179,7 @@ list(#{bucket := Bucket, aws_config := AwsConfig}, Options) -> -spec uri(client(), key()) -> iodata(). uri(#{bucket := Bucket, aws_config := AwsConfig, url_expire_time := ExpireTime}, Key) -> - erlcloud_s3:make_get_url(ExpireTime, Bucket, key(Key), AwsConfig). + erlcloud_s3:make_get_url(ExpireTime, Bucket, erlcloud_key(Key), AwsConfig). -spec format(client()) -> term(). format(#{aws_config := AwsConfig} = Client) -> @@ -197,7 +195,7 @@ upload_options(Config) -> ]. headers(#{headers := Headers}) -> - string_headers(maps:to_list(Headers)); + headers_user_to_erlcloud_request(Headers); headers(#{}) -> []. @@ -230,7 +228,9 @@ aws_config(#{ request_fun(HttpPool) -> fun(Url, Method, Headers, Body, Timeout, _Config) -> with_path_and_query_only(Url, fun(PathQuery) -> - Request = make_request(Method, PathQuery, binary_headers(Headers), Body), + Request = make_request( + Method, PathQuery, headers_erlcloud_request_to_ehttpc(Headers), Body + ), ?SLOG(debug, #{ msg => "s3_ehttpc_request", timeout => Timeout, @@ -243,6 +243,13 @@ request_fun(HttpPool) -> end. ehttpc_request(HttpPool, Method, Request, Timeout) -> + ?SLOG(debug, #{ + msg => "s3_ehttpc_request", + timeout => Timeout, + pool => HttpPool, + method => Method, + request => format_request(Request) + }), try ehttpc:request(HttpPool, Method, Request, Timeout) of {ok, StatusCode, RespHeaders} -> ?SLOG(debug, #{ @@ -250,7 +257,9 @@ ehttpc_request(HttpPool, Method, Request, Timeout) -> status_code => StatusCode, headers => RespHeaders }), - {ok, {{StatusCode, undefined}, erlcloud_string_headers(RespHeaders), undefined}}; + {ok, { + {StatusCode, undefined}, headers_ehttpc_to_erlcloud_response(RespHeaders), undefined + }}; {ok, StatusCode, RespHeaders, RespBody} -> ?SLOG(debug, #{ msg => "s3_ehttpc_request_ok", @@ -258,7 +267,9 @@ ehttpc_request(HttpPool, Method, Request, Timeout) -> headers => RespHeaders, body => RespBody }), - {ok, {{StatusCode, undefined}, erlcloud_string_headers(RespHeaders), RespBody}}; + {ok, { + {StatusCode, undefined}, headers_ehttpc_to_erlcloud_response(RespHeaders), RespBody + }}; {error, Reason} -> ?SLOG(error, #{ msg => "s3_ehttpc_request_fail", @@ -290,10 +301,10 @@ ehttpc_request(HttpPool, Method, Request, Timeout) -> end. -define(IS_BODY_EMPTY(Body), (Body =:= undefined orelse Body =:= <<>>)). --define(NEEDS_BODY(Method), (Method =:= get orelse Method =:= head orelse Method =:= delete)). +-define(NEEDS_NO_BODY(Method), (Method =:= get orelse Method =:= head orelse Method =:= delete)). make_request(Method, PathQuery, Headers, Body) when - ?IS_BODY_EMPTY(Body) andalso ?NEEDS_BODY(Method) + ?IS_BODY_EMPTY(Body) andalso ?NEEDS_NO_BODY(Method) -> {PathQuery, Headers}; make_request(_Method, PathQuery, Headers, Body) when ?IS_BODY_EMPTY(Body) -> @@ -301,7 +312,8 @@ make_request(_Method, PathQuery, Headers, Body) when ?IS_BODY_EMPTY(Body) -> make_request(_Method, PathQuery, Headers, Body) -> {PathQuery, Headers, Body}. -format_request({PathQuery, Headers, _Body}) -> {PathQuery, Headers, <<"...">>}. +format_request({PathQuery, Headers, _Body}) -> {PathQuery, Headers, <<"...">>}; +format_request({PathQuery, Headers}) -> {PathQuery, Headers}. with_path_and_query_only(Url, Fun) -> case string:split(Url, "//", leading) of @@ -316,17 +328,41 @@ with_path_and_query_only(Url, Fun) -> {error, {invalid_url, Url}} end. -string_headers(Headers) -> - [{to_list_string(K), to_list_string(V)} || {K, V} <- Headers]. +%% We need some header conversions to tie the emqx_s3, erlcloud and ehttpc APIs together. -erlcloud_string_headers(Headers) -> - [{string:to_lower(K), V} || {K, V} <- string_headers(Headers)]. +%% The request header flow is: -binary_headers(Headers) -> - [{to_binary(K), V} || {K, V} <- Headers]. +%% UserHeaders -> [emqx_s3_client API] -> ErlcloudRequestHeaders0 -> +%% -> [erlcloud API] -> ErlcloudRequestHeaders1 -> [emqx_s3_client injected request_fun] -> +%% -> EhttpcRequestHeaders -> [ehttpc API] -join_headers(Headers, SpecialHeaders) -> - Headers ++ string_headers(maps:to_list(SpecialHeaders)). +%% The response header flow is: + +%% [ehttpc API] -> EhttpcResponseHeaders -> [emqx_s3_client injected request_fun] -> +%% -> ErlcloudResponseHeaders0 -> [erlcloud API] -> [emqx_s3_client API] + +%% UserHeders (emqx_s3 API headers) are maps with string/binary keys. +%% ErlcloudRequestHeaders are lists of tuples with string keys and iodata values +%% ErlcloudResponseHeders are lists of tuples with lower case string keys and iodata values. +%% EhttpcHeaders are lists of tuples with binary keys and iodata values. + +%% Users provide headers as a map, but erlcloud expects a list of tuples with string keys and values. +headers_user_to_erlcloud_request(UserHeaders) -> + [{to_list_string(K), V} || {K, V} <- maps:to_list(UserHeaders)]. + +%% Ehttpc returns operates on headers as a list of tuples with binary keys. +%% Erlcloud expects a list of tuples with string values and lowcase string keys +%% from the underlying http library. +headers_ehttpc_to_erlcloud_response(EhttpcHeaders) -> + [{string:to_lower(to_list_string(K)), to_list_string(V)} || {K, V} <- EhttpcHeaders]. + +%% Ehttpc expects a list of tuples with binary keys. +%% Erlcloud provides a list of tuples with string keys. +headers_erlcloud_request_to_ehttpc(ErlcloudHeaders) -> + [{to_binary(K), V} || {K, V} <- ErlcloudHeaders]. + +join_headers(ErlcloudHeaders, UserSpecialHeaders) -> + ErlcloudHeaders ++ headers_user_to_erlcloud_request(UserSpecialHeaders). to_binary(Val) when is_list(Val) -> list_to_binary(Val); to_binary(Val) when is_binary(Val) -> Val. @@ -336,5 +372,19 @@ to_list_string(Val) when is_binary(Val) -> to_list_string(Val) when is_list(Val) -> Val. -key(Characters) -> +erlcloud_key(Characters) -> binary_to_list(unicode:characters_to_binary(Characters)). + +response_property(Name, Props) -> + case proplists:get_value(Name, Props) of + undefined -> + %% This schould not happen for valid S3 implementations + ?SLOG(error, #{ + msg => "missing_s3_response_property", + name => Name, + props => Props + }), + error({missing_s3_response_property, Name}); + Value -> + Value + end. diff --git a/apps/emqx_s3/src/emqx_s3_profile_conf.erl b/apps/emqx_s3/src/emqx_s3_profile_conf.erl index 3c5eb8f36..3ab023d4d 100644 --- a/apps/emqx_s3/src/emqx_s3_profile_conf.erl +++ b/apps/emqx_s3/src/emqx_s3_profile_conf.erl @@ -11,6 +11,8 @@ -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include("src/emqx_s3.hrl"). + -export([ start_link/2, child_spec/2 @@ -47,6 +49,10 @@ -define(DEFAULT_HTTP_POOL_TIMEOUT, 60000). -define(DEAFULT_HTTP_POOL_CLEANUP_INTERVAL, 60000). +-define(SAFE_CALL_VIA_GPROC(ProfileId, Message, Timeout), + ?SAFE_CALL_VIA_GPROC(id(ProfileId), Message, Timeout, profile_not_found) +). + -spec child_spec(emqx_s3:profile_id(), emqx_s3:profile_config()) -> supervisor:child_spec(). child_spec(ProfileId, ProfileConfig) -> #{ @@ -60,7 +66,7 @@ child_spec(ProfileId, ProfileConfig) -> -spec start_link(emqx_s3:profile_id(), emqx_s3:profile_config()) -> gen_server:start_ret(). start_link(ProfileId, ProfileConfig) -> - gen_server:start_link(?MODULE, [ProfileId, ProfileConfig], []). + gen_server:start_link(?VIA_GPROC(id(ProfileId)), ?MODULE, [ProfileId, ProfileConfig], []). -spec update_config(emqx_s3:profile_id(), emqx_s3:profile_config()) -> ok_or_error(term()). update_config(ProfileId, ProfileConfig) -> @@ -69,12 +75,7 @@ update_config(ProfileId, ProfileConfig) -> -spec update_config(emqx_s3:profile_id(), emqx_s3:profile_config(), timeout()) -> ok_or_error(term()). update_config(ProfileId, ProfileConfig, Timeout) -> - case gproc:where({n, l, id(ProfileId)}) of - undefined -> - {error, profile_not_found}; - Pid -> - gen_server:call(Pid, {update_config, ProfileConfig}, Timeout) - end. + ?SAFE_CALL_VIA_GPROC(ProfileId, {update_config, ProfileConfig}, Timeout). -spec checkout_config(emqx_s3:profile_id()) -> {ok, emqx_s3_client:config(), emqx_s3_uploader:config()} | {error, profile_not_found}. @@ -84,12 +85,7 @@ checkout_config(ProfileId) -> -spec checkout_config(emqx_s3:profile_id(), timeout()) -> {ok, emqx_s3_client:config(), emqx_s3_uploader:config()} | {error, profile_not_found}. checkout_config(ProfileId, Timeout) -> - case gproc:where({n, l, id(ProfileId)}) of - undefined -> - {error, profile_not_found}; - Pid -> - gen_server:call(Pid, {checkout_config, self()}, Timeout) - end. + ?SAFE_CALL_VIA_GPROC(ProfileId, {checkout_config, self()}, Timeout). -spec checkin_config(emqx_s3:profile_id()) -> ok | {error, profile_not_found}. checkin_config(ProfileId) -> @@ -97,12 +93,7 @@ checkin_config(ProfileId) -> -spec checkin_config(emqx_s3:profile_id(), timeout()) -> ok | {error, profile_not_found}. checkin_config(ProfileId, Timeout) -> - case gproc:where({n, l, id(ProfileId)}) of - undefined -> - {error, profile_not_found}; - Pid -> - gen_server:call(Pid, {checkin_config, self()}, Timeout) - end. + ?SAFE_CALL_VIA_GPROC(ProfileId, {checkin_config, self()}, Timeout). %%-------------------------------------------------------------------- %% gen_server callbacks @@ -110,10 +101,9 @@ checkin_config(ProfileId, Timeout) -> init([ProfileId, ProfileConfig]) -> _ = process_flag(trap_exit, true), - ok = cleanup_orphaned_pools(ProfileId), + ok = cleanup_profile_pools(ProfileId), case start_http_pool(ProfileId, ProfileConfig) of {ok, PoolName} -> - true = gproc:reg({n, l, id(ProfileId)}, ignored), HttpPoolCleanupInterval = http_pool_cleanup_interval(ProfileConfig), {ok, #{ profile_id => ProfileId, @@ -188,12 +178,7 @@ handle_info(_Info, State) -> {noreply, State}. terminate(_Reason, #{profile_id := ProfileId}) -> - lists:foreach( - fun(PoolName) -> - ok = stop_http_pool(ProfileId, PoolName) - end, - emqx_s3_profile_http_pools:all(ProfileId) - ). + cleanup_profile_pools(ProfileId). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -263,12 +248,14 @@ update_http_pool(ProfileId, ProfileConfig, #{pool_name := OldPoolName} = State) pool_name(ProfileId) -> iolist_to_binary([ <<"s3-http-">>, - ProfileId, + profile_id_to_bin(ProfileId), <<"-">>, integer_to_binary(erlang:system_time(millisecond)), <<"-">>, integer_to_binary(erlang:unique_integer([positive])) ]). +profile_id_to_bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8); +profile_id_to_bin(Bin) when is_binary(Bin) -> Bin. old_http_config(#{profile_config := ProfileConfig}) -> http_config(ProfileConfig). @@ -278,7 +265,7 @@ set_old_pool_outdated(#{ _ = emqx_s3_profile_http_pools:set_outdated(ProfileId, PoolName, HttpPoolTimeout), ok. -cleanup_orphaned_pools(ProfileId) -> +cleanup_profile_pools(ProfileId) -> lists:foreach( fun(PoolName) -> ok = stop_http_pool(ProfileId, PoolName) diff --git a/apps/emqx_s3/src/emqx_s3_profile_uploader_sup.erl b/apps/emqx_s3/src/emqx_s3_profile_uploader_sup.erl index 1cd155a77..fb7b93a15 100644 --- a/apps/emqx_s3/src/emqx_s3_profile_uploader_sup.erl +++ b/apps/emqx_s3/src/emqx_s3_profile_uploader_sup.erl @@ -7,6 +7,9 @@ -behaviour(supervisor). -include_lib("emqx/include/types.hrl"). +-include_lib("emqx/include/logger.hrl"). + +-include("src/emqx_s3.hrl"). -export([ start_link/1, @@ -23,7 +26,7 @@ -spec start_link(emqx_s3:profile_id()) -> supervisor:start_ret(). start_link(ProfileId) -> - supervisor:start_link(?MODULE, [ProfileId]). + supervisor:start_link(?VIA_GPROC(id(ProfileId)), ?MODULE, [ProfileId]). -spec child_spec(emqx_s3:profile_id()) -> supervisor:child_spec(). child_spec(ProfileId) -> @@ -43,10 +46,10 @@ id(ProfileId) -> -spec start_uploader(emqx_s3:profile_id(), emqx_s3_uploader:opts()) -> supervisor:start_ret() | {error, profile_not_found}. start_uploader(ProfileId, Opts) -> - Id = id(ProfileId), - case gproc:where({n, l, Id}) of - undefined -> {error, profile_not_found}; - Pid -> supervisor:start_child(Pid, [Opts]) + try supervisor:start_child(?VIA_GPROC(id(ProfileId)), [Opts]) of + Result -> Result + catch + exit:{noproc, _} -> {error, profile_not_found} end. %%-------------------------------------------------------------------- @@ -54,7 +57,6 @@ start_uploader(ProfileId, Opts) -> %%------------------------------------------------------------------- init([ProfileId]) -> - true = gproc:reg({n, l, id(ProfileId)}, ignored), SupFlags = #{ strategy => simple_one_for_one, intensity => 10, diff --git a/apps/emqx_s3/src/emqx_s3_uploader.erl b/apps/emqx_s3/src/emqx_s3_uploader.erl index f6414669d..8327462c7 100644 --- a/apps/emqx_s3/src/emqx_s3_uploader.erl +++ b/apps/emqx_s3/src/emqx_s3_uploader.erl @@ -56,13 +56,15 @@ %% 5GB -define(DEFAULT_MAX_PART_SIZE, 5368709120). +-define(DEFAULT_TIMEOUT, 30000). + -spec start_link(emqx_s3:profile_id(), opts()) -> gen_statem:start_ret(). start_link(ProfileId, #{key := Key} = Opts) when is_list(Key) -> gen_statem:start_link(?MODULE, [ProfileId, Opts], []). -spec write(pid(), iodata()) -> ok_or_error(term()). write(Pid, WriteData) -> - write(Pid, WriteData, infinity). + write(Pid, WriteData, ?DEFAULT_TIMEOUT). -spec write(pid(), iodata(), timeout()) -> ok_or_error(term()). write(Pid, WriteData, Timeout) -> @@ -70,7 +72,7 @@ write(Pid, WriteData, Timeout) -> -spec complete(pid()) -> ok_or_error(term()). complete(Pid) -> - complete(Pid, infinity). + complete(Pid, ?DEFAULT_TIMEOUT). -spec complete(pid(), timeout()) -> ok_or_error(term()). complete(Pid, Timeout) -> @@ -78,7 +80,7 @@ complete(Pid, Timeout) -> -spec abort(pid()) -> ok_or_error(term()). abort(Pid) -> - abort(Pid, infinity). + abort(Pid, ?DEFAULT_TIMEOUT). -spec abort(pid(), timeout()) -> ok_or_error(term()). abort(Pid, Timeout) -> @@ -237,7 +239,7 @@ upload_part( etags := ETags } = Data ) -> - case emqx_s3_client:upload_part(Client, Key, UploadId, PartNumber, lists:reverse(Buffer)) of + case emqx_s3_client:upload_part(Client, Key, UploadId, PartNumber, Buffer) of {ok, ETag} -> NewData = Data#{ buffer => [], @@ -298,7 +300,7 @@ put_object( headers := Headers } ) -> - case emqx_s3_client:put_object(Client, Headers, Key, lists:reverse(Buffer)) of + case emqx_s3_client:put_object(Client, Headers, Key, Buffer) of ok -> ok; {error, _} = Error -> @@ -308,7 +310,7 @@ put_object( -spec append_buffer(data(), iodata()) -> data(). append_buffer(#{buffer := Buffer, buffer_size := BufferSize} = Data, WriteData) -> Data#{ - buffer => [WriteData | Buffer], + buffer => [Buffer, WriteData], buffer_size => BufferSize + iolist_size(WriteData) }.