feat(ft): make timeouts configurable
This commit is contained in:
parent
e22c1c01ec
commit
92ff2b7d6e
|
@ -81,9 +81,6 @@
|
|||
|
||||
-type segment() :: {offset(), _Content :: binary()}.
|
||||
|
||||
-define(STORE_SEGMENT_TIMEOUT, 10000).
|
||||
-define(ASSEMBLE_TIMEOUT, 300000).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% API for app
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -212,7 +209,7 @@ on_init(PacketId, Msg, Transfer, Meta) ->
|
|||
Callback = fun(Result) ->
|
||||
?MODULE:on_complete("store_filemeta", PacketKey, Transfer, Result)
|
||||
end,
|
||||
with_responder(PacketKey, Callback, ?STORE_SEGMENT_TIMEOUT, fun() ->
|
||||
with_responder(PacketKey, Callback, emqx_ft_conf:init_timeout(), fun() ->
|
||||
case store_filemeta(Transfer, Meta) of
|
||||
% Stored, ack through the responder right away
|
||||
ok ->
|
||||
|
@ -245,7 +242,7 @@ on_segment(PacketId, Msg, Transfer, Offset, Checksum) ->
|
|||
Callback = fun(Result) ->
|
||||
?MODULE:on_complete("store_segment", PacketKey, Transfer, Result)
|
||||
end,
|
||||
with_responder(PacketKey, Callback, ?STORE_SEGMENT_TIMEOUT, fun() ->
|
||||
with_responder(PacketKey, Callback, emqx_ft_conf:store_segment_timeout(), fun() ->
|
||||
case store_segment(Transfer, Segment) of
|
||||
ok ->
|
||||
emqx_ft_responder:ack(PacketKey, ok);
|
||||
|
@ -271,7 +268,7 @@ on_fin(PacketId, Msg, Transfer, FinalSize, Checksum) ->
|
|||
Callback = fun(Result) ->
|
||||
?MODULE:on_complete("assemble", FinPacketKey, Transfer, Result)
|
||||
end,
|
||||
with_responder(FinPacketKey, Callback, ?ASSEMBLE_TIMEOUT, fun() ->
|
||||
with_responder(FinPacketKey, Callback, emqx_ft_conf:assemble_timeout(), fun() ->
|
||||
case assemble(Transfer, FinalSize) of
|
||||
%% Assembling completed, ack through the responder right away
|
||||
% ok ->
|
||||
|
|
|
@ -26,6 +26,9 @@
|
|||
-export([storage/0]).
|
||||
-export([gc_interval/1]).
|
||||
-export([segments_ttl/1]).
|
||||
-export([init_timeout/0]).
|
||||
-export([store_segment_timeout/0]).
|
||||
-export([assemble_timeout/0]).
|
||||
|
||||
%% Load/Unload
|
||||
-export([
|
||||
|
@ -86,6 +89,15 @@ assert_storage(Type) ->
|
|||
error({inapplicable, Conf})
|
||||
end.
|
||||
|
||||
init_timeout() ->
|
||||
emqx_config:get([file_transfer, init_timeout]).
|
||||
|
||||
assemble_timeout() ->
|
||||
emqx_config:get([file_transfer, assemble_timeout]).
|
||||
|
||||
store_segment_timeout() ->
|
||||
emqx_config:get([file_transfer, store_segment_timeout]).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% API
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -46,6 +46,33 @@ roots() -> [file_transfer].
|
|||
|
||||
fields(file_transfer) ->
|
||||
[
|
||||
{init_timeout,
|
||||
mk(
|
||||
emqx_schema:duration_ms(),
|
||||
#{
|
||||
desc => ?DESC("init_timeout"),
|
||||
required => false,
|
||||
default => "10s"
|
||||
}
|
||||
)},
|
||||
{store_segment_timeout,
|
||||
mk(
|
||||
emqx_schema:duration_ms(),
|
||||
#{
|
||||
desc => ?DESC("store_segment_timeout"),
|
||||
required => false,
|
||||
default => "5m"
|
||||
}
|
||||
)},
|
||||
{assemble_timeout,
|
||||
mk(
|
||||
emqx_schema:duration_ms(),
|
||||
#{
|
||||
desc => ?DESC("assemble_timeout"),
|
||||
required => false,
|
||||
default => "5m"
|
||||
}
|
||||
)},
|
||||
{storage,
|
||||
mk(
|
||||
hoconsc:union([
|
||||
|
|
|
@ -1,5 +1,41 @@
|
|||
emqx_ft_schema {
|
||||
|
||||
init_timeout {
|
||||
desc {
|
||||
en: "Timeout for initializing the file transfer.<br/>"
|
||||
"After reaching the timeout, `init` message will be acked with an error"
|
||||
zh: ""
|
||||
}
|
||||
label {
|
||||
en: "File Transfer Init Timeout"
|
||||
zh: ""
|
||||
}
|
||||
}
|
||||
|
||||
assemble_timeout {
|
||||
desc {
|
||||
en: "Timeout for assembling and exporting file segments into a final file.<br/>"
|
||||
"After reaching the timeout, `fin` message will be acked with an error"
|
||||
zh: ""
|
||||
}
|
||||
label {
|
||||
en: "File Assemble Timeout"
|
||||
zh: ""
|
||||
}
|
||||
}
|
||||
|
||||
store_segment_timeout {
|
||||
desc {
|
||||
en: "Timeout for storing a file segment.<br/>"
|
||||
"After reaching the timeout, message with the segment will be acked with an error"
|
||||
zh: ""
|
||||
}
|
||||
label {
|
||||
en: "Store Segment Timeout"
|
||||
zh: ""
|
||||
}
|
||||
}
|
||||
|
||||
storage {
|
||||
desc {
|
||||
en: "Storage settings for file transfer."
|
||||
|
|
Loading…
Reference in New Issue