From 3278158c66b02fb5be7e1085976b6f182518a9e3 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 25 May 2023 12:59:33 +0300 Subject: [PATCH 1/6] fix(ft): disable `ipv6_probe` by default for S3 clients It's often demonstrates degraded performance on common setups, e.g. containers with virtualized network interfaces. --- apps/emqx_s3/src/emqx_s3_schema.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_s3/src/emqx_s3_schema.erl b/apps/emqx_s3/src/emqx_s3_schema.erl index 5866f8c2b..a9b5e97e9 100644 --- a/apps/emqx_s3/src/emqx_s3_schema.erl +++ b/apps/emqx_s3/src/emqx_s3_schema.erl @@ -124,7 +124,7 @@ fields(transport_options) -> mk( boolean(), #{ - default => true, + default => false, desc => ?DESC("ipv6_probe"), required => false } From 75cf562c903662f27ccc410de619d2659074ab84 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 25 May 2023 13:02:04 +0300 Subject: [PATCH 2/6] fix(ft): tune some logging levels --- apps/emqx_s3/src/emqx_s3_profile_conf.erl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/emqx_s3/src/emqx_s3_profile_conf.erl b/apps/emqx_s3/src/emqx_s3_profile_conf.erl index 87f006bcb..a449640a6 100644 --- a/apps/emqx_s3/src/emqx_s3_profile_conf.erl +++ b/apps/emqx_s3/src/emqx_s3_profile_conf.erl @@ -11,7 +11,7 @@ -include_lib("snabbkaffe/include/snabbkaffe.hrl"). --include("src/emqx_s3.hrl"). +-include("emqx_s3.hrl"). -export([ start_link/2, @@ -377,10 +377,10 @@ stop_http_pool(ProfileId, PoolName) -> ok = ?tp(debug, "s3_stop_http_pool", #{pool_name => PoolName}). do_start_http_pool(PoolName, HttpConfig) -> - ?SLOG(warning, #{msg => "s3_start_http_pool", pool_name => PoolName, config => HttpConfig}), + ?SLOG(debug, #{msg => "s3_starting_http_pool", pool_name => PoolName, config => HttpConfig}), case ehttpc_sup:start_pool(PoolName, HttpConfig) of {ok, _} -> - ?SLOG(warning, #{msg => "s3_start_http_pool_success", pool_name => PoolName}), + ?SLOG(info, #{msg => "s3_start_http_pool_success", pool_name => PoolName}), ok; {error, _} = Error -> ?SLOG(error, #{msg => "s3_start_http_pool_fail", pool_name => PoolName, error => Error}), From d36c2c2928b59a982a303dd764058a2af155cfc5 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 25 May 2023 13:43:22 +0300 Subject: [PATCH 3/6] fix(ft-gc): ensure GC of already complete transfers --- apps/emqx_ft/src/emqx_ft_storage_fs.erl | 8 ++- .../test/emqx_ft_storage_fs_gc_SUITE.erl | 52 ++++++++++++++++--- 2 files changed, 50 insertions(+), 10 deletions(-) diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_fs.erl index 010d004a1..7a0a6b3b4 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs.erl @@ -267,8 +267,12 @@ lookup_assembler([Source | Sources]) -> check_if_already_exported(Storage, Transfer) -> case files(Storage, #{transfer => Transfer}) of - {ok, #{items := [_ | _]}} -> ok; - _ -> {error, not_found} + {ok, #{items := [_ | _]}} -> + % NOTE: we don't know coverage here, let's just clean up locally. + _ = emqx_ft_storage_fs_gc:collect(Storage, Transfer, [node()]), + ok; + _ -> + {error, not_found} end. lookup_local_assembler(Transfer) -> diff --git a/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl b/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl index a7ffd5675..842ae6bad 100644 --- a/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl @@ -266,6 +266,38 @@ t_gc_incomplete_transfers(_Config) -> 0 ). +t_gc_repeated_transfer(_Config) -> + {local, Storage} = emqx_ft_storage:backend(), + Transfer = { + TID = {<<"clientclient">>, mk_file_id()}, + #{name => "repeat.please", segments_ttl => 10}, + emqx_ft_content_gen:new({?LINE, Size = 42}, 16) + }, + Size = start_transfer(Storage, Transfer), + {ok, {ok, #{stats := Stats1}}} = ?wait_async_action( + ?assertEqual(ok, complete_transfer(Storage, TID, Size)), + #{?snk_kind := garbage_collection}, + 1000 + ), + Size = start_transfer(Storage, Transfer), + {ok, {ok, #{stats := Stats2}}} = ?wait_async_action( + ?assertEqual(ok, complete_transfer(Storage, TID, Size)), + #{?snk_kind := garbage_collection}, + 1000 + ), + ?assertMatch( + #gcstats{files = 4, directories = 2}, + Stats1 + ), + ?assertMatch( + #gcstats{files = 4, directories = 2}, + Stats2 + ), + ?assertEqual( + {ok, []}, + emqx_ft_storage_fs:list(Storage, TID, fragment) + ). + t_gc_handling_errors(_Config) -> ok = set_gc_config(minimum_segments_ttl, 0), ok = set_gc_config(maximum_segments_ttl, 0), @@ -349,14 +381,18 @@ complete_transfer(Storage, Transfer, Size) -> complete_transfer(Storage, Transfer, Size, 100). complete_transfer(Storage, Transfer, Size, Timeout) -> - {async, Pid} = emqx_ft_storage_fs:assemble(Storage, Transfer, Size), - MRef = erlang:monitor(process, Pid), - Pid ! kickoff, - receive - {'DOWN', MRef, process, Pid, {shutdown, Result}} -> - Result - after Timeout -> - ct:fail("Assembler did not finish in time") + case emqx_ft_storage_fs:assemble(Storage, Transfer, Size) of + ok -> + ok; + {async, Pid} -> + MRef = erlang:monitor(process, Pid), + Pid ! kickoff, + receive + {'DOWN', MRef, process, Pid, {shutdown, Result}} -> + Result + after Timeout -> + ct:fail("Assembler did not finish in time") + end end. mk_file_id() -> From cc3275b38913268f9824c279b218ba6bc5b4c39f Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 25 May 2023 18:37:22 +0300 Subject: [PATCH 4/6] fix(ft): use conservative timeouts for S3 uploader calls --- apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl index b9f07d5c0..0d6086259 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl @@ -79,7 +79,7 @@ start_export(_Options, Transfer, Filemeta) -> -spec write(export_st(), iodata()) -> {ok, export_st()} | {error, term()}. write(#{pid := Pid} = ExportSt, IoData) -> - case emqx_s3_uploader:write(Pid, IoData) of + case emqx_s3_uploader:write(Pid, IoData, emqx_ft_conf:store_segment_timeout()) of ok -> {ok, ExportSt}; {error, _Reason} = Error -> @@ -89,7 +89,7 @@ write(#{pid := Pid} = ExportSt, IoData) -> -spec complete(export_st(), emqx_ft:checksum()) -> ok | {error, term()}. complete(#{pid := Pid} = _ExportSt, _Checksum) -> - emqx_s3_uploader:complete(Pid). + emqx_s3_uploader:complete(Pid, emqx_ft_conf:assemble_timeout()). -spec discard(export_st()) -> ok. From 3d428a2e0ec9128f5a11d597036a4d24a945d46e Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 25 May 2023 19:18:16 +0300 Subject: [PATCH 5/6] fix(ft-s3): rely on asynchronous upload abort So that upload aborts would not block assemblers. This should not affect the expected behavior since S3 API usually allows having concurrent uploads, plus we couldn't really guarantee successful aborts anyway. --- apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl | 3 ++- apps/emqx_s3/src/emqx_s3_uploader.erl | 9 ++++++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl index 0d6086259..4db2255f6 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl @@ -94,7 +94,8 @@ complete(#{pid := Pid} = _ExportSt, _Checksum) -> -spec discard(export_st()) -> ok. discard(#{pid := Pid} = _ExportSt) -> - emqx_s3_uploader:abort(Pid). + % NOTE: will abort upload asynchronously if needed + emqx_s3_uploader:shutdown(Pid). -spec list(options(), query()) -> {ok, page(exportinfo())} | {error, term()}. diff --git a/apps/emqx_s3/src/emqx_s3_uploader.erl b/apps/emqx_s3/src/emqx_s3_uploader.erl index 595612f62..aa547c7cc 100644 --- a/apps/emqx_s3/src/emqx_s3_uploader.erl +++ b/apps/emqx_s3/src/emqx_s3_uploader.erl @@ -18,7 +18,9 @@ complete/2, abort/1, - abort/2 + abort/2, + + shutdown/1 ]). -export([ @@ -87,6 +89,11 @@ abort(Pid) -> abort(Pid, Timeout) -> gen_statem:call(Pid, abort, Timeout). +-spec shutdown(pid()) -> ok. +shutdown(Pid) -> + _ = erlang:exit(Pid, shutdown), + ok. + %%-------------------------------------------------------------------- %% gen_statem callbacks %%-------------------------------------------------------------------- From 48858dee3392ff420be65470febc3d0c7ec75dca Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 26 May 2023 00:27:00 +0300 Subject: [PATCH 6/6] fix(s3): wrap S3 secrets during config load --- apps/emqx/src/emqx_secret.erl | 4 ++++ apps/emqx_s3/src/emqx_s3.erl | 2 +- apps/emqx_s3/src/emqx_s3_client.erl | 4 ++-- apps/emqx_s3/src/emqx_s3_schema.erl | 13 ++++++++++-- apps/emqx_s3/test/emqx_s3_schema_SUITE.erl | 24 ++++++++++++++++++++-- 5 files changed, 40 insertions(+), 7 deletions(-) diff --git a/apps/emqx/src/emqx_secret.erl b/apps/emqx/src/emqx_secret.erl index 5340f36ba..72c4f3c08 100644 --- a/apps/emqx/src/emqx_secret.erl +++ b/apps/emqx/src/emqx_secret.erl @@ -21,6 +21,10 @@ %% API: -export([wrap/1, unwrap/1]). +-export_type([t/1]). + +-opaque t(T) :: T | fun(() -> t(T)). + %%================================================================================ %% API funcions %%================================================================================ diff --git a/apps/emqx_s3/src/emqx_s3.erl b/apps/emqx_s3/src/emqx_s3.erl index 0c2592736..cc48cdb93 100644 --- a/apps/emqx_s3/src/emqx_s3.erl +++ b/apps/emqx_s3/src/emqx_s3.erl @@ -44,7 +44,7 @@ -type profile_config() :: #{ bucket := string(), access_key_id => string(), - secret_access_key => string(), + secret_access_key => emqx_secret:t(string()), host := string(), port := pos_integer(), url_expire_time := pos_integer(), diff --git a/apps/emqx_s3/src/emqx_s3_client.erl b/apps/emqx_s3/src/emqx_s3_client.erl index 3bc5861c6..c062cf1ca 100644 --- a/apps/emqx_s3/src/emqx_s3_client.erl +++ b/apps/emqx_s3/src/emqx_s3_client.erl @@ -60,7 +60,7 @@ acl := emqx_s3:acl() | undefined, url_expire_time := pos_integer(), access_key_id := string() | undefined, - secret_access_key := string() | undefined, + secret_access_key := emqx_secret:t(string()) | undefined, http_pool := http_pool(), pool_type := pool_type(), request_timeout := timeout() | undefined, @@ -230,7 +230,7 @@ aws_config(#{ s3_bucket_after_host = true, access_key_id = AccessKeyId, - secret_access_key = SecretAccessKey, + secret_access_key = emqx_secret:unwrap(SecretAccessKey), http_client = request_fun( HttpPool, PoolType, with_default(MaxRetries, ?DEFAULT_MAX_RETRIES) diff --git a/apps/emqx_s3/src/emqx_s3_schema.erl b/apps/emqx_s3/src/emqx_s3_schema.erl index a9b5e97e9..f02364969 100644 --- a/apps/emqx_s3/src/emqx_s3_schema.erl +++ b/apps/emqx_s3/src/emqx_s3_schema.erl @@ -34,11 +34,12 @@ fields(s3) -> )}, {secret_access_key, mk( - string(), + hoconsc:union([string(), function()]), #{ desc => ?DESC("secret_access_key"), required => false, - sensitive => true + sensitive => true, + converter => fun secret/2 } )}, {bucket, @@ -142,6 +143,14 @@ desc(s3) -> desc(transport_options) -> "Options for the HTTP transport layer used by the S3 client". +secret(undefined, #{}) -> + undefined; +secret(Secret, #{make_serializable := true}) -> + unicode:characters_to_binary(emqx_secret:unwrap(Secret)); +secret(Secret, #{}) -> + _ = is_binary(Secret) orelse throw({expected_type, string}), + emqx_secret:wrap(unicode:characters_to_list(Secret)). + translate(Conf) -> translate(Conf, #{}). diff --git a/apps/emqx_s3/test/emqx_s3_schema_SUITE.erl b/apps/emqx_s3/test/emqx_s3_schema_SUITE.erl index 63f659da0..3c7753857 100644 --- a/apps/emqx_s3/test/emqx_s3_schema_SUITE.erl +++ b/apps/emqx_s3/test/emqx_s3_schema_SUITE.erl @@ -49,7 +49,7 @@ t_full_config(_Config) -> host := "s3.us-east-1.endpoint.com", min_part_size := 10485760, port := 443, - secret_access_key := "secret_access_key", + secret_access_key := Secret, transport_options := #{ connect_timeout := 30000, @@ -74,7 +74,7 @@ t_full_config(_Config) -> versions := ['tlsv1.2'] } } - }, + } when is_function(Secret), emqx_s3_schema:translate(#{ <<"access_key_id">> => <<"access_key_id">>, <<"secret_access_key">> => <<"secret_access_key">>, @@ -126,6 +126,26 @@ t_sensitive_config_hidden(_Config) -> ) ). +t_sensitive_config_no_leak(_Config) -> + ?assertThrow( + {emqx_s3_schema, [ + Error = #{ + kind := validation_error, + path := "s3.secret_access_key", + reason := {expected_type, string} + } + ]} when map_size(Error) == 3, + emqx_s3_schema:translate( + #{ + <<"bucket">> => <<"bucket">>, + <<"host">> => <<"s3.us-east-1.endpoint.com">>, + <<"port">> => 443, + <<"access_key_id">> => <<"access_key_id">>, + <<"secret_access_key">> => #{<<"1">> => <<"secret_access_key">>} + } + ) + ). + t_invalid_limits(_Config) -> ?assertException( throw,