Merge pull request #10154 from thalesmg/fix-buffer-worker-default-req-timeout

fix(buffer_worker): calculate default `resume_interval` based on `request_timeout` and `health_check_interval`
This commit is contained in:
Thales Macedo Garitezi 2023-03-22 20:21:04 -03:00 committed by GitHub
commit ddffba0355
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 54 additions and 3 deletions

View File

@ -45,6 +45,17 @@ For bridges only have ingress direction data flow, it can be set to 0 otherwise
}
}
resume_interval {
desc {
en: """The interval at which the buffer worker attempts to resend failed requests in the inflight window."""
zh: """在发送失败后尝试重传飞行窗口中的请求的时间间隔。"""
}
label {
en: """Resume Interval"""
zh: """重试时间间隔"""
}
}
start_after_created {
desc {
en: """Whether start the resource right after created."""

View File

@ -88,6 +88,8 @@
-type queue_query() :: ?QUERY(reply_fun(), request(), HasBeenSent :: boolean(), expire_at()).
-type request() :: term().
-type request_from() :: undefined | gen_statem:from().
-type request_timeout() :: infinity | timer:time().
-type health_check_interval() :: timer:time().
-type state() :: blocked | running.
-type inflight_key() :: integer().
-type data() :: #{
@ -199,6 +201,8 @@ init({Id, Index, Opts}) ->
RequestTimeout = maps:get(request_timeout, Opts, ?DEFAULT_REQUEST_TIMEOUT),
BatchTime0 = maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME),
BatchTime = adjust_batch_time(Id, RequestTimeout, BatchTime0),
DefaultResumeInterval = default_resume_interval(RequestTimeout, HealthCheckInterval),
ResumeInterval = maps:get(resume_interval, Opts, DefaultResumeInterval),
Data = #{
id => Id,
index => Index,
@ -207,7 +211,7 @@ init({Id, Index, Opts}) ->
batch_size => BatchSize,
batch_time => BatchTime,
queue => Queue,
resume_interval => maps:get(resume_interval, Opts, HealthCheckInterval),
resume_interval => ResumeInterval,
tref => undefined
},
?tp(buffer_worker_init, #{id => Id, index => Index}),
@ -1679,6 +1683,17 @@ adjust_batch_time(Id, RequestTimeout, BatchTime0) ->
end,
BatchTime.
%% The request timeout should be greater than the resume interval, as
%% it defines how often the buffer worker tries to unblock. If request
%% timeout is <= resume interval and the buffer worker is ever
%% blocked, than all queued requests will basically fail without being
%% attempted.
-spec default_resume_interval(request_timeout(), health_check_interval()) -> timer:time().
default_resume_interval(_RequestTimeout = infinity, HealthCheckInterval) ->
max(1, HealthCheckInterval);
default_resume_interval(RequestTimeout, HealthCheckInterval) ->
max(1, min(HealthCheckInterval, RequestTimeout div 3)).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
adjust_batch_time_test_() ->

View File

@ -55,6 +55,7 @@ fields("creation_opts") ->
[
{worker_pool_size, fun worker_pool_size/1},
{health_check_interval, fun health_check_interval/1},
{resume_interval, fun resume_interval/1},
{start_after_created, fun start_after_created/1},
{start_timeout, fun start_timeout/1},
{auto_restart_interval, fun auto_restart_interval/1},
@ -81,6 +82,12 @@ worker_pool_size(default) -> ?WORKER_POOL_SIZE;
worker_pool_size(required) -> false;
worker_pool_size(_) -> undefined.
resume_interval(type) -> emqx_schema:duration_ms();
resume_interval(hidden) -> true;
resume_interval(desc) -> ?DESC("resume_interval");
resume_interval(required) -> false;
resume_interval(_) -> undefined.
health_check_interval(type) -> emqx_schema:duration_ms();
health_check_interval(desc) -> ?DESC("health_check_interval");
health_check_interval(default) -> ?HEALTHCHECK_INTERVAL_RAW;

View File

@ -0,0 +1,8 @@
Change the default `resume_interval` for bridges and connectors to be
the minimum of `health_check_interval` and `request_timeout / 3`.
Also exposes it as a hidden configuration to allow fine tuning.
Before this change, the default values for `resume_interval` meant
that, if a buffer ever got blocked due to resource errors or high
message volumes, then, by the time the buffer would try to resume its
normal operations, almost all requests would have timed out.

View File

@ -520,6 +520,7 @@ wait_until_gauge_is(GaugeName, ExpectedValue, Timeout) ->
#{measurements := #{gauge_set := ExpectedValue}} ->
ok;
#{measurements := #{gauge_set := Value}} ->
ct:pal("events: ~p", [Events]),
ct:fail(
"gauge ~p didn't reach expected value ~p; last value: ~p",
[GaugeName, ExpectedValue, Value]
@ -972,7 +973,13 @@ t_publish_econnrefused(Config) ->
ResourceId = ?config(resource_id, Config),
%% set pipelining to 1 so that one of the 2 requests is `pending'
%% in ehttpc.
{ok, _} = create_bridge(Config, #{<<"pipelining">> => 1}),
{ok, _} = create_bridge(
Config,
#{
<<"pipelining">> => 1,
<<"resource_opts">> => #{<<"resume_interval">> => <<"15s">>}
}
),
{ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
assert_empty_metrics(ResourceId),
@ -986,7 +993,10 @@ t_publish_timeout(Config) ->
%% requests are done separately.
{ok, _} = create_bridge(Config, #{
<<"pipelining">> => 1,
<<"resource_opts">> => #{<<"batch_size">> => 1}
<<"resource_opts">> => #{
<<"batch_size">> => 1,
<<"resume_interval">> => <<"15s">>
}
}),
{ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),