fix(s3): try to obtain credentials with default lhttpc

Otherwise `emqx_s3_client` will try to do it later by contacting the S3
endpoint instead of the metadata server.
This commit is contained in:
Andrew Mayorov 2024-02-22 16:12:29 +01:00
parent ae8f59979d
commit 69c7f858af
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
2 changed files with 100 additions and 33 deletions

View File

@ -103,7 +103,7 @@ put_object(Client, Key, Value) ->
-spec put_object(client(), key(), upload_options(), iodata()) -> ok_or_error(term()).
put_object(
#{bucket := Bucket, headers := BaseHeaders, aws_config := AwsConfig},
#{bucket := Bucket, headers := BaseHeaders, aws_config := AwsConfig = #aws_config{}},
Key,
UploadOpts,
Content
@ -118,11 +118,13 @@ put_object(
error:{aws_error, Reason} ->
?SLOG(debug, #{msg => "put_object_fail", key => Key, reason => Reason}),
{error, Reason}
end.
end;
put_object(#{aws_config := {error, Reason}}, _Key, _UploadOpts, _Content) ->
{error, {config_error, Reason}}.
-spec start_multipart(client(), key(), upload_options()) -> ok_or_error(upload_id(), term()).
start_multipart(
#{bucket := Bucket, headers := BaseHeaders, aws_config := AwsConfig},
#{bucket := Bucket, headers := BaseHeaders, aws_config := AwsConfig = #aws_config{}},
Key,
UploadOpts
) ->
@ -135,12 +137,14 @@ start_multipart(
{error, Reason} ->
?SLOG(debug, #{msg => "start_multipart_fail", key => Key, reason => Reason}),
{error, Reason}
end.
end;
start_multipart(#{aws_config := {error, Reason}}, _Key, _UploadOpts) ->
{error, {config_error, Reason}}.
-spec upload_part(client(), key(), upload_id(), part_number(), iodata()) ->
ok_or_error(etag(), term()).
upload_part(
#{bucket := Bucket, headers := Headers, aws_config := AwsConfig},
#{bucket := Bucket, headers := Headers, aws_config := AwsConfig = #aws_config{}},
Key,
UploadId,
PartNumber,
@ -156,11 +160,13 @@ upload_part(
{error, Reason} ->
?SLOG(debug, #{msg => "upload_part_fail", key => Key, reason => Reason}),
{error, Reason}
end.
end;
upload_part(#{aws_config := {error, Reason}}, _Key, _UploadId, _PartNumber, _Value) ->
{error, {config_error, Reason}}.
-spec complete_multipart(client(), key(), upload_id(), [etag()]) -> ok_or_error(term()).
complete_multipart(
#{bucket := Bucket, headers := Headers, aws_config := AwsConfig},
#{bucket := Bucket, headers := Headers, aws_config := AwsConfig = #aws_config{}},
Key,
UploadId,
ETags
@ -175,27 +181,37 @@ complete_multipart(
{error, Reason} ->
?SLOG(debug, #{msg => "complete_multipart_fail", key => Key, reason => Reason}),
{error, Reason}
end.
end;
complete_multipart(#{aws_config := {error, Reason}}, _Key, _UploadId, _ETags) ->
{error, {config_error, Reason}}.
-spec abort_multipart(client(), key(), upload_id()) -> ok_or_error(term()).
abort_multipart(#{bucket := Bucket, headers := Headers, aws_config := AwsConfig}, Key, UploadId) ->
abort_multipart(
#{bucket := Bucket, headers := Headers, aws_config := AwsConfig = #aws_config{}},
Key,
UploadId
) ->
case erlcloud_s3:abort_multipart(Bucket, erlcloud_key(Key), UploadId, [], Headers, AwsConfig) of
ok ->
ok;
{error, Reason} ->
?SLOG(debug, #{msg => "abort_multipart_fail", key => Key, reason => Reason}),
{error, Reason}
end.
end;
abort_multipart(#{aws_config := {error, Reason}}, _Key, _UploadId) ->
{error, {config_error, Reason}}.
-spec list(client(), s3_options()) -> ok_or_error(proplists:proplist(), term()).
list(#{bucket := Bucket, aws_config := AwsConfig}, Options) ->
list(#{bucket := Bucket, aws_config := AwsConfig = #aws_config{}}, Options) ->
try erlcloud_s3:list_objects(Bucket, Options, AwsConfig) of
Result -> {ok, Result}
catch
error:{aws_error, Reason} ->
?SLOG(debug, #{msg => "list_objects_fail", bucket => Bucket, reason => Reason}),
{error, Reason}
end.
end;
list(#{aws_config := {error, Reason}}, _Options) ->
{error, {config_error, Reason}}.
-spec uri(client(), key()) -> iodata().
uri(#{bucket := Bucket, aws_config := AwsConfig, url_expire_time := ExpireTime}, Key) ->
@ -221,35 +237,67 @@ headers(#{headers := Headers}) ->
headers(#{}) ->
[].
aws_config(#{
scheme := Scheme,
host := Host,
port := Port,
access_key_id := AccessKeyId,
secret_access_key := SecretAccessKey,
http_pool := HttpPool,
pool_type := PoolType,
request_timeout := Timeout,
max_retries := MaxRetries
}) ->
#aws_config{
aws_config(
Config = #{
scheme := Scheme,
host := Host,
port := Port,
request_timeout := Timeout
}
) ->
ensure_aws_credentials(Config, #aws_config{
s3_scheme = Scheme,
s3_host = Host,
s3_port = Port,
s3_bucket_access_method = path,
s3_bucket_after_host = true,
access_key_id = AccessKeyId,
secret_access_key = emqx_secret:unwrap(SecretAccessKey),
http_client = request_fun(
HttpPool, PoolType, with_default(MaxRetries, ?DEFAULT_MAX_RETRIES)
),
%% This value will be transparently passed to ehttpc
timeout = with_default(Timeout, ?DEFAULT_REQUEST_TIMEOUT),
%% We rely on retry mechanism of ehttpc
retry_num = 1
}).
ensure_aws_credentials(
Config = #{
access_key_id := undefined,
secret_access_key := undefined
},
AWSConfigIn
) ->
%% NOTE
%% Try to fetch credentials from the runtime environment (i.e. EC2 instance metadata).
%% Doing it before changing HTTP client to the one pinned to the S3 hostname.
case erlcloud_aws:update_config(AWSConfigIn) of
{ok, AWSConfig} ->
ensure_ehttpc_client(Config, AWSConfig);
{error, Reason} ->
{error, {failed_to_obtain_credentials, Reason}}
end;
ensure_aws_credentials(
Config = #{
access_key_id := AccessKeyId,
secret_access_key := SecretAccessKey
},
AWSConfigIn
) ->
ensure_ehttpc_client(Config, AWSConfigIn#aws_config{
access_key_id = AccessKeyId,
secret_access_key = emqx_secret:unwrap(SecretAccessKey)
}).
ensure_ehttpc_client(
#{
http_pool := HttpPool,
pool_type := PoolType,
max_retries := MaxRetries
},
AWSConfig
) ->
AWSConfig#aws_config{
http_client = request_fun(
HttpPool, PoolType, with_default(MaxRetries, ?DEFAULT_MAX_RETRIES)
)
}.
-spec request_fun(http_pool(), pool_type(), non_neg_integer()) -> erlcloud_httpc:request_fun().

View File

@ -145,18 +145,37 @@ t_extra_headers(Config0) ->
Client = client(Config),
Opts = #{acl => public_read},
Data = #{foo => bar},
Data = #{<<"foo">> => <<"bar">>},
ok = emqx_s3_client:put_object(Client, Key, Opts, emqx_utils_json:encode(Data)),
Url = emqx_s3_client:uri(Client, Key),
{ok, {{_StatusLine, 200, "OK"}, _Headers, Content}} =
httpc:request(get, {Url, []}, [{ssl, [{verify, verify_none}]}], []),
?_assertEqual(
?assertEqual(
Data,
emqx_utils_json:decode(Content)
).
t_no_credentials(Config) ->
Bucket = ?config(bucket, Config),
ProfileConfig = maps:merge(
profile_config(Config),
#{
access_key_id => undefined,
secret_access_key => undefined
}
),
ClientConfig = emqx_s3_profile_conf:client_config(
ProfileConfig,
?config(ehttpc_pool_name, Config)
),
Client = emqx_s3_client:create(Bucket, ClientConfig),
?assertMatch(
{error, {config_error, {failed_to_obtain_credentials, _}}},
emqx_s3_client:put_object(Client, ?config(key, Config), <<>>)
).
%%--------------------------------------------------------------------
%% Helpers
%%--------------------------------------------------------------------