Merge pull request #10393 from savonarola/0413-file-transfer-options

0413 file transfer options
This commit is contained in:
Ilya Averyanov 2023-04-14 11:56:59 +03:00 committed by GitHub
commit 15887843bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 97 additions and 12 deletions

View File

@ -81,9 +81,6 @@
-type segment() :: {offset(), _Content :: binary()}. -type segment() :: {offset(), _Content :: binary()}.
-define(STORE_SEGMENT_TIMEOUT, 10000).
-define(ASSEMBLE_TIMEOUT, 300000).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% API for app %% API for app
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -212,7 +209,7 @@ on_init(PacketId, Msg, Transfer, Meta) ->
Callback = fun(Result) -> Callback = fun(Result) ->
?MODULE:on_complete("store_filemeta", PacketKey, Transfer, Result) ?MODULE:on_complete("store_filemeta", PacketKey, Transfer, Result)
end, end,
with_responder(PacketKey, Callback, ?STORE_SEGMENT_TIMEOUT, fun() -> with_responder(PacketKey, Callback, emqx_ft_conf:init_timeout(), fun() ->
case store_filemeta(Transfer, Meta) of case store_filemeta(Transfer, Meta) of
% Stored, ack through the responder right away % Stored, ack through the responder right away
ok -> ok ->
@ -245,7 +242,7 @@ on_segment(PacketId, Msg, Transfer, Offset, Checksum) ->
Callback = fun(Result) -> Callback = fun(Result) ->
?MODULE:on_complete("store_segment", PacketKey, Transfer, Result) ?MODULE:on_complete("store_segment", PacketKey, Transfer, Result)
end, end,
with_responder(PacketKey, Callback, ?STORE_SEGMENT_TIMEOUT, fun() -> with_responder(PacketKey, Callback, emqx_ft_conf:store_segment_timeout(), fun() ->
case store_segment(Transfer, Segment) of case store_segment(Transfer, Segment) of
ok -> ok ->
emqx_ft_responder:ack(PacketKey, ok); emqx_ft_responder:ack(PacketKey, ok);
@ -271,7 +268,7 @@ on_fin(PacketId, Msg, Transfer, FinalSize, Checksum) ->
Callback = fun(Result) -> Callback = fun(Result) ->
?MODULE:on_complete("assemble", FinPacketKey, Transfer, Result) ?MODULE:on_complete("assemble", FinPacketKey, Transfer, Result)
end, end,
with_responder(FinPacketKey, Callback, ?ASSEMBLE_TIMEOUT, fun() -> with_responder(FinPacketKey, Callback, emqx_ft_conf:assemble_timeout(), fun() ->
case assemble(Transfer, FinalSize) of case assemble(Transfer, FinalSize) of
%% Assembling completed, ack through the responder right away %% Assembling completed, ack through the responder right away
% ok -> % ok ->

View File

@ -26,6 +26,9 @@
-export([storage/0]). -export([storage/0]).
-export([gc_interval/1]). -export([gc_interval/1]).
-export([segments_ttl/1]). -export([segments_ttl/1]).
-export([init_timeout/0]).
-export([store_segment_timeout/0]).
-export([assemble_timeout/0]).
%% Load/Unload %% Load/Unload
-export([ -export([
@ -86,6 +89,15 @@ assert_storage(Type) ->
error({inapplicable, Conf}) error({inapplicable, Conf})
end. end.
init_timeout() ->
emqx_config:get([file_transfer, init_timeout]).
assemble_timeout() ->
emqx_config:get([file_transfer, assemble_timeout]).
store_segment_timeout() ->
emqx_config:get([file_transfer, store_segment_timeout]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% API %% API
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -46,6 +46,33 @@ roots() -> [file_transfer].
fields(file_transfer) -> fields(file_transfer) ->
[ [
{init_timeout,
mk(
emqx_schema:duration_ms(),
#{
desc => ?DESC("init_timeout"),
required => false,
default => "10s"
}
)},
{store_segment_timeout,
mk(
emqx_schema:duration_ms(),
#{
desc => ?DESC("store_segment_timeout"),
required => false,
default => "5m"
}
)},
{assemble_timeout,
mk(
emqx_schema:duration_ms(),
#{
desc => ?DESC("assemble_timeout"),
required => false,
default => "5m"
}
)},
{storage, {storage,
mk( mk(
hoconsc:union([ hoconsc:union([

View File

@ -199,10 +199,12 @@ format(#{aws_config := AwsConfig} = Client) ->
%% Internal functions %% Internal functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
upload_options(Config) -> upload_options(#{acl := Acl}) when Acl =/= undefined ->
[ [
{acl, maps:get(acl, Config)} {acl, Acl}
]. ];
upload_options(#{}) ->
[].
headers(#{headers := Headers}) -> headers(#{headers := Headers}) ->
headers_user_to_erlcloud_request(Headers); headers_user_to_erlcloud_request(Headers);

View File

@ -198,7 +198,7 @@ client_config(ProfileConfig, PoolName) ->
port => maps:get(port, ProfileConfig), port => maps:get(port, ProfileConfig),
url_expire_time => maps:get(url_expire_time, ProfileConfig), url_expire_time => maps:get(url_expire_time, ProfileConfig),
headers => maps:get(headers, HTTPOpts, #{}), headers => maps:get(headers, HTTPOpts, #{}),
acl => maps:get(acl, ProfileConfig), acl => maps:get(acl, ProfileConfig, undefined),
bucket => maps:get(bucket, ProfileConfig), bucket => maps:get(bucket, ProfileConfig),
access_key_id => maps:get(access_key_id, ProfileConfig, undefined), access_key_id => maps:get(access_key_id, ProfileConfig, undefined),
secret_access_key => maps:get(secret_access_key, ProfileConfig, undefined), secret_access_key => maps:get(secret_access_key, ProfileConfig, undefined),

View File

@ -105,7 +105,7 @@ fields(s3) ->
#{ #{
default => private, default => private,
desc => ?DESC("acl"), desc => ?DESC("acl"),
required => true required => false
} }
)}, )},
{transport_options, {transport_options,

View File

@ -60,7 +60,8 @@ init_per_testcase(_TestCase, Config0) ->
ok = erlcloud_s3:create_bucket(Bucket, TestAwsConfig), ok = erlcloud_s3:create_bucket(Bucket, TestAwsConfig),
Config1 = [ Config1 = [
{key, emqx_s3_test_helpers:unique_key()}, {key, emqx_s3_test_helpers:unique_key()},
{bucket, Bucket} {bucket, Bucket},
{aws_config, TestAwsConfig}
| Config0 | Config0
], ],
{ok, PoolName} = emqx_s3_profile_conf:start_http_pool(?PROFILE_ID, profile_config(Config1)), {ok, PoolName} = emqx_s3_profile_conf:start_http_pool(?PROFILE_ID, profile_config(Config1)),
@ -131,6 +132,16 @@ t_url(Config) ->
httpc:request(Url) httpc:request(Url)
). ).
t_no_acl(Config) ->
Key = ?config(key, Config),
ClientConfig = emqx_s3_profile_conf:client_config(
profile_config(Config), ?config(ehttpc_pool_name, Config)
),
Client = emqx_s3_client:create(maps:without([acl], ClientConfig)),
ok = emqx_s3_client:put_object(Client, Key, <<"data">>).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Helpers %% Helpers
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -1,5 +1,41 @@
emqx_ft_schema { emqx_ft_schema {
init_timeout {
desc {
en: "Timeout for initializing the file transfer.<br/>"
"After reaching the timeout, `init` message will be acked with an error"
zh: ""
}
label {
en: "File Transfer Init Timeout"
zh: ""
}
}
assemble_timeout {
desc {
en: "Timeout for assembling and exporting file segments into a final file.<br/>"
"After reaching the timeout, `fin` message will be acked with an error"
zh: ""
}
label {
en: "File Assemble Timeout"
zh: ""
}
}
store_segment_timeout {
desc {
en: "Timeout for storing a file segment.<br/>"
"After reaching the timeout, message with the segment will be acked with an error"
zh: ""
}
label {
en: "Store Segment Timeout"
zh: ""
}
}
storage { storage {
desc { desc {
en: "Storage settings for file transfer." en: "Storage settings for file transfer."