diff --git a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf index 600289b1d..57b109497 100644 --- a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf +++ b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf @@ -45,6 +45,17 @@ For bridges only have ingress direction data flow, it can be set to 0 otherwise } } + resume_interval { + desc { + en: """The interval at which the buffer worker attempts to resend failed requests in the inflight window.""" + zh: """在发送失败后尝试重传飞行窗口中的请求的时间间隔。""" + } + label { + en: """Resume Interval""" + zh: """重试时间间隔""" + } + } + start_after_created { desc { en: """Whether start the resource right after created.""" diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 8bfd77e61..648587c25 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -88,6 +88,8 @@ -type queue_query() :: ?QUERY(reply_fun(), request(), HasBeenSent :: boolean(), expire_at()). -type request() :: term(). -type request_from() :: undefined | gen_statem:from(). +-type request_timeout() :: infinity | timer:time(). +-type health_check_interval() :: timer:time(). -type state() :: blocked | running. -type inflight_key() :: integer(). -type data() :: #{ @@ -199,6 +201,8 @@ init({Id, Index, Opts}) -> RequestTimeout = maps:get(request_timeout, Opts, ?DEFAULT_REQUEST_TIMEOUT), BatchTime0 = maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME), BatchTime = adjust_batch_time(Id, RequestTimeout, BatchTime0), + DefaultResumeInterval = default_resume_interval(RequestTimeout, HealthCheckInterval), + ResumeInterval = maps:get(resume_interval, Opts, DefaultResumeInterval), Data = #{ id => Id, index => Index, @@ -207,7 +211,7 @@ init({Id, Index, Opts}) -> batch_size => BatchSize, batch_time => BatchTime, queue => Queue, - resume_interval => maps:get(resume_interval, Opts, HealthCheckInterval), + resume_interval => ResumeInterval, tref => undefined }, ?tp(buffer_worker_init, #{id => Id, index => Index}), @@ -1679,6 +1683,17 @@ adjust_batch_time(Id, RequestTimeout, BatchTime0) -> end, BatchTime. +%% The request timeout should be greater than the resume interval, as +%% it defines how often the buffer worker tries to unblock. If request +%% timeout is <= resume interval and the buffer worker is ever +%% blocked, than all queued requests will basically fail without being +%% attempted. +-spec default_resume_interval(request_timeout(), health_check_interval()) -> timer:time(). +default_resume_interval(_RequestTimeout = infinity, HealthCheckInterval) -> + max(1, HealthCheckInterval); +default_resume_interval(RequestTimeout, HealthCheckInterval) -> + max(1, min(HealthCheckInterval, RequestTimeout div 3)). + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). adjust_batch_time_test_() -> diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl index fdd65bc3c..b9ed176fe 100644 --- a/apps/emqx_resource/src/schema/emqx_resource_schema.erl +++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl @@ -55,6 +55,7 @@ fields("creation_opts") -> [ {worker_pool_size, fun worker_pool_size/1}, {health_check_interval, fun health_check_interval/1}, + {resume_interval, fun resume_interval/1}, {start_after_created, fun start_after_created/1}, {start_timeout, fun start_timeout/1}, {auto_restart_interval, fun auto_restart_interval/1}, @@ -81,6 +82,12 @@ worker_pool_size(default) -> ?WORKER_POOL_SIZE; worker_pool_size(required) -> false; worker_pool_size(_) -> undefined. +resume_interval(type) -> emqx_schema:duration_ms(); +resume_interval(hidden) -> true; +resume_interval(desc) -> ?DESC("resume_interval"); +resume_interval(required) -> false; +resume_interval(_) -> undefined. + health_check_interval(type) -> emqx_schema:duration_ms(); health_check_interval(desc) -> ?DESC("health_check_interval"); health_check_interval(default) -> ?HEALTHCHECK_INTERVAL_RAW; diff --git a/changes/ce/fix-10154.en.md b/changes/ce/fix-10154.en.md new file mode 100644 index 000000000..24bc4bae1 --- /dev/null +++ b/changes/ce/fix-10154.en.md @@ -0,0 +1,8 @@ +Change the default `resume_interval` for bridges and connectors to be +the minimum of `health_check_interval` and `request_timeout / 3`. +Also exposes it as a hidden configuration to allow fine tuning. + +Before this change, the default values for `resume_interval` meant +that, if a buffer ever got blocked due to resource errors or high +message volumes, then, by the time the buffer would try to resume its +normal operations, almost all requests would have timed out. diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl index 8424ddff0..f9968ee96 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl @@ -520,6 +520,7 @@ wait_until_gauge_is(GaugeName, ExpectedValue, Timeout) -> #{measurements := #{gauge_set := ExpectedValue}} -> ok; #{measurements := #{gauge_set := Value}} -> + ct:pal("events: ~p", [Events]), ct:fail( "gauge ~p didn't reach expected value ~p; last value: ~p", [GaugeName, ExpectedValue, Value] @@ -972,7 +973,13 @@ t_publish_econnrefused(Config) -> ResourceId = ?config(resource_id, Config), %% set pipelining to 1 so that one of the 2 requests is `pending' %% in ehttpc. - {ok, _} = create_bridge(Config, #{<<"pipelining">> => 1}), + {ok, _} = create_bridge( + Config, + #{ + <<"pipelining">> => 1, + <<"resource_opts">> => #{<<"resume_interval">> => <<"15s">>} + } + ), {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), assert_empty_metrics(ResourceId), @@ -986,7 +993,10 @@ t_publish_timeout(Config) -> %% requests are done separately. {ok, _} = create_bridge(Config, #{ <<"pipelining">> => 1, - <<"resource_opts">> => #{<<"batch_size">> => 1} + <<"resource_opts">> => #{ + <<"batch_size">> => 1, + <<"resume_interval">> => <<"15s">> + } }), {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),