fix(ft-s3): fix review comments

This commit is contained in:
Ilya Averyanov 2023-04-04 23:11:33 +03:00
parent d7a85242de
commit be99242e32
12 changed files with 151 additions and 89 deletions

View File

@ -7,5 +7,6 @@ INFLUXDB_TAG=2.5.0
TDENGINE_TAG=3.0.2.4 TDENGINE_TAG=3.0.2.4
DYNAMO_TAG=1.21.0 DYNAMO_TAG=1.21.0
CASSANDRA_TAG=3.11.6 CASSANDRA_TAG=3.11.6
MINIO_TAG=RELEASE.2023-03-20T20-16-18Z
TARGET=emqx/emqx TARGET=emqx/emqx

View File

@ -3,7 +3,7 @@ version: '3.7'
services: services:
minio: minio:
hostname: minio hostname: minio
image: quay.io/minio/minio:RELEASE.2023-03-20T20-16-18Z image: quay.io/minio/minio:${MINIO_TAG}
command: server --address ":9000" --console-address ":9001" /minio-data command: server --address ":9000" --console-address ":9001" /minio-data
expose: expose:
- "9000" - "9000"

View File

@ -3,7 +3,7 @@ version: '3.7'
services: services:
minio_tls: minio_tls:
hostname: minio-tls hostname: minio-tls
image: quay.io/minio/minio:RELEASE.2023-03-20T20-16-18Z image: quay.io/minio/minio:${MINIO_TAG}
command: server --certs-dir /etc/certs --address ":9100" --console-address ":9101" /minio-data command: server --certs-dir /etc/certs --address ":9100" --console-address ":9101" /minio-data
volumes: volumes:
- ./certs/server.crt:/etc/certs/public.crt - ./certs/server.crt:/etc/certs/public.crt

View File

@ -168,6 +168,7 @@ jobs:
REDIS_TAG: "7.0" REDIS_TAG: "7.0"
INFLUXDB_TAG: "2.5.0" INFLUXDB_TAG: "2.5.0"
TDENGINE_TAG: "3.0.2.4" TDENGINE_TAG: "3.0.2.4"
MINIO_TAG: "RELEASE.2023-03-20T20-16-18Z"
PROFILE: ${{ matrix.profile }} PROFILE: ${{ matrix.profile }}
CT_COVER_EXPORT_PREFIX: ${{ matrix.profile }}-${{ matrix.otp }} CT_COVER_EXPORT_PREFIX: ${{ matrix.profile }}-${{ matrix.otp }}
run: ./scripts/ct/run.sh --ci --app ${{ matrix.app }} run: ./scripts/ct/run.sh --ci --app ${{ matrix.app }}

View File

@ -20,6 +20,8 @@
-behaviour(emqx_config_handler). -behaviour(emqx_config_handler).
-include_lib("emqx/include/logger.hrl").
%% Accessors %% Accessors
-export([storage/0]). -export([storage/0]).
-export([gc_interval/1]). -export([gc_interval/1]).

View File

