Merge pull request #10051 from thalesmg/adapt-batch-time-r50

fix(buffer_worker): add batch time automatic adjustment
This commit is contained in:
Thales Macedo Garitezi 2023-03-06 17:21:27 -03:00 committed by GitHub
commit c78f914ccc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 213 additions and 25 deletions

View File

@ -17,6 +17,7 @@
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl"). -include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/logger.hrl").
-import(hoconsc, [mk/2, ref/2]). -import(hoconsc, [mk/2, ref/2]).
@ -140,11 +141,7 @@ fields(bridges) ->
#{ #{
desc => ?DESC("bridges_webhook"), desc => ?DESC("bridges_webhook"),
required => false, required => false,
converter => fun(X, _HoconOpts) -> converter => fun webhook_bridge_converter/2
emqx_bridge_compatible_config:upgrade_pre_ee(
X, fun emqx_bridge_compatible_config:webhook_maybe_upgrade/1
)
end
} }
)}, )},
{mqtt, {mqtt,
@ -212,3 +209,48 @@ status() ->
node_name() -> node_name() ->
{"node", mk(binary(), #{desc => ?DESC("desc_node_name"), example => "emqx@127.0.0.1"})}. {"node", mk(binary(), #{desc => ?DESC("desc_node_name"), example => "emqx@127.0.0.1"})}.
webhook_bridge_converter(Conf0, _HoconOpts) ->
Conf1 = emqx_bridge_compatible_config:upgrade_pre_ee(
Conf0, fun emqx_bridge_compatible_config:webhook_maybe_upgrade/1
),
case Conf1 of
undefined ->
undefined;
_ ->
do_convert_webhook_config(Conf1)
end.
do_convert_webhook_config(
#{<<"request_timeout">> := ReqT, <<"resource_opts">> := #{<<"request_timeout">> := ReqT}} = Conf
) ->
%% ok: same values
Conf;
do_convert_webhook_config(
#{
<<"request_timeout">> := ReqTRootRaw,
<<"resource_opts">> := #{<<"request_timeout">> := ReqTResourceRaw}
} = Conf0
) ->
%% different values; we set them to the same, if they are valid
%% durations
MReqTRoot = emqx_schema:to_duration_ms(ReqTRootRaw),
MReqTResource = emqx_schema:to_duration_ms(ReqTResourceRaw),
case {MReqTRoot, MReqTResource} of
{{ok, ReqTRoot}, {ok, ReqTResource}} ->
{_Parsed, ReqTRaw} = max({ReqTRoot, ReqTRootRaw}, {ReqTResource, ReqTResourceRaw}),
Conf1 = emqx_map_lib:deep_merge(
Conf0,
#{
<<"request_timeout">> => ReqTRaw,
<<"resource_opts">> => #{<<"request_timeout">> => ReqTRaw}
}
),
Conf1;
_ ->
%% invalid values; let the type checker complain about
%% that.
Conf0
end;
do_convert_webhook_config(Conf) ->
Conf.

View File

@ -818,6 +818,35 @@ t_metrics(Config) ->
), ),
ok. ok.
%% request_timeout in bridge root should match request_timeout in
%% resource_opts.
t_inconsistent_webhook_request_timeouts(Config) ->
Port = ?config(port, Config),
URL1 = ?URL(Port, "path1"),
Name = ?BRIDGE_NAME,
BadBridgeParams =
emqx_map_lib:deep_merge(
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name),
#{
<<"request_timeout">> => <<"1s">>,
<<"resource_opts">> => #{<<"request_timeout">> => <<"2s">>}
}
),
{ok, 201, RawResponse} = request(
post,
uri(["bridges"]),
BadBridgeParams
),
%% note: same value on both fields
?assertMatch(
#{
<<"request_timeout">> := <<"2s">>,
<<"resource_opts">> := #{<<"request_timeout">> := <<"2s">>}
},
emqx_json:decode(RawResponse, [return_maps])
),
ok.
operation_path(node, Oper, BridgeID) -> operation_path(node, Oper, BridgeID) ->
uri(["nodes", node(), "bridges", BridgeID, Oper]); uri(["nodes", node(), "bridges", BridgeID, Oper]);
operation_path(cluster, Oper, BridgeID) -> operation_path(cluster, Oper, BridgeID) ->

View File

@ -28,6 +28,7 @@ empty_config_test() ->
webhook_config_test() -> webhook_config_test() ->
Conf1 = parse(webhook_v5011_hocon()), Conf1 = parse(webhook_v5011_hocon()),
Conf2 = parse(full_webhook_v5011_hocon()), Conf2 = parse(full_webhook_v5011_hocon()),
Conf3 = parse(full_webhook_v5019_hocon()),
?assertMatch( ?assertMatch(
#{ #{
@ -59,6 +60,26 @@ webhook_config_test() ->
check(Conf2) check(Conf2)
), ),
%% the converter should pick the greater of the two
%% request_timeouts and place them in the root and inside
%% resource_opts.
?assertMatch(
#{
<<"bridges">> := #{
<<"webhook">> := #{
<<"the_name">> :=
#{
<<"method">> := get,
<<"request_timeout">> := 60_000,
<<"resource_opts">> := #{<<"request_timeout">> := 60_000},
<<"body">> := <<"${payload}">>
}
}
}
},
check(Conf3)
),
ok. ok.
up(#{<<"bridges">> := Bridges0} = Conf0) -> up(#{<<"bridges">> := Bridges0} = Conf0) ->
@ -124,7 +145,7 @@ bridges{
max_retries = 3 max_retries = 3
method = \"get\" method = \"get\"
pool_size = 4 pool_size = 4
request_timeout = \"5s\" request_timeout = \"15s\"
ssl {enable = false, verify = \"verify_peer\"} ssl {enable = false, verify = \"verify_peer\"}
url = \"http://localhost:8080\" url = \"http://localhost:8080\"
} }
@ -164,6 +185,41 @@ full_webhook_v5011_hocon() ->
"}\n" "}\n"
"". "".
%% does not contain direction
full_webhook_v5019_hocon() ->
""
"\n"
"bridges{\n"
" webhook {\n"
" the_name{\n"
" body = \"${payload}\"\n"
" connect_timeout = \"5s\"\n"
" enable_pipelining = 100\n"
" headers {\"content-type\" = \"application/json\"}\n"
" max_retries = 3\n"
" method = \"get\"\n"
" pool_size = 4\n"
" pool_type = \"random\"\n"
" request_timeout = \"1m\"\n"
" resource_opts = {\n"
" request_timeout = \"7s\"\n"
" }\n"
" ssl {\n"
" ciphers = \"\"\n"
" depth = 10\n"
" enable = false\n"
" reuse_sessions = true\n"
" secure_renegotiate = true\n"
" user_lookup_fun = \"emqx_tls_psk:lookup\"\n"
" verify = \"verify_peer\"\n"
" versions = [\"tlsv1.3\", \"tlsv1.2\", \"tlsv1.1\", \"tlsv1\"]\n"
" }\n"
" url = \"http://localhost:8080\"\n"
" }\n"
" }\n"
"}\n"
"".
%% erlfmt-ignore %% erlfmt-ignore
%% this is a generated from v5.0.11 %% this is a generated from v5.0.11
mqtt_v5011_hocon() -> mqtt_v5011_hocon() ->

View File

@ -102,12 +102,12 @@ For bridges only have ingress direction data flow, it can be set to 0 otherwise
request_timeout { request_timeout {
desc { desc {
en: """Timeout for requests. If <code>query_mode</code> is <code>sync</code>, calls to the resource will be blocked for this amount of time before timing out.""" en: """Starting from the moment when the request enters the buffer, if the request remains in the buffer for the specified time or is sent but does not receive a response or acknowledgement in time, the request is considered expired."""
zh: """请求的超时。 如果<code>query_mode</code>是<code>sync</code>,对资源的调用将在超时前被阻断这一时间。""" zh: """从请求进入缓冲区开始计时,如果请求在规定的时间内仍停留在缓冲区内或者已发送但未能及时收到响应或确认,该请求将被视为过期。"""
} }
label { label {
en: """Request timeout""" en: """Request Expiry"""
zh: """请求超""" zh: """请求超"""
} }
} }
@ -159,12 +159,12 @@ When disabled the messages are buffered in RAM only."""
batch_time { batch_time {
desc { desc {
en: """Maximum batch waiting interval.""" en: """Maximum waiting interval when accumulating a batch at a low message rates for more efficient resource usage."""
zh: """最大批量请求等待间。""" zh: """在较低消息率情况下尝试累积批量输出时的最大等待间隔,以提高资源的利用率。"""
} }
label { label {
en: """Batch time""" en: """Max Batch Wait Time"""
zh: """批量等待间隔""" zh: """批量等待最大间隔"""
} }
} }

View File

@ -97,8 +97,8 @@
-define(DEFAULT_BATCH_SIZE, 1). -define(DEFAULT_BATCH_SIZE, 1).
%% milliseconds %% milliseconds
-define(DEFAULT_BATCH_TIME, 20). -define(DEFAULT_BATCH_TIME, 0).
-define(DEFAULT_BATCH_TIME_RAW, <<"20ms">>). -define(DEFAULT_BATCH_TIME_RAW, <<"0ms">>).
%% count %% count
-define(DEFAULT_INFLIGHT, 100). -define(DEFAULT_INFLIGHT, 100).

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_resource, [ {application, emqx_resource, [
{description, "Manager for all external resources"}, {description, "Manager for all external resources"},
{vsn, "0.1.8"}, {vsn, "0.1.9"},
{registered, []}, {registered, []},
{mod, {emqx_resource_app, []}}, {mod, {emqx_resource_app, []}},
{applications, [ {applications, [

View File

@ -196,13 +196,16 @@ init({Id, Index, Opts}) ->
InflightWinSize = maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT), InflightWinSize = maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT),
InflightTID = inflight_new(InflightWinSize, Id, Index), InflightTID = inflight_new(InflightWinSize, Id, Index),
HealthCheckInterval = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL), HealthCheckInterval = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL),
RequestTimeout = maps:get(request_timeout, Opts, ?DEFAULT_REQUEST_TIMEOUT),
BatchTime0 = maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME),
BatchTime = adjust_batch_time(Id, RequestTimeout, BatchTime0),
Data = #{ Data = #{
id => Id, id => Id,
index => Index, index => Index,
inflight_tid => InflightTID, inflight_tid => InflightTID,
async_workers => #{}, async_workers => #{},
batch_size => BatchSize, batch_size => BatchSize,
batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME), batch_time => BatchTime,
queue => Queue, queue => Queue,
resume_interval => maps:get(resume_interval, Opts, HealthCheckInterval), resume_interval => maps:get(resume_interval, Opts, HealthCheckInterval),
tref => undefined tref => undefined
@ -1537,6 +1540,12 @@ clear_disk_queue_dir(Id, Index) ->
ensure_flush_timer(Data = #{batch_time := T}) -> ensure_flush_timer(Data = #{batch_time := T}) ->
ensure_flush_timer(Data, T). ensure_flush_timer(Data, T).
ensure_flush_timer(Data = #{tref := undefined}, 0) ->
%% if the batch_time is 0, we don't need to start a timer, which
%% can be costly at high rates.
Ref = make_ref(),
self() ! {flush, Ref},
Data#{tref => {Ref, Ref}};
ensure_flush_timer(Data = #{tref := undefined}, T) -> ensure_flush_timer(Data = #{tref := undefined}, T) ->
Ref = make_ref(), Ref = make_ref(),
TRef = erlang:send_after(T, self(), {flush, Ref}), TRef = erlang:send_after(T, self(), {flush, Ref}),
@ -1639,3 +1648,53 @@ do_minimize(?QUERY(_ReplyTo, _Req, _Sent, _ExpireAt) = Query) -> Query.
-else. -else.
do_minimize(?QUERY(ReplyTo, _Req, Sent, ExpireAt)) -> ?QUERY(ReplyTo, [], Sent, ExpireAt). do_minimize(?QUERY(ReplyTo, _Req, Sent, ExpireAt)) -> ?QUERY(ReplyTo, [], Sent, ExpireAt).
-endif. -endif.
%% To avoid message loss due to misconfigurations, we adjust
%% `batch_time' based on `request_timeout'. If `batch_time' >
%% `request_timeout', all requests will timeout before being sent if
%% the message rate is low. Even worse if `pool_size' is high.
%% We cap `batch_time' at `request_timeout div 2' as a rule of thumb.
adjust_batch_time(_Id, _RequestTimeout = infinity, BatchTime0) ->
BatchTime0;
adjust_batch_time(Id, RequestTimeout, BatchTime0) ->
BatchTime = max(0, min(BatchTime0, RequestTimeout div 2)),
case BatchTime =:= BatchTime0 of
false ->
?SLOG(info, #{
id => Id,
msg => adjusting_buffer_worker_batch_time,
new_batch_time => BatchTime
});
true ->
ok
end,
BatchTime.
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
adjust_batch_time_test_() ->
%% just for logging
Id = some_id,
[
{"batch time smaller than request_time/2",
?_assertEqual(
100,
adjust_batch_time(Id, 500, 100)
)},
{"batch time equal to request_time/2",
?_assertEqual(
100,
adjust_batch_time(Id, 200, 100)
)},
{"batch time greater than request_time/2",
?_assertEqual(
50,
adjust_batch_time(Id, 100, 100)
)},
{"batch time smaller than request_time/2 (request_time = infinity)",
?_assertEqual(
100,
adjust_batch_time(Id, infinity, 100)
)}
].
-endif.

View File

@ -77,8 +77,8 @@ emqx_ee_bridge_gcp_pubsub {
request_timeout { request_timeout {
desc { desc {
en: "HTTP request timeout." en: "Deprecated: Configure the request timeout in the buffer settings."
zh: "HTTP 请求超时。" zh: "废弃的。在缓冲区设置中配置请求超时。"
} }
label: { label: {
en: "Request Timeout" en: "Request Timeout"

View File

@ -1,6 +1,6 @@
{application, emqx_ee_bridge, [ {application, emqx_ee_bridge, [
{description, "EMQX Enterprise data bridges"}, {description, "EMQX Enterprise data bridges"},
{vsn, "0.1.5"}, {vsn, "0.1.6"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -84,6 +84,7 @@ fields(bridge_config) ->
emqx_schema:duration_ms(), emqx_schema:duration_ms(),
#{ #{
required => false, required => false,
deprecated => {since, "e5.0.1"},
default => <<"15s">>, default => <<"15s">>,
desc => ?DESC("request_timeout") desc => ?DESC("request_timeout")
} }

View File

@ -282,7 +282,6 @@ gcp_pubsub_config(Config) ->
"bridges.gcp_pubsub.~s {\n" "bridges.gcp_pubsub.~s {\n"
" enable = true\n" " enable = true\n"
" connect_timeout = 1s\n" " connect_timeout = 1s\n"
" request_timeout = 1s\n"
" service_account_json = ~s\n" " service_account_json = ~s\n"
" payload_template = ~p\n" " payload_template = ~p\n"
" pubsub_topic = ~s\n" " pubsub_topic = ~s\n"

View File

@ -540,7 +540,9 @@ resource_configs() ->
<<"query_mode">> => <<"sync">>, <<"query_mode">> => <<"sync">>,
<<"worker_pool_size">> => <<"1">>, <<"worker_pool_size">> => <<"1">>,
<<"batch_size">> => integer_to_binary(?BATCH_SIZE), <<"batch_size">> => integer_to_binary(?BATCH_SIZE),
<<"start_timeout">> => <<"15s">> <<"start_timeout">> => <<"15s">>,
<<"batch_time">> => <<"4s">>,
<<"request_timeout">> => <<"30s">>
} }
}. }.

View File

@ -33,7 +33,7 @@
connect_timeout := emqx_schema:duration_ms(), connect_timeout := emqx_schema:duration_ms(),
max_retries := non_neg_integer(), max_retries := non_neg_integer(),
pubsub_topic := binary(), pubsub_topic := binary(),
request_timeout := emqx_schema:duration_ms(), resource_opts := #{request_timeout := emqx_schema:duration_ms(), any() => term()},
service_account_json := service_account_json(), service_account_json := service_account_json(),
any() => term() any() => term()
}. }.
@ -71,7 +71,7 @@ on_start(
payload_template := PayloadTemplate, payload_template := PayloadTemplate,
pool_size := PoolSize, pool_size := PoolSize,
pubsub_topic := PubSubTopic, pubsub_topic := PubSubTopic,
request_timeout := RequestTimeout resource_opts := #{request_timeout := RequestTimeout}
} = Config } = Config
) -> ) ->
?SLOG(info, #{ ?SLOG(info, #{