From 92ff2b7d6ee8a11d11f9f1d67b564fcb5c1e9ad5 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Thu, 13 Apr 2023 19:47:32 +0300 Subject: [PATCH] feat(ft): make timeouts configurable --- apps/emqx_ft/src/emqx_ft.erl | 9 +++----- apps/emqx_ft/src/emqx_ft_conf.erl | 12 ++++++++++ apps/emqx_ft/src/emqx_ft_schema.erl | 27 ++++++++++++++++++++++ rel/i18n/emqx_ft_schema.hocon | 36 +++++++++++++++++++++++++++++ 4 files changed, 78 insertions(+), 6 deletions(-) diff --git a/apps/emqx_ft/src/emqx_ft.erl b/apps/emqx_ft/src/emqx_ft.erl index c055680b0..6ac6596ff 100644 --- a/apps/emqx_ft/src/emqx_ft.erl +++ b/apps/emqx_ft/src/emqx_ft.erl @@ -81,9 +81,6 @@ -type segment() :: {offset(), _Content :: binary()}. --define(STORE_SEGMENT_TIMEOUT, 10000). --define(ASSEMBLE_TIMEOUT, 300000). - %%-------------------------------------------------------------------- %% API for app %%-------------------------------------------------------------------- @@ -212,7 +209,7 @@ on_init(PacketId, Msg, Transfer, Meta) -> Callback = fun(Result) -> ?MODULE:on_complete("store_filemeta", PacketKey, Transfer, Result) end, - with_responder(PacketKey, Callback, ?STORE_SEGMENT_TIMEOUT, fun() -> + with_responder(PacketKey, Callback, emqx_ft_conf:init_timeout(), fun() -> case store_filemeta(Transfer, Meta) of % Stored, ack through the responder right away ok -> @@ -245,7 +242,7 @@ on_segment(PacketId, Msg, Transfer, Offset, Checksum) -> Callback = fun(Result) -> ?MODULE:on_complete("store_segment", PacketKey, Transfer, Result) end, - with_responder(PacketKey, Callback, ?STORE_SEGMENT_TIMEOUT, fun() -> + with_responder(PacketKey, Callback, emqx_ft_conf:store_segment_timeout(), fun() -> case store_segment(Transfer, Segment) of ok -> emqx_ft_responder:ack(PacketKey, ok); @@ -271,7 +268,7 @@ on_fin(PacketId, Msg, Transfer, FinalSize, Checksum) -> Callback = fun(Result) -> ?MODULE:on_complete("assemble", FinPacketKey, Transfer, Result) end, - with_responder(FinPacketKey, Callback, ?ASSEMBLE_TIMEOUT, fun() -> + with_responder(FinPacketKey, Callback, emqx_ft_conf:assemble_timeout(), fun() -> case assemble(Transfer, FinalSize) of %% Assembling completed, ack through the responder right away % ok -> diff --git a/apps/emqx_ft/src/emqx_ft_conf.erl b/apps/emqx_ft/src/emqx_ft_conf.erl index 5ca0b85ed..14a79b94c 100644 --- a/apps/emqx_ft/src/emqx_ft_conf.erl +++ b/apps/emqx_ft/src/emqx_ft_conf.erl @@ -26,6 +26,9 @@ -export([storage/0]). -export([gc_interval/1]). -export([segments_ttl/1]). +-export([init_timeout/0]). +-export([store_segment_timeout/0]). +-export([assemble_timeout/0]). %% Load/Unload -export([ @@ -86,6 +89,15 @@ assert_storage(Type) -> error({inapplicable, Conf}) end. +init_timeout() -> + emqx_config:get([file_transfer, init_timeout]). + +assemble_timeout() -> + emqx_config:get([file_transfer, assemble_timeout]). + +store_segment_timeout() -> + emqx_config:get([file_transfer, store_segment_timeout]). + %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- diff --git a/apps/emqx_ft/src/emqx_ft_schema.erl b/apps/emqx_ft/src/emqx_ft_schema.erl index beb43adc3..bc780874a 100644 --- a/apps/emqx_ft/src/emqx_ft_schema.erl +++ b/apps/emqx_ft/src/emqx_ft_schema.erl @@ -46,6 +46,33 @@ roots() -> [file_transfer]. fields(file_transfer) -> [ + {init_timeout, + mk( + emqx_schema:duration_ms(), + #{ + desc => ?DESC("init_timeout"), + required => false, + default => "10s" + } + )}, + {store_segment_timeout, + mk( + emqx_schema:duration_ms(), + #{ + desc => ?DESC("store_segment_timeout"), + required => false, + default => "5m" + } + )}, + {assemble_timeout, + mk( + emqx_schema:duration_ms(), + #{ + desc => ?DESC("assemble_timeout"), + required => false, + default => "5m" + } + )}, {storage, mk( hoconsc:union([ diff --git a/rel/i18n/emqx_ft_schema.hocon b/rel/i18n/emqx_ft_schema.hocon index 15cf17a39..9df02e920 100644 --- a/rel/i18n/emqx_ft_schema.hocon +++ b/rel/i18n/emqx_ft_schema.hocon @@ -1,5 +1,41 @@ emqx_ft_schema { + init_timeout { + desc { + en: "Timeout for initializing the file transfer.
" + "After reaching the timeout, `init` message will be acked with an error" + zh: "" + } + label { + en: "File Transfer Init Timeout" + zh: "" + } + } + + assemble_timeout { + desc { + en: "Timeout for assembling and exporting file segments into a final file.
" + "After reaching the timeout, `fin` message will be acked with an error" + zh: "" + } + label { + en: "File Assemble Timeout" + zh: "" + } + } + + store_segment_timeout { + desc { + en: "Timeout for storing a file segment.
" + "After reaching the timeout, message with the segment will be acked with an error" + zh: "" + } + label { + en: "Store Segment Timeout" + zh: "" + } + } + storage { desc { en: "Storage settings for file transfer."