@ -49,7 +49,7 @@
transfer := transfer() transfer := transfer()
}. }.
-define(S3_PROFILE_ID, <<"emqx_ft_storage_exporter_s3">>). -define(S3_PROFILE_ID, ?MODULE).
-define(FILEMETA_VSN, <<"1">>). -define(FILEMETA_VSN, <<"1">>).
-define(S3_LIST_LIMIT, 500). -define(S3_LIST_LIMIT, 500).
@ -66,6 +66,7 @@ start_export(_Options, Transfer, Filemeta) ->
}, },
case emqx_s3:start_uploader(?S3_PROFILE_ID, Options) of case emqx_s3:start_uploader(?S3_PROFILE_ID, Options) of
{ok, Pid} -> {ok, Pid} ->
true = erlang:link(Pid),
{ok, #{filemeta => Filemeta, pid => Pid}}; {ok, #{filemeta => Filemeta, pid => Pid}};
{error, _Reason} = Error -> {error, _Reason} = Error ->
Error Error
@ -151,7 +152,7 @@ list_key_info(Client, Options, Marker, Acc) ->
ListOptions = [{max_keys, ?S3_LIST_LIMIT}] ++ Marker, ListOptions = [{max_keys, ?S3_LIST_LIMIT}] ++ Marker,
case emqx_s3_client:list(Client, ListOptions) of case emqx_s3_client:list(Client, ListOptions) of
{ok, Result} -> {ok, Result} ->
?SLOG(warning, #{msg => "list_key_info", result => Result}), ?SLOG(debug, #{msg => "list_key_info", result => Result}),
KeyInfos = proplists:get_value(contents, Result, []), KeyInfos = proplists:get_value(contents, Result, []),
case proplists:get_value(is_truncated, Result, false) of case proplists:get_value(is_truncated, Result, false) of
true -> true ->

View File

@ -16,10 +16,11 @@
-export_type([ -export_type([
profile_id/0, profile_id/0,
profile_config/0 profile_config/0,
acl/0
]). ]).
-type profile_id() :: term(). -type profile_id() :: atom() | binary().
-type acl() :: -type acl() ::
private private
@ -30,9 +31,9 @@
| bucket_owner_full_control. | bucket_owner_full_control.
-type transport_options() :: #{ -type transport_options() :: #{
headers => map(),
connect_timeout => pos_integer(), connect_timeout => pos_integer(),
enable_pipelining => pos_integer(), enable_pipelining => pos_integer(),
headers => map(),
max_retries => pos_integer(), max_retries => pos_integer(),
pool_size => pos_integer(), pool_size => pos_integer(),
pool_type => atom(), pool_type => atom(),
@ -51,12 +52,14 @@
transport_options => transport_options() transport_options => transport_options()
}. }.
-define(IS_PROFILE_ID(ProfileId), (is_atom(ProfileId) orelse is_binary(ProfileId))).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% API %% API
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec start_profile(profile_id(), profile_config()) -> ok_or_error(term()). -spec start_profile(profile_id(), profile_config()) -> ok_or_error(term()).
start_profile(ProfileId, ProfileConfig) -> start_profile(ProfileId, ProfileConfig) when ?IS_PROFILE_ID(ProfileId) ->
case emqx_s3_sup:start_profile(ProfileId, ProfileConfig) of case emqx_s3_sup:start_profile(ProfileId, ProfileConfig) of
{ok, _} -> {ok, _} ->
ok; ok;
@ -65,21 +68,21 @@ start_profile(ProfileId, ProfileConfig) ->
end. end.
-spec stop_profile(profile_id()) -> ok_or_error(term()). -spec stop_profile(profile_id()) -> ok_or_error(term()).
stop_profile(ProfileId) -> stop_profile(ProfileId) when ?IS_PROFILE_ID(ProfileId) ->
emqx_s3_sup:stop_profile(ProfileId). emqx_s3_sup:stop_profile(ProfileId).
-spec update_profile(profile_id(), profile_config()) -> ok_or_error(term()). -spec update_profile(profile_id(), profile_config()) -> ok_or_error(term()).
update_profile(ProfileId, ProfileConfig) -> update_profile(ProfileId, ProfileConfig) when ?IS_PROFILE_ID(ProfileId) ->
emqx_s3_profile_conf:update_config(ProfileId, ProfileConfig). emqx_s3_profile_conf:update_config(ProfileId, ProfileConfig).
-spec start_uploader(profile_id(), emqx_s3_uploader:opts()) -> -spec start_uploader(profile_id(), emqx_s3_uploader:opts()) ->
supervisor:start_ret() | {error, profile_not_found}. supervisor:start_ret() | {error, profile_not_found}.
start_uploader(ProfileId, Opts) -> start_uploader(ProfileId, Opts) when ?IS_PROFILE_ID(ProfileId) ->
emqx_s3_profile_uploader_sup:start_uploader(ProfileId, Opts). emqx_s3_profile_uploader_sup:start_uploader(ProfileId, Opts).
-spec with_client(profile_id(), fun((emqx_s3_client:client()) -> Result)) -> -spec with_client(profile_id(), fun((emqx_s3_client:client()) -> Result)) ->
{error, profile_not_found} | Result. {error, profile_not_found} | Result.
with_client(ProfileId, Fun) when is_function(Fun, 1) -> with_client(ProfileId, Fun) when is_function(Fun, 1) andalso ?IS_PROFILE_ID(ProfileId) ->
case emqx_s3_profile_conf:checkout_config(ProfileId) of case emqx_s3_profile_conf:checkout_config(ProfileId) of
{ok, ClientConfig, _UploadConfig} -> {ok, ClientConfig, _UploadConfig} ->
try try

View File

@ -0,0 +1,13 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-define(VIA_GPROC(Id), {via, gproc, {n, l, Id}}).
-define(SAFE_CALL_VIA_GPROC(Id, Message, Timeout, NoProcError),
try gen_server:call(?VIA_GPROC(Id), Message, Timeout) of
Result -> Result
catch
exit:{noproc, _} -> {error, NoProcError}
end
).

View File

@ -31,22 +31,14 @@
headers/0 headers/0
]). ]).
-type s3_bucket_acl() :: -type headers() :: #{binary() | string() => iodata()}.
private
| public_read
| public_read_write
| authenticated_read
| bucket_owner_read
| bucket_owner_full_control.
-type headers() :: #{binary() | string() => binary() | string()}.
-type key() :: string(). -type key() :: string().
-type part_number() :: non_neg_integer(). -type part_number() :: non_neg_integer().
-type upload_id() :: string(). -type upload_id() :: string().
-type etag() :: string(). -type etag() :: string().
-type upload_options() :: list({acl, s3_bucket_acl()}). -type upload_options() :: list({acl, emqx_s3:acl()}).
-opaque client() :: #{ -opaque client() :: #{
aws_config := aws_config(), aws_config := aws_config(),
@ -61,11 +53,11 @@
port := part_number(), port := part_number(),
bucket := string(), bucket := string(),
headers := headers(), headers := headers(),
acl := s3_bucket_acl(), acl := emqx_s3:acl(),
url_expire_time := pos_integer(), url_expire_time := pos_integer(),
access_key_id := string() | undefined, access_key_id := string() | undefined,
secret_access_key := string() | undefined, secret_access_key := string() | undefined,
http_pool := ecpool:pool_name(), http_pool := ehttpc:pool_name(),
request_timeout := timeout() request_timeout := timeout()
}. }.
@ -97,7 +89,7 @@ put_object(
Value Value
) -> ) ->
AllHeaders = join_headers(Headers, SpecialHeaders), AllHeaders = join_headers(Headers, SpecialHeaders),
try erlcloud_s3:put_object(Bucket, key(Key), Value, Options, AllHeaders, AwsConfig) of try erlcloud_s3:put_object(Bucket, erlcloud_key(Key), Value, Options, AllHeaders, AwsConfig) of
Props when is_list(Props) -> Props when is_list(Props) ->
ok ok
catch catch
@ -117,9 +109,9 @@ start_multipart(
Key Key
) -> ) ->
AllHeaders = join_headers(Headers, SpecialHeaders), AllHeaders = join_headers(Headers, SpecialHeaders),
case erlcloud_s3:start_multipart(Bucket, key(Key), Options, AllHeaders, AwsConfig) of case erlcloud_s3:start_multipart(Bucket, erlcloud_key(Key), Options, AllHeaders, AwsConfig) of
{ok, Props} -> {ok, Props} ->
{ok, proplists:get_value('uploadId', Props)}; {ok, response_property('uploadId', Props)};
{error, Reason} -> {error, Reason} ->
?SLOG(debug, #{msg => "start_multipart_fail", key => Key, reason => Reason}), ?SLOG(debug, #{msg => "start_multipart_fail", key => Key, reason => Reason}),
{error, Reason} {error, Reason}
@ -135,10 +127,12 @@ upload_part(
Value Value
) -> ) ->
case case
erlcloud_s3:upload_part(Bucket, key(Key), UploadId, PartNumber, Value, Headers, AwsConfig) erlcloud_s3:upload_part(
Bucket, erlcloud_key(Key), UploadId, PartNumber, Value, Headers, AwsConfig
)
of of
{ok, Props} -> {ok, Props} ->
{ok, proplists:get_value(etag, Props)}; {ok, response_property(etag, Props)};
{error, Reason} -> {error, Reason} ->
?SLOG(debug, #{msg => "upload_part_fail", key => Key, reason => Reason}), ?SLOG(debug, #{msg => "upload_part_fail", key => Key, reason => Reason}),
{error, Reason} {error, Reason}
@ -151,7 +145,11 @@ complete_multipart(
UploadId, UploadId,
ETags ETags
) -> ) ->
case erlcloud_s3:complete_multipart(Bucket, key(Key), UploadId, ETags, Headers, AwsConfig) of case
erlcloud_s3:complete_multipart(
Bucket, erlcloud_key(Key), UploadId, ETags, Headers, AwsConfig
)
of
ok -> ok ->
ok; ok;
{error, Reason} -> {error, Reason} ->
@ -161,7 +159,7 @@ complete_multipart(
-spec abort_multipart(client(), key(), upload_id()) -> ok_or_error(term()). -spec abort_multipart(client(), key(), upload_id()) -> ok_or_error(term()).
abort_multipart(#{bucket := Bucket, headers := Headers, aws_config := AwsConfig}, Key, UploadId) -> abort_multipart(#{bucket := Bucket, headers := Headers, aws_config := AwsConfig}, Key, UploadId) ->
case erlcloud_s3:abort_multipart(Bucket, key(Key), UploadId, [], Headers, AwsConfig) of case erlcloud_s3:abort_multipart(Bucket, erlcloud_key(Key), UploadId, [], Headers, AwsConfig) of
ok -> ok ->
ok; ok;
{error, Reason} -> {error, Reason} ->
@ -181,7 +179,7 @@ list(#{bucket := Bucket, aws_config := AwsConfig}, Options) ->
-spec uri(client(), key()) -> iodata(). -spec uri(client(), key()) -> iodata().
uri(#{bucket := Bucket, aws_config := AwsConfig, url_expire_time := ExpireTime}, Key) -> uri(#{bucket := Bucket, aws_config := AwsConfig, url_expire_time := ExpireTime}, Key) ->
erlcloud_s3:make_get_url(ExpireTime, Bucket, key(Key), AwsConfig). erlcloud_s3:make_get_url(ExpireTime, Bucket, erlcloud_key(Key), AwsConfig).
-spec format(client()) -> term(). -spec format(client()) -> term().
format(#{aws_config := AwsConfig} = Client) -> format(#{aws_config := AwsConfig} = Client) ->
@ -197,7 +195,7 @@ upload_options(Config) ->
]. ].
headers(#{headers := Headers}) -> headers(#{headers := Headers}) ->
string_headers(maps:to_list(Headers)); headers_user_to_erlcloud_request(Headers);
headers(#{}) -> headers(#{}) ->
[]. [].
@ -230,7 +228,9 @@ aws_config(#{
request_fun(HttpPool) -> request_fun(HttpPool) ->
fun(Url, Method, Headers, Body, Timeout, _Config) -> fun(Url, Method, Headers, Body, Timeout, _Config) ->
with_path_and_query_only(Url, fun(PathQuery) -> with_path_and_query_only(Url, fun(PathQuery) ->
Request = make_request(Method, PathQuery, binary_headers(Headers), Body), Request = make_request(
Method, PathQuery, headers_erlcloud_request_to_ehttpc(Headers), Body
),
?SLOG(debug, #{ ?SLOG(debug, #{
msg => "s3_ehttpc_request", msg => "s3_ehttpc_request",
timeout => Timeout, timeout => Timeout,
@ -243,6 +243,13 @@ request_fun(HttpPool) ->
end. end.
ehttpc_request(HttpPool, Method, Request, Timeout) -> ehttpc_request(HttpPool, Method, Request, Timeout) ->
?SLOG(debug, #{
msg => "s3_ehttpc_request",
timeout => Timeout,
pool => HttpPool,
method => Method,
request => format_request(Request)
}),
try ehttpc:request(HttpPool, Method, Request, Timeout) of try ehttpc:request(HttpPool, Method, Request, Timeout) of
{ok, StatusCode, RespHeaders} -> {ok, StatusCode, RespHeaders} ->
?SLOG(debug, #{ ?SLOG(debug, #{
@ -250,7 +257,9 @@ ehttpc_request(HttpPool, Method, Request, Timeout) ->
status_code => StatusCode, status_code => StatusCode,
headers => RespHeaders headers => RespHeaders
}), }),
{ok, {{StatusCode, undefined}, erlcloud_string_headers(RespHeaders), undefined}}; {ok, {
{StatusCode, undefined}, headers_ehttpc_to_erlcloud_response(RespHeaders), undefined
}};
{ok, StatusCode, RespHeaders, RespBody} -> {ok, StatusCode, RespHeaders, RespBody} ->
?SLOG(debug, #{ ?SLOG(debug, #{
msg => "s3_ehttpc_request_ok", msg => "s3_ehttpc_request_ok",
@ -258,7 +267,9 @@ ehttpc_request(HttpPool, Method, Request, Timeout) ->
headers => RespHeaders, headers => RespHeaders,
body => RespBody body => RespBody
}), }),
{ok, {{StatusCode, undefined}, erlcloud_string_headers(RespHeaders), RespBody}}; {ok, {
{StatusCode, undefined}, headers_ehttpc_to_erlcloud_response(RespHeaders), RespBody
}};
{error, Reason} -> {error, Reason} ->
?SLOG(error, #{ ?SLOG(error, #{
msg => "s3_ehttpc_request_fail", msg => "s3_ehttpc_request_fail",
@ -290,10 +301,10 @@ ehttpc_request(HttpPool, Method, Request, Timeout) ->
end. end.
-define(IS_BODY_EMPTY(Body), (Body =:= undefined orelse Body =:= <<>>)). -define(IS_BODY_EMPTY(Body), (Body =:= undefined orelse Body =:= <<>>)).
-define(NEEDS_BODY(Method), (Method =:= get orelse Method =:= head orelse Method =:= delete)). -define(NEEDS_NO_BODY(Method), (Method =:= get orelse Method =:= head orelse Method =:= delete)).
make_request(Method, PathQuery, Headers, Body) when make_request(Method, PathQuery, Headers, Body) when
?IS_BODY_EMPTY(Body) andalso ?NEEDS_BODY(Method) ?IS_BODY_EMPTY(Body) andalso ?NEEDS_NO_BODY(Method)
-> ->
{PathQuery, Headers}; {PathQuery, Headers};
make_request(_Method, PathQuery, Headers, Body) when ?IS_BODY_EMPTY(Body) -> make_request(_Method, PathQuery, Headers, Body) when ?IS_BODY_EMPTY(Body) ->
@ -301,7 +312,8 @@ make_request(_Method, PathQuery, Headers, Body) when ?IS_BODY_EMPTY(Body) ->
make_request(_Method, PathQuery, Headers, Body) -> make_request(_Method, PathQuery, Headers, Body) ->
{PathQuery, Headers, Body}. {PathQuery, Headers, Body}.
format_request({PathQuery, Headers, _Body}) -> {PathQuery, Headers, <<"...">>}. format_request({PathQuery, Headers, _Body}) -> {PathQuery, Headers, <<"...">>};
format_request({PathQuery, Headers}) -> {PathQuery, Headers}.
with_path_and_query_only(Url, Fun) -> with_path_and_query_only(Url, Fun) ->
case string:split(Url, "//", leading) of case string:split(Url, "//", leading) of
@ -316,17 +328,41 @@ with_path_and_query_only(Url, Fun) ->
{error, {invalid_url, Url}} {error, {invalid_url, Url}}
end. end.
string_headers(Headers) -> %% We need some header conversions to tie the emqx_s3, erlcloud and ehttpc APIs together.
[{to_list_string(K), to_list_string(V)} || {K, V} <- Headers].
erlcloud_string_headers(Headers) -> %% The request header flow is:
[{string:to_lower(K), V} || {K, V} <- string_headers(Headers)].
binary_headers(Headers) -> %% UserHeaders -> [emqx_s3_client API] -> ErlcloudRequestHeaders0 ->
[{to_binary(K), V} || {K, V} <- Headers]. %% -> [erlcloud API] -> ErlcloudRequestHeaders1 -> [emqx_s3_client injected request_fun] ->
%% -> EhttpcRequestHeaders -> [ehttpc API]
join_headers(Headers, SpecialHeaders) -> %% The response header flow is:
Headers ++ string_headers(maps:to_list(SpecialHeaders)).
%% [ehttpc API] -> EhttpcResponseHeaders -> [emqx_s3_client injected request_fun] ->
%% -> ErlcloudResponseHeaders0 -> [erlcloud API] -> [emqx_s3_client API]
%% UserHeders (emqx_s3 API headers) are maps with string/binary keys.
%% ErlcloudRequestHeaders are lists of tuples with string keys and iodata values
%% ErlcloudResponseHeders are lists of tuples with lower case string keys and iodata values.
%% EhttpcHeaders are lists of tuples with binary keys and iodata values.
%% Users provide headers as a map, but erlcloud expects a list of tuples with string keys and values.
headers_user_to_erlcloud_request(UserHeaders) ->
[{to_list_string(K), V} || {K, V} <- maps:to_list(UserHeaders)].
%% Ehttpc returns operates on headers as a list of tuples with binary keys.
%% Erlcloud expects a list of tuples with string values and lowcase string keys
%% from the underlying http library.
headers_ehttpc_to_erlcloud_response(EhttpcHeaders) ->
[{string:to_lower(to_list_string(K)), to_list_string(V)} || {K, V} <- EhttpcHeaders].
%% Ehttpc expects a list of tuples with binary keys.
%% Erlcloud provides a list of tuples with string keys.
headers_erlcloud_request_to_ehttpc(ErlcloudHeaders) ->
[{to_binary(K), V} || {K, V} <- ErlcloudHeaders].
join_headers(ErlcloudHeaders, UserSpecialHeaders) ->
ErlcloudHeaders ++ headers_user_to_erlcloud_request(UserSpecialHeaders).
to_binary(Val) when is_list(Val) -> list_to_binary(Val); to_binary(Val) when is_list(Val) -> list_to_binary(Val);
to_binary(Val) when is_binary(Val) -> Val. to_binary(Val) when is_binary(Val) -> Val.
@ -336,5 +372,19 @@ to_list_string(Val) when is_binary(Val) ->
to_list_string(Val) when is_list(Val) -> to_list_string(Val) when is_list(Val) ->
Val. Val.
key(Characters) -> erlcloud_key(Characters) ->
binary_to_list(unicode:characters_to_binary(Characters)). binary_to_list(unicode:characters_to_binary(Characters)).
response_property(Name, Props) ->
case proplists:get_value(Name, Props) of
undefined ->
%% This schould not happen for valid S3 implementations
?SLOG(error, #{
msg => "missing_s3_response_property",
name => Name,
props => Props
}),
error({missing_s3_response_property, Name});
Value ->
Value
end.

View File

@ -11,6 +11,8 @@
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include("src/emqx_s3.hrl").
-export([ -export([
start_link/2, start_link/2,
child_spec/2 child_spec/2
@ -47,6 +49,10 @@
-define(DEFAULT_HTTP_POOL_TIMEOUT, 60000). -define(DEFAULT_HTTP_POOL_TIMEOUT, 60000).
-define(DEAFULT_HTTP_POOL_CLEANUP_INTERVAL, 60000). -define(DEAFULT_HTTP_POOL_CLEANUP_INTERVAL, 60000).
-define(SAFE_CALL_VIA_GPROC(ProfileId, Message, Timeout),
?SAFE_CALL_VIA_GPROC(id(ProfileId), Message, Timeout, profile_not_found)
).
-spec child_spec(emqx_s3:profile_id(), emqx_s3:profile_config()) -> supervisor:child_spec(). -spec child_spec(emqx_s3:profile_id(), emqx_s3:profile_config()) -> supervisor:child_spec().
child_spec(ProfileId, ProfileConfig) -> child_spec(ProfileId, ProfileConfig) ->
#{ #{
@ -60,7 +66,7 @@ child_spec(ProfileId, ProfileConfig) ->
-spec start_link(emqx_s3:profile_id(), emqx_s3:profile_config()) -> gen_server:start_ret(). -spec start_link(emqx_s3:profile_id(), emqx_s3:profile_config()) -> gen_server:start_ret().
start_link(ProfileId, ProfileConfig) -> start_link(ProfileId, ProfileConfig) ->
gen_server:start_link(?MODULE, [ProfileId, ProfileConfig], []). gen_server:start_link(?VIA_GPROC(id(ProfileId)), ?MODULE, [ProfileId, ProfileConfig], []).
-spec update_config(emqx_s3:profile_id(), emqx_s3:profile_config()) -> ok_or_error(term()). -spec update_config(emqx_s3:profile_id(), emqx_s3:profile_config()) -> ok_or_error(term()).
update_config(ProfileId, ProfileConfig) -> update_config(ProfileId, ProfileConfig) ->
@ -69,12 +75,7 @@ update_config(ProfileId, ProfileConfig) ->
-spec update_config(emqx_s3:profile_id(), emqx_s3:profile_config(), timeout()) -> -spec update_config(emqx_s3:profile_id(), emqx_s3:profile_config(), timeout()) ->
ok_or_error(term()). ok_or_error(term()).
update_config(ProfileId, ProfileConfig, Timeout) -> update_config(ProfileId, ProfileConfig, Timeout) ->
case gproc:where({n, l, id(ProfileId)}) of ?SAFE_CALL_VIA_GPROC(ProfileId, {update_config, ProfileConfig}, Timeout).
undefined ->
{error, profile_not_found};
Pid ->
gen_server:call(Pid, {update_config, ProfileConfig}, Timeout)
end.
-spec checkout_config(emqx_s3:profile_id()) -> -spec checkout_config(emqx_s3:profile_id()) ->
{ok, emqx_s3_client:config(), emqx_s3_uploader:config()} | {error, profile_not_found}. {ok, emqx_s3_client:config(), emqx_s3_uploader:config()} | {error, profile_not_found}.
@ -84,12 +85,7 @@ checkout_config(ProfileId) ->
-spec checkout_config(emqx_s3:profile_id(), timeout()) -> -spec checkout_config(emqx_s3:profile_id(), timeout()) ->
{ok, emqx_s3_client:config(), emqx_s3_uploader:config()} | {error, profile_not_found}. {ok, emqx_s3_client:config(), emqx_s3_uploader:config()} | {error, profile_not_found}.
checkout_config(ProfileId, Timeout) -> checkout_config(ProfileId, Timeout) ->
case gproc:where({n, l, id(ProfileId)}) of ?SAFE_CALL_VIA_GPROC(ProfileId, {checkout_config, self()}, Timeout).
undefined ->
{error, profile_not_found};
Pid ->
gen_server:call(Pid, {checkout_config, self()}, Timeout)
end.
-spec checkin_config(emqx_s3:profile_id()) -> ok | {error, profile_not_found}. -spec checkin_config(emqx_s3:profile_id()) -> ok | {error, profile_not_found}.
checkin_config(ProfileId) -> checkin_config(ProfileId) ->
@ -97,12 +93,7 @@ checkin_config(ProfileId) ->
-spec checkin_config(emqx_s3:profile_id(), timeout()) -> ok | {error, profile_not_found}. -spec checkin_config(emqx_s3:profile_id(), timeout()) -> ok | {error, profile_not_found}.
checkin_config(ProfileId, Timeout) -> checkin_config(ProfileId, Timeout) ->
case gproc:where({n, l, id(ProfileId)}) of ?SAFE_CALL_VIA_GPROC(ProfileId, {checkin_config, self()}, Timeout).
undefined ->
{error, profile_not_found};
Pid ->
gen_server:call(Pid, {checkin_config, self()}, Timeout)
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% gen_server callbacks %% gen_server callbacks
@ -110,10 +101,9 @@ checkin_config(ProfileId, Timeout) ->
init([ProfileId, ProfileConfig]) -> init([ProfileId, ProfileConfig]) ->
_ = process_flag(trap_exit, true), _ = process_flag(trap_exit, true),
ok = cleanup_orphaned_pools(ProfileId), ok = cleanup_profile_pools(ProfileId),
case start_http_pool(ProfileId, ProfileConfig) of case start_http_pool(ProfileId, ProfileConfig) of
{ok, PoolName} -> {ok, PoolName} ->
true = gproc:reg({n, l, id(ProfileId)}, ignored),
HttpPoolCleanupInterval = http_pool_cleanup_interval(ProfileConfig), HttpPoolCleanupInterval = http_pool_cleanup_interval(ProfileConfig),
{ok, #{ {ok, #{
profile_id => ProfileId, profile_id => ProfileId,
@ -188,12 +178,7 @@ handle_info(_Info, State) ->
{noreply, State}. {noreply, State}.
terminate(_Reason, #{profile_id := ProfileId}) -> terminate(_Reason, #{profile_id := ProfileId}) ->
lists:foreach( cleanup_profile_pools(ProfileId).
fun(PoolName) ->
ok = stop_http_pool(ProfileId, PoolName)
end,
emqx_s3_profile_http_pools:all(ProfileId)
).
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.
@ -263,12 +248,14 @@ update_http_pool(ProfileId, ProfileConfig, #{pool_name := OldPoolName} = State)
pool_name(ProfileId) -> pool_name(ProfileId) ->
iolist_to_binary([ iolist_to_binary([
<<"s3-http-">>, <<"s3-http-">>,
ProfileId, profile_id_to_bin(ProfileId),
<<"-">>, <<"-">>,
integer_to_binary(erlang:system_time(millisecond)), integer_to_binary(erlang:system_time(millisecond)),
<<"-">>, <<"-">>,
integer_to_binary(erlang:unique_integer([positive])) integer_to_binary(erlang:unique_integer([positive]))
]). ]).
profile_id_to_bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8);
profile_id_to_bin(Bin) when is_binary(Bin) -> Bin.
old_http_config(#{profile_config := ProfileConfig}) -> http_config(ProfileConfig). old_http_config(#{profile_config := ProfileConfig}) -> http_config(ProfileConfig).
@ -278,7 +265,7 @@ set_old_pool_outdated(#{
_ = emqx_s3_profile_http_pools:set_outdated(ProfileId, PoolName, HttpPoolTimeout), _ = emqx_s3_profile_http_pools:set_outdated(ProfileId, PoolName, HttpPoolTimeout),
ok. ok.
cleanup_orphaned_pools(ProfileId) -> cleanup_profile_pools(ProfileId) ->
lists:foreach( lists:foreach(
fun(PoolName) -> fun(PoolName) ->
ok = stop_http_pool(ProfileId, PoolName) ok = stop_http_pool(ProfileId, PoolName)

View File

@ -7,6 +7,9 @@
-behaviour(supervisor). -behaviour(supervisor).
-include_lib("emqx/include/types.hrl"). -include_lib("emqx/include/types.hrl").
-include_lib("emqx/include/logger.hrl").
-include("src/emqx_s3.hrl").
-export([ -export([
start_link/1, start_link/1,
@ -23,7 +26,7 @@
-spec start_link(emqx_s3:profile_id()) -> supervisor:start_ret(). -spec start_link(emqx_s3:profile_id()) -> supervisor:start_ret().
start_link(ProfileId) -> start_link(ProfileId) ->
supervisor:start_link(?MODULE, [ProfileId]). supervisor:start_link(?VIA_GPROC(id(ProfileId)), ?MODULE, [ProfileId]).
-spec child_spec(emqx_s3:profile_id()) -> supervisor:child_spec(). -spec child_spec(emqx_s3:profile_id()) -> supervisor:child_spec().
child_spec(ProfileId) -> child_spec(ProfileId) ->
@ -43,10 +46,10 @@ id(ProfileId) ->
-spec start_uploader(emqx_s3:profile_id(), emqx_s3_uploader:opts()) -> -spec start_uploader(emqx_s3:profile_id(), emqx_s3_uploader:opts()) ->
supervisor:start_ret() | {error, profile_not_found}. supervisor:start_ret() | {error, profile_not_found}.
start_uploader(ProfileId, Opts) -> start_uploader(ProfileId, Opts) ->
Id = id(ProfileId), try supervisor:start_child(?VIA_GPROC(id(ProfileId)), [Opts]) of
case gproc:where({n, l, Id}) of Result -> Result
undefined -> {error, profile_not_found}; catch
Pid -> supervisor:start_child(Pid, [Opts]) exit:{noproc, _} -> {error, profile_not_found}
end. end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -54,7 +57,6 @@ start_uploader(ProfileId, Opts) ->
%%------------------------------------------------------------------- %%-------------------------------------------------------------------
init([ProfileId]) -> init([ProfileId]) ->
true = gproc:reg({n, l, id(ProfileId)}, ignored),
SupFlags = #{ SupFlags = #{
strategy => simple_one_for_one, strategy => simple_one_for_one,
intensity => 10, intensity => 10,

View File

@ -56,13 +56,15 @@
%% 5GB %% 5GB
-define(DEFAULT_MAX_PART_SIZE, 5368709120). -define(DEFAULT_MAX_PART_SIZE, 5368709120).
-define(DEFAULT_TIMEOUT, 30000).
-spec start_link(emqx_s3:profile_id(), opts()) -> gen_statem:start_ret(). -spec start_link(emqx_s3:profile_id(), opts()) -> gen_statem:start_ret().
start_link(ProfileId, #{key := Key} = Opts) when is_list(Key) -> start_link(ProfileId, #{key := Key} = Opts) when is_list(Key) ->
gen_statem:start_link(?MODULE, [ProfileId, Opts], []). gen_statem:start_link(?MODULE, [ProfileId, Opts], []).
-spec write(pid(), iodata()) -> ok_or_error(term()). -spec write(pid(), iodata()) -> ok_or_error(term()).
write(Pid, WriteData) -> write(Pid, WriteData) ->
write(Pid, WriteData, infinity). write(Pid, WriteData, ?DEFAULT_TIMEOUT).
-spec write(pid(), iodata(), timeout()) -> ok_or_error(term()). -spec write(pid(), iodata(), timeout()) -> ok_or_error(term()).
write(Pid, WriteData, Timeout) -> write(Pid, WriteData, Timeout) ->
@ -70,7 +72,7 @@ write(Pid, WriteData, Timeout) ->
-spec complete(pid()) -> ok_or_error(term()). -spec complete(pid()) -> ok_or_error(term()).
complete(Pid) -> complete(Pid) ->
complete(Pid, infinity). complete(Pid, ?DEFAULT_TIMEOUT).
-spec complete(pid(), timeout()) -> ok_or_error(term()). -spec complete(pid(), timeout()) -> ok_or_error(term()).
complete(Pid, Timeout) -> complete(Pid, Timeout) ->
@ -78,7 +80,7 @@ complete(Pid, Timeout) ->
-spec abort(pid()) -> ok_or_error(term()). -spec abort(pid()) -> ok_or_error(term()).
abort(Pid) -> abort(Pid) ->
abort(Pid, infinity). abort(Pid, ?DEFAULT_TIMEOUT).
-spec abort(pid(), timeout()) -> ok_or_error(term()). -spec abort(pid(), timeout()) -> ok_or_error(term()).
abort(Pid, Timeout) -> abort(Pid, Timeout) ->
@ -237,7 +239,7 @@ upload_part(
etags := ETags etags := ETags
} = Data } = Data
) -> ) ->
case emqx_s3_client:upload_part(Client, Key, UploadId, PartNumber, lists:reverse(Buffer)) of case emqx_s3_client:upload_part(Client, Key, UploadId, PartNumber, Buffer) of
{ok, ETag} -> {ok, ETag} ->
NewData = Data#{ NewData = Data#{
buffer => [], buffer => [],
@ -298,7 +300,7 @@ put_object(
headers := Headers headers := Headers
} }
) -> ) ->
case emqx_s3_client:put_object(Client, Headers, Key, lists:reverse(Buffer)) of case emqx_s3_client:put_object(Client, Headers, Key, Buffer) of
ok -> ok ->
ok; ok;
{error, _} = Error -> {error, _} = Error ->
@ -308,7 +310,7 @@ put_object(
-spec append_buffer(data(), iodata()) -> data(). -spec append_buffer(data(), iodata()) -> data().
append_buffer(#{buffer := Buffer, buffer_size := BufferSize} = Data, WriteData) -> append_buffer(#{buffer := Buffer, buffer_size := BufferSize} = Data, WriteData) ->
Data#{ Data#{
buffer => [WriteData | Buffer], buffer => [Buffer, WriteData],
buffer_size => BufferSize + iolist_size(WriteData) buffer_size => BufferSize + iolist_size(WriteData)
}. }.