diff --git a/apps/emqx/src/emqx_secret.erl b/apps/emqx/src/emqx_secret.erl index 5340f36ba..72c4f3c08 100644 --- a/apps/emqx/src/emqx_secret.erl +++ b/apps/emqx/src/emqx_secret.erl @@ -21,6 +21,10 @@ %% API: -export([wrap/1, unwrap/1]). +-export_type([t/1]). + +-opaque t(T) :: T | fun(() -> t(T)). + %%================================================================================ %% API funcions %%================================================================================ diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl index b9f07d5c0..4db2255f6 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl @@ -79,7 +79,7 @@ start_export(_Options, Transfer, Filemeta) -> -spec write(export_st(), iodata()) -> {ok, export_st()} | {error, term()}. write(#{pid := Pid} = ExportSt, IoData) -> - case emqx_s3_uploader:write(Pid, IoData) of + case emqx_s3_uploader:write(Pid, IoData, emqx_ft_conf:store_segment_timeout()) of ok -> {ok, ExportSt}; {error, _Reason} = Error -> @@ -89,12 +89,13 @@ write(#{pid := Pid} = ExportSt, IoData) -> -spec complete(export_st(), emqx_ft:checksum()) -> ok | {error, term()}. complete(#{pid := Pid} = _ExportSt, _Checksum) -> - emqx_s3_uploader:complete(Pid). + emqx_s3_uploader:complete(Pid, emqx_ft_conf:assemble_timeout()). -spec discard(export_st()) -> ok. discard(#{pid := Pid} = _ExportSt) -> - emqx_s3_uploader:abort(Pid). + % NOTE: will abort upload asynchronously if needed + emqx_s3_uploader:shutdown(Pid). -spec list(options(), query()) -> {ok, page(exportinfo())} | {error, term()}. diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_fs.erl index 010d004a1..7a0a6b3b4 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs.erl @@ -267,8 +267,12 @@ lookup_assembler([Source | Sources]) -> check_if_already_exported(Storage, Transfer) -> case files(Storage, #{transfer => Transfer}) of - {ok, #{items := [_ | _]}} -> ok; - _ -> {error, not_found} + {ok, #{items := [_ | _]}} -> + % NOTE: we don't know coverage here, let's just clean up locally. + _ = emqx_ft_storage_fs_gc:collect(Storage, Transfer, [node()]), + ok; + _ -> + {error, not_found} end. lookup_local_assembler(Transfer) -> diff --git a/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl b/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl index a7ffd5675..842ae6bad 100644 --- a/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl @@ -266,6 +266,38 @@ t_gc_incomplete_transfers(_Config) -> 0 ). +t_gc_repeated_transfer(_Config) -> + {local, Storage} = emqx_ft_storage:backend(), + Transfer = { + TID = {<<"clientclient">>, mk_file_id()}, + #{name => "repeat.please", segments_ttl => 10}, + emqx_ft_content_gen:new({?LINE, Size = 42}, 16) + }, + Size = start_transfer(Storage, Transfer), + {ok, {ok, #{stats := Stats1}}} = ?wait_async_action( + ?assertEqual(ok, complete_transfer(Storage, TID, Size)), + #{?snk_kind := garbage_collection}, + 1000 + ), + Size = start_transfer(Storage, Transfer), + {ok, {ok, #{stats := Stats2}}} = ?wait_async_action( + ?assertEqual(ok, complete_transfer(Storage, TID, Size)), + #{?snk_kind := garbage_collection}, + 1000 + ), + ?assertMatch( + #gcstats{files = 4, directories = 2}, + Stats1 + ), + ?assertMatch( + #gcstats{files = 4, directories = 2}, + Stats2 + ), + ?assertEqual( + {ok, []}, + emqx_ft_storage_fs:list(Storage, TID, fragment) + ). + t_gc_handling_errors(_Config) -> ok = set_gc_config(minimum_segments_ttl, 0), ok = set_gc_config(maximum_segments_ttl, 0), @@ -349,14 +381,18 @@ complete_transfer(Storage, Transfer, Size) -> complete_transfer(Storage, Transfer, Size, 100). complete_transfer(Storage, Transfer, Size, Timeout) -> - {async, Pid} = emqx_ft_storage_fs:assemble(Storage, Transfer, Size), - MRef = erlang:monitor(process, Pid), - Pid ! kickoff, - receive - {'DOWN', MRef, process, Pid, {shutdown, Result}} -> - Result - after Timeout -> - ct:fail("Assembler did not finish in time") + case emqx_ft_storage_fs:assemble(Storage, Transfer, Size) of + ok -> + ok; + {async, Pid} -> + MRef = erlang:monitor(process, Pid), + Pid ! kickoff, + receive + {'DOWN', MRef, process, Pid, {shutdown, Result}} -> + Result + after Timeout -> + ct:fail("Assembler did not finish in time") + end end. mk_file_id() -> diff --git a/apps/emqx_s3/src/emqx_s3.erl b/apps/emqx_s3/src/emqx_s3.erl index 0c2592736..cc48cdb93 100644 --- a/apps/emqx_s3/src/emqx_s3.erl +++ b/apps/emqx_s3/src/emqx_s3.erl @@ -44,7 +44,7 @@ -type profile_config() :: #{ bucket := string(), access_key_id => string(), - secret_access_key => string(), + secret_access_key => emqx_secret:t(string()), host := string(), port := pos_integer(), url_expire_time := pos_integer(), diff --git a/apps/emqx_s3/src/emqx_s3_client.erl b/apps/emqx_s3/src/emqx_s3_client.erl index 3bc5861c6..c062cf1ca 100644 --- a/apps/emqx_s3/src/emqx_s3_client.erl +++ b/apps/emqx_s3/src/emqx_s3_client.erl @@ -60,7 +60,7 @@ acl := emqx_s3:acl() | undefined, url_expire_time := pos_integer(), access_key_id := string() | undefined, - secret_access_key := string() | undefined, + secret_access_key := emqx_secret:t(string()) | undefined, http_pool := http_pool(), pool_type := pool_type(), request_timeout := timeout() | undefined, @@ -230,7 +230,7 @@ aws_config(#{ s3_bucket_after_host = true, access_key_id = AccessKeyId, - secret_access_key = SecretAccessKey, + secret_access_key = emqx_secret:unwrap(SecretAccessKey), http_client = request_fun( HttpPool, PoolType, with_default(MaxRetries, ?DEFAULT_MAX_RETRIES) diff --git a/apps/emqx_s3/src/emqx_s3_profile_conf.erl b/apps/emqx_s3/src/emqx_s3_profile_conf.erl index 87f006bcb..a449640a6 100644 --- a/apps/emqx_s3/src/emqx_s3_profile_conf.erl +++ b/apps/emqx_s3/src/emqx_s3_profile_conf.erl @@ -11,7 +11,7 @@ -include_lib("snabbkaffe/include/snabbkaffe.hrl"). --include("src/emqx_s3.hrl"). +-include("emqx_s3.hrl"). -export([ start_link/2, @@ -377,10 +377,10 @@ stop_http_pool(ProfileId, PoolName) -> ok = ?tp(debug, "s3_stop_http_pool", #{pool_name => PoolName}). do_start_http_pool(PoolName, HttpConfig) -> - ?SLOG(warning, #{msg => "s3_start_http_pool", pool_name => PoolName, config => HttpConfig}), + ?SLOG(debug, #{msg => "s3_starting_http_pool", pool_name => PoolName, config => HttpConfig}), case ehttpc_sup:start_pool(PoolName, HttpConfig) of {ok, _} -> - ?SLOG(warning, #{msg => "s3_start_http_pool_success", pool_name => PoolName}), + ?SLOG(info, #{msg => "s3_start_http_pool_success", pool_name => PoolName}), ok; {error, _} = Error -> ?SLOG(error, #{msg => "s3_start_http_pool_fail", pool_name => PoolName, error => Error}), diff --git a/apps/emqx_s3/src/emqx_s3_schema.erl b/apps/emqx_s3/src/emqx_s3_schema.erl index 5866f8c2b..f02364969 100644 --- a/apps/emqx_s3/src/emqx_s3_schema.erl +++ b/apps/emqx_s3/src/emqx_s3_schema.erl @@ -34,11 +34,12 @@ fields(s3) -> )}, {secret_access_key, mk( - string(), + hoconsc:union([string(), function()]), #{ desc => ?DESC("secret_access_key"), required => false, - sensitive => true + sensitive => true, + converter => fun secret/2 } )}, {bucket, @@ -124,7 +125,7 @@ fields(transport_options) -> mk( boolean(), #{ - default => true, + default => false, desc => ?DESC("ipv6_probe"), required => false } @@ -142,6 +143,14 @@ desc(s3) -> desc(transport_options) -> "Options for the HTTP transport layer used by the S3 client". +secret(undefined, #{}) -> + undefined; +secret(Secret, #{make_serializable := true}) -> + unicode:characters_to_binary(emqx_secret:unwrap(Secret)); +secret(Secret, #{}) -> + _ = is_binary(Secret) orelse throw({expected_type, string}), + emqx_secret:wrap(unicode:characters_to_list(Secret)). + translate(Conf) -> translate(Conf, #{}). diff --git a/apps/emqx_s3/src/emqx_s3_uploader.erl b/apps/emqx_s3/src/emqx_s3_uploader.erl index 595612f62..aa547c7cc 100644 --- a/apps/emqx_s3/src/emqx_s3_uploader.erl +++ b/apps/emqx_s3/src/emqx_s3_uploader.erl @@ -18,7 +18,9 @@ complete/2, abort/1, - abort/2 + abort/2, + + shutdown/1 ]). -export([ @@ -87,6 +89,11 @@ abort(Pid) -> abort(Pid, Timeout) -> gen_statem:call(Pid, abort, Timeout). +-spec shutdown(pid()) -> ok. +shutdown(Pid) -> + _ = erlang:exit(Pid, shutdown), + ok. + %%-------------------------------------------------------------------- %% gen_statem callbacks %%-------------------------------------------------------------------- diff --git a/apps/emqx_s3/test/emqx_s3_schema_SUITE.erl b/apps/emqx_s3/test/emqx_s3_schema_SUITE.erl index 63f659da0..3c7753857 100644 --- a/apps/emqx_s3/test/emqx_s3_schema_SUITE.erl +++ b/apps/emqx_s3/test/emqx_s3_schema_SUITE.erl @@ -49,7 +49,7 @@ t_full_config(_Config) -> host := "s3.us-east-1.endpoint.com", min_part_size := 10485760, port := 443, - secret_access_key := "secret_access_key", + secret_access_key := Secret, transport_options := #{ connect_timeout := 30000, @@ -74,7 +74,7 @@ t_full_config(_Config) -> versions := ['tlsv1.2'] } } - }, + } when is_function(Secret), emqx_s3_schema:translate(#{ <<"access_key_id">> => <<"access_key_id">>, <<"secret_access_key">> => <<"secret_access_key">>, @@ -126,6 +126,26 @@ t_sensitive_config_hidden(_Config) -> ) ). +t_sensitive_config_no_leak(_Config) -> + ?assertThrow( + {emqx_s3_schema, [ + Error = #{ + kind := validation_error, + path := "s3.secret_access_key", + reason := {expected_type, string} + } + ]} when map_size(Error) == 3, + emqx_s3_schema:translate( + #{ + <<"bucket">> => <<"bucket">>, + <<"host">> => <<"s3.us-east-1.endpoint.com">>, + <<"port">> => 443, + <<"access_key_id">> => <<"access_key_id">>, + <<"secret_access_key">> => #{<<"1">> => <<"secret_access_key">>} + } + ) + ). + t_invalid_limits(_Config) -> ?assertException( throw,