diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl index 63d58e04c..4407222d5 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl @@ -105,14 +105,19 @@ on_stop(InstId, _State = #{pool_name := PoolName}) -> -spec on_get_status(_InstanceId :: resource_id(), state()) -> health_check_status(). on_get_status(_InstId, State = #{client_config := Config}) -> - try erlcloud_s3:list_buckets(emqx_s3_client:aws_config(Config)) of - Props when is_list(Props) -> - ?status_connected - catch - error:{aws_error, {http_error, _Code, _, Reason}} -> + case emqx_s3_client:aws_config(Config) of + {error, Reason} -> {?status_disconnected, State, Reason}; - error:{aws_error, {socket_error, Reason}} -> - {?status_disconnected, State, Reason} + AWSConfig -> + try erlcloud_s3:list_buckets(AWSConfig) of + Props when is_list(Props) -> + ?status_connected + catch + error:{aws_error, {http_error, _Code, _, Reason}} -> + {?status_disconnected, State, Reason}; + error:{aws_error, {socket_error, Reason}} -> + {?status_disconnected, State, Reason} + end end. -spec on_add_channel(_InstanceId :: resource_id(), state(), channel_id(), channel_config()) -> diff --git a/apps/emqx_s3/src/emqx_s3_client.erl b/apps/emqx_s3/src/emqx_s3_client.erl index e02134dc1..ec7522fe7 100644 --- a/apps/emqx_s3/src/emqx_s3_client.erl +++ b/apps/emqx_s3/src/emqx_s3_client.erl @@ -103,7 +103,7 @@ put_object(Client, Key, Value) -> -spec put_object(client(), key(), upload_options(), iodata()) -> ok_or_error(term()). put_object( - #{bucket := Bucket, headers := BaseHeaders, aws_config := AwsConfig}, + #{bucket := Bucket, headers := BaseHeaders, aws_config := AwsConfig = #aws_config{}}, Key, UploadOpts, Content @@ -118,11 +118,13 @@ put_object( error:{aws_error, Reason} -> ?SLOG(debug, #{msg => "put_object_fail", key => Key, reason => Reason}), {error, Reason} - end. + end; +put_object(#{aws_config := {error, Reason}}, _Key, _UploadOpts, _Content) -> + {error, {config_error, Reason}}. -spec start_multipart(client(), key(), upload_options()) -> ok_or_error(upload_id(), term()). start_multipart( - #{bucket := Bucket, headers := BaseHeaders, aws_config := AwsConfig}, + #{bucket := Bucket, headers := BaseHeaders, aws_config := AwsConfig = #aws_config{}}, Key, UploadOpts ) -> @@ -135,12 +137,14 @@ start_multipart( {error, Reason} -> ?SLOG(debug, #{msg => "start_multipart_fail", key => Key, reason => Reason}), {error, Reason} - end. + end; +start_multipart(#{aws_config := {error, Reason}}, _Key, _UploadOpts) -> + {error, {config_error, Reason}}. -spec upload_part(client(), key(), upload_id(), part_number(), iodata()) -> ok_or_error(etag(), term()). upload_part( - #{bucket := Bucket, headers := Headers, aws_config := AwsConfig}, + #{bucket := Bucket, headers := Headers, aws_config := AwsConfig = #aws_config{}}, Key, UploadId, PartNumber, @@ -156,11 +160,13 @@ upload_part( {error, Reason} -> ?SLOG(debug, #{msg => "upload_part_fail", key => Key, reason => Reason}), {error, Reason} - end. + end; +upload_part(#{aws_config := {error, Reason}}, _Key, _UploadId, _PartNumber, _Value) -> + {error, {config_error, Reason}}. -spec complete_multipart(client(), key(), upload_id(), [etag()]) -> ok_or_error(term()). complete_multipart( - #{bucket := Bucket, headers := Headers, aws_config := AwsConfig}, + #{bucket := Bucket, headers := Headers, aws_config := AwsConfig = #aws_config{}}, Key, UploadId, ETags @@ -175,27 +181,37 @@ complete_multipart( {error, Reason} -> ?SLOG(debug, #{msg => "complete_multipart_fail", key => Key, reason => Reason}), {error, Reason} - end. + end; +complete_multipart(#{aws_config := {error, Reason}}, _Key, _UploadId, _ETags) -> + {error, {config_error, Reason}}. -spec abort_multipart(client(), key(), upload_id()) -> ok_or_error(term()). -abort_multipart(#{bucket := Bucket, headers := Headers, aws_config := AwsConfig}, Key, UploadId) -> +abort_multipart( + #{bucket := Bucket, headers := Headers, aws_config := AwsConfig = #aws_config{}}, + Key, + UploadId +) -> case erlcloud_s3:abort_multipart(Bucket, erlcloud_key(Key), UploadId, [], Headers, AwsConfig) of ok -> ok; {error, Reason} -> ?SLOG(debug, #{msg => "abort_multipart_fail", key => Key, reason => Reason}), {error, Reason} - end. + end; +abort_multipart(#{aws_config := {error, Reason}}, _Key, _UploadId) -> + {error, {config_error, Reason}}. -spec list(client(), s3_options()) -> ok_or_error(proplists:proplist(), term()). -list(#{bucket := Bucket, aws_config := AwsConfig}, Options) -> +list(#{bucket := Bucket, aws_config := AwsConfig = #aws_config{}}, Options) -> try erlcloud_s3:list_objects(Bucket, Options, AwsConfig) of Result -> {ok, Result} catch error:{aws_error, Reason} -> ?SLOG(debug, #{msg => "list_objects_fail", bucket => Bucket, reason => Reason}), {error, Reason} - end. + end; +list(#{aws_config := {error, Reason}}, _Options) -> + {error, {config_error, Reason}}. -spec uri(client(), key()) -> iodata(). uri(#{bucket := Bucket, aws_config := AwsConfig, url_expire_time := ExpireTime}, Key) -> @@ -221,35 +237,67 @@ headers(#{headers := Headers}) -> headers(#{}) -> []. -aws_config(#{ - scheme := Scheme, - host := Host, - port := Port, - access_key_id := AccessKeyId, - secret_access_key := SecretAccessKey, - http_pool := HttpPool, - pool_type := PoolType, - request_timeout := Timeout, - max_retries := MaxRetries -}) -> - #aws_config{ +aws_config( + Config = #{ + scheme := Scheme, + host := Host, + port := Port, + request_timeout := Timeout + } +) -> + ensure_aws_credentials(Config, #aws_config{ s3_scheme = Scheme, s3_host = Host, s3_port = Port, s3_bucket_access_method = path, s3_bucket_after_host = true, - access_key_id = AccessKeyId, - secret_access_key = emqx_secret:unwrap(SecretAccessKey), - - http_client = request_fun( - HttpPool, PoolType, with_default(MaxRetries, ?DEFAULT_MAX_RETRIES) - ), - %% This value will be transparently passed to ehttpc timeout = with_default(Timeout, ?DEFAULT_REQUEST_TIMEOUT), %% We rely on retry mechanism of ehttpc retry_num = 1 + }). + +ensure_aws_credentials( + Config = #{ + access_key_id := undefined, + secret_access_key := undefined + }, + AWSConfigIn +) -> + %% NOTE + %% Try to fetch credentials from the runtime environment (i.e. EC2 instance metadata). + %% Doing it before changing HTTP client to the one pinned to the S3 hostname. + case erlcloud_aws:update_config(AWSConfigIn) of + {ok, AWSConfig} -> + ensure_ehttpc_client(Config, AWSConfig); + {error, Reason} -> + {error, {failed_to_obtain_credentials, Reason}} + end; +ensure_aws_credentials( + Config = #{ + access_key_id := AccessKeyId, + secret_access_key := SecretAccessKey + }, + AWSConfigIn +) -> + ensure_ehttpc_client(Config, AWSConfigIn#aws_config{ + access_key_id = AccessKeyId, + secret_access_key = emqx_secret:unwrap(SecretAccessKey) + }). + +ensure_ehttpc_client( + #{ + http_pool := HttpPool, + pool_type := PoolType, + max_retries := MaxRetries + }, + AWSConfig +) -> + AWSConfig#aws_config{ + http_client = request_fun( + HttpPool, PoolType, with_default(MaxRetries, ?DEFAULT_MAX_RETRIES) + ) }. -spec request_fun(http_pool(), pool_type(), non_neg_integer()) -> erlcloud_httpc:request_fun(). diff --git a/apps/emqx_s3/test/emqx_s3_client_SUITE.erl b/apps/emqx_s3/test/emqx_s3_client_SUITE.erl index 619a09e76..c186d1296 100644 --- a/apps/emqx_s3/test/emqx_s3_client_SUITE.erl +++ b/apps/emqx_s3/test/emqx_s3_client_SUITE.erl @@ -145,18 +145,37 @@ t_extra_headers(Config0) -> Client = client(Config), Opts = #{acl => public_read}, - Data = #{foo => bar}, + Data = #{<<"foo">> => <<"bar">>}, ok = emqx_s3_client:put_object(Client, Key, Opts, emqx_utils_json:encode(Data)), Url = emqx_s3_client:uri(Client, Key), {ok, {{_StatusLine, 200, "OK"}, _Headers, Content}} = httpc:request(get, {Url, []}, [{ssl, [{verify, verify_none}]}], []), - ?_assertEqual( + ?assertEqual( Data, emqx_utils_json:decode(Content) ). +t_no_credentials(Config) -> + Bucket = ?config(bucket, Config), + ProfileConfig = maps:merge( + profile_config(Config), + #{ + access_key_id => undefined, + secret_access_key => undefined + } + ), + ClientConfig = emqx_s3_profile_conf:client_config( + ProfileConfig, + ?config(ehttpc_pool_name, Config) + ), + Client = emqx_s3_client:create(Bucket, ClientConfig), + ?assertMatch( + {error, {config_error, {failed_to_obtain_credentials, _}}}, + emqx_s3_client:put_object(Client, ?config(key, Config), <<>>) + ). + %%-------------------------------------------------------------------- %% Helpers %%-------------------------------------------------------------------- diff --git a/apps/emqx_utils/src/emqx_utils.erl b/apps/emqx_utils/src/emqx_utils.erl index 3867eee5e..45d293a6a 100644 --- a/apps/emqx_utils/src/emqx_utils.erl +++ b/apps/emqx_utils/src/emqx_utils.erl @@ -730,8 +730,8 @@ redact(Term, Checker) -> is_sensitive_key(V) orelse Checker(V) end). -do_redact(L, Checker) when is_list(L) -> - lists:map(fun(E) -> do_redact(E, Checker) end, L); +do_redact([E | Rest], Checker) -> + [do_redact(E, Checker) | do_redact(Rest, Checker)]; do_redact(M, Checker) when is_map(M) -> maps:map( fun(K, V) -> @@ -838,14 +838,8 @@ ipv6_probe_test() -> end. redact_test_() -> - Case = fun(Type, KeyT) -> - Key = - case Type of - atom -> KeyT; - string -> erlang:atom_to_list(KeyT); - binary -> erlang:atom_to_binary(KeyT) - end, - + Case = fun(TypeF, KeyIn) -> + Key = TypeF(KeyIn), ?assert(is_sensitive_key(Key)), %% direct @@ -866,10 +860,16 @@ redact_test_() -> %% 3 level nested ?assertEqual([#{opts => [{Key, ?REDACT_VAL}]}], redact([#{opts => [{Key, foo}]}])), ?assertEqual([{opts, [{Key, ?REDACT_VAL}]}], redact([{opts, [{Key, foo}]}])), - ?assertEqual([{opts, [#{Key => ?REDACT_VAL}]}], redact([{opts, [#{Key => foo}]}])) - end, + ?assertEqual([{opts, [#{Key => ?REDACT_VAL}]}], redact([{opts, [#{Key => foo}]}])), - Types = [atom, string, binary], + %% improper lists + ?assertEqual([{opts, [{Key, ?REDACT_VAL} | oops]}], redact([{opts, [{Key, foo} | oops]}])) + end, + Types = [ + {atom, fun identity/1}, + {string, fun emqx_utils_conv:str/1}, + {binary, fun emqx_utils_conv:bin/1} + ], Keys = [ authorization, aws_secret_access_key, @@ -882,7 +882,11 @@ redact_test_() -> token, bind_password ], - [{case_name(Type, Key), fun() -> Case(Type, Key) end} || Key <- Keys, Type <- Types]. + [ + {case_name(Type, Key), fun() -> Case(TypeF, Key) end} + || Key <- Keys, + {Type, TypeF} <- Types + ]. redact2_test_() -> Case = fun(Key, Checker) -> @@ -936,6 +940,9 @@ redact_is_authorization_test_() -> case_name(Type, Key) -> lists:concat([Type, "-", Key]). +identity(X) -> + X. + -endif. pub_props_to_packet(Properties) ->