fix(buffer_worker): check request timeout and health check interval
Fixes https://emqx.atlassian.net/browse/EMQX-9099 The default value for `request_timeout` is 15 seconds, and the default resume interval is also 15 seconds (the health check timeout, if `resume_interval` is not explicitly given). This means that, in practice, if a buffer worker ever gets into the blocked state, then almost all requests will timeout. Proposed improvement: - `request_timeout` should by default be twice as much as health_check_interval. - Emit a alarm if `request_timeout` is not greater than `health_check_interval`.
This commit is contained in:
parent
ef8f764154
commit
20414d7373
|
@ -102,8 +102,8 @@ For bridges only have ingress direction data flow, it can be set to 0 otherwise
|
|||
|
||||
request_timeout {
|
||||
desc {
|
||||
en: """Starting from the moment when the request enters the buffer, if the request remains in the buffer for the specified time or is sent but does not receive a response or acknowledgement in time, the request is considered expired."""
|
||||
zh: """从请求进入缓冲区开始计时,如果请求在规定的时间内仍停留在缓冲区内或者已发送但未能及时收到响应或确认,该请求将被视为过期。"""
|
||||
en: """Starting from the moment when the request enters the buffer, if the request remains in the buffer for the specified time or is sent but does not receive a response or acknowledgement in time, the request is considered expired. We recommend setting this timeout to be at least twice the health check interval, so that the buffer has the chance to recover if too many requests get enqueued."""
|
||||
zh: """从请求进入缓冲区开始计时,如果请求在规定的时间内仍停留在缓冲区内或者已发送但未能及时收到响应或确认,该请求将被视为过期。我们建议将这个超时设置为健康检查间隔的至少两倍,这样,如果有太多的请求被排队,缓冲区就有机会恢复。"""
|
||||
}
|
||||
label {
|
||||
en: """Request Expiry"""
|
||||
|
|
|
@ -91,7 +91,10 @@
|
|||
-define(DEFAULT_QUEUE_SIZE, 100 * 1024 * 1024).
|
||||
-define(DEFAULT_QUEUE_SIZE_RAW, <<"100MB">>).
|
||||
|
||||
-define(DEFAULT_REQUEST_TIMEOUT, timer:seconds(15)).
|
||||
%% Note: this should be greater than the health check timeout;
|
||||
%% otherwise, if the buffer worker is ever blocked, than all queued
|
||||
%% requests will basically fail without being attempted.
|
||||
-define(DEFAULT_REQUEST_TIMEOUT, timer:seconds(30)).
|
||||
|
||||
%% count
|
||||
-define(DEFAULT_BATCH_SIZE, 1).
|
||||
|
|
|
@ -56,6 +56,8 @@
|
|||
|
||||
-export([clear_disk_queue_dir/2]).
|
||||
|
||||
-export([deactivate_bad_request_timeout_alarm/1]).
|
||||
|
||||
-elvis([{elvis_style, dont_repeat_yourself, disable}]).
|
||||
|
||||
-define(COLLECT_REQ_LIMIT, 1000).
|
||||
|
@ -88,6 +90,8 @@
|
|||
-type queue_query() :: ?QUERY(reply_fun(), request(), HasBeenSent :: boolean(), expire_at()).
|
||||
-type request() :: term().
|
||||
-type request_from() :: undefined | gen_statem:from().
|
||||
-type request_timeout() :: infinity | timer:time().
|
||||
-type health_check_interval() :: timer:time().
|
||||
-type state() :: blocked | running.
|
||||
-type inflight_key() :: integer().
|
||||
-type data() :: #{
|
||||
|
@ -199,6 +203,7 @@ init({Id, Index, Opts}) ->
|
|||
RequestTimeout = maps:get(request_timeout, Opts, ?DEFAULT_REQUEST_TIMEOUT),
|
||||
BatchTime0 = maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME),
|
||||
BatchTime = adjust_batch_time(Id, RequestTimeout, BatchTime0),
|
||||
maybe_toggle_bad_request_timeout_alarm(Id, RequestTimeout, HealthCheckInterval),
|
||||
Data = #{
|
||||
id => Id,
|
||||
index => Index,
|
||||
|
@ -1679,6 +1684,39 @@ adjust_batch_time(Id, RequestTimeout, BatchTime0) ->
|
|||
end,
|
||||
BatchTime.
|
||||
|
||||
%% The request timeout should be greater than the health check
|
||||
%% timeout, health timeout defines how often the buffer worker tries
|
||||
%% to unblock. If request timeout is <= health check timeout and the
|
||||
%% buffer worker is ever blocked, than all queued requests will
|
||||
%% basically fail without being attempted.
|
||||
-spec maybe_toggle_bad_request_timeout_alarm(
|
||||
resource_id(), request_timeout(), health_check_interval()
|
||||
) -> ok.
|
||||
maybe_toggle_bad_request_timeout_alarm(Id, _RequestTimeout = infinity, _HealthCheckInterval) ->
|
||||
deactivate_bad_request_timeout_alarm(Id),
|
||||
ok;
|
||||
maybe_toggle_bad_request_timeout_alarm(Id, RequestTimeout, HealthCheckInterval) ->
|
||||
case RequestTimeout > HealthCheckInterval of
|
||||
true ->
|
||||
deactivate_bad_request_timeout_alarm(Id),
|
||||
ok;
|
||||
false ->
|
||||
_ = emqx_alarm:activate(
|
||||
bad_request_timeout_alarm_id(Id),
|
||||
#{resource_id => Id, reason => bad_request_timeout},
|
||||
<<"Request timeout should be greater than health check timeout: ", Id/binary>>
|
||||
),
|
||||
ok
|
||||
end.
|
||||
|
||||
-spec deactivate_bad_request_timeout_alarm(resource_id()) -> ok.
|
||||
deactivate_bad_request_timeout_alarm(Id) ->
|
||||
_ = emqx_alarm:ensure_deactivated(bad_request_timeout_alarm_id(Id)),
|
||||
ok.
|
||||
|
||||
bad_request_timeout_alarm_id(Id) ->
|
||||
<<"bad_request_timeout:", Id/binary>>.
|
||||
|
||||
-ifdef(TEST).
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
adjust_batch_time_test_() ->
|
||||
|
|
|
@ -506,6 +506,7 @@ handle_remove_event(From, ClearMetrics, Data) ->
|
|||
true -> ok = emqx_metrics_worker:clear_metrics(?RES_METRICS, Data#data.id);
|
||||
false -> ok
|
||||
end,
|
||||
emqx_resource_buffer_worker:deactivate_bad_request_timeout_alarm(Data#data.id),
|
||||
{stop_and_reply, {shutdown, removed}, [{reply, From, ok}]}.
|
||||
|
||||
start_resource(Data, From) ->
|
||||
|
|
|
@ -2554,6 +2554,132 @@ do_t_recursive_flush() ->
|
|||
),
|
||||
ok.
|
||||
|
||||
%% Check that we raise an alarm if a bad request timeout config is
|
||||
%% issued. Request timeout should be greater than health check
|
||||
%% timeout.
|
||||
t_bad_request_timeout_alarm(_Config) ->
|
||||
emqx_connector_demo:set_callback_mode(async_if_possible),
|
||||
|
||||
%% 1) Same values.
|
||||
{ok, _} = emqx_resource:create(
|
||||
?ID,
|
||||
?DEFAULT_RESOURCE_GROUP,
|
||||
?TEST_RESOURCE,
|
||||
#{name => test_resource},
|
||||
#{
|
||||
query_mode => async,
|
||||
request_timeout => 1_000,
|
||||
health_check_interval => 1_000,
|
||||
worker_pool_size => 2
|
||||
}
|
||||
),
|
||||
ExpectedMessage =
|
||||
<<"Request timeout should be greater than health check timeout: ", ?ID/binary>>,
|
||||
?assertMatch(
|
||||
[
|
||||
#{
|
||||
message := ExpectedMessage,
|
||||
details := #{reason := bad_request_timeout, resource_id := ?ID},
|
||||
deactivate_at := infinity
|
||||
}
|
||||
],
|
||||
emqx_alarm:get_alarms(activated)
|
||||
),
|
||||
%% The unexpected termination of one of the buffer workers should
|
||||
%% not turn the alarm off.
|
||||
[Pid, _ | _] = emqx_resource_buffer_worker_sup:worker_pids(?ID),
|
||||
MRef = monitor(process, Pid),
|
||||
exit(Pid, kill),
|
||||
receive
|
||||
{'DOWN', MRef, process, Pid, _} ->
|
||||
ok
|
||||
after 300 ->
|
||||
ct:fail("buffer worker didn't die")
|
||||
end,
|
||||
?assertMatch(
|
||||
[
|
||||
#{
|
||||
message := ExpectedMessage,
|
||||
details := #{reason := bad_request_timeout, resource_id := ?ID},
|
||||
deactivate_at := infinity
|
||||
}
|
||||
],
|
||||
emqx_alarm:get_alarms(activated)
|
||||
),
|
||||
ok = emqx_resource:remove_local(?ID),
|
||||
?assertEqual([], emqx_alarm:get_alarms(activated)),
|
||||
|
||||
%% 2) Request timeout < health check interval.
|
||||
{ok, _} = emqx_resource:create(
|
||||
?ID,
|
||||
?DEFAULT_RESOURCE_GROUP,
|
||||
?TEST_RESOURCE,
|
||||
#{name => test_resource},
|
||||
#{
|
||||
query_mode => async,
|
||||
request_timeout => 999,
|
||||
health_check_interval => 1_000,
|
||||
worker_pool_size => 2
|
||||
}
|
||||
),
|
||||
?assertMatch(
|
||||
[
|
||||
#{
|
||||
message := ExpectedMessage,
|
||||
details := #{reason := bad_request_timeout, resource_id := ?ID},
|
||||
deactivate_at := infinity
|
||||
}
|
||||
],
|
||||
emqx_alarm:get_alarms(activated)
|
||||
),
|
||||
ok = emqx_resource:remove_local(?ID),
|
||||
?assertEqual([], emqx_alarm:get_alarms(activated)),
|
||||
|
||||
%% 2) Request timeout < health check interval.
|
||||
{ok, _} = emqx_resource:create(
|
||||
?ID,
|
||||
?DEFAULT_RESOURCE_GROUP,
|
||||
?TEST_RESOURCE,
|
||||
#{name => test_resource},
|
||||
#{
|
||||
query_mode => async,
|
||||
request_timeout => 999,
|
||||
health_check_interval => 1_000,
|
||||
worker_pool_size => 2
|
||||
}
|
||||
),
|
||||
?assertMatch(
|
||||
[
|
||||
#{
|
||||
message := ExpectedMessage,
|
||||
details := #{reason := bad_request_timeout, resource_id := ?ID},
|
||||
deactivate_at := infinity
|
||||
}
|
||||
],
|
||||
emqx_alarm:get_alarms(activated)
|
||||
),
|
||||
ok = emqx_resource:remove_local(?ID),
|
||||
?assertEqual([], emqx_alarm:get_alarms(activated)),
|
||||
|
||||
%% 3) Request timeout > health check interval.
|
||||
{ok, _} = emqx_resource:create(
|
||||
?ID,
|
||||
?DEFAULT_RESOURCE_GROUP,
|
||||
?TEST_RESOURCE,
|
||||
#{name => test_resource},
|
||||
#{
|
||||
query_mode => async,
|
||||
request_timeout => 1_001,
|
||||
health_check_interval => 1_000,
|
||||
worker_pool_size => 2
|
||||
}
|
||||
),
|
||||
?assertEqual([], emqx_alarm:get_alarms(activated)),
|
||||
ok = emqx_resource:remove_local(?ID),
|
||||
?assertEqual([], emqx_alarm:get_alarms(activated)),
|
||||
|
||||
ok.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Helpers
|
||||
%%------------------------------------------------------------------------------
|
||||
|
|
|
@ -0,0 +1,7 @@
|
|||
Change the default `request_timeout` for bridges and connectors to be
|
||||
twice the default `health_check_interval`.
|
||||
|
||||
Before this change, the default values for those two options meant
|
||||
that, if a buffer ever got blocked due to resource errors or high
|
||||
message volumes, then, by the time the buffer would try to resume its
|
||||
normal operations, almost all requests would have timed out.
|
Loading…
Reference in New Issue