From 69c7f858af319da1244cf597d344aa5ba25299b4 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 22 Feb 2024 16:12:29 +0100 Subject: [PATCH] fix(s3): try to obtain credentials with default lhttpc Otherwise `emqx_s3_client` will try to do it later by contacting the S3 endpoint instead of the metadata server. --- apps/emqx_s3/src/emqx_s3_client.erl | 110 +++++++++++++++------ apps/emqx_s3/test/emqx_s3_client_SUITE.erl | 23 ++++- 2 files changed, 100 insertions(+), 33 deletions(-) diff --git a/apps/emqx_s3/src/emqx_s3_client.erl b/apps/emqx_s3/src/emqx_s3_client.erl index e02134dc1..ec7522fe7 100644 --- a/apps/emqx_s3/src/emqx_s3_client.erl +++ b/apps/emqx_s3/src/emqx_s3_client.erl @@ -103,7 +103,7 @@ put_object(Client, Key, Value) -> -spec put_object(client(), key(), upload_options(), iodata()) -> ok_or_error(term()). put_object( - #{bucket := Bucket, headers := BaseHeaders, aws_config := AwsConfig}, + #{bucket := Bucket, headers := BaseHeaders, aws_config := AwsConfig = #aws_config{}}, Key, UploadOpts, Content @@ -118,11 +118,13 @@ put_object( error:{aws_error, Reason} -> ?SLOG(debug, #{msg => "put_object_fail", key => Key, reason => Reason}), {error, Reason} - end. + end; +put_object(#{aws_config := {error, Reason}}, _Key, _UploadOpts, _Content) -> + {error, {config_error, Reason}}. -spec start_multipart(client(), key(), upload_options()) -> ok_or_error(upload_id(), term()). start_multipart( - #{bucket := Bucket, headers := BaseHeaders, aws_config := AwsConfig}, + #{bucket := Bucket, headers := BaseHeaders, aws_config := AwsConfig = #aws_config{}}, Key, UploadOpts ) -> @@ -135,12 +137,14 @@ start_multipart( {error, Reason} -> ?SLOG(debug, #{msg => "start_multipart_fail", key => Key, reason => Reason}), {error, Reason} - end. + end; +start_multipart(#{aws_config := {error, Reason}}, _Key, _UploadOpts) -> + {error, {config_error, Reason}}. -spec upload_part(client(), key(), upload_id(), part_number(), iodata()) -> ok_or_error(etag(), term()). upload_part( - #{bucket := Bucket, headers := Headers, aws_config := AwsConfig}, + #{bucket := Bucket, headers := Headers, aws_config := AwsConfig = #aws_config{}}, Key, UploadId, PartNumber, @@ -156,11 +160,13 @@ upload_part( {error, Reason} -> ?SLOG(debug, #{msg => "upload_part_fail", key => Key, reason => Reason}), {error, Reason} - end. + end; +upload_part(#{aws_config := {error, Reason}}, _Key, _UploadId, _PartNumber, _Value) -> + {error, {config_error, Reason}}. -spec complete_multipart(client(), key(), upload_id(), [etag()]) -> ok_or_error(term()). complete_multipart( - #{bucket := Bucket, headers := Headers, aws_config := AwsConfig}, + #{bucket := Bucket, headers := Headers, aws_config := AwsConfig = #aws_config{}}, Key, UploadId, ETags @@ -175,27 +181,37 @@ complete_multipart( {error, Reason} -> ?SLOG(debug, #{msg => "complete_multipart_fail", key => Key, reason => Reason}), {error, Reason} - end. + end; +complete_multipart(#{aws_config := {error, Reason}}, _Key, _UploadId, _ETags) -> + {error, {config_error, Reason}}. -spec abort_multipart(client(), key(), upload_id()) -> ok_or_error(term()). -abort_multipart(#{bucket := Bucket, headers := Headers, aws_config := AwsConfig}, Key, UploadId) -> +abort_multipart( + #{bucket := Bucket, headers := Headers, aws_config := AwsConfig = #aws_config{}}, + Key, + UploadId +) -> case erlcloud_s3:abort_multipart(Bucket, erlcloud_key(Key), UploadId, [], Headers, AwsConfig) of ok -> ok; {error, Reason} -> ?SLOG(debug, #{msg => "abort_multipart_fail", key => Key, reason => Reason}), {error, Reason} - end. + end; +abort_multipart(#{aws_config := {error, Reason}}, _Key, _UploadId) -> + {error, {config_error, Reason}}. -spec list(client(), s3_options()) -> ok_or_error(proplists:proplist(), term()). -list(#{bucket := Bucket, aws_config := AwsConfig}, Options) -> +list(#{bucket := Bucket, aws_config := AwsConfig = #aws_config{}}, Options) -> try erlcloud_s3:list_objects(Bucket, Options, AwsConfig) of Result -> {ok, Result} catch error:{aws_error, Reason} -> ?SLOG(debug, #{msg => "list_objects_fail", bucket => Bucket, reason => Reason}), {error, Reason} - end. + end; +list(#{aws_config := {error, Reason}}, _Options) -> + {error, {config_error, Reason}}. -spec uri(client(), key()) -> iodata(). uri(#{bucket := Bucket, aws_config := AwsConfig, url_expire_time := ExpireTime}, Key) -> @@ -221,35 +237,67 @@ headers(#{headers := Headers}) -> headers(#{}) -> []. -aws_config(#{ - scheme := Scheme, - host := Host, - port := Port, - access_key_id := AccessKeyId, - secret_access_key := SecretAccessKey, - http_pool := HttpPool, - pool_type := PoolType, - request_timeout := Timeout, - max_retries := MaxRetries -}) -> - #aws_config{ +aws_config( + Config = #{ + scheme := Scheme, + host := Host, + port := Port, + request_timeout := Timeout + } +) -> + ensure_aws_credentials(Config, #aws_config{ s3_scheme = Scheme, s3_host = Host, s3_port = Port, s3_bucket_access_method = path, s3_bucket_after_host = true, - access_key_id = AccessKeyId, - secret_access_key = emqx_secret:unwrap(SecretAccessKey), - - http_client = request_fun( - HttpPool, PoolType, with_default(MaxRetries, ?DEFAULT_MAX_RETRIES) - ), - %% This value will be transparently passed to ehttpc timeout = with_default(Timeout, ?DEFAULT_REQUEST_TIMEOUT), %% We rely on retry mechanism of ehttpc retry_num = 1 + }). + +ensure_aws_credentials( + Config = #{ + access_key_id := undefined, + secret_access_key := undefined + }, + AWSConfigIn +) -> + %% NOTE + %% Try to fetch credentials from the runtime environment (i.e. EC2 instance metadata). + %% Doing it before changing HTTP client to the one pinned to the S3 hostname. + case erlcloud_aws:update_config(AWSConfigIn) of + {ok, AWSConfig} -> + ensure_ehttpc_client(Config, AWSConfig); + {error, Reason} -> + {error, {failed_to_obtain_credentials, Reason}} + end; +ensure_aws_credentials( + Config = #{ + access_key_id := AccessKeyId, + secret_access_key := SecretAccessKey + }, + AWSConfigIn +) -> + ensure_ehttpc_client(Config, AWSConfigIn#aws_config{ + access_key_id = AccessKeyId, + secret_access_key = emqx_secret:unwrap(SecretAccessKey) + }). + +ensure_ehttpc_client( + #{ + http_pool := HttpPool, + pool_type := PoolType, + max_retries := MaxRetries + }, + AWSConfig +) -> + AWSConfig#aws_config{ + http_client = request_fun( + HttpPool, PoolType, with_default(MaxRetries, ?DEFAULT_MAX_RETRIES) + ) }. -spec request_fun(http_pool(), pool_type(), non_neg_integer()) -> erlcloud_httpc:request_fun(). diff --git a/apps/emqx_s3/test/emqx_s3_client_SUITE.erl b/apps/emqx_s3/test/emqx_s3_client_SUITE.erl index 619a09e76..c186d1296 100644 --- a/apps/emqx_s3/test/emqx_s3_client_SUITE.erl +++ b/apps/emqx_s3/test/emqx_s3_client_SUITE.erl @@ -145,18 +145,37 @@ t_extra_headers(Config0) -> Client = client(Config), Opts = #{acl => public_read}, - Data = #{foo => bar}, + Data = #{<<"foo">> => <<"bar">>}, ok = emqx_s3_client:put_object(Client, Key, Opts, emqx_utils_json:encode(Data)), Url = emqx_s3_client:uri(Client, Key), {ok, {{_StatusLine, 200, "OK"}, _Headers, Content}} = httpc:request(get, {Url, []}, [{ssl, [{verify, verify_none}]}], []), - ?_assertEqual( + ?assertEqual( Data, emqx_utils_json:decode(Content) ). +t_no_credentials(Config) -> + Bucket = ?config(bucket, Config), + ProfileConfig = maps:merge( + profile_config(Config), + #{ + access_key_id => undefined, + secret_access_key => undefined + } + ), + ClientConfig = emqx_s3_profile_conf:client_config( + ProfileConfig, + ?config(ehttpc_pool_name, Config) + ), + Client = emqx_s3_client:create(Bucket, ClientConfig), + ?assertMatch( + {error, {config_error, {failed_to_obtain_credentials, _}}}, + emqx_s3_client:put_object(Client, ?config(key, Config), <<>>) + ). + %%-------------------------------------------------------------------- %% Helpers %%--------------------------------------------------------------------