diff --git a/apps/emqx_ft/src/emqx_ft.erl b/apps/emqx_ft/src/emqx_ft.erl index c055680b0..6ac6596ff 100644 --- a/apps/emqx_ft/src/emqx_ft.erl +++ b/apps/emqx_ft/src/emqx_ft.erl @@ -81,9 +81,6 @@ -type segment() :: {offset(), _Content :: binary()}. --define(STORE_SEGMENT_TIMEOUT, 10000). --define(ASSEMBLE_TIMEOUT, 300000). - %%-------------------------------------------------------------------- %% API for app %%-------------------------------------------------------------------- @@ -212,7 +209,7 @@ on_init(PacketId, Msg, Transfer, Meta) -> Callback = fun(Result) -> ?MODULE:on_complete("store_filemeta", PacketKey, Transfer, Result) end, - with_responder(PacketKey, Callback, ?STORE_SEGMENT_TIMEOUT, fun() -> + with_responder(PacketKey, Callback, emqx_ft_conf:init_timeout(), fun() -> case store_filemeta(Transfer, Meta) of % Stored, ack through the responder right away ok -> @@ -245,7 +242,7 @@ on_segment(PacketId, Msg, Transfer, Offset, Checksum) -> Callback = fun(Result) -> ?MODULE:on_complete("store_segment", PacketKey, Transfer, Result) end, - with_responder(PacketKey, Callback, ?STORE_SEGMENT_TIMEOUT, fun() -> + with_responder(PacketKey, Callback, emqx_ft_conf:store_segment_timeout(), fun() -> case store_segment(Transfer, Segment) of ok -> emqx_ft_responder:ack(PacketKey, ok); @@ -271,7 +268,7 @@ on_fin(PacketId, Msg, Transfer, FinalSize, Checksum) -> Callback = fun(Result) -> ?MODULE:on_complete("assemble", FinPacketKey, Transfer, Result) end, - with_responder(FinPacketKey, Callback, ?ASSEMBLE_TIMEOUT, fun() -> + with_responder(FinPacketKey, Callback, emqx_ft_conf:assemble_timeout(), fun() -> case assemble(Transfer, FinalSize) of %% Assembling completed, ack through the responder right away % ok -> diff --git a/apps/emqx_ft/src/emqx_ft_conf.erl b/apps/emqx_ft/src/emqx_ft_conf.erl index 5ca0b85ed..14a79b94c 100644 --- a/apps/emqx_ft/src/emqx_ft_conf.erl +++ b/apps/emqx_ft/src/emqx_ft_conf.erl @@ -26,6 +26,9 @@ -export([storage/0]). -export([gc_interval/1]). -export([segments_ttl/1]). +-export([init_timeout/0]). +-export([store_segment_timeout/0]). +-export([assemble_timeout/0]). %% Load/Unload -export([ @@ -86,6 +89,15 @@ assert_storage(Type) -> error({inapplicable, Conf}) end. +init_timeout() -> + emqx_config:get([file_transfer, init_timeout]). + +assemble_timeout() -> + emqx_config:get([file_transfer, assemble_timeout]). + +store_segment_timeout() -> + emqx_config:get([file_transfer, store_segment_timeout]). + %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- diff --git a/apps/emqx_ft/src/emqx_ft_schema.erl b/apps/emqx_ft/src/emqx_ft_schema.erl index beb43adc3..bc780874a 100644 --- a/apps/emqx_ft/src/emqx_ft_schema.erl +++ b/apps/emqx_ft/src/emqx_ft_schema.erl @@ -46,6 +46,33 @@ roots() -> [file_transfer]. fields(file_transfer) -> [ + {init_timeout, + mk( + emqx_schema:duration_ms(), + #{ + desc => ?DESC("init_timeout"), + required => false, + default => "10s" + } + )}, + {store_segment_timeout, + mk( + emqx_schema:duration_ms(), + #{ + desc => ?DESC("store_segment_timeout"), + required => false, + default => "5m" + } + )}, + {assemble_timeout, + mk( + emqx_schema:duration_ms(), + #{ + desc => ?DESC("assemble_timeout"), + required => false, + default => "5m" + } + )}, {storage, mk( hoconsc:union([ diff --git a/apps/emqx_s3/src/emqx_s3_client.erl b/apps/emqx_s3/src/emqx_s3_client.erl index 6bb6c2e3b..e3e1d84af 100644 --- a/apps/emqx_s3/src/emqx_s3_client.erl +++ b/apps/emqx_s3/src/emqx_s3_client.erl @@ -199,10 +199,12 @@ format(#{aws_config := AwsConfig} = Client) -> %% Internal functions %%-------------------------------------------------------------------- -upload_options(Config) -> +upload_options(#{acl := Acl}) when Acl =/= undefined -> [ - {acl, maps:get(acl, Config)} - ]. + {acl, Acl} + ]; +upload_options(#{}) -> + []. headers(#{headers := Headers}) -> headers_user_to_erlcloud_request(Headers); diff --git a/apps/emqx_s3/src/emqx_s3_profile_conf.erl b/apps/emqx_s3/src/emqx_s3_profile_conf.erl index 3d9d6ed3f..3d66823a7 100644 --- a/apps/emqx_s3/src/emqx_s3_profile_conf.erl +++ b/apps/emqx_s3/src/emqx_s3_profile_conf.erl @@ -198,7 +198,7 @@ client_config(ProfileConfig, PoolName) -> port => maps:get(port, ProfileConfig), url_expire_time => maps:get(url_expire_time, ProfileConfig), headers => maps:get(headers, HTTPOpts, #{}), - acl => maps:get(acl, ProfileConfig), + acl => maps:get(acl, ProfileConfig, undefined), bucket => maps:get(bucket, ProfileConfig), access_key_id => maps:get(access_key_id, ProfileConfig, undefined), secret_access_key => maps:get(secret_access_key, ProfileConfig, undefined), diff --git a/apps/emqx_s3/src/emqx_s3_schema.erl b/apps/emqx_s3/src/emqx_s3_schema.erl index 4d017deed..2d714bb7d 100644 --- a/apps/emqx_s3/src/emqx_s3_schema.erl +++ b/apps/emqx_s3/src/emqx_s3_schema.erl @@ -105,7 +105,7 @@ fields(s3) -> #{ default => private, desc => ?DESC("acl"), - required => true + required => false } )}, {transport_options, diff --git a/apps/emqx_s3/test/emqx_s3_client_SUITE.erl b/apps/emqx_s3/test/emqx_s3_client_SUITE.erl index 4af803c67..cb55bc083 100644 --- a/apps/emqx_s3/test/emqx_s3_client_SUITE.erl +++ b/apps/emqx_s3/test/emqx_s3_client_SUITE.erl @@ -60,7 +60,8 @@ init_per_testcase(_TestCase, Config0) -> ok = erlcloud_s3:create_bucket(Bucket, TestAwsConfig), Config1 = [ {key, emqx_s3_test_helpers:unique_key()}, - {bucket, Bucket} + {bucket, Bucket}, + {aws_config, TestAwsConfig} | Config0 ], {ok, PoolName} = emqx_s3_profile_conf:start_http_pool(?PROFILE_ID, profile_config(Config1)), @@ -131,6 +132,16 @@ t_url(Config) -> httpc:request(Url) ). +t_no_acl(Config) -> + Key = ?config(key, Config), + + ClientConfig = emqx_s3_profile_conf:client_config( + profile_config(Config), ?config(ehttpc_pool_name, Config) + ), + Client = emqx_s3_client:create(maps:without([acl], ClientConfig)), + + ok = emqx_s3_client:put_object(Client, Key, <<"data">>). + %%-------------------------------------------------------------------- %% Helpers %%-------------------------------------------------------------------- diff --git a/rel/i18n/emqx_ft_schema.hocon b/rel/i18n/emqx_ft_schema.hocon index 15cf17a39..9df02e920 100644 --- a/rel/i18n/emqx_ft_schema.hocon +++ b/rel/i18n/emqx_ft_schema.hocon @@ -1,5 +1,41 @@ emqx_ft_schema { + init_timeout { + desc { + en: "Timeout for initializing the file transfer.
" + "After reaching the timeout, `init` message will be acked with an error" + zh: "" + } + label { + en: "File Transfer Init Timeout" + zh: "" + } + } + + assemble_timeout { + desc { + en: "Timeout for assembling and exporting file segments into a final file.
" + "After reaching the timeout, `fin` message will be acked with an error" + zh: "" + } + label { + en: "File Assemble Timeout" + zh: "" + } + } + + store_segment_timeout { + desc { + en: "Timeout for storing a file segment.
" + "After reaching the timeout, message with the segment will be acked with an error" + zh: "" + } + label { + en: "Store Segment Timeout" + zh: "" + } + } + storage { desc { en: "Storage settings for file transfer."