diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl
index ed2baec8f..74d2a5ca1 100644
--- a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl
+++ b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl
@@ -17,6 +17,7 @@
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
+-include_lib("emqx/include/logger.hrl").
-import(hoconsc, [mk/2, ref/2]).
@@ -140,11 +141,7 @@ fields(bridges) ->
#{
desc => ?DESC("bridges_webhook"),
required => false,
- converter => fun(X, _HoconOpts) ->
- emqx_bridge_compatible_config:upgrade_pre_ee(
- X, fun emqx_bridge_compatible_config:webhook_maybe_upgrade/1
- )
- end
+ converter => fun webhook_bridge_converter/2
}
)},
{mqtt,
@@ -212,3 +209,48 @@ status() ->
node_name() ->
{"node", mk(binary(), #{desc => ?DESC("desc_node_name"), example => "emqx@127.0.0.1"})}.
+
+webhook_bridge_converter(Conf0, _HoconOpts) ->
+ Conf1 = emqx_bridge_compatible_config:upgrade_pre_ee(
+ Conf0, fun emqx_bridge_compatible_config:webhook_maybe_upgrade/1
+ ),
+ case Conf1 of
+ undefined ->
+ undefined;
+ _ ->
+ do_convert_webhook_config(Conf1)
+ end.
+
+do_convert_webhook_config(
+ #{<<"request_timeout">> := ReqT, <<"resource_opts">> := #{<<"request_timeout">> := ReqT}} = Conf
+) ->
+ %% ok: same values
+ Conf;
+do_convert_webhook_config(
+ #{
+ <<"request_timeout">> := ReqTRootRaw,
+ <<"resource_opts">> := #{<<"request_timeout">> := ReqTResourceRaw}
+ } = Conf0
+) ->
+ %% different values; we set them to the same, if they are valid
+ %% durations
+ MReqTRoot = emqx_schema:to_duration_ms(ReqTRootRaw),
+ MReqTResource = emqx_schema:to_duration_ms(ReqTResourceRaw),
+ case {MReqTRoot, MReqTResource} of
+ {{ok, ReqTRoot}, {ok, ReqTResource}} ->
+ {_Parsed, ReqTRaw} = max({ReqTRoot, ReqTRootRaw}, {ReqTResource, ReqTResourceRaw}),
+ Conf1 = emqx_map_lib:deep_merge(
+ Conf0,
+ #{
+ <<"request_timeout">> => ReqTRaw,
+ <<"resource_opts">> => #{<<"request_timeout">> => ReqTRaw}
+ }
+ ),
+ Conf1;
+ _ ->
+ %% invalid values; let the type checker complain about
+ %% that.
+ Conf0
+ end;
+do_convert_webhook_config(Conf) ->
+ Conf.
diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
index 5f863ed63..d242111dc 100644
--- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
+++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
@@ -818,6 +818,35 @@ t_metrics(Config) ->
),
ok.
+%% request_timeout in bridge root should match request_timeout in
+%% resource_opts.
+t_inconsistent_webhook_request_timeouts(Config) ->
+ Port = ?config(port, Config),
+ URL1 = ?URL(Port, "path1"),
+ Name = ?BRIDGE_NAME,
+ BadBridgeParams =
+ emqx_map_lib:deep_merge(
+ ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name),
+ #{
+ <<"request_timeout">> => <<"1s">>,
+ <<"resource_opts">> => #{<<"request_timeout">> => <<"2s">>}
+ }
+ ),
+ {ok, 201, RawResponse} = request(
+ post,
+ uri(["bridges"]),
+ BadBridgeParams
+ ),
+ %% note: same value on both fields
+ ?assertMatch(
+ #{
+ <<"request_timeout">> := <<"2s">>,
+ <<"resource_opts">> := #{<<"request_timeout">> := <<"2s">>}
+ },
+ emqx_json:decode(RawResponse, [return_maps])
+ ),
+ ok.
+
operation_path(node, Oper, BridgeID) ->
uri(["nodes", node(), "bridges", BridgeID, Oper]);
operation_path(cluster, Oper, BridgeID) ->
diff --git a/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl b/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl
index 5e0b4912f..acafb84ca 100644
--- a/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl
+++ b/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl
@@ -28,6 +28,7 @@ empty_config_test() ->
webhook_config_test() ->
Conf1 = parse(webhook_v5011_hocon()),
Conf2 = parse(full_webhook_v5011_hocon()),
+ Conf3 = parse(full_webhook_v5019_hocon()),
?assertMatch(
#{
@@ -59,6 +60,26 @@ webhook_config_test() ->
check(Conf2)
),
+ %% the converter should pick the greater of the two
+ %% request_timeouts and place them in the root and inside
+ %% resource_opts.
+ ?assertMatch(
+ #{
+ <<"bridges">> := #{
+ <<"webhook">> := #{
+ <<"the_name">> :=
+ #{
+ <<"method">> := get,
+ <<"request_timeout">> := 60_000,
+ <<"resource_opts">> := #{<<"request_timeout">> := 60_000},
+ <<"body">> := <<"${payload}">>
+ }
+ }
+ }
+ },
+ check(Conf3)
+ ),
+
ok.
up(#{<<"bridges">> := Bridges0} = Conf0) ->
@@ -124,7 +145,7 @@ bridges{
max_retries = 3
method = \"get\"
pool_size = 4
- request_timeout = \"5s\"
+ request_timeout = \"15s\"
ssl {enable = false, verify = \"verify_peer\"}
url = \"http://localhost:8080\"
}
@@ -164,6 +185,41 @@ full_webhook_v5011_hocon() ->
"}\n"
"".
+%% does not contain direction
+full_webhook_v5019_hocon() ->
+ ""
+ "\n"
+ "bridges{\n"
+ " webhook {\n"
+ " the_name{\n"
+ " body = \"${payload}\"\n"
+ " connect_timeout = \"5s\"\n"
+ " enable_pipelining = 100\n"
+ " headers {\"content-type\" = \"application/json\"}\n"
+ " max_retries = 3\n"
+ " method = \"get\"\n"
+ " pool_size = 4\n"
+ " pool_type = \"random\"\n"
+ " request_timeout = \"1m\"\n"
+ " resource_opts = {\n"
+ " request_timeout = \"7s\"\n"
+ " }\n"
+ " ssl {\n"
+ " ciphers = \"\"\n"
+ " depth = 10\n"
+ " enable = false\n"
+ " reuse_sessions = true\n"
+ " secure_renegotiate = true\n"
+ " user_lookup_fun = \"emqx_tls_psk:lookup\"\n"
+ " verify = \"verify_peer\"\n"
+ " versions = [\"tlsv1.3\", \"tlsv1.2\", \"tlsv1.1\", \"tlsv1\"]\n"
+ " }\n"
+ " url = \"http://localhost:8080\"\n"
+ " }\n"
+ " }\n"
+ "}\n"
+ "".
+
%% erlfmt-ignore
%% this is a generated from v5.0.11
mqtt_v5011_hocon() ->
diff --git a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf
index f3ac2fd97..fb6b2eb06 100644
--- a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf
+++ b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf
@@ -102,12 +102,12 @@ For bridges only have ingress direction data flow, it can be set to 0 otherwise
request_timeout {
desc {
- en: """Timeout for requests. If query_mode
is sync
, calls to the resource will be blocked for this amount of time before timing out."""
- zh: """请求的超时。 如果query_mode
是sync
,对资源的调用将在超时前被阻断这一时间。"""
+ en: """Starting from the moment when the request enters the buffer, if the request remains in the buffer for the specified time or is sent but does not receive a response or acknowledgement in time, the request is considered expired."""
+ zh: """从请求进入缓冲区开始计时,如果请求在规定的时间内仍停留在缓冲区内或者已发送但未能及时收到响应或确认,该请求将被视为过期。"""
}
label {
- en: """Request timeout"""
- zh: """请求超时"""
+ en: """Request Expiry"""
+ zh: """请求超期"""
}
}
@@ -159,12 +159,12 @@ When disabled the messages are buffered in RAM only."""
batch_time {
desc {
- en: """Maximum batch waiting interval."""
- zh: """最大批量请求等待时间。"""
+ en: """Maximum waiting interval when accumulating a batch at a low message rates for more efficient resource usage."""
+ zh: """在较低消息率情况下尝试累积批量输出时的最大等待间隔,以提高资源的利用率。"""
}
label {
- en: """Batch time"""
- zh: """批量等待间隔"""
+ en: """Max Batch Wait Time"""
+ zh: """批量等待最大间隔"""
}
}
diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl
index fa7f2eb38..be570e694 100644
--- a/apps/emqx_resource/include/emqx_resource.hrl
+++ b/apps/emqx_resource/include/emqx_resource.hrl
@@ -97,8 +97,8 @@
-define(DEFAULT_BATCH_SIZE, 1).
%% milliseconds
--define(DEFAULT_BATCH_TIME, 20).
--define(DEFAULT_BATCH_TIME_RAW, <<"20ms">>).
+-define(DEFAULT_BATCH_TIME, 0).
+-define(DEFAULT_BATCH_TIME_RAW, <<"0ms">>).
%% count
-define(DEFAULT_INFLIGHT, 100).
diff --git a/apps/emqx_resource/src/emqx_resource.app.src b/apps/emqx_resource/src/emqx_resource.app.src
index cb26c7f09..0cc013099 100644
--- a/apps/emqx_resource/src/emqx_resource.app.src
+++ b/apps/emqx_resource/src/emqx_resource.app.src
@@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_resource, [
{description, "Manager for all external resources"},
- {vsn, "0.1.8"},
+ {vsn, "0.1.9"},
{registered, []},
{mod, {emqx_resource_app, []}},
{applications, [
diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl
index a8ae4454d..d5a50f351 100644
--- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl
+++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl
@@ -196,13 +196,16 @@ init({Id, Index, Opts}) ->
InflightWinSize = maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT),
InflightTID = inflight_new(InflightWinSize, Id, Index),
HealthCheckInterval = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL),
+ RequestTimeout = maps:get(request_timeout, Opts, ?DEFAULT_REQUEST_TIMEOUT),
+ BatchTime0 = maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME),
+ BatchTime = adjust_batch_time(Id, RequestTimeout, BatchTime0),
Data = #{
id => Id,
index => Index,
inflight_tid => InflightTID,
async_workers => #{},
batch_size => BatchSize,
- batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME),
+ batch_time => BatchTime,
queue => Queue,
resume_interval => maps:get(resume_interval, Opts, HealthCheckInterval),
tref => undefined
@@ -1537,6 +1540,12 @@ clear_disk_queue_dir(Id, Index) ->
ensure_flush_timer(Data = #{batch_time := T}) ->
ensure_flush_timer(Data, T).
+ensure_flush_timer(Data = #{tref := undefined}, 0) ->
+ %% if the batch_time is 0, we don't need to start a timer, which
+ %% can be costly at high rates.
+ Ref = make_ref(),
+ self() ! {flush, Ref},
+ Data#{tref => {Ref, Ref}};
ensure_flush_timer(Data = #{tref := undefined}, T) ->
Ref = make_ref(),
TRef = erlang:send_after(T, self(), {flush, Ref}),
@@ -1639,3 +1648,53 @@ do_minimize(?QUERY(_ReplyTo, _Req, _Sent, _ExpireAt) = Query) -> Query.
-else.
do_minimize(?QUERY(ReplyTo, _Req, Sent, ExpireAt)) -> ?QUERY(ReplyTo, [], Sent, ExpireAt).
-endif.
+
+%% To avoid message loss due to misconfigurations, we adjust
+%% `batch_time' based on `request_timeout'. If `batch_time' >
+%% `request_timeout', all requests will timeout before being sent if
+%% the message rate is low. Even worse if `pool_size' is high.
+%% We cap `batch_time' at `request_timeout div 2' as a rule of thumb.
+adjust_batch_time(_Id, _RequestTimeout = infinity, BatchTime0) ->
+ BatchTime0;
+adjust_batch_time(Id, RequestTimeout, BatchTime0) ->
+ BatchTime = max(0, min(BatchTime0, RequestTimeout div 2)),
+ case BatchTime =:= BatchTime0 of
+ false ->
+ ?SLOG(info, #{
+ id => Id,
+ msg => adjusting_buffer_worker_batch_time,
+ new_batch_time => BatchTime
+ });
+ true ->
+ ok
+ end,
+ BatchTime.
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+adjust_batch_time_test_() ->
+ %% just for logging
+ Id = some_id,
+ [
+ {"batch time smaller than request_time/2",
+ ?_assertEqual(
+ 100,
+ adjust_batch_time(Id, 500, 100)
+ )},
+ {"batch time equal to request_time/2",
+ ?_assertEqual(
+ 100,
+ adjust_batch_time(Id, 200, 100)
+ )},
+ {"batch time greater than request_time/2",
+ ?_assertEqual(
+ 50,
+ adjust_batch_time(Id, 100, 100)
+ )},
+ {"batch time smaller than request_time/2 (request_time = infinity)",
+ ?_assertEqual(
+ 100,
+ adjust_batch_time(Id, infinity, 100)
+ )}
+ ].
+-endif.
diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_gcp_pubsub.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_gcp_pubsub.conf
index af2a93f82..b8fa3b43a 100644
--- a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_gcp_pubsub.conf
+++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_gcp_pubsub.conf
@@ -77,8 +77,8 @@ emqx_ee_bridge_gcp_pubsub {
request_timeout {
desc {
- en: "HTTP request timeout."
- zh: "HTTP 请求超时。"
+ en: "Deprecated: Configure the request timeout in the buffer settings."
+ zh: "废弃的。在缓冲区设置中配置请求超时。"
}
label: {
en: "Request Timeout"
diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src
index c30c927f2..05d893a79 100644
--- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src
+++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src
@@ -1,6 +1,6 @@
{application, emqx_ee_bridge, [
{description, "EMQX Enterprise data bridges"},
- {vsn, "0.1.5"},
+ {vsn, "0.1.6"},
{registered, []},
{applications, [
kernel,
diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl
index e00483839..352a7163a 100644
--- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl
+++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl
@@ -84,6 +84,7 @@ fields(bridge_config) ->
emqx_schema:duration_ms(),
#{
required => false,
+ deprecated => {since, "e5.0.1"},
default => <<"15s">>,
desc => ?DESC("request_timeout")
}
diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl
index 222acb77b..452b7a4d2 100644
--- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl
+++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl
@@ -282,7 +282,6 @@ gcp_pubsub_config(Config) ->
"bridges.gcp_pubsub.~s {\n"
" enable = true\n"
" connect_timeout = 1s\n"
- " request_timeout = 1s\n"
" service_account_json = ~s\n"
" payload_template = ~p\n"
" pubsub_topic = ~s\n"
diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl
index 67a9b4a05..4eeebfaf8 100644
--- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl
+++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl
@@ -540,7 +540,9 @@ resource_configs() ->
<<"query_mode">> => <<"sync">>,
<<"worker_pool_size">> => <<"1">>,
<<"batch_size">> => integer_to_binary(?BATCH_SIZE),
- <<"start_timeout">> => <<"15s">>
+ <<"start_timeout">> => <<"15s">>,
+ <<"batch_time">> => <<"4s">>,
+ <<"request_timeout">> => <<"30s">>
}
}.
diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl
index 898c36fe0..f07cbceab 100644
--- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl
+++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl
@@ -33,7 +33,7 @@
connect_timeout := emqx_schema:duration_ms(),
max_retries := non_neg_integer(),
pubsub_topic := binary(),
- request_timeout := emqx_schema:duration_ms(),
+ resource_opts := #{request_timeout := emqx_schema:duration_ms(), any() => term()},
service_account_json := service_account_json(),
any() => term()
}.
@@ -71,7 +71,7 @@ on_start(
payload_template := PayloadTemplate,
pool_size := PoolSize,
pubsub_topic := PubSubTopic,
- request_timeout := RequestTimeout
+ resource_opts := #{request_timeout := RequestTimeout}
} = Config
) ->
?SLOG(info, #{