From 20414d737323aa6a35906f49697fd72fbe990ef5 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 16 Mar 2023 13:33:52 -0300 Subject: [PATCH 1/3] fix(buffer_worker): check request timeout and health check interval Fixes https://emqx.atlassian.net/browse/EMQX-9099 The default value for `request_timeout` is 15 seconds, and the default resume interval is also 15 seconds (the health check timeout, if `resume_interval` is not explicitly given). This means that, in practice, if a buffer worker ever gets into the blocked state, then almost all requests will timeout. Proposed improvement: - `request_timeout` should by default be twice as much as health_check_interval. - Emit a alarm if `request_timeout` is not greater than `health_check_interval`. --- .../i18n/emqx_resource_schema_i18n.conf | 4 +- apps/emqx_resource/include/emqx_resource.hrl | 5 +- .../src/emqx_resource_buffer_worker.erl | 38 ++++++ .../src/emqx_resource_manager.erl | 1 + .../test/emqx_resource_SUITE.erl | 126 ++++++++++++++++++ changes/ce/fix-10154.en.md | 7 + 6 files changed, 178 insertions(+), 3 deletions(-) create mode 100644 changes/ce/fix-10154.en.md diff --git a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf index fb6b2eb06..2e5cf96e8 100644 --- a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf +++ b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf @@ -102,8 +102,8 @@ For bridges only have ingress direction data flow, it can be set to 0 otherwise request_timeout { desc { - en: """Starting from the moment when the request enters the buffer, if the request remains in the buffer for the specified time or is sent but does not receive a response or acknowledgement in time, the request is considered expired.""" - zh: """从请求进入缓冲区开始计时,如果请求在规定的时间内仍停留在缓冲区内或者已发送但未能及时收到响应或确认,该请求将被视为过期。""" + en: """Starting from the moment when the request enters the buffer, if the request remains in the buffer for the specified time or is sent but does not receive a response or acknowledgement in time, the request is considered expired. We recommend setting this timeout to be at least twice the health check interval, so that the buffer has the chance to recover if too many requests get enqueued.""" + zh: """从请求进入缓冲区开始计时,如果请求在规定的时间内仍停留在缓冲区内或者已发送但未能及时收到响应或确认,该请求将被视为过期。我们建议将这个超时设置为健康检查间隔的至少两倍,这样,如果有太多的请求被排队,缓冲区就有机会恢复。""" } label { en: """Request Expiry""" diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index be570e694..8033ed660 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -91,7 +91,10 @@ -define(DEFAULT_QUEUE_SIZE, 100 * 1024 * 1024). -define(DEFAULT_QUEUE_SIZE_RAW, <<"100MB">>). --define(DEFAULT_REQUEST_TIMEOUT, timer:seconds(15)). +%% Note: this should be greater than the health check timeout; +%% otherwise, if the buffer worker is ever blocked, than all queued +%% requests will basically fail without being attempted. +-define(DEFAULT_REQUEST_TIMEOUT, timer:seconds(30)). %% count -define(DEFAULT_BATCH_SIZE, 1). diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 8bfd77e61..05622bdd7 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -56,6 +56,8 @@ -export([clear_disk_queue_dir/2]). +-export([deactivate_bad_request_timeout_alarm/1]). + -elvis([{elvis_style, dont_repeat_yourself, disable}]). -define(COLLECT_REQ_LIMIT, 1000). @@ -88,6 +90,8 @@ -type queue_query() :: ?QUERY(reply_fun(), request(), HasBeenSent :: boolean(), expire_at()). -type request() :: term(). -type request_from() :: undefined | gen_statem:from(). +-type request_timeout() :: infinity | timer:time(). +-type health_check_interval() :: timer:time(). -type state() :: blocked | running. -type inflight_key() :: integer(). -type data() :: #{ @@ -199,6 +203,7 @@ init({Id, Index, Opts}) -> RequestTimeout = maps:get(request_timeout, Opts, ?DEFAULT_REQUEST_TIMEOUT), BatchTime0 = maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME), BatchTime = adjust_batch_time(Id, RequestTimeout, BatchTime0), + maybe_toggle_bad_request_timeout_alarm(Id, RequestTimeout, HealthCheckInterval), Data = #{ id => Id, index => Index, @@ -1679,6 +1684,39 @@ adjust_batch_time(Id, RequestTimeout, BatchTime0) -> end, BatchTime. +%% The request timeout should be greater than the health check +%% timeout, health timeout defines how often the buffer worker tries +%% to unblock. If request timeout is <= health check timeout and the +%% buffer worker is ever blocked, than all queued requests will +%% basically fail without being attempted. +-spec maybe_toggle_bad_request_timeout_alarm( + resource_id(), request_timeout(), health_check_interval() +) -> ok. +maybe_toggle_bad_request_timeout_alarm(Id, _RequestTimeout = infinity, _HealthCheckInterval) -> + deactivate_bad_request_timeout_alarm(Id), + ok; +maybe_toggle_bad_request_timeout_alarm(Id, RequestTimeout, HealthCheckInterval) -> + case RequestTimeout > HealthCheckInterval of + true -> + deactivate_bad_request_timeout_alarm(Id), + ok; + false -> + _ = emqx_alarm:activate( + bad_request_timeout_alarm_id(Id), + #{resource_id => Id, reason => bad_request_timeout}, + <<"Request timeout should be greater than health check timeout: ", Id/binary>> + ), + ok + end. + +-spec deactivate_bad_request_timeout_alarm(resource_id()) -> ok. +deactivate_bad_request_timeout_alarm(Id) -> + _ = emqx_alarm:ensure_deactivated(bad_request_timeout_alarm_id(Id)), + ok. + +bad_request_timeout_alarm_id(Id) -> + <<"bad_request_timeout:", Id/binary>>. + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). adjust_batch_time_test_() -> diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 40f9fe1ab..2bdc67a4d 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -506,6 +506,7 @@ handle_remove_event(From, ClearMetrics, Data) -> true -> ok = emqx_metrics_worker:clear_metrics(?RES_METRICS, Data#data.id); false -> ok end, + emqx_resource_buffer_worker:deactivate_bad_request_timeout_alarm(Data#data.id), {stop_and_reply, {shutdown, removed}, [{reply, From, ok}]}. start_resource(Data, From) -> diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index ff7e1d347..25f4a6d77 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -2554,6 +2554,132 @@ do_t_recursive_flush() -> ), ok. +%% Check that we raise an alarm if a bad request timeout config is +%% issued. Request timeout should be greater than health check +%% timeout. +t_bad_request_timeout_alarm(_Config) -> + emqx_connector_demo:set_callback_mode(async_if_possible), + + %% 1) Same values. + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => async, + request_timeout => 1_000, + health_check_interval => 1_000, + worker_pool_size => 2 + } + ), + ExpectedMessage = + <<"Request timeout should be greater than health check timeout: ", ?ID/binary>>, + ?assertMatch( + [ + #{ + message := ExpectedMessage, + details := #{reason := bad_request_timeout, resource_id := ?ID}, + deactivate_at := infinity + } + ], + emqx_alarm:get_alarms(activated) + ), + %% The unexpected termination of one of the buffer workers should + %% not turn the alarm off. + [Pid, _ | _] = emqx_resource_buffer_worker_sup:worker_pids(?ID), + MRef = monitor(process, Pid), + exit(Pid, kill), + receive + {'DOWN', MRef, process, Pid, _} -> + ok + after 300 -> + ct:fail("buffer worker didn't die") + end, + ?assertMatch( + [ + #{ + message := ExpectedMessage, + details := #{reason := bad_request_timeout, resource_id := ?ID}, + deactivate_at := infinity + } + ], + emqx_alarm:get_alarms(activated) + ), + ok = emqx_resource:remove_local(?ID), + ?assertEqual([], emqx_alarm:get_alarms(activated)), + + %% 2) Request timeout < health check interval. + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => async, + request_timeout => 999, + health_check_interval => 1_000, + worker_pool_size => 2 + } + ), + ?assertMatch( + [ + #{ + message := ExpectedMessage, + details := #{reason := bad_request_timeout, resource_id := ?ID}, + deactivate_at := infinity + } + ], + emqx_alarm:get_alarms(activated) + ), + ok = emqx_resource:remove_local(?ID), + ?assertEqual([], emqx_alarm:get_alarms(activated)), + + %% 2) Request timeout < health check interval. + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => async, + request_timeout => 999, + health_check_interval => 1_000, + worker_pool_size => 2 + } + ), + ?assertMatch( + [ + #{ + message := ExpectedMessage, + details := #{reason := bad_request_timeout, resource_id := ?ID}, + deactivate_at := infinity + } + ], + emqx_alarm:get_alarms(activated) + ), + ok = emqx_resource:remove_local(?ID), + ?assertEqual([], emqx_alarm:get_alarms(activated)), + + %% 3) Request timeout > health check interval. + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => async, + request_timeout => 1_001, + health_check_interval => 1_000, + worker_pool_size => 2 + } + ), + ?assertEqual([], emqx_alarm:get_alarms(activated)), + ok = emqx_resource:remove_local(?ID), + ?assertEqual([], emqx_alarm:get_alarms(activated)), + + ok. + %%------------------------------------------------------------------------------ %% Helpers %%------------------------------------------------------------------------------ diff --git a/changes/ce/fix-10154.en.md b/changes/ce/fix-10154.en.md new file mode 100644 index 000000000..83a729360 --- /dev/null +++ b/changes/ce/fix-10154.en.md @@ -0,0 +1,7 @@ +Change the default `request_timeout` for bridges and connectors to be +twice the default `health_check_interval`. + +Before this change, the default values for those two options meant +that, if a buffer ever got blocked due to resource errors or high +message volumes, then, by the time the buffer would try to resume its +normal operations, almost all requests would have timed out. From 61cb03b45a427fe361a79edc807d5e1e791de132 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 21 Mar 2023 14:32:50 -0300 Subject: [PATCH 2/3] fix(buffer_worker): change the default `resume_interval` value and expose it as hidden config Also removes the previously added alarm for request timeout. There are situations where having a short request timeout and a long health check interval make sense, so we don't want to alarm the user for those situations. Instead, we automatically attempt to set a reasonable `resume_interval` value. --- .../i18n/emqx_resource_schema_i18n.conf | 15 ++- apps/emqx_resource/include/emqx_resource.hrl | 5 +- .../src/emqx_resource_buffer_worker.erl | 49 ++----- .../src/emqx_resource_manager.erl | 1 - .../src/schema/emqx_resource_schema.erl | 7 + .../test/emqx_resource_SUITE.erl | 126 ------------------ changes/ce/fix-10154.en.md | 7 +- .../test/emqx_ee_bridge_gcp_pubsub_SUITE.erl | 14 +- 8 files changed, 50 insertions(+), 174 deletions(-) diff --git a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf index 2e5cf96e8..eb4f00ac7 100644 --- a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf +++ b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf @@ -45,6 +45,17 @@ For bridges only have ingress direction data flow, it can be set to 0 otherwise } } + resume_interval { + desc { + en: """The interval at which a resource will retry inflight requests.""" + zh: """资源重试机内请求的间隔时间。""" + } + label { + en: """Resume Interval""" + zh: """复职时间间隔""" + } + } + start_after_created { desc { en: """Whether start the resource right after created.""" @@ -102,8 +113,8 @@ For bridges only have ingress direction data flow, it can be set to 0 otherwise request_timeout { desc { - en: """Starting from the moment when the request enters the buffer, if the request remains in the buffer for the specified time or is sent but does not receive a response or acknowledgement in time, the request is considered expired. We recommend setting this timeout to be at least twice the health check interval, so that the buffer has the chance to recover if too many requests get enqueued.""" - zh: """从请求进入缓冲区开始计时,如果请求在规定的时间内仍停留在缓冲区内或者已发送但未能及时收到响应或确认,该请求将被视为过期。我们建议将这个超时设置为健康检查间隔的至少两倍,这样,如果有太多的请求被排队,缓冲区就有机会恢复。""" + en: """Starting from the moment when the request enters the buffer, if the request remains in the buffer for the specified time or is sent but does not receive a response or acknowledgement in time, the request is considered expired.""" + zh: """从请求进入缓冲区开始计时,如果请求在规定的时间内仍停留在缓冲区内或者已发送但未能及时收到响应或确认,该请求将被视为过期。""" } label { en: """Request Expiry""" diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 8033ed660..be570e694 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -91,10 +91,7 @@ -define(DEFAULT_QUEUE_SIZE, 100 * 1024 * 1024). -define(DEFAULT_QUEUE_SIZE_RAW, <<"100MB">>). -%% Note: this should be greater than the health check timeout; -%% otherwise, if the buffer worker is ever blocked, than all queued -%% requests will basically fail without being attempted. --define(DEFAULT_REQUEST_TIMEOUT, timer:seconds(30)). +-define(DEFAULT_REQUEST_TIMEOUT, timer:seconds(15)). %% count -define(DEFAULT_BATCH_SIZE, 1). diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 05622bdd7..648587c25 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -56,8 +56,6 @@ -export([clear_disk_queue_dir/2]). --export([deactivate_bad_request_timeout_alarm/1]). - -elvis([{elvis_style, dont_repeat_yourself, disable}]). -define(COLLECT_REQ_LIMIT, 1000). @@ -203,7 +201,8 @@ init({Id, Index, Opts}) -> RequestTimeout = maps:get(request_timeout, Opts, ?DEFAULT_REQUEST_TIMEOUT), BatchTime0 = maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME), BatchTime = adjust_batch_time(Id, RequestTimeout, BatchTime0), - maybe_toggle_bad_request_timeout_alarm(Id, RequestTimeout, HealthCheckInterval), + DefaultResumeInterval = default_resume_interval(RequestTimeout, HealthCheckInterval), + ResumeInterval = maps:get(resume_interval, Opts, DefaultResumeInterval), Data = #{ id => Id, index => Index, @@ -212,7 +211,7 @@ init({Id, Index, Opts}) -> batch_size => BatchSize, batch_time => BatchTime, queue => Queue, - resume_interval => maps:get(resume_interval, Opts, HealthCheckInterval), + resume_interval => ResumeInterval, tref => undefined }, ?tp(buffer_worker_init, #{id => Id, index => Index}), @@ -1684,38 +1683,16 @@ adjust_batch_time(Id, RequestTimeout, BatchTime0) -> end, BatchTime. -%% The request timeout should be greater than the health check -%% timeout, health timeout defines how often the buffer worker tries -%% to unblock. If request timeout is <= health check timeout and the -%% buffer worker is ever blocked, than all queued requests will -%% basically fail without being attempted. --spec maybe_toggle_bad_request_timeout_alarm( - resource_id(), request_timeout(), health_check_interval() -) -> ok. -maybe_toggle_bad_request_timeout_alarm(Id, _RequestTimeout = infinity, _HealthCheckInterval) -> - deactivate_bad_request_timeout_alarm(Id), - ok; -maybe_toggle_bad_request_timeout_alarm(Id, RequestTimeout, HealthCheckInterval) -> - case RequestTimeout > HealthCheckInterval of - true -> - deactivate_bad_request_timeout_alarm(Id), - ok; - false -> - _ = emqx_alarm:activate( - bad_request_timeout_alarm_id(Id), - #{resource_id => Id, reason => bad_request_timeout}, - <<"Request timeout should be greater than health check timeout: ", Id/binary>> - ), - ok - end. - --spec deactivate_bad_request_timeout_alarm(resource_id()) -> ok. -deactivate_bad_request_timeout_alarm(Id) -> - _ = emqx_alarm:ensure_deactivated(bad_request_timeout_alarm_id(Id)), - ok. - -bad_request_timeout_alarm_id(Id) -> - <<"bad_request_timeout:", Id/binary>>. +%% The request timeout should be greater than the resume interval, as +%% it defines how often the buffer worker tries to unblock. If request +%% timeout is <= resume interval and the buffer worker is ever +%% blocked, than all queued requests will basically fail without being +%% attempted. +-spec default_resume_interval(request_timeout(), health_check_interval()) -> timer:time(). +default_resume_interval(_RequestTimeout = infinity, HealthCheckInterval) -> + max(1, HealthCheckInterval); +default_resume_interval(RequestTimeout, HealthCheckInterval) -> + max(1, min(HealthCheckInterval, RequestTimeout div 3)). -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 2bdc67a4d..40f9fe1ab 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -506,7 +506,6 @@ handle_remove_event(From, ClearMetrics, Data) -> true -> ok = emqx_metrics_worker:clear_metrics(?RES_METRICS, Data#data.id); false -> ok end, - emqx_resource_buffer_worker:deactivate_bad_request_timeout_alarm(Data#data.id), {stop_and_reply, {shutdown, removed}, [{reply, From, ok}]}. start_resource(Data, From) -> diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl index fdd65bc3c..b9ed176fe 100644 --- a/apps/emqx_resource/src/schema/emqx_resource_schema.erl +++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl @@ -55,6 +55,7 @@ fields("creation_opts") -> [ {worker_pool_size, fun worker_pool_size/1}, {health_check_interval, fun health_check_interval/1}, + {resume_interval, fun resume_interval/1}, {start_after_created, fun start_after_created/1}, {start_timeout, fun start_timeout/1}, {auto_restart_interval, fun auto_restart_interval/1}, @@ -81,6 +82,12 @@ worker_pool_size(default) -> ?WORKER_POOL_SIZE; worker_pool_size(required) -> false; worker_pool_size(_) -> undefined. +resume_interval(type) -> emqx_schema:duration_ms(); +resume_interval(hidden) -> true; +resume_interval(desc) -> ?DESC("resume_interval"); +resume_interval(required) -> false; +resume_interval(_) -> undefined. + health_check_interval(type) -> emqx_schema:duration_ms(); health_check_interval(desc) -> ?DESC("health_check_interval"); health_check_interval(default) -> ?HEALTHCHECK_INTERVAL_RAW; diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 25f4a6d77..ff7e1d347 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -2554,132 +2554,6 @@ do_t_recursive_flush() -> ), ok. -%% Check that we raise an alarm if a bad request timeout config is -%% issued. Request timeout should be greater than health check -%% timeout. -t_bad_request_timeout_alarm(_Config) -> - emqx_connector_demo:set_callback_mode(async_if_possible), - - %% 1) Same values. - {ok, _} = emqx_resource:create( - ?ID, - ?DEFAULT_RESOURCE_GROUP, - ?TEST_RESOURCE, - #{name => test_resource}, - #{ - query_mode => async, - request_timeout => 1_000, - health_check_interval => 1_000, - worker_pool_size => 2 - } - ), - ExpectedMessage = - <<"Request timeout should be greater than health check timeout: ", ?ID/binary>>, - ?assertMatch( - [ - #{ - message := ExpectedMessage, - details := #{reason := bad_request_timeout, resource_id := ?ID}, - deactivate_at := infinity - } - ], - emqx_alarm:get_alarms(activated) - ), - %% The unexpected termination of one of the buffer workers should - %% not turn the alarm off. - [Pid, _ | _] = emqx_resource_buffer_worker_sup:worker_pids(?ID), - MRef = monitor(process, Pid), - exit(Pid, kill), - receive - {'DOWN', MRef, process, Pid, _} -> - ok - after 300 -> - ct:fail("buffer worker didn't die") - end, - ?assertMatch( - [ - #{ - message := ExpectedMessage, - details := #{reason := bad_request_timeout, resource_id := ?ID}, - deactivate_at := infinity - } - ], - emqx_alarm:get_alarms(activated) - ), - ok = emqx_resource:remove_local(?ID), - ?assertEqual([], emqx_alarm:get_alarms(activated)), - - %% 2) Request timeout < health check interval. - {ok, _} = emqx_resource:create( - ?ID, - ?DEFAULT_RESOURCE_GROUP, - ?TEST_RESOURCE, - #{name => test_resource}, - #{ - query_mode => async, - request_timeout => 999, - health_check_interval => 1_000, - worker_pool_size => 2 - } - ), - ?assertMatch( - [ - #{ - message := ExpectedMessage, - details := #{reason := bad_request_timeout, resource_id := ?ID}, - deactivate_at := infinity - } - ], - emqx_alarm:get_alarms(activated) - ), - ok = emqx_resource:remove_local(?ID), - ?assertEqual([], emqx_alarm:get_alarms(activated)), - - %% 2) Request timeout < health check interval. - {ok, _} = emqx_resource:create( - ?ID, - ?DEFAULT_RESOURCE_GROUP, - ?TEST_RESOURCE, - #{name => test_resource}, - #{ - query_mode => async, - request_timeout => 999, - health_check_interval => 1_000, - worker_pool_size => 2 - } - ), - ?assertMatch( - [ - #{ - message := ExpectedMessage, - details := #{reason := bad_request_timeout, resource_id := ?ID}, - deactivate_at := infinity - } - ], - emqx_alarm:get_alarms(activated) - ), - ok = emqx_resource:remove_local(?ID), - ?assertEqual([], emqx_alarm:get_alarms(activated)), - - %% 3) Request timeout > health check interval. - {ok, _} = emqx_resource:create( - ?ID, - ?DEFAULT_RESOURCE_GROUP, - ?TEST_RESOURCE, - #{name => test_resource}, - #{ - query_mode => async, - request_timeout => 1_001, - health_check_interval => 1_000, - worker_pool_size => 2 - } - ), - ?assertEqual([], emqx_alarm:get_alarms(activated)), - ok = emqx_resource:remove_local(?ID), - ?assertEqual([], emqx_alarm:get_alarms(activated)), - - ok. - %%------------------------------------------------------------------------------ %% Helpers %%------------------------------------------------------------------------------ diff --git a/changes/ce/fix-10154.en.md b/changes/ce/fix-10154.en.md index 83a729360..24bc4bae1 100644 --- a/changes/ce/fix-10154.en.md +++ b/changes/ce/fix-10154.en.md @@ -1,7 +1,8 @@ -Change the default `request_timeout` for bridges and connectors to be -twice the default `health_check_interval`. +Change the default `resume_interval` for bridges and connectors to be +the minimum of `health_check_interval` and `request_timeout / 3`. +Also exposes it as a hidden configuration to allow fine tuning. -Before this change, the default values for those two options meant +Before this change, the default values for `resume_interval` meant that, if a buffer ever got blocked due to resource errors or high message volumes, then, by the time the buffer would try to resume its normal operations, almost all requests would have timed out. diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl index 452b7a4d2..55dfa5555 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl @@ -520,6 +520,7 @@ wait_until_gauge_is(GaugeName, ExpectedValue, Timeout) -> #{measurements := #{gauge_set := ExpectedValue}} -> ok; #{measurements := #{gauge_set := Value}} -> + ct:pal("events: ~p", [Events]), ct:fail( "gauge ~p didn't reach expected value ~p; last value: ~p", [GaugeName, ExpectedValue, Value] @@ -972,7 +973,13 @@ t_publish_econnrefused(Config) -> ResourceId = ?config(resource_id, Config), %% set pipelining to 1 so that one of the 2 requests is `pending' %% in ehttpc. - {ok, _} = create_bridge(Config, #{<<"pipelining">> => 1}), + {ok, _} = create_bridge( + Config, + #{ + <<"pipelining">> => 1, + <<"resource_opts">> => #{<<"resume_interval">> => <<"15s">>} + } + ), {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), assert_empty_metrics(ResourceId), @@ -986,7 +993,10 @@ t_publish_timeout(Config) -> %% requests are done separately. {ok, _} = create_bridge(Config, #{ <<"pipelining">> => 1, - <<"resource_opts">> => #{<<"batch_size">> => 1} + <<"resource_opts">> => #{ + <<"batch_size">> => 1, + <<"resume_interval">> => <<"15s">> + } }), {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), From 8844b22c809ae7495243ed8f9164d84cf6303513 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 22 Mar 2023 15:32:09 -0300 Subject: [PATCH 3/3] docs: improve descriptions Co-authored-by: Zaiming (Stone) Shi --- apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf index eb4f00ac7..aedcabc70 100644 --- a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf +++ b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf @@ -47,12 +47,12 @@ For bridges only have ingress direction data flow, it can be set to 0 otherwise resume_interval { desc { - en: """The interval at which a resource will retry inflight requests.""" - zh: """资源重试机内请求的间隔时间。""" + en: """The interval at which the buffer worker attempts to resend failed requests in the inflight window.""" + zh: """在发送失败后尝试重传飞行窗口中的请求的时间间隔。""" } label { en: """Resume Interval""" - zh: """复职时间间隔""" + zh: """重试时间间隔""" } }