From 10425eb925f53133a24080113e7a398e990abb11 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 1 Jun 2023 09:37:45 -0300 Subject: [PATCH 1/6] feat(resource): deprecate `auto_restart_interval` in favor of `health_check_interval` See: https://emqx.atlassian.net/wiki/spaces/P/pages/612368639/open+e5.1+remove+auto+restart+interval+from+buffer+worker+resource+options Current problem: In 5.0.x, we have two timer options that control the state changing of buffer worker resources: auto_restart_interval and health_check_interval. - auto_restart_interval controls how often the resource attempts to transition from disconnected to connected. - health_check_interval controls how often the resource is checked and potentially moved from connected to disconnected or connecting. The existence of two independent timers for very similar purposes is confusing to users, QA and even developers. Also, an intimately related configuration is request_timeout, which can interact badly with auto_restart_interval if the latter is poorly configured: requests may always expire if request_timeout < auto_restart_interval and if the resource enters the disconnected state. For health_check_interval, we attempt to derive a sane default that gives requests a chance to retry (if request timeout is finite, then the resource retries requests with a period of min(health_check_interval, request_timeout / 3). Another problem with the separate auto_restart_interval is that its default value (60 s) is too high when compared to the default request timeout and health check, leading to the problems described above if not tuned. Proposed solution: We propose to drop auto_restart_interval in favor of health_check_interval, which will be used for both disconnected -> connected and connected -> {disconnected, connecting} transition checks. With that, the resource will attempt to reconnect at the same interval as the health check, which currently is 15 s. Also, as two smaller changes to accompany this one: - Increase the default request_timeout from 15 s to 45 s. - Rename request_timeout to request_ttl. --- apps/emqx_bridge/src/emqx_bridge_api.erl | 2 - .../schema/emqx_bridge_compatible_config.erl | 1 - .../test/emqx_bridge_api_SUITE.erl | 86 +------------------ .../test/emqx_bridge_webhook_SUITE.erl | 1 - .../src/emqx_bridge_cassandra.app.src | 2 +- .../src/emqx_bridge_cassandra.erl | 1 - .../test/emqx_bridge_cassandra_SUITE.erl | 1 - .../src/emqx_bridge_clickhouse.app.src | 2 +- .../src/emqx_bridge_clickhouse.erl | 1 - .../src/emqx_bridge_dynamo.app.src | 2 +- .../src/emqx_bridge_dynamo.erl | 1 - .../src/emqx_bridge_iotdb.erl | 1 - .../test/emqx_bridge_iotdb_impl_SUITE.erl | 2 +- .../src/emqx_bridge_mqtt.app.src | 2 +- .../test/emqx_bridge_mqtt_SUITE.erl | 12 +-- .../src/emqx_bridge_opents.app.src | 2 +- .../src/emqx_bridge_opents.erl | 1 - .../src/emqx_bridge_oracle.app.src | 2 +- .../src/emqx_bridge_oracle.erl | 1 - .../test/emqx_bridge_oracle_SUITE.erl | 2 - .../src/emqx_bridge_pgsql.app.src | 2 +- .../src/emqx_bridge_pgsql.erl | 1 - .../test/emqx_bridge_pgsql_SUITE.erl | 1 - .../src/emqx_bridge_pulsar.erl | 3 +- .../src/emqx_bridge_rabbitmq.erl | 1 - .../src/emqx_bridge_rocketmq.erl | 1 - .../src/emqx_bridge_sqlserver.app.src | 2 +- .../src/emqx_bridge_sqlserver.erl | 1 - .../src/emqx_bridge_tdengine.erl | 1 - apps/emqx_resource/include/emqx_resource.hrl | 11 +-- .../src/emqx_resource_manager.erl | 4 +- .../src/schema/emqx_resource_schema.erl | 24 +----- .../test/emqx_resource_SUITE.erl | 2 +- .../src/emqx_ee_bridge_mysql.erl | 1 - rel/i18n/emqx_resource_schema.hocon | 6 -- scripts/test/influx/influx-bridge.conf | 1 - 36 files changed, 22 insertions(+), 165 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index bffa7b7f9..ab99a2d86 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -218,7 +218,6 @@ info_example_basic(webhook) -> resource_opts => #{ worker_pool_size => 1, health_check_interval => 15000, - auto_restart_interval => 15000, query_mode => async, inflight_window => 100, max_buffer_bytes => 100 * 1024 * 1024 @@ -244,7 +243,6 @@ mqtt_main_example() -> max_inflight => 100, resource_opts => #{ health_check_interval => <<"15s">>, - auto_restart_interval => <<"60s">>, query_mode => sync, max_buffer_bytes => 100 * 1024 * 1024 }, diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_compatible_config.erl b/apps/emqx_bridge/src/schema/emqx_bridge_compatible_config.erl index 595b75ecf..6743b9cdd 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_compatible_config.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_compatible_config.erl @@ -87,7 +87,6 @@ default_ssl() -> default_resource_opts() -> #{ <<"inflight_window">> => 100, - <<"auto_restart_interval">> => <<"60s">>, <<"health_check_interval">> => <<"15s">>, <<"max_buffer_bytes">> => <<"1GB">>, <<"query_mode">> => <<"sync">>, diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index ecab986e8..bbab0a09a 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -86,8 +86,7 @@ groups() -> SingleOnlyTests = [ t_broken_bpapi_vsn, t_old_bpapi_vsn, - t_bridges_probe, - t_auto_restart_interval + t_bridges_probe ], ClusterLaterJoinOnlyTCs = [t_cluster_later_join_metrics], [ @@ -559,89 +558,6 @@ t_http_crud_apis(Config) -> {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config). -t_auto_restart_interval(Config) -> - Port = ?config(port, Config), - %% assert we there's no bridges at first - {ok, 200, []} = request_json(get, uri(["bridges"]), Config), - - meck:new(emqx_resource, [passthrough]), - meck:expect(emqx_resource, call_start, fun(_, _, _) -> {error, fake_error} end), - - %% then we add a webhook bridge, using POST - %% POST /bridges/ will create a bridge - URL1 = ?URL(Port, "path1"), - Name = ?BRIDGE_NAME, - BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), - BridgeParams = ?HTTP_BRIDGE(URL1, Name)#{ - <<"resource_opts">> => #{<<"auto_restart_interval">> => "1s"} - }, - ?check_trace( - begin - ?assertMatch( - {ok, 201, #{ - <<"type">> := ?BRIDGE_TYPE_HTTP, - <<"name">> := Name, - <<"enable">> := true, - <<"status">> := _, - <<"node_status">> := [_ | _], - <<"url">> := URL1 - }}, - request_json( - post, - uri(["bridges"]), - BridgeParams, - Config - ) - ), - {ok, _} = ?block_until(#{?snk_kind := resource_disconnected_enter}), - {ok, _} = ?block_until(#{?snk_kind := resource_auto_reconnect}, 1500) - end, - fun(Trace0) -> - Trace = ?of_kind(resource_auto_reconnect, Trace0), - ?assertMatch([#{}], Trace), - ok - end - ), - %% delete the bridge - {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config), - {ok, 200, []} = request_json(get, uri(["bridges"]), Config), - - %% auto_retry_interval=infinity - BridgeParams1 = BridgeParams#{ - <<"resource_opts">> => #{<<"auto_restart_interval">> => "infinity"} - }, - ?check_trace( - begin - ?assertMatch( - {ok, 201, #{ - <<"type">> := ?BRIDGE_TYPE_HTTP, - <<"name">> := Name, - <<"enable">> := true, - <<"status">> := _, - <<"node_status">> := [_ | _], - <<"url">> := URL1 - }}, - request_json( - post, - uri(["bridges"]), - BridgeParams1, - Config - ) - ), - {ok, _} = ?block_until(#{?snk_kind := resource_disconnected_enter}), - ?assertEqual(timeout, ?block_until(#{?snk_kind := resource_auto_reconnect}, 1500)) - end, - fun(Trace0) -> - Trace = ?of_kind(resource_auto_reconnect, Trace0), - ?assertMatch([], Trace), - ok - end - ), - %% delete the bridge - {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config), - {ok, 200, []} = request_json(get, uri(["bridges"]), Config), - meck:unload(emqx_resource). - t_http_bridges_local_topic(Config) -> Port = ?config(port, Config), %% assert we there's no bridges at first diff --git a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl index 45cc82251..ac98a08d7 100644 --- a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl @@ -182,7 +182,6 @@ bridge_async_config(#{port := Port} = Config) -> " body = \"${id}\"" " resource_opts {\n" " inflight_window = 100\n" - " auto_restart_interval = \"60s\"\n" " health_check_interval = \"15s\"\n" " max_buffer_bytes = \"1GB\"\n" " query_mode = \"~s\"\n" diff --git a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src index 1bde274f3..ea3495e0f 100644 --- a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src +++ b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_cassandra, [ {description, "EMQX Enterprise Cassandra Bridge"}, - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {registered, []}, {applications, [kernel, stdlib, ecql]}, {env, []}, diff --git a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.erl b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.erl index e8f7d50ce..2724b7c09 100644 --- a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.erl +++ b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.erl @@ -59,7 +59,6 @@ values(_Method, Type) -> resource_opts => #{ worker_pool_size => 8, health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, - auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW, batch_size => ?DEFAULT_BATCH_SIZE, batch_time => ?DEFAULT_BATCH_TIME, query_mode => sync, diff --git a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl index 11014a596..307ef8cdf 100644 --- a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl +++ b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl @@ -511,7 +511,6 @@ t_write_failure(Config) -> #{ <<"resource_opts">> => #{ - <<"auto_restart_interval">> => <<"100ms">>, <<"resume_interval">> => <<"100ms">>, <<"health_check_interval">> => <<"100ms">> } diff --git a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src index 72669ba8f..58a92fde4 100644 --- a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src +++ b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_clickhouse, [ {description, "EMQX Enterprise ClickHouse Bridge"}, - {vsn, "0.2.0"}, + {vsn, "0.2.1"}, {registered, []}, {applications, [kernel, stdlib, clickhouse, emqx_resource]}, {env, []}, diff --git a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.erl b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.erl index 9abcadbba..deca42154 100644 --- a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.erl +++ b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.erl @@ -56,7 +56,6 @@ values(_Method, Type) -> resource_opts => #{ worker_pool_size => 8, health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, - auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW, batch_size => ?DEFAULT_BATCH_SIZE, batch_time => ?DEFAULT_BATCH_TIME, query_mode => async, diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src index 2d2e299d2..0e202b714 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_dynamo, [ {description, "EMQX Enterprise Dynamo Bridge"}, - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {registered, []}, {applications, [kernel, stdlib, erlcloud]}, {env, []}, diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.erl b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.erl index 251e79ca2..6ddae57f7 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.erl +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.erl @@ -52,7 +52,6 @@ values(_Method) -> resource_opts => #{ worker_pool_size => 8, health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, - auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW, batch_size => ?DEFAULT_BATCH_SIZE, batch_time => ?DEFAULT_BATCH_TIME, query_mode => sync, diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl index d003864fb..2948bd59c 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl @@ -227,7 +227,6 @@ conn_bridge_example(_Method, Type) -> resource_opts => #{ worker_pool_size => 8, health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, - auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW, query_mode => async, max_buffer_bytes => ?DEFAULT_BUFFER_BYTES } diff --git a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl index 434587cf0..9a868c900 100644 --- a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl +++ b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl @@ -101,7 +101,7 @@ bridge_config(TestCase, _TestGroup, Config) -> " }\n" " pool_size = 1\n" " resource_opts = {\n" - " auto_restart_interval = 5000\n" + " health_check_interval = 5000\n" " request_timeout = 30000\n" " query_mode = \"async\"\n" " worker_pool_size = 1\n" diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src index 54d7ffbed..660daca9c 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge_mqtt, [ {description, "EMQX MQTT Broker Bridge"}, - {vsn, "0.1.0"}, + {vsn, "0.1.1"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl index 81f9a3573..b44720400 100644 --- a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl @@ -756,10 +756,8 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> <<"query_mode">> => <<"sync">>, %% using a long time so we can test recovery <<"request_timeout">> => <<"15s">>, - %% to make it check the healthy quickly - <<"health_check_interval">> => <<"0.5s">>, - %% to make it reconnect quickly - <<"auto_restart_interval">> => <<"1s">> + %% to make it check the healthy and reconnect quickly + <<"health_check_interval">> => <<"0.5s">> } } ), @@ -866,10 +864,8 @@ t_mqtt_conn_bridge_egress_async_reconnect(_) -> <<"query_mode">> => <<"async">>, %% using a long time so we can test recovery <<"request_timeout">> => <<"15s">>, - %% to make it check the healthy quickly - <<"health_check_interval">> => <<"0.5s">>, - %% to make it reconnect quickly - <<"auto_restart_interval">> => <<"1s">> + %% to make it check the healthy and reconnect quickly + <<"health_check_interval">> => <<"0.5s">> } } ), diff --git a/apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src b/apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src index d001446b3..9037b8840 100644 --- a/apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src +++ b/apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_opents, [ {description, "EMQX Enterprise OpenTSDB Bridge"}, - {vsn, "0.1.0"}, + {vsn, "0.1.1"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_opents/src/emqx_bridge_opents.erl b/apps/emqx_bridge_opents/src/emqx_bridge_opents.erl index 2eb6a554f..cfb12453d 100644 --- a/apps/emqx_bridge_opents/src/emqx_bridge_opents.erl +++ b/apps/emqx_bridge_opents/src/emqx_bridge_opents.erl @@ -42,7 +42,6 @@ values(_Method) -> resource_opts => #{ worker_pool_size => 1, health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, - auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW, batch_size => ?DEFAULT_BATCH_SIZE, batch_time => ?DEFAULT_BATCH_TIME, query_mode => async, diff --git a/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.app.src b/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.app.src index 4a2549f7c..ad96b4744 100644 --- a/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.app.src +++ b/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_oracle, [ {description, "EMQX Enterprise Oracle Database Bridge"}, - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.erl b/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.erl index 7f384c5e6..49d9d6914 100644 --- a/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.erl +++ b/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.erl @@ -50,7 +50,6 @@ values(_Method) -> resource_opts => #{ worker_pool_size => 8, health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, - auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW, batch_size => ?DEFAULT_BATCH_SIZE, batch_time => ?DEFAULT_BATCH_TIME, query_mode => async, diff --git a/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl b/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl index 721beab6e..119817aaf 100644 --- a/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl +++ b/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl @@ -203,11 +203,9 @@ oracle_config(TestCase, _ConnectionType, Config) -> " pool_size = 1\n" " sql = \"~s\"\n" " resource_opts = {\n" - " auto_restart_interval = \"5s\"\n" " health_check_interval = \"5s\"\n" " request_timeout = \"30s\"\n" " query_mode = \"async\"\n" - " enable_batch = true\n" " batch_size = 3\n" " batch_time = \"3s\"\n" " worker_pool_size = 1\n" diff --git a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src index a310b46b4..5a72107a4 100644 --- a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src +++ b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_pgsql, [ {description, "EMQX Enterprise PostgreSQL Bridge"}, - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {registered, []}, {applications, [kernel, stdlib]}, {env, []}, diff --git a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl index 4615b6789..12161b9b9 100644 --- a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl +++ b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl @@ -55,7 +55,6 @@ values(_Method, Type) -> resource_opts => #{ worker_pool_size => 8, health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, - auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW, batch_size => ?DEFAULT_BATCH_SIZE, batch_time => ?DEFAULT_BATCH_TIME, query_mode => async, diff --git a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl index e4f17d76a..eca841f11 100644 --- a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl +++ b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl @@ -503,7 +503,6 @@ t_write_timeout(Config) -> Config, #{ <<"resource_opts">> => #{ - <<"auto_restart_interval">> => <<"100ms">>, <<"resume_interval">> => <<"100ms">>, <<"health_check_interval">> => <<"100ms">> } diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl index 602e9cfdd..aa1076d33 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl @@ -154,8 +154,7 @@ fields(producer_resource_opts) -> health_check_interval, resume_interval, start_after_created, - start_timeout, - auto_restart_interval + start_timeout ], lists:filtermap( fun diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.erl index c4897fa39..608e0a669 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.erl +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.erl @@ -57,7 +57,6 @@ values(_Method, Type) -> resource_opts => #{ worker_pool_size => 8, health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, - auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW, batch_size => ?DEFAULT_BATCH_SIZE, batch_time => ?DEFAULT_BATCH_TIME, query_mode => async, diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.erl b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.erl index a4a942d0e..b3149fa99 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.erl +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.erl @@ -52,7 +52,6 @@ values(post) -> resource_opts => #{ worker_pool_size => 1, health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, - auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW, batch_size => ?DEFAULT_BATCH_SIZE, batch_time => ?DEFAULT_BATCH_TIME, query_mode => sync, diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src index a0b4e287b..e5c5ae73d 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_sqlserver, [ {description, "EMQX Enterprise SQL Server Bridge"}, - {vsn, "0.1.0"}, + {vsn, "0.1.1"}, {registered, []}, {applications, [kernel, stdlib, odbc]}, {env, []}, diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.erl b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.erl index 8a97cb2ad..6ea6424dd 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.erl +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.erl @@ -56,7 +56,6 @@ values(post) -> resource_opts => #{ worker_pool_size => 1, health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, - auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW, batch_size => ?DEFAULT_BATCH_SIZE, batch_time => ?DEFAULT_BATCH_TIME, query_mode => async, diff --git a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.erl b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.erl index abdc26592..0b618487d 100644 --- a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.erl +++ b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.erl @@ -54,7 +54,6 @@ values(_Method) -> resource_opts => #{ worker_pool_size => 8, health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, - auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW, batch_size => ?DEFAULT_BATCH_SIZE, batch_time => ?DEFAULT_BATCH_TIME, query_mode => sync, diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index c1f1d1391..b640d17e5 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -51,8 +51,10 @@ health_check_timeout => integer(), %% use start_timeout instead wait_for_resource_ready => integer(), - %% use auto_restart_interval instead + %% use health_check_interval instead auto_retry_interval => integer(), + %% use health_check_interval instead + auto_restart_interval => pos_integer() | infinity, %%======================================= Deprecated Opts END worker_pool_size => non_neg_integer(), %% use `integer()` compatibility to release 5.0.0 bpapi @@ -64,9 +66,6 @@ %% after it is created. But note that a `started` resource is not guaranteed %% to be `connected`. start_after_created => boolean(), - %% If the resource disconnected, we can set to retry starting the resource - %% periodically. - auto_restart_interval => pos_integer() | infinity, batch_size => pos_integer(), batch_time => pos_integer(), max_buffer_bytes => pos_integer(), @@ -115,10 +114,6 @@ -define(START_AFTER_CREATED, true). -define(START_AFTER_CREATED_RAW, <<"true">>). -%% milliseconds --define(AUTO_RESTART_INTERVAL, 60000). --define(AUTO_RESTART_INTERVAL_RAW, <<"60s">>). - -define(TEST_ID_PREFIX, "_probe_:"). -define(RES_METRICS, resource_metrics). diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index d7c2a4bd3..02dda8021 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -448,11 +448,9 @@ try_read_cache(ResId) -> end. retry_actions(Data) -> - case maps:get(auto_restart_interval, Data#data.opts, ?AUTO_RESTART_INTERVAL) of + case maps:get(health_check_interval, Data#data.opts, ?HEALTHCHECK_INTERVAL) of undefined -> []; - infinity -> - []; RetryInterval -> [{state_timeout, RetryInterval, auto_retry}] end. diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl index 8b2a68c4b..5c7572ebf 100644 --- a/apps/emqx_resource/src/schema/emqx_resource_schema.erl +++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl @@ -26,8 +26,6 @@ %% range interval in ms -define(HEALTH_CHECK_INTERVAL_RANGE_MIN, 1). -define(HEALTH_CHECK_INTERVAL_RANGE_MAX, 3_600_000). --define(AUTO_RESTART_INTERVAL_RANGE_MIN, 1). --define(AUTO_RESTART_INTERVAL_RANGE_MAX, 3_600_000). %% ------------------------------------------------------------------------------------------------- %% Hocon Schema Definitions @@ -124,29 +122,11 @@ start_timeout(required) -> false; start_timeout(_) -> undefined. auto_restart_interval(type) -> hoconsc:union([infinity, emqx_schema:duration_ms()]); -auto_restart_interval(desc) -> ?DESC("auto_restart_interval"); -auto_restart_interval(default) -> ?AUTO_RESTART_INTERVAL_RAW; +auto_restart_interval(default) -> <<"15s">>; auto_restart_interval(required) -> false; -auto_restart_interval(validator) -> fun auto_restart_interval_range/1; +auto_restart_interval(deprecated) -> {since, "5.1.0"}; auto_restart_interval(_) -> undefined. -auto_restart_interval_range(infinity) -> - ok; -auto_restart_interval_range(AutoRestartInterval) when - is_integer(AutoRestartInterval) andalso - AutoRestartInterval >= ?AUTO_RESTART_INTERVAL_RANGE_MIN andalso - AutoRestartInterval =< ?AUTO_RESTART_INTERVAL_RANGE_MAX --> - ok; -auto_restart_interval_range(AutoRestartInterval) -> - Message = get_out_of_range_msg( - <<"Auto Restart Interval">>, - AutoRestartInterval, - ?AUTO_RESTART_INTERVAL_RANGE_MIN, - ?AUTO_RESTART_INTERVAL_RANGE_MAX - ), - {error, Message}. - query_mode(type) -> enum([sync, async]); query_mode(desc) -> ?DESC("query_mode"); query_mode(default) -> async; diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 508b8d96b..5b07523aa 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -2887,7 +2887,7 @@ do_t_resource_activate_alarm_once(ResourceConfig, SubscribeEvent) -> ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, ResourceConfig, - #{auto_restart_interval => 100, health_check_interval => 100} + #{health_check_interval => 100} ), #{?snk_kind := resource_activate_alarm, resource_id := ?ID} ), diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl index 7914c77e2..4d041135f 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl @@ -53,7 +53,6 @@ values(_Method) -> resource_opts => #{ worker_pool_size => 1, health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, - auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW, batch_size => ?DEFAULT_BATCH_SIZE, batch_time => ?DEFAULT_BATCH_TIME, query_mode => async, diff --git a/rel/i18n/emqx_resource_schema.hocon b/rel/i18n/emqx_resource_schema.hocon index 8fc781794..ccf8e37ac 100644 --- a/rel/i18n/emqx_resource_schema.hocon +++ b/rel/i18n/emqx_resource_schema.hocon @@ -1,11 +1,5 @@ emqx_resource_schema { -auto_restart_interval.desc: -"""The auto restart interval after the resource is disconnected.""" - -auto_restart_interval.label: -"""Auto Restart Interval""" - batch_size.desc: """Maximum batch count. If equal to 1, there's effectively no batching.""" diff --git a/scripts/test/influx/influx-bridge.conf b/scripts/test/influx/influx-bridge.conf index 0574ac38a..b14ecc2f1 100644 --- a/scripts/test/influx/influx-bridge.conf +++ b/scripts/test/influx/influx-bridge.conf @@ -7,7 +7,6 @@ bridges { precision = "ms" resource_opts { inflight_window = 100 - auto_restart_interval = "60s" batch_size = 100 batch_time = "10ms" health_check_interval = "15s" From f42ccb6262a4a24615b66c178ef0ed4149dea227 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 1 Jun 2023 09:40:11 -0300 Subject: [PATCH 2/6] feat(resource): increase default request timeout to 45 s See https://emqx.atlassian.net/wiki/spaces/P/pages/612368639/open+e5.1+remove+auto+restart+interval+from+buffer+worker+resource+options --- apps/emqx_resource/include/emqx_resource.hrl | 3 ++- apps/emqx_resource/src/schema/emqx_resource_schema.erl | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index b640d17e5..32d1e94d0 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -86,7 +86,8 @@ -define(DEFAULT_BUFFER_BYTES, 256 * 1024 * 1024). -define(DEFAULT_BUFFER_BYTES_RAW, <<"256MB">>). --define(DEFAULT_REQUEST_TIMEOUT, timer:seconds(15)). +-define(DEFAULT_REQUEST_TIMEOUT, timer:seconds(45)). +-define(DEFAULT_REQUEST_TIMEOUT_RAW, <<"45s">>). %% count -define(DEFAULT_BATCH_SIZE, 1). diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl index 5c7572ebf..713af5bc9 100644 --- a/apps/emqx_resource/src/schema/emqx_resource_schema.erl +++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl @@ -135,7 +135,7 @@ query_mode(_) -> undefined. request_timeout(type) -> hoconsc:union([infinity, emqx_schema:duration_ms()]); request_timeout(desc) -> ?DESC("request_timeout"); -request_timeout(default) -> <<"15s">>; +request_timeout(default) -> ?DEFAULT_REQUEST_TIMEOUT_RAW; request_timeout(_) -> undefined. enable_batch(type) -> boolean(); From 99796224d844ea56bee738522807a6513d885ae9 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 1 Jun 2023 10:18:08 -0300 Subject: [PATCH 3/6] refactor(resource): rename `request_timeout` -> `request_ttl` See https://emqx.atlassian.net/wiki/spaces/P/pages/612368639/open+e5.1+remove+auto+restart+interval+from+buffer+worker+resource+options --- apps/emqx_bridge/src/emqx_bridge.erl | 4 +-- apps/emqx_bridge/src/emqx_bridge_resource.erl | 6 ++-- .../test/emqx_bridge_api_SUITE.erl | 12 ++++---- .../emqx_bridge_compatible_config_tests.erl | 2 +- .../test/emqx_bridge_webhook_SUITE.erl | 12 ++++---- .../test/emqx_bridge_cassandra_SUITE.erl | 4 +-- .../test/emqx_bridge_dynamo_SUITE.erl | 2 +- .../src/emqx_bridge_gcp_pubsub.app.src | 2 +- .../src/emqx_bridge_gcp_pubsub_connector.erl | 16 +++++----- .../test/emqx_bridge_gcp_pubsub_SUITE.erl | 4 +-- .../test/emqx_bridge_influxdb_SUITE.erl | 4 +-- .../test/emqx_bridge_iotdb_impl_SUITE.erl | 2 +- .../test/emqx_bridge_mqtt_SUITE.erl | 4 +-- .../test/emqx_bridge_opents_SUITE.erl | 4 +-- .../test/emqx_bridge_oracle_SUITE.erl | 2 +- .../test/emqx_bridge_pgsql_SUITE.erl | 2 +- .../test/emqx_bridge_rocketmq_SUITE.erl | 2 +- .../src/emqx_bridge_sqlserver_connector.erl | 6 ++-- .../test/emqx_bridge_sqlserver_SUITE.erl | 2 +- .../test/emqx_bridge_tdengine_SUITE.erl | 4 +-- apps/emqx_resource/include/emqx_resource.hrl | 4 +-- .../src/emqx_resource_buffer_worker.erl | 30 +++++++++---------- .../src/schema/emqx_resource_schema.erl | 11 +++---- .../test/emqx_resource_SUITE.erl | 8 ++--- .../test/emqx_ee_bridge_mysql_SUITE.erl | 2 +- .../test/emqx_ee_bridge_redis_SUITE.erl | 2 +- rel/i18n/emqx_bridge_gcp_pubsub.hocon | 6 ---- rel/i18n/emqx_resource_schema.hocon | 6 ++-- scripts/test/influx/influx-bridge.conf | 2 +- 29 files changed, 81 insertions(+), 86 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 46c822ed0..591dd9e0d 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -216,9 +216,9 @@ send_message(BridgeType, BridgeName, ResId, Message) -> end. query_opts(Config) -> - case emqx_utils_maps:deep_get([resource_opts, request_timeout], Config, false) of + case emqx_utils_maps:deep_get([resource_opts, request_ttl], Config, false) of Timeout when is_integer(Timeout) orelse Timeout =:= infinity -> - %% request_timeout is configured + %% request_ttl is configured #{timeout => Timeout}; _ -> %% emqx_resource has a default value (15s) diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index eee39bd56..d7234a6bf 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -327,8 +327,8 @@ parse_confs( Reason1 = emqx_utils:readable_error_msg(Reason), invalid_data(<<"Invalid URL: ", Url1/binary, ", details: ", Reason1/binary>>) end, - RequestTimeout = emqx_utils_maps:deep_get( - [resource_opts, request_timeout], + RequestTTL = emqx_utils_maps:deep_get( + [resource_opts, request_ttl], Conf ), Conf#{ @@ -339,7 +339,7 @@ parse_confs( method => Method, body => maps:get(body, Conf, undefined), headers => Headers, - request_timeout => RequestTimeout, + request_ttl => RequestTTL, max_retries => Retry } }; diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index bbab0a09a..2f5d65259 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -1300,7 +1300,7 @@ t_metrics(Config) -> ), ok. -%% request_timeout in bridge root should match request_timeout in +%% request_timeout in bridge root should match request_ttl in %% resource_opts. t_inconsistent_webhook_request_timeouts(Config) -> Port = ?config(port, Config), @@ -1311,7 +1311,7 @@ t_inconsistent_webhook_request_timeouts(Config) -> ?HTTP_BRIDGE(URL1, Name), #{ <<"request_timeout">> => <<"1s">>, - <<"resource_opts">> => #{<<"request_timeout">> => <<"2s">>} + <<"resource_opts">> => #{<<"request_ttl">> => <<"2s">>} } ), %% root request_timeout is deprecated for bridge. @@ -1326,8 +1326,8 @@ t_inconsistent_webhook_request_timeouts(Config) -> Config ), ?assertNot(maps:is_key(<<"request_timeout">>, Response)), - ?assertMatch(#{<<"request_timeout">> := <<"2s">>}, ResourceOpts), - validate_resource_request_timeout(proplists:get_value(group, Config), 2000, Name), + ?assertMatch(#{<<"request_ttl">> := <<"2s">>}, ResourceOpts), + validate_resource_request_ttl(proplists:get_value(group, Config), 2000, Name), ok. t_cluster_later_join_metrics(Config) -> @@ -1368,7 +1368,7 @@ t_cluster_later_join_metrics(Config) -> ), ok. -validate_resource_request_timeout(single, Timeout, Name) -> +validate_resource_request_ttl(single, Timeout, Name) -> SentData = #{payload => <<"Hello EMQX">>, timestamp => 1668602148000}, BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), ResId = emqx_bridge_resource:resource_id(<<"webhook">>, Name), @@ -1388,7 +1388,7 @@ validate_resource_request_timeout(single, Timeout, Name) -> ok end ); -validate_resource_request_timeout(_Cluster, _Timeout, _Name) -> +validate_resource_request_ttl(_Cluster, _Timeout, _Name) -> ignore. %% diff --git a/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl b/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl index 7c8ee4f4d..540c18878 100644 --- a/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl +++ b/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl @@ -71,7 +71,7 @@ webhook_config_test() -> } } } = check(Conf3), - ?assertMatch(#{<<"request_timeout">> := infinity}, ResourceOpts), + ?assertMatch(#{<<"request_ttl">> := infinity}, ResourceOpts), ok. up(#{<<"bridges">> := Bridges0} = Conf0) -> diff --git a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl index ac98a08d7..4fc76fc9e 100644 --- a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl @@ -167,7 +167,7 @@ bridge_async_config(#{port := Port} = Config) -> ConnectTimeout = maps:get(connect_timeout, Config, 1), RequestTimeout = maps:get(request_timeout, Config, 10000), ResumeInterval = maps:get(resume_interval, Config, "1s"), - ResourceRequestTimeout = maps:get(resource_request_timeout, Config, "infinity"), + ResourceRequestTTL = maps:get(resource_request_ttl, Config, "infinity"), ConfigString = io_lib:format( "bridges.~s.~s {\n" " url = \"http://localhost:~p\"\n" @@ -185,7 +185,7 @@ bridge_async_config(#{port := Port} = Config) -> " health_check_interval = \"15s\"\n" " max_buffer_bytes = \"1GB\"\n" " query_mode = \"~s\"\n" - " request_timeout = \"~p\"\n" + " request_ttl = \"~p\"\n" " resume_interval = \"~s\"\n" " start_after_created = \"true\"\n" " start_timeout = \"5s\"\n" @@ -203,7 +203,7 @@ bridge_async_config(#{port := Port} = Config) -> PoolSize, RequestTimeout, QueryMode, - ResourceRequestTimeout, + ResourceRequestTTL, ResumeInterval ] ), @@ -246,7 +246,7 @@ t_send_async_connection_timeout(_Config) -> query_mode => "async", connect_timeout => ResponseDelayMS * 2, request_timeout => 10000, - resource_request_timeout => "infinity" + resource_request_ttl => "infinity" }), NumberOfMessagesToSend = 10, [ @@ -268,7 +268,7 @@ t_async_free_retries(_Config) -> query_mode => "sync", connect_timeout => 1_000, request_timeout => 10_000, - resource_request_timeout => "10000s" + resource_request_ttl => "10000s" }), %% Fail 5 times then succeed. Context = #{error_attempts => 5}, @@ -294,7 +294,7 @@ t_async_common_retries(_Config) -> resume_interval => "100ms", connect_timeout => 1_000, request_timeout => 10_000, - resource_request_timeout => "10000s" + resource_request_ttl => "10000s" }), %% Keeps failing until connector gives up. Context = #{error_attempts => infinity}, diff --git a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl index 307ef8cdf..5525a640c 100644 --- a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl +++ b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl @@ -218,7 +218,7 @@ cassa_config(BridgeType, Config) -> " password = ~p\n" " cql = ~p\n" " resource_opts = {\n" - " request_timeout = 500ms\n" + " request_ttl = 500ms\n" " batch_size = ~b\n" " query_mode = ~s\n" " }\n" @@ -635,7 +635,7 @@ t_bad_sql_parameter(Config) -> Config, #{ <<"resource_opts">> => #{ - <<"request_timeout">> => 500, + <<"request_ttl">> => 500, <<"resume_interval">> => 100, <<"health_check_interval">> => 100 } diff --git a/apps/emqx_bridge_dynamo/test/emqx_bridge_dynamo_SUITE.erl b/apps/emqx_bridge_dynamo/test/emqx_bridge_dynamo_SUITE.erl index da87f6047..ac2b59229 100644 --- a/apps/emqx_bridge_dynamo/test/emqx_bridge_dynamo_SUITE.erl +++ b/apps/emqx_bridge_dynamo/test/emqx_bridge_dynamo_SUITE.erl @@ -170,7 +170,7 @@ dynamo_config(BridgeType, Config) -> " aws_access_key_id = ~p\n" " aws_secret_access_key = ~p\n" " resource_opts = {\n" - " request_timeout = 500ms\n" + " request_ttl = 500ms\n" " batch_size = ~b\n" " query_mode = ~s\n" " }\n" diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src index 2b3d359d3..f7401d423 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_gcp_pubsub, [ {description, "EMQX Enterprise GCP Pub/Sub Bridge"}, - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl index be5e56e85..d2f6bbee4 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl @@ -32,7 +32,7 @@ connect_timeout := emqx_schema:duration_ms(), max_retries := non_neg_integer(), pubsub_topic := binary(), - resource_opts := #{request_timeout := emqx_schema:duration_ms(), any() => term()}, + resource_opts := #{request_ttl := infinity | emqx_schema:duration_ms(), any() => term()}, service_account_json := service_account_json(), any() => term() }. @@ -44,7 +44,7 @@ pool_name := binary(), project_id := binary(), pubsub_topic := binary(), - request_timeout := timer:time() + request_ttl := infinity | timer:time() }. -type headers() :: [{binary(), iodata()}]. -type body() :: iodata(). @@ -69,7 +69,7 @@ on_start( payload_template := PayloadTemplate, pool_size := PoolSize, pubsub_topic := PubSubTopic, - resource_opts := #{request_timeout := RequestTimeout} + resource_opts := #{request_ttl := RequestTTL} } = Config ) -> ?SLOG(info, #{ @@ -108,7 +108,7 @@ on_start( pool_name => ResourceId, project_id => ProjectId, pubsub_topic => PubSubTopic, - request_timeout => RequestTimeout + request_ttl => RequestTTL }, ?tp( gcp_pubsub_on_start_before_starting_pool, @@ -344,7 +344,7 @@ do_send_requests_sync(State, Requests, ResourceId) -> #{ pool_name := PoolName, max_retries := MaxRetries, - request_timeout := RequestTimeout + request_ttl := RequestTTL } = State, ?tp( gcp_pubsub_bridge_do_send_requests, @@ -371,7 +371,7 @@ do_send_requests_sync(State, Requests, ResourceId) -> PoolName, Method, Request, - RequestTimeout, + RequestTTL, MaxRetries ) of @@ -467,7 +467,7 @@ do_send_requests_sync(State, Requests, ResourceId) -> do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId) -> #{ pool_name := PoolName, - request_timeout := RequestTimeout + request_ttl := RequestTTL } = State, ?tp( gcp_pubsub_bridge_do_send_requests, @@ -494,7 +494,7 @@ do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId) -> Worker, Method, Request, - RequestTimeout, + RequestTTL, {fun ?MODULE:reply_delegator/3, [ResourceId, ReplyFunAndArgs]} ), {ok, Worker}. diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl index 65b88d45b..b5f22f727 100644 --- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl @@ -287,7 +287,7 @@ gcp_pubsub_config(Config) -> " pool_size = 1\n" " pipelining = ~b\n" " resource_opts = {\n" - " request_timeout = 500ms\n" + " request_ttl = 500ms\n" " metrics_flush_interval = 700ms\n" " worker_pool_size = 1\n" " query_mode = ~s\n" @@ -627,7 +627,7 @@ t_publish_success_infinity_timeout(Config) -> ServiceAccountJSON = ?config(service_account_json, Config), Topic = <<"t/topic">>, {ok, _} = create_bridge(Config, #{ - <<"resource_opts">> => #{<<"request_timeout">> => <<"infinity">>} + <<"resource_opts">> => #{<<"request_ttl">> => <<"infinity">>} }), {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), diff --git a/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl b/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl index 825721052..8421f4e21 100644 --- a/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl +++ b/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl @@ -277,7 +277,7 @@ influxdb_config(apiv1 = Type, InfluxDBHost, InfluxDBPort, Config) -> " precision = ns\n" " write_syntax = \"~s\"\n" " resource_opts = {\n" - " request_timeout = 1s\n" + " request_ttl = 1s\n" " query_mode = ~s\n" " batch_size = ~b\n" " }\n" @@ -314,7 +314,7 @@ influxdb_config(apiv2 = Type, InfluxDBHost, InfluxDBPort, Config) -> " precision = ns\n" " write_syntax = \"~s\"\n" " resource_opts = {\n" - " request_timeout = 1s\n" + " request_ttl = 1s\n" " query_mode = ~s\n" " batch_size = ~b\n" " }\n" diff --git a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl index 9a868c900..2c24466df 100644 --- a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl +++ b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl @@ -102,7 +102,7 @@ bridge_config(TestCase, _TestGroup, Config) -> " pool_size = 1\n" " resource_opts = {\n" " health_check_interval = 5000\n" - " request_timeout = 30000\n" + " request_ttl = 30000\n" " query_mode = \"async\"\n" " worker_pool_size = 1\n" " }\n" diff --git a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl index b44720400..d14e0597e 100644 --- a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl @@ -755,7 +755,7 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> <<"worker_pool_size">> => 2, <<"query_mode">> => <<"sync">>, %% using a long time so we can test recovery - <<"request_timeout">> => <<"15s">>, + <<"request_ttl">> => <<"15s">>, %% to make it check the healthy and reconnect quickly <<"health_check_interval">> => <<"0.5s">> } @@ -863,7 +863,7 @@ t_mqtt_conn_bridge_egress_async_reconnect(_) -> <<"worker_pool_size">> => 2, <<"query_mode">> => <<"async">>, %% using a long time so we can test recovery - <<"request_timeout">> => <<"15s">>, + <<"request_ttl">> => <<"15s">>, %% to make it check the healthy and reconnect quickly <<"health_check_interval">> => <<"0.5s">> } diff --git a/apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl b/apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl index 6f444b93e..3563e0774 100644 --- a/apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl +++ b/apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl @@ -127,7 +127,7 @@ opents_config(BridgeType, Config) -> " enable = true\n" " server = ~p\n" " resource_opts = {\n" - " request_timeout = 500ms\n" + " request_ttl = 500ms\n" " batch_size = ~b\n" " query_mode = sync\n" " }\n" @@ -298,7 +298,7 @@ t_write_timeout(Config) -> Config, #{ <<"resource_opts">> => #{ - <<"request_timeout">> => 500, + <<"request_ttl">> => 500, <<"resume_interval">> => 100, <<"health_check_interval">> => 100 } diff --git a/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl b/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl index 119817aaf..48d784633 100644 --- a/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl +++ b/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl @@ -204,7 +204,7 @@ oracle_config(TestCase, _ConnectionType, Config) -> " sql = \"~s\"\n" " resource_opts = {\n" " health_check_interval = \"5s\"\n" - " request_timeout = \"30s\"\n" + " request_ttl = \"30s\"\n" " query_mode = \"async\"\n" " batch_size = 3\n" " batch_time = \"3s\"\n" diff --git a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl index eca841f11..4e7da85bd 100644 --- a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl +++ b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl @@ -193,7 +193,7 @@ pgsql_config(BridgeType, Config) -> " password = ~p\n" " sql = ~p\n" " resource_opts = {\n" - " request_timeout = 500ms\n" + " request_ttl = 500ms\n" " batch_size = ~b\n" " query_mode = ~s\n" " }\n" diff --git a/apps/emqx_bridge_rocketmq/test/emqx_bridge_rocketmq_SUITE.erl b/apps/emqx_bridge_rocketmq/test/emqx_bridge_rocketmq_SUITE.erl index 90047e577..202c9cc4d 100644 --- a/apps/emqx_bridge_rocketmq/test/emqx_bridge_rocketmq_SUITE.erl +++ b/apps/emqx_bridge_rocketmq/test/emqx_bridge_rocketmq_SUITE.erl @@ -139,7 +139,7 @@ rocketmq_config(BridgeType, Config) -> " servers = ~p\n" " topic = ~p\n" " resource_opts = {\n" - " request_timeout = 1500ms\n" + " request_ttl = 1500ms\n" " batch_size = ~b\n" " query_mode = ~s\n" " }\n" diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl index ed8134051..72962229a 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl @@ -55,8 +55,8 @@ default_port => ?SQLSERVER_DEFAULT_PORT }). --define(REQUEST_TIMEOUT(RESOURCE_OPTS), - maps:get(request_timeout, RESOURCE_OPTS, ?DEFAULT_REQUEST_TIMEOUT) +-define(REQUEST_TTL(RESOURCE_OPTS), + maps:get(request_ttl, RESOURCE_OPTS, ?DEFAULT_REQUEST_TTL) ). -define(BATCH_INSERT_TEMP, batch_insert_temp). @@ -388,7 +388,7 @@ worker_do_insert( ) -> LogMeta = #{connector => ResourceId, state => State}, try - case execute(Conn, SQL, ?REQUEST_TIMEOUT(ResourceOpts)) of + case execute(Conn, SQL, ?REQUEST_TTL(ResourceOpts)) of {selected, Rows, _} -> {ok, Rows}; {updated, _} -> diff --git a/apps/emqx_bridge_sqlserver/test/emqx_bridge_sqlserver_SUITE.erl b/apps/emqx_bridge_sqlserver/test/emqx_bridge_sqlserver_SUITE.erl index fcf20da8f..0e60e9c97 100644 --- a/apps/emqx_bridge_sqlserver/test/emqx_bridge_sqlserver_SUITE.erl +++ b/apps/emqx_bridge_sqlserver/test/emqx_bridge_sqlserver_SUITE.erl @@ -461,7 +461,7 @@ sqlserver_config(BridgeType, Config) -> " sql = ~p\n" " driver = ~p\n" " resource_opts = {\n" - " request_timeout = 500ms\n" + " request_ttl = 500ms\n" " batch_size = ~b\n" " query_mode = ~s\n" " worker_pool_size = ~b\n" diff --git a/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl b/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl index 0b8d20f15..55f8bb69e 100644 --- a/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl +++ b/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl @@ -190,7 +190,7 @@ tdengine_config(BridgeType, Config) -> " password = ~p\n" " sql = ~p\n" " resource_opts = {\n" - " request_timeout = 500ms\n" + " request_ttl = 500ms\n" " batch_size = ~b\n" " query_mode = ~s\n" " }\n" @@ -456,7 +456,7 @@ t_write_timeout(Config) -> Config, #{ <<"resource_opts">> => #{ - <<"request_timeout">> => 500, + <<"request_ttl">> => 500, <<"resume_interval">> => 100, <<"health_check_interval">> => 100 } diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 32d1e94d0..562e18d52 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -86,8 +86,8 @@ -define(DEFAULT_BUFFER_BYTES, 256 * 1024 * 1024). -define(DEFAULT_BUFFER_BYTES_RAW, <<"256MB">>). --define(DEFAULT_REQUEST_TIMEOUT, timer:seconds(45)). --define(DEFAULT_REQUEST_TIMEOUT_RAW, <<"45s">>). +-define(DEFAULT_REQUEST_TTL, timer:seconds(45)). +-define(DEFAULT_REQUEST_TTL_RAW, <<"45s">>). %% count -define(DEFAULT_BATCH_SIZE, 1). diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 167dcc02e..7a0bcaea9 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -76,7 +76,7 @@ -type queue_query() :: ?QUERY(reply_fun(), request(), HasBeenSent :: boolean(), expire_at()). -type request() :: term(). -type request_from() :: undefined | gen_statem:from(). --type request_timeout() :: infinity | timer:time(). +-type request_ttl() :: infinity | timer:time(). -type health_check_interval() :: timer:time(). -type state() :: blocked | running. -type inflight_key() :: integer(). @@ -187,10 +187,10 @@ init({Id, Index, Opts}) -> InflightWinSize = maps:get(inflight_window, Opts, ?DEFAULT_INFLIGHT), InflightTID = inflight_new(InflightWinSize), HealthCheckInterval = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL), - RequestTimeout = maps:get(request_timeout, Opts, ?DEFAULT_REQUEST_TIMEOUT), + RequestTTL = maps:get(request_ttl, Opts, ?DEFAULT_REQUEST_TTL), BatchTime0 = maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME), - BatchTime = adjust_batch_time(Id, RequestTimeout, BatchTime0), - DefaultResumeInterval = default_resume_interval(RequestTimeout, HealthCheckInterval), + BatchTime = adjust_batch_time(Id, RequestTTL, BatchTime0), + DefaultResumeInterval = default_resume_interval(RequestTTL, HealthCheckInterval), ResumeInterval = maps:get(resume_interval, Opts, DefaultResumeInterval), MetricsFlushInterval = maps:get(metrics_flush_interval, Opts, ?DEFAULT_METRICS_FLUSH_INTERVAL), Data0 = #{ @@ -1733,7 +1733,7 @@ now_() -> ensure_timeout_query_opts(#{timeout := _} = Opts, _SyncOrAsync) -> Opts; ensure_timeout_query_opts(#{} = Opts0, sync) -> - Opts0#{timeout => ?DEFAULT_REQUEST_TIMEOUT}; + Opts0#{timeout => ?DEFAULT_REQUEST_TTL}; ensure_timeout_query_opts(#{} = Opts0, async) -> Opts0#{timeout => infinity}. @@ -1760,14 +1760,14 @@ do_minimize(?QUERY(ReplyTo, _Req, Sent, ExpireAt)) -> ?QUERY(ReplyTo, [], Sent, -endif. %% To avoid message loss due to misconfigurations, we adjust -%% `batch_time' based on `request_timeout'. If `batch_time' > -%% `request_timeout', all requests will timeout before being sent if +%% `batch_time' based on `request_ttl'. If `batch_time' > +%% `request_ttl', all requests will timeout before being sent if %% the message rate is low. Even worse if `pool_size' is high. -%% We cap `batch_time' at `request_timeout div 2' as a rule of thumb. -adjust_batch_time(_Id, _RequestTimeout = infinity, BatchTime0) -> +%% We cap `batch_time' at `request_ttl div 2' as a rule of thumb. +adjust_batch_time(_Id, _RequestTTL = infinity, BatchTime0) -> BatchTime0; -adjust_batch_time(Id, RequestTimeout, BatchTime0) -> - BatchTime = max(0, min(BatchTime0, RequestTimeout div 2)), +adjust_batch_time(Id, RequestTTL, BatchTime0) -> + BatchTime = max(0, min(BatchTime0, RequestTTL div 2)), case BatchTime =:= BatchTime0 of false -> ?SLOG(info, #{ @@ -1811,11 +1811,11 @@ replayq_opts(Id, Index, Opts) -> %% timeout is <= resume interval and the buffer worker is ever %% blocked, than all queued requests will basically fail without being %% attempted. --spec default_resume_interval(request_timeout(), health_check_interval()) -> timer:time(). -default_resume_interval(_RequestTimeout = infinity, HealthCheckInterval) -> +-spec default_resume_interval(request_ttl(), health_check_interval()) -> timer:time(). +default_resume_interval(_RequestTTL = infinity, HealthCheckInterval) -> max(1, HealthCheckInterval); -default_resume_interval(RequestTimeout, HealthCheckInterval) -> - max(1, min(HealthCheckInterval, RequestTimeout div 3)). +default_resume_interval(RequestTTL, HealthCheckInterval) -> + max(1, min(HealthCheckInterval, RequestTTL div 3)). -spec reply_call(reference(), term()) -> ok. reply_call(Alias, Response) -> diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl index 713af5bc9..f427fbbd0 100644 --- a/apps/emqx_resource/src/schema/emqx_resource_schema.erl +++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl @@ -53,7 +53,7 @@ fields("creation_opts") -> {start_timeout, fun start_timeout/1}, {auto_restart_interval, fun auto_restart_interval/1}, {query_mode, fun query_mode/1}, - {request_timeout, fun request_timeout/1}, + {request_ttl, fun request_ttl/1}, {inflight_window, fun inflight_window/1}, {enable_batch, fun enable_batch/1}, {batch_size, fun batch_size/1}, @@ -133,10 +133,11 @@ query_mode(default) -> async; query_mode(required) -> false; query_mode(_) -> undefined. -request_timeout(type) -> hoconsc:union([infinity, emqx_schema:duration_ms()]); -request_timeout(desc) -> ?DESC("request_timeout"); -request_timeout(default) -> ?DEFAULT_REQUEST_TIMEOUT_RAW; -request_timeout(_) -> undefined. +request_ttl(type) -> hoconsc:union([infinity, emqx_schema:duration_ms()]); +request_ttl(aliases) -> [request_timeout]; +request_ttl(desc) -> ?DESC("request_ttl"); +request_ttl(default) -> ?DEFAULT_REQUEST_TTL_RAW; +request_ttl(_) -> undefined. enable_batch(type) -> boolean(); enable_batch(required) -> false; diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 5b07523aa..5883614aa 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -2823,7 +2823,7 @@ t_volatile_offload_mode(_Config) -> t_late_call_reply(_Config) -> emqx_connector_demo:set_callback_mode(always_sync), - RequestTimeout = 500, + RequestTTL = 500, ?assertMatch( {ok, _}, emqx_resource:create( @@ -2833,7 +2833,7 @@ t_late_call_reply(_Config) -> #{name => test_resource}, #{ buffer_mode => memory_only, - request_timeout => RequestTimeout, + request_ttl => RequestTTL, query_mode => sync } ) @@ -2844,13 +2844,13 @@ t_late_call_reply(_Config) -> %% have been already returned (a timeout), but the resource will %% still send a message with the reply. %% The demo connector will reply with `{error, timeout}' after 1 s. - SleepFor = RequestTimeout + 500, + SleepFor = RequestTTL + 500, ?assertMatch( {error, {resource_error, #{reason := timeout}}}, emqx_resource:query( ?ID, {sync_sleep_before_reply, SleepFor}, - #{timeout => RequestTimeout} + #{timeout => RequestTTL} ) ), %% Our process shouldn't receive any late messages. diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl index 707aa47ea..4af180a2f 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl @@ -178,7 +178,7 @@ mysql_config(BridgeType, Config) -> " pool_size = ~b\n" " sql = ~p\n" " resource_opts = {\n" - " request_timeout = 500ms\n" + " request_ttl = 500ms\n" " batch_size = ~b\n" " query_mode = ~s\n" " worker_pool_size = ~b\n" diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl index 56f932aba..c9dc402bd 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl @@ -554,7 +554,7 @@ resource_configs(#{query_mode := QueryMode}) -> <<"batch_size">> => integer_to_binary(?BATCH_SIZE), <<"start_timeout">> => <<"15s">>, <<"batch_time">> => <<"4s">>, - <<"request_timeout">> => <<"30s">> + <<"request_ttl">> => <<"30s">> } }. diff --git a/rel/i18n/emqx_bridge_gcp_pubsub.hocon b/rel/i18n/emqx_bridge_gcp_pubsub.hocon index cc255aec3..ca6b855dc 100644 --- a/rel/i18n/emqx_bridge_gcp_pubsub.hocon +++ b/rel/i18n/emqx_bridge_gcp_pubsub.hocon @@ -64,12 +64,6 @@ pubsub_topic.desc: pubsub_topic.label: """GCP PubSub Topic""" -request_timeout.desc: -"""Deprecated: Configure the request timeout in the buffer settings.""" - -request_timeout.label: -"""Request Timeout""" - service_account_json.desc: """JSON containing the GCP Service Account credentials to be used with PubSub. When a GCP Service Account is created (as described in https://developers.google.com/identity/protocols/oauth2/service-account#creatinganaccount), you have the option of downloading the credentials in JSON form. That's the file needed.""" diff --git a/rel/i18n/emqx_resource_schema.hocon b/rel/i18n/emqx_resource_schema.hocon index ccf8e37ac..cb26e4182 100644 --- a/rel/i18n/emqx_resource_schema.hocon +++ b/rel/i18n/emqx_resource_schema.hocon @@ -70,11 +70,11 @@ query_mode.desc: query_mode.label: """Query mode""" -request_timeout.desc: +request_ttl.desc: """Starting from the moment when the request enters the buffer, if the request remains in the buffer for the specified time or is sent but does not receive a response or acknowledgement in time, the request is considered expired.""" -request_timeout.label: -"""Request Expiry""" +request_ttl.label: +"""Request TTL""" resource_opts.desc: """Resource options.""" diff --git a/scripts/test/influx/influx-bridge.conf b/scripts/test/influx/influx-bridge.conf index b14ecc2f1..08013185a 100644 --- a/scripts/test/influx/influx-bridge.conf +++ b/scripts/test/influx/influx-bridge.conf @@ -12,7 +12,7 @@ bridges { health_check_interval = "15s" max_buffer_bytes = "1GB" query_mode = "sync" - request_timeout = "15s" + request_ttl = "15s" start_after_created = "true" start_timeout = "5s" worker_pool_size = 4 From 3e4790edd4c3ed8b692d9ca24c0a6b93d0a42bdc Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 1 Jun 2023 13:01:58 -0300 Subject: [PATCH 4/6] test(pulsar_producer): fix flaky test --- .../test/emqx_bridge_pulsar_impl_producer_SUITE.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl index 9dc2f05d6..ce14eb83d 100644 --- a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl @@ -1040,7 +1040,7 @@ t_resource_manager_crash_before_producers_started(Config) -> end), %% even if the resource manager is dead, we can still %% clear the allocated resources. - {{error, {config_update_crashed, {killed, _}}}, {ok, _}} = + {{error, {config_update_crashed, _}}, {ok, _}} = ?wait_async_action( create_bridge(Config), #{?snk_kind := pulsar_bridge_stopped, pulsar_producers := undefined}, From 0790c88aaf522e664b956f1cd7fca0831887934e Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 2 Jun 2023 09:08:11 -0300 Subject: [PATCH 5/6] refactor: use default's type as first union member Co-authored-by: Zaiming (Stone) Shi --- apps/emqx_resource/src/schema/emqx_resource_schema.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl index f427fbbd0..7f9886a5d 100644 --- a/apps/emqx_resource/src/schema/emqx_resource_schema.erl +++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl @@ -133,7 +133,7 @@ query_mode(default) -> async; query_mode(required) -> false; query_mode(_) -> undefined. -request_ttl(type) -> hoconsc:union([infinity, emqx_schema:duration_ms()]); +request_ttl(type) -> hoconsc:union([emqx_schema:duration_ms(), infinity]); request_ttl(aliases) -> [request_timeout]; request_ttl(desc) -> ?DESC("request_ttl"); request_ttl(default) -> ?DEFAULT_REQUEST_TTL_RAW; From 940353cc52760dbf941acecb71685ac98ae12ef6 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 2 Jun 2023 09:13:47 -0300 Subject: [PATCH 6/6] docs: add changelog --- changes/ce/feat-10910.en.md | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 changes/ce/feat-10910.en.md diff --git a/changes/ce/feat-10910.en.md b/changes/ce/feat-10910.en.md new file mode 100644 index 000000000..40373e1a1 --- /dev/null +++ b/changes/ce/feat-10910.en.md @@ -0,0 +1,3 @@ +The data bridge resource option `auto_restart_interval` was deprecated in favor of `health_check_interval`, and `request_timeout` was renamed to `request_ttl`. Also, the default `request_ttl` value went from 15 seconds to 45 seconds. + +The previous existence of both `auto_restart_interval` and `health_check_interval` was a source of confusion, as both parameters influenced the recovery of data bridges under failures. An inconsistent configuration of those two parameters could lead to messages being expired without a chance to retry. Now, `health_check_interval` is used both to control the periodicity of health checks that may transition the data bridge into `disconnected` or `connecting` states, as well as recovering from `disconnected`.