From e9ffabf9369117b9dd6b4cb5f815ba8bead4edaa Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 1 Mar 2023 10:48:06 -0300 Subject: [PATCH 01/10] fix(buffer_worker): add batch time automatic adjustment To avoid message loss due to misconfigurations, we adjust `batch_time` based on `request_timeout`. If `batch_time` > `request_timeout`, all requests will timeout before being sent if the message rate is low. Even worse if `pool_size` is high. We cap `batch_time` at `request_timeout div 2` as a rule of thumb. --- .../src/emqx_resource_buffer_worker.erl | 48 ++++++++++++++++++- 1 file changed, 47 insertions(+), 1 deletion(-) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index a8ae4454d..ac22e1c48 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -196,13 +196,16 @@ init({Id, Index, Opts}) -> InflightWinSize = maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT), InflightTID = inflight_new(InflightWinSize, Id, Index), HealthCheckInterval = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL), + RequestTimeout = maps:get(request_timeout, Opts, ?DEFAULT_REQUEST_TIMEOUT), + BatchTime0 = maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME), + BatchTime = adjust_batch_time(Id, RequestTimeout, BatchTime0), Data = #{ id => Id, index => Index, inflight_tid => InflightTID, async_workers => #{}, batch_size => BatchSize, - batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME), + batch_time => BatchTime, queue => Queue, resume_interval => maps:get(resume_interval, Opts, HealthCheckInterval), tref => undefined @@ -1639,3 +1642,46 @@ do_minimize(?QUERY(_ReplyTo, _Req, _Sent, _ExpireAt) = Query) -> Query. -else. do_minimize(?QUERY(ReplyTo, _Req, Sent, ExpireAt)) -> ?QUERY(ReplyTo, [], Sent, ExpireAt). -endif. + +%% To avoid message loss due to misconfigurations, we adjust +%% `batch_time' based on `request_timeout'. If `batch_time' > +%% `request_timeout', all requests will timeout before being sent if +%% the message rate is low. Even worse if `pool_size' is high. +%% We cap `batch_time' at `request_timeout div 2' as a rule of thumb. +adjust_batch_time(Id, RequestTimeout, BatchTime0) -> + BatchTime = max(0, min(BatchTime0, RequestTimeout div 2)), + case BatchTime =:= BatchTime0 of + false -> + ?SLOG(info, #{ + id => Id, + msg => adjusting_buffer_worker_batch_time, + new_batch_time => BatchTime + }); + true -> + ok + end, + BatchTime. + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). +adjust_batch_time_test_() -> + %% just for logging + Id = some_id, + [ + {"batch time smaller than request_time/2", + ?_assertEqual( + 100, + adjust_batch_time(Id, 500, 100) + )}, + {"batch time equal to request_time/2", + ?_assertEqual( + 100, + adjust_batch_time(Id, 200, 100) + )}, + {"batch time greater than request_time/2", + ?_assertEqual( + 50, + adjust_batch_time(Id, 100, 100) + )} + ]. +-endif. From 167b7a212f76eb248448b35d41fd1620b8ac7b2b Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 1 Mar 2023 13:39:59 -0300 Subject: [PATCH 02/10] refactor(buffer_worker): avoid starting 0-time timers --- apps/emqx_resource/src/emqx_resource_buffer_worker.erl | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index ac22e1c48..0f65b21f4 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -1540,6 +1540,12 @@ clear_disk_queue_dir(Id, Index) -> ensure_flush_timer(Data = #{batch_time := T}) -> ensure_flush_timer(Data, T). +ensure_flush_timer(Data = #{tref := undefined}, 0) -> + %% if the batch_time is 0, we don't need to start a timer, which + %% can be costly at high rates. + Ref = make_ref(), + self() ! {flush, {Ref, Ref}}, + Data#{tref => {Ref, Ref}}; ensure_flush_timer(Data = #{tref := undefined}, T) -> Ref = make_ref(), TRef = erlang:send_after(T, self(), {flush, Ref}), From 9b087a21f51dc504172b3adcfeb8906ff3d842ac Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 1 Mar 2023 11:49:41 -0300 Subject: [PATCH 03/10] fix(gcp_pubsub): remove conflicting `request_timeout` option Since the buffer worker schema already contains that configuration, having it two places can lead to quite confusing behavior. --- lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl | 9 --------- .../test/emqx_ee_bridge_gcp_pubsub_SUITE.erl | 1 - .../src/emqx_ee_connector_gcp_pubsub.erl | 4 ++-- 3 files changed, 2 insertions(+), 12 deletions(-) diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl index e00483839..ea9cedf4d 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl @@ -79,15 +79,6 @@ fields(bridge_config) -> desc => ?DESC("max_retries") } )}, - {request_timeout, - sc( - emqx_schema:duration_ms(), - #{ - required => false, - default => <<"15s">>, - desc => ?DESC("request_timeout") - } - )}, {payload_template, sc( binary(), diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl index 222acb77b..452b7a4d2 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl @@ -282,7 +282,6 @@ gcp_pubsub_config(Config) -> "bridges.gcp_pubsub.~s {\n" " enable = true\n" " connect_timeout = 1s\n" - " request_timeout = 1s\n" " service_account_json = ~s\n" " payload_template = ~p\n" " pubsub_topic = ~s\n" diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl index 898c36fe0..f07cbceab 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl @@ -33,7 +33,7 @@ connect_timeout := emqx_schema:duration_ms(), max_retries := non_neg_integer(), pubsub_topic := binary(), - request_timeout := emqx_schema:duration_ms(), + resource_opts := #{request_timeout := emqx_schema:duration_ms(), any() => term()}, service_account_json := service_account_json(), any() => term() }. @@ -71,7 +71,7 @@ on_start( payload_template := PayloadTemplate, pool_size := PoolSize, pubsub_topic := PubSubTopic, - request_timeout := RequestTimeout + resource_opts := #{request_timeout := RequestTimeout} } = Config ) -> ?SLOG(info, #{ From f95a30ae897bceeb4eafe5725f9f502695b244f6 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 1 Mar 2023 15:01:57 -0300 Subject: [PATCH 04/10] fix(webhook): convert `request_timeout`s in root and resource_opts --- .../src/schema/emqx_bridge_schema.erl | 59 +++++++++++++++++-- .../test/emqx_bridge_api_SUITE.erl | 29 +++++++++ .../emqx_bridge_compatible_config_tests.erl | 58 +++++++++++++++++- .../src/emqx_resource_buffer_worker.erl | 2 +- 4 files changed, 141 insertions(+), 7 deletions(-) diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl index ed2baec8f..6b96e5150 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl @@ -17,6 +17,7 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). +-include_lib("emqx/include/logger.hrl"). -import(hoconsc, [mk/2, ref/2]). @@ -140,11 +141,7 @@ fields(bridges) -> #{ desc => ?DESC("bridges_webhook"), required => false, - converter => fun(X, _HoconOpts) -> - emqx_bridge_compatible_config:upgrade_pre_ee( - X, fun emqx_bridge_compatible_config:webhook_maybe_upgrade/1 - ) - end + converter => fun webhook_bridge_converter/2 } )}, {mqtt, @@ -212,3 +209,55 @@ status() -> node_name() -> {"node", mk(binary(), #{desc => ?DESC("desc_node_name"), example => "emqx@127.0.0.1"})}. + +webhook_bridge_converter(Conf0, _HoconOpts) -> + Conf1 = emqx_bridge_compatible_config:upgrade_pre_ee( + Conf0, fun emqx_bridge_compatible_config:webhook_maybe_upgrade/1 + ), + case Conf1 of + undefined -> + undefined; + _ -> + do_convert_webhook_config(Conf1) + end. + +do_convert_webhook_config( + #{<<"request_timeout">> := ReqT, <<"resource_opts">> := #{<<"request_timeout">> := ReqT}} = Conf +) -> + %% ok: same values + Conf; +do_convert_webhook_config( + #{ + <<"request_timeout">> := ReqTRootRaw, + <<"resource_opts">> := #{<<"request_timeout">> := ReqTResourceRaw} + } = Conf0 +) -> + %% different values; we set them to the same, if they are valid + %% durations + MReqTRoot = emqx_schema:to_duration_ms(ReqTRootRaw), + MReqTResource = emqx_schema:to_duration_ms(ReqTResourceRaw), + case {MReqTRoot, MReqTResource} of + {{ok, ReqTRoot}, {ok, ReqTResource}} -> + {_Parsed, ReqTRaw} = max({ReqTRoot, ReqTRootRaw}, {ReqTResource, ReqTResourceRaw}), + ?SLOG( + debug, + #{ + msg => adjusting_webhook_bridge_request_time, + new_value => ReqTRaw + } + ), + Conf1 = emqx_map_lib:deep_merge( + Conf0, + #{ + <<"request_timeout">> => ReqTRaw, + <<"resource_opts">> => #{<<"request_timeout">> => ReqTRaw} + } + ), + Conf1; + _ -> + %% invalid values; let the type checker complain about + %% that. + Conf0 + end; +do_convert_webhook_config(Conf) -> + Conf. diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 5f863ed63..d242111dc 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -818,6 +818,35 @@ t_metrics(Config) -> ), ok. +%% request_timeout in bridge root should match request_timeout in +%% resource_opts. +t_inconsistent_webhook_request_timeouts(Config) -> + Port = ?config(port, Config), + URL1 = ?URL(Port, "path1"), + Name = ?BRIDGE_NAME, + BadBridgeParams = + emqx_map_lib:deep_merge( + ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name), + #{ + <<"request_timeout">> => <<"1s">>, + <<"resource_opts">> => #{<<"request_timeout">> => <<"2s">>} + } + ), + {ok, 201, RawResponse} = request( + post, + uri(["bridges"]), + BadBridgeParams + ), + %% note: same value on both fields + ?assertMatch( + #{ + <<"request_timeout">> := <<"2s">>, + <<"resource_opts">> := #{<<"request_timeout">> := <<"2s">>} + }, + emqx_json:decode(RawResponse, [return_maps]) + ), + ok. + operation_path(node, Oper, BridgeID) -> uri(["nodes", node(), "bridges", BridgeID, Oper]); operation_path(cluster, Oper, BridgeID) -> diff --git a/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl b/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl index 5e0b4912f..acafb84ca 100644 --- a/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl +++ b/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl @@ -28,6 +28,7 @@ empty_config_test() -> webhook_config_test() -> Conf1 = parse(webhook_v5011_hocon()), Conf2 = parse(full_webhook_v5011_hocon()), + Conf3 = parse(full_webhook_v5019_hocon()), ?assertMatch( #{ @@ -59,6 +60,26 @@ webhook_config_test() -> check(Conf2) ), + %% the converter should pick the greater of the two + %% request_timeouts and place them in the root and inside + %% resource_opts. + ?assertMatch( + #{ + <<"bridges">> := #{ + <<"webhook">> := #{ + <<"the_name">> := + #{ + <<"method">> := get, + <<"request_timeout">> := 60_000, + <<"resource_opts">> := #{<<"request_timeout">> := 60_000}, + <<"body">> := <<"${payload}">> + } + } + } + }, + check(Conf3) + ), + ok. up(#{<<"bridges">> := Bridges0} = Conf0) -> @@ -124,7 +145,7 @@ bridges{ max_retries = 3 method = \"get\" pool_size = 4 - request_timeout = \"5s\" + request_timeout = \"15s\" ssl {enable = false, verify = \"verify_peer\"} url = \"http://localhost:8080\" } @@ -164,6 +185,41 @@ full_webhook_v5011_hocon() -> "}\n" "". +%% does not contain direction +full_webhook_v5019_hocon() -> + "" + "\n" + "bridges{\n" + " webhook {\n" + " the_name{\n" + " body = \"${payload}\"\n" + " connect_timeout = \"5s\"\n" + " enable_pipelining = 100\n" + " headers {\"content-type\" = \"application/json\"}\n" + " max_retries = 3\n" + " method = \"get\"\n" + " pool_size = 4\n" + " pool_type = \"random\"\n" + " request_timeout = \"1m\"\n" + " resource_opts = {\n" + " request_timeout = \"7s\"\n" + " }\n" + " ssl {\n" + " ciphers = \"\"\n" + " depth = 10\n" + " enable = false\n" + " reuse_sessions = true\n" + " secure_renegotiate = true\n" + " user_lookup_fun = \"emqx_tls_psk:lookup\"\n" + " verify = \"verify_peer\"\n" + " versions = [\"tlsv1.3\", \"tlsv1.2\", \"tlsv1.1\", \"tlsv1\"]\n" + " }\n" + " url = \"http://localhost:8080\"\n" + " }\n" + " }\n" + "}\n" + "". + %% erlfmt-ignore %% this is a generated from v5.0.11 mqtt_v5011_hocon() -> diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 0f65b21f4..58a97b5fe 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -1544,7 +1544,7 @@ ensure_flush_timer(Data = #{tref := undefined}, 0) -> %% if the batch_time is 0, we don't need to start a timer, which %% can be costly at high rates. Ref = make_ref(), - self() ! {flush, {Ref, Ref}}, + self() ! {flush, Ref}, Data#{tref => {Ref, Ref}}; ensure_flush_timer(Data = #{tref := undefined}, T) -> Ref = make_ref(), From 9825998207069c50ddf3d3aaa615249d01eb9e6c Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 6 Mar 2023 09:56:19 -0300 Subject: [PATCH 05/10] chore(gcp_pubsub): just deprecate `request_timeout` instead of removing --- .../emqx_ee_bridge/i18n/emqx_ee_bridge_gcp_pubsub.conf | 4 ++-- .../emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl | 10 ++++++++++ 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_gcp_pubsub.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_gcp_pubsub.conf index af2a93f82..b8fa3b43a 100644 --- a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_gcp_pubsub.conf +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_gcp_pubsub.conf @@ -77,8 +77,8 @@ emqx_ee_bridge_gcp_pubsub { request_timeout { desc { - en: "HTTP request timeout." - zh: "HTTP 请求超时。" + en: "Deprecated: Configure the request timeout in the buffer settings." + zh: "废弃的。在缓冲区设置中配置请求超时。" } label: { en: "Request Timeout" diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl index ea9cedf4d..352a7163a 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl @@ -79,6 +79,16 @@ fields(bridge_config) -> desc => ?DESC("max_retries") } )}, + {request_timeout, + sc( + emqx_schema:duration_ms(), + #{ + required => false, + deprecated => {since, "e5.0.1"}, + default => <<"15s">>, + desc => ?DESC("request_timeout") + } + )}, {payload_template, sc( binary(), From e9d3fc511fdfc4d37fb14ca94bb8cc3c0e32e902 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 6 Mar 2023 09:56:44 -0300 Subject: [PATCH 06/10] chore(buffer_worker): change default `batch_time` to 0 and improve docs --- apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf | 8 ++++---- apps/emqx_resource/include/emqx_resource.hrl | 4 ++-- lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl | 4 +++- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf index f3ac2fd97..35e2905df 100644 --- a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf +++ b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf @@ -159,12 +159,12 @@ When disabled the messages are buffered in RAM only.""" batch_time { desc { - en: """Maximum batch waiting interval.""" - zh: """最大批量请求等待时间。""" + en: """Maximum waiting interval when accumulating a batch at a low message rates for more efficient resource usage.""" + zh: """在较低消息率情况下尝试累积批量输出时的最大等待间隔,以提高资源的利用率。""" } label { - en: """Batch time""" - zh: """批量等待间隔""" + en: """Max Batch Wait Time""" + zh: """批量等待最大间隔""" } } diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index fa7f2eb38..be570e694 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -97,8 +97,8 @@ -define(DEFAULT_BATCH_SIZE, 1). %% milliseconds --define(DEFAULT_BATCH_TIME, 20). --define(DEFAULT_BATCH_TIME_RAW, <<"20ms">>). +-define(DEFAULT_BATCH_TIME, 0). +-define(DEFAULT_BATCH_TIME_RAW, <<"0ms">>). %% count -define(DEFAULT_INFLIGHT, 100). diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl index 67a9b4a05..4eeebfaf8 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl @@ -540,7 +540,9 @@ resource_configs() -> <<"query_mode">> => <<"sync">>, <<"worker_pool_size">> => <<"1">>, <<"batch_size">> => integer_to_binary(?BATCH_SIZE), - <<"start_timeout">> => <<"15s">> + <<"start_timeout">> => <<"15s">>, + <<"batch_time">> => <<"4s">>, + <<"request_timeout">> => <<"30s">> } }. From e17ad320eeacefbabf75d059390915bb14ac5936 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 6 Mar 2023 09:57:10 -0300 Subject: [PATCH 07/10] fix(bridge): do not log in converter --- apps/emqx_bridge/src/schema/emqx_bridge_schema.erl | 7 ------- 1 file changed, 7 deletions(-) diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl index 6b96e5150..74d2a5ca1 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl @@ -239,13 +239,6 @@ do_convert_webhook_config( case {MReqTRoot, MReqTResource} of {{ok, ReqTRoot}, {ok, ReqTResource}} -> {_Parsed, ReqTRaw} = max({ReqTRoot, ReqTRootRaw}, {ReqTResource, ReqTResourceRaw}), - ?SLOG( - debug, - #{ - msg => adjusting_webhook_bridge_request_time, - new_value => ReqTRaw - } - ), Conf1 = emqx_map_lib:deep_merge( Conf0, #{ From 0e707e837f747dda095ce776ebe094bcd2f8e725 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 6 Mar 2023 10:14:13 -0300 Subject: [PATCH 08/10] docs(buffer_worker): improve description of `request_timeout` --- apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf index 35e2905df..fb6b2eb06 100644 --- a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf +++ b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf @@ -102,12 +102,12 @@ For bridges only have ingress direction data flow, it can be set to 0 otherwise request_timeout { desc { - en: """Timeout for requests. If query_mode is sync, calls to the resource will be blocked for this amount of time before timing out.""" - zh: """请求的超时。 如果query_modesync,对资源的调用将在超时前被阻断这一时间。""" + en: """Starting from the moment when the request enters the buffer, if the request remains in the buffer for the specified time or is sent but does not receive a response or acknowledgement in time, the request is considered expired.""" + zh: """从请求进入缓冲区开始计时,如果请求在规定的时间内仍停留在缓冲区内或者已发送但未能及时收到响应或确认,该请求将被视为过期。""" } label { - en: """Request timeout""" - zh: """请求超时""" + en: """Request Expiry""" + zh: """请求超期""" } } From 18ab7ed19792252f2159ee8b866555fa86f235d1 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 6 Mar 2023 10:36:00 -0300 Subject: [PATCH 09/10] chore: bump app vsns --- apps/emqx_resource/src/emqx_resource.app.src | 2 +- lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource.app.src b/apps/emqx_resource/src/emqx_resource.app.src index cb26c7f09..0cc013099 100644 --- a/apps/emqx_resource/src/emqx_resource.app.src +++ b/apps/emqx_resource/src/emqx_resource.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_resource, [ {description, "Manager for all external resources"}, - {vsn, "0.1.8"}, + {vsn, "0.1.9"}, {registered, []}, {mod, {emqx_resource_app, []}}, {applications, [ diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src index c30c927f2..05d893a79 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src @@ -1,6 +1,6 @@ {application, emqx_ee_bridge, [ {description, "EMQX Enterprise data bridges"}, - {vsn, "0.1.5"}, + {vsn, "0.1.6"}, {registered, []}, {applications, [ kernel, From eef65fba606d4d2a916e41cc00419d2782b87449 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 6 Mar 2023 13:19:20 -0300 Subject: [PATCH 10/10] fix(buffer_worker): handle `request_timeout = infinity` case The current schema allows `infinity` for `request_timeout`, so we have to take that into account. It's not currently possible to set `batch_time = infinity`, so there's no need to treat that case. --- apps/emqx_resource/src/emqx_resource_buffer_worker.erl | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 58a97b5fe..d5a50f351 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -1654,6 +1654,8 @@ do_minimize(?QUERY(ReplyTo, _Req, Sent, ExpireAt)) -> ?QUERY(ReplyTo, [], Sent, %% `request_timeout', all requests will timeout before being sent if %% the message rate is low. Even worse if `pool_size' is high. %% We cap `batch_time' at `request_timeout div 2' as a rule of thumb. +adjust_batch_time(_Id, _RequestTimeout = infinity, BatchTime0) -> + BatchTime0; adjust_batch_time(Id, RequestTimeout, BatchTime0) -> BatchTime = max(0, min(BatchTime0, RequestTimeout div 2)), case BatchTime =:= BatchTime0 of @@ -1688,6 +1690,11 @@ adjust_batch_time_test_() -> ?_assertEqual( 50, adjust_batch_time(Id, 100, 100) + )}, + {"batch time smaller than request_time/2 (request_time = infinity)", + ?_assertEqual( + 100, + adjust_batch_time(Id, infinity, 100) )} ]. -endif.