Merge pull request #12568 from keynslug/ft/EMQX-11888/s3-redact
fix(s3): try to obtain credentials with default lhttpc
This commit is contained in:
commit
54499175d5
|
@ -105,14 +105,19 @@ on_stop(InstId, _State = #{pool_name := PoolName}) ->
|
|||
-spec on_get_status(_InstanceId :: resource_id(), state()) ->
|
||||
health_check_status().
|
||||
on_get_status(_InstId, State = #{client_config := Config}) ->
|
||||
try erlcloud_s3:list_buckets(emqx_s3_client:aws_config(Config)) of
|
||||
Props when is_list(Props) ->
|
||||
?status_connected
|
||||
catch
|
||||
error:{aws_error, {http_error, _Code, _, Reason}} ->
|
||||
case emqx_s3_client:aws_config(Config) of
|
||||
{error, Reason} ->
|
||||
{?status_disconnected, State, Reason};
|
||||
error:{aws_error, {socket_error, Reason}} ->
|
||||
{?status_disconnected, State, Reason}
|
||||
AWSConfig ->
|
||||
try erlcloud_s3:list_buckets(AWSConfig) of
|
||||
Props when is_list(Props) ->
|
||||
?status_connected
|
||||
catch
|
||||
error:{aws_error, {http_error, _Code, _, Reason}} ->
|
||||
{?status_disconnected, State, Reason};
|
||||
error:{aws_error, {socket_error, Reason}} ->
|
||||
{?status_disconnected, State, Reason}
|
||||
end
|
||||
end.
|
||||
|
||||
-spec on_add_channel(_InstanceId :: resource_id(), state(), channel_id(), channel_config()) ->
|
||||
|
|
|
@ -103,7 +103,7 @@ put_object(Client, Key, Value) ->
|
|||
|
||||
-spec put_object(client(), key(), upload_options(), iodata()) -> ok_or_error(term()).
|
||||
put_object(
|
||||
#{bucket := Bucket, headers := BaseHeaders, aws_config := AwsConfig},
|
||||
#{bucket := Bucket, headers := BaseHeaders, aws_config := AwsConfig = #aws_config{}},
|
||||
Key,
|
||||
UploadOpts,
|
||||
Content
|
||||
|
@ -118,11 +118,13 @@ put_object(
|
|||
error:{aws_error, Reason} ->
|
||||
?SLOG(debug, #{msg => "put_object_fail", key => Key, reason => Reason}),
|
||||
{error, Reason}
|
||||
end.
|
||||
end;
|
||||
put_object(#{aws_config := {error, Reason}}, _Key, _UploadOpts, _Content) ->
|
||||
{error, {config_error, Reason}}.
|
||||
|
||||
-spec start_multipart(client(), key(), upload_options()) -> ok_or_error(upload_id(), term()).
|
||||
start_multipart(
|
||||
#{bucket := Bucket, headers := BaseHeaders, aws_config := AwsConfig},
|
||||
#{bucket := Bucket, headers := BaseHeaders, aws_config := AwsConfig = #aws_config{}},
|
||||
Key,
|
||||
UploadOpts
|
||||
) ->
|
||||
|
@ -135,12 +137,14 @@ start_multipart(
|
|||
{error, Reason} ->
|
||||
?SLOG(debug, #{msg => "start_multipart_fail", key => Key, reason => Reason}),
|
||||
{error, Reason}
|
||||
end.
|
||||
end;
|
||||
start_multipart(#{aws_config := {error, Reason}}, _Key, _UploadOpts) ->
|
||||
{error, {config_error, Reason}}.
|
||||
|
||||
-spec upload_part(client(), key(), upload_id(), part_number(), iodata()) ->
|
||||
ok_or_error(etag(), term()).
|
||||
upload_part(
|
||||
#{bucket := Bucket, headers := Headers, aws_config := AwsConfig},
|
||||
#{bucket := Bucket, headers := Headers, aws_config := AwsConfig = #aws_config{}},
|
||||
Key,
|
||||
UploadId,
|
||||
PartNumber,
|
||||
|
@ -156,11 +160,13 @@ upload_part(
|
|||
{error, Reason} ->
|
||||
?SLOG(debug, #{msg => "upload_part_fail", key => Key, reason => Reason}),
|
||||
{error, Reason}
|
||||
end.
|
||||
end;
|
||||
upload_part(#{aws_config := {error, Reason}}, _Key, _UploadId, _PartNumber, _Value) ->
|
||||
{error, {config_error, Reason}}.
|
||||
|
||||
-spec complete_multipart(client(), key(), upload_id(), [etag()]) -> ok_or_error(term()).
|
||||
complete_multipart(
|
||||
#{bucket := Bucket, headers := Headers, aws_config := AwsConfig},
|
||||
#{bucket := Bucket, headers := Headers, aws_config := AwsConfig = #aws_config{}},
|
||||
Key,
|
||||
UploadId,
|
||||
ETags
|
||||
|
@ -175,27 +181,37 @@ complete_multipart(
|
|||
{error, Reason} ->
|
||||
?SLOG(debug, #{msg => "complete_multipart_fail", key => Key, reason => Reason}),
|
||||
{error, Reason}
|
||||
end.
|
||||
end;
|
||||
complete_multipart(#{aws_config := {error, Reason}}, _Key, _UploadId, _ETags) ->
|
||||
{error, {config_error, Reason}}.
|
||||
|
||||
-spec abort_multipart(client(), key(), upload_id()) -> ok_or_error(term()).
|
||||
abort_multipart(#{bucket := Bucket, headers := Headers, aws_config := AwsConfig}, Key, UploadId) ->
|
||||
abort_multipart(
|
||||
#{bucket := Bucket, headers := Headers, aws_config := AwsConfig = #aws_config{}},
|
||||
Key,
|
||||
UploadId
|
||||
) ->
|
||||
case erlcloud_s3:abort_multipart(Bucket, erlcloud_key(Key), UploadId, [], Headers, AwsConfig) of
|
||||
ok ->
|
||||
ok;
|
||||
{error, Reason} ->
|
||||
?SLOG(debug, #{msg => "abort_multipart_fail", key => Key, reason => Reason}),
|
||||
{error, Reason}
|
||||
end.
|
||||
end;
|
||||
abort_multipart(#{aws_config := {error, Reason}}, _Key, _UploadId) ->
|
||||
{error, {config_error, Reason}}.
|
||||
|
||||
-spec list(client(), s3_options()) -> ok_or_error(proplists:proplist(), term()).
|
||||
list(#{bucket := Bucket, aws_config := AwsConfig}, Options) ->
|
||||
list(#{bucket := Bucket, aws_config := AwsConfig = #aws_config{}}, Options) ->
|
||||
try erlcloud_s3:list_objects(Bucket, Options, AwsConfig) of
|
||||
Result -> {ok, Result}
|
||||
catch
|
||||
error:{aws_error, Reason} ->
|
||||
?SLOG(debug, #{msg => "list_objects_fail", bucket => Bucket, reason => Reason}),
|
||||
{error, Reason}
|
||||
end.
|
||||
end;
|
||||
list(#{aws_config := {error, Reason}}, _Options) ->
|
||||
{error, {config_error, Reason}}.
|
||||
|
||||
-spec uri(client(), key()) -> iodata().
|
||||
uri(#{bucket := Bucket, aws_config := AwsConfig, url_expire_time := ExpireTime}, Key) ->
|
||||
|
@ -221,35 +237,67 @@ headers(#{headers := Headers}) ->
|
|||
headers(#{}) ->
|
||||
[].
|
||||
|
||||
aws_config(#{
|
||||
scheme := Scheme,
|
||||
host := Host,
|
||||
port := Port,
|
||||
access_key_id := AccessKeyId,
|
||||
secret_access_key := SecretAccessKey,
|
||||
http_pool := HttpPool,
|
||||
pool_type := PoolType,
|
||||
request_timeout := Timeout,
|
||||
max_retries := MaxRetries
|
||||
}) ->
|
||||
#aws_config{
|
||||
aws_config(
|
||||
Config = #{
|
||||
scheme := Scheme,
|
||||
host := Host,
|
||||
port := Port,
|
||||
request_timeout := Timeout
|
||||
}
|
||||
) ->
|
||||
ensure_aws_credentials(Config, #aws_config{
|
||||
s3_scheme = Scheme,
|
||||
s3_host = Host,
|
||||
s3_port = Port,
|
||||
s3_bucket_access_method = path,
|
||||
s3_bucket_after_host = true,
|
||||
|
||||
access_key_id = AccessKeyId,
|
||||
secret_access_key = emqx_secret:unwrap(SecretAccessKey),
|
||||
|
||||
http_client = request_fun(
|
||||
HttpPool, PoolType, with_default(MaxRetries, ?DEFAULT_MAX_RETRIES)
|
||||
),
|
||||
|
||||
%% This value will be transparently passed to ehttpc
|
||||
timeout = with_default(Timeout, ?DEFAULT_REQUEST_TIMEOUT),
|
||||
%% We rely on retry mechanism of ehttpc
|
||||
retry_num = 1
|
||||
}).
|
||||
|
||||
ensure_aws_credentials(
|
||||
Config = #{
|
||||
access_key_id := undefined,
|
||||
secret_access_key := undefined
|
||||
},
|
||||
AWSConfigIn
|
||||
) ->
|
||||
%% NOTE
|
||||
%% Try to fetch credentials from the runtime environment (i.e. EC2 instance metadata).
|
||||
%% Doing it before changing HTTP client to the one pinned to the S3 hostname.
|
||||
case erlcloud_aws:update_config(AWSConfigIn) of
|
||||
{ok, AWSConfig} ->
|
||||
ensure_ehttpc_client(Config, AWSConfig);
|
||||
{error, Reason} ->
|
||||
{error, {failed_to_obtain_credentials, Reason}}
|
||||
end;
|
||||
ensure_aws_credentials(
|
||||
Config = #{
|
||||
access_key_id := AccessKeyId,
|
||||
secret_access_key := SecretAccessKey
|
||||
},
|
||||
AWSConfigIn
|
||||
) ->
|
||||
ensure_ehttpc_client(Config, AWSConfigIn#aws_config{
|
||||
access_key_id = AccessKeyId,
|
||||
secret_access_key = emqx_secret:unwrap(SecretAccessKey)
|
||||
}).
|
||||
|
||||
ensure_ehttpc_client(
|
||||
#{
|
||||
http_pool := HttpPool,
|
||||
pool_type := PoolType,
|
||||
max_retries := MaxRetries
|
||||
},
|
||||
AWSConfig
|
||||
) ->
|
||||
AWSConfig#aws_config{
|
||||
http_client = request_fun(
|
||||
HttpPool, PoolType, with_default(MaxRetries, ?DEFAULT_MAX_RETRIES)
|
||||
)
|
||||
}.
|
||||
|
||||
-spec request_fun(http_pool(), pool_type(), non_neg_integer()) -> erlcloud_httpc:request_fun().
|
||||
|
|
|
@ -145,18 +145,37 @@ t_extra_headers(Config0) ->
|
|||
|
||||
Client = client(Config),
|
||||
Opts = #{acl => public_read},
|
||||
Data = #{foo => bar},
|
||||
Data = #{<<"foo">> => <<"bar">>},
|
||||
ok = emqx_s3_client:put_object(Client, Key, Opts, emqx_utils_json:encode(Data)),
|
||||
|
||||
Url = emqx_s3_client:uri(Client, Key),
|
||||
|
||||
{ok, {{_StatusLine, 200, "OK"}, _Headers, Content}} =
|
||||
httpc:request(get, {Url, []}, [{ssl, [{verify, verify_none}]}], []),
|
||||
?_assertEqual(
|
||||
?assertEqual(
|
||||
Data,
|
||||
emqx_utils_json:decode(Content)
|
||||
).
|
||||
|
||||
t_no_credentials(Config) ->
|
||||
Bucket = ?config(bucket, Config),
|
||||
ProfileConfig = maps:merge(
|
||||
profile_config(Config),
|
||||
#{
|
||||
access_key_id => undefined,
|
||||
secret_access_key => undefined
|
||||
}
|
||||
),
|
||||
ClientConfig = emqx_s3_profile_conf:client_config(
|
||||
ProfileConfig,
|
||||
?config(ehttpc_pool_name, Config)
|
||||
),
|
||||
Client = emqx_s3_client:create(Bucket, ClientConfig),
|
||||
?assertMatch(
|
||||
{error, {config_error, {failed_to_obtain_credentials, _}}},
|
||||
emqx_s3_client:put_object(Client, ?config(key, Config), <<>>)
|
||||
).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Helpers
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -730,8 +730,8 @@ redact(Term, Checker) ->
|
|||
is_sensitive_key(V) orelse Checker(V)
|
||||
end).
|
||||
|
||||
do_redact(L, Checker) when is_list(L) ->
|
||||
lists:map(fun(E) -> do_redact(E, Checker) end, L);
|
||||
do_redact([E | Rest], Checker) ->
|
||||
[do_redact(E, Checker) | do_redact(Rest, Checker)];
|
||||
do_redact(M, Checker) when is_map(M) ->
|
||||
maps:map(
|
||||
fun(K, V) ->
|
||||
|
@ -838,14 +838,8 @@ ipv6_probe_test() ->
|
|||
end.
|
||||
|
||||
redact_test_() ->
|
||||
Case = fun(Type, KeyT) ->
|
||||
Key =
|
||||
case Type of
|
||||
atom -> KeyT;
|
||||
string -> erlang:atom_to_list(KeyT);
|
||||
binary -> erlang:atom_to_binary(KeyT)
|
||||
end,
|
||||
|
||||
Case = fun(TypeF, KeyIn) ->
|
||||
Key = TypeF(KeyIn),
|
||||
?assert(is_sensitive_key(Key)),
|
||||
|
||||
%% direct
|
||||
|
@ -866,10 +860,16 @@ redact_test_() ->
|
|||
%% 3 level nested
|
||||
?assertEqual([#{opts => [{Key, ?REDACT_VAL}]}], redact([#{opts => [{Key, foo}]}])),
|
||||
?assertEqual([{opts, [{Key, ?REDACT_VAL}]}], redact([{opts, [{Key, foo}]}])),
|
||||
?assertEqual([{opts, [#{Key => ?REDACT_VAL}]}], redact([{opts, [#{Key => foo}]}]))
|
||||
end,
|
||||
?assertEqual([{opts, [#{Key => ?REDACT_VAL}]}], redact([{opts, [#{Key => foo}]}])),
|
||||
|
||||
Types = [atom, string, binary],
|
||||
%% improper lists
|
||||
?assertEqual([{opts, [{Key, ?REDACT_VAL} | oops]}], redact([{opts, [{Key, foo} | oops]}]))
|
||||
end,
|
||||
Types = [
|
||||
{atom, fun identity/1},
|
||||
{string, fun emqx_utils_conv:str/1},
|
||||
{binary, fun emqx_utils_conv:bin/1}
|
||||
],
|
||||
Keys = [
|
||||
authorization,
|
||||
aws_secret_access_key,
|
||||
|
@ -882,7 +882,11 @@ redact_test_() ->
|
|||
token,
|
||||
bind_password
|
||||
],
|
||||
[{case_name(Type, Key), fun() -> Case(Type, Key) end} || Key <- Keys, Type <- Types].
|
||||
[
|
||||
{case_name(Type, Key), fun() -> Case(TypeF, Key) end}
|
||||
|| Key <- Keys,
|
||||
{Type, TypeF} <- Types
|
||||
].
|
||||
|
||||
redact2_test_() ->
|
||||
Case = fun(Key, Checker) ->
|
||||
|
@ -936,6 +940,9 @@ redact_is_authorization_test_() ->
|
|||
case_name(Type, Key) ->
|
||||
lists:concat([Type, "-", Key]).
|
||||
|
||||
identity(X) ->
|
||||
X.
|
||||
|
||||
-endif.
|
||||
|
||||
pub_props_to_packet(Properties) ->
|
||||
|
|
Loading…
Reference in New Issue