diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index d5d26c770..e282c3dd8 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -221,9 +221,9 @@ send_message(BridgeType, BridgeName, ResId, Message) -> end. query_opts(Config) -> - case emqx_utils_maps:deep_get([resource_opts, request_timeout], Config, false) of + case emqx_utils_maps:deep_get([resource_opts, request_ttl], Config, false) of Timeout when is_integer(Timeout) orelse Timeout =:= infinity -> - %% request_timeout is configured + %% request_ttl is configured #{timeout => Timeout}; _ -> %% emqx_resource has a default value (15s) diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index bffa7b7f9..ab99a2d86 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -218,7 +218,6 @@ info_example_basic(webhook) -> resource_opts => #{ worker_pool_size => 1, health_check_interval => 15000, - auto_restart_interval => 15000, query_mode => async, inflight_window => 100, max_buffer_bytes => 100 * 1024 * 1024 @@ -244,7 +243,6 @@ mqtt_main_example() -> max_inflight => 100, resource_opts => #{ health_check_interval => <<"15s">>, - auto_restart_interval => <<"60s">>, query_mode => sync, max_buffer_bytes => 100 * 1024 * 1024 }, diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index eee39bd56..d7234a6bf 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -327,8 +327,8 @@ parse_confs( Reason1 = emqx_utils:readable_error_msg(Reason), invalid_data(<<"Invalid URL: ", Url1/binary, ", details: ", Reason1/binary>>) end, - RequestTimeout = emqx_utils_maps:deep_get( - [resource_opts, request_timeout], + RequestTTL = emqx_utils_maps:deep_get( + [resource_opts, request_ttl], Conf ), Conf#{ @@ -339,7 +339,7 @@ parse_confs( method => Method, body => maps:get(body, Conf, undefined), headers => Headers, - request_timeout => RequestTimeout, + request_ttl => RequestTTL, max_retries => Retry } }; diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_compatible_config.erl b/apps/emqx_bridge/src/schema/emqx_bridge_compatible_config.erl index 595b75ecf..6743b9cdd 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_compatible_config.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_compatible_config.erl @@ -87,7 +87,6 @@ default_ssl() -> default_resource_opts() -> #{ <<"inflight_window">> => 100, - <<"auto_restart_interval">> => <<"60s">>, <<"health_check_interval">> => <<"15s">>, <<"max_buffer_bytes">> => <<"1GB">>, <<"query_mode">> => <<"sync">>, diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index ecab986e8..2f5d65259 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -86,8 +86,7 @@ groups() -> SingleOnlyTests = [ t_broken_bpapi_vsn, t_old_bpapi_vsn, - t_bridges_probe, - t_auto_restart_interval + t_bridges_probe ], ClusterLaterJoinOnlyTCs = [t_cluster_later_join_metrics], [ @@ -559,89 +558,6 @@ t_http_crud_apis(Config) -> {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config). -t_auto_restart_interval(Config) -> - Port = ?config(port, Config), - %% assert we there's no bridges at first - {ok, 200, []} = request_json(get, uri(["bridges"]), Config), - - meck:new(emqx_resource, [passthrough]), - meck:expect(emqx_resource, call_start, fun(_, _, _) -> {error, fake_error} end), - - %% then we add a webhook bridge, using POST - %% POST /bridges/ will create a bridge - URL1 = ?URL(Port, "path1"), - Name = ?BRIDGE_NAME, - BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), - BridgeParams = ?HTTP_BRIDGE(URL1, Name)#{ - <<"resource_opts">> => #{<<"auto_restart_interval">> => "1s"} - }, - ?check_trace( - begin - ?assertMatch( - {ok, 201, #{ - <<"type">> := ?BRIDGE_TYPE_HTTP, - <<"name">> := Name, - <<"enable">> := true, - <<"status">> := _, - <<"node_status">> := [_ | _], - <<"url">> := URL1 - }}, - request_json( - post, - uri(["bridges"]), - BridgeParams, - Config - ) - ), - {ok, _} = ?block_until(#{?snk_kind := resource_disconnected_enter}), - {ok, _} = ?block_until(#{?snk_kind := resource_auto_reconnect}, 1500) - end, - fun(Trace0) -> - Trace = ?of_kind(resource_auto_reconnect, Trace0), - ?assertMatch([#{}], Trace), - ok - end - ), - %% delete the bridge - {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config), - {ok, 200, []} = request_json(get, uri(["bridges"]), Config), - - %% auto_retry_interval=infinity - BridgeParams1 = BridgeParams#{ - <<"resource_opts">> => #{<<"auto_restart_interval">> => "infinity"} - }, - ?check_trace( - begin - ?assertMatch( - {ok, 201, #{ - <<"type">> := ?BRIDGE_TYPE_HTTP, - <<"name">> := Name, - <<"enable">> := true, - <<"status">> := _, - <<"node_status">> := [_ | _], - <<"url">> := URL1 - }}, - request_json( - post, - uri(["bridges"]), - BridgeParams1, - Config - ) - ), - {ok, _} = ?block_until(#{?snk_kind := resource_disconnected_enter}), - ?assertEqual(timeout, ?block_until(#{?snk_kind := resource_auto_reconnect}, 1500)) - end, - fun(Trace0) -> - Trace = ?of_kind(resource_auto_reconnect, Trace0), - ?assertMatch([], Trace), - ok - end - ), - %% delete the bridge - {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config), - {ok, 200, []} = request_json(get, uri(["bridges"]), Config), - meck:unload(emqx_resource). - t_http_bridges_local_topic(Config) -> Port = ?config(port, Config), %% assert we there's no bridges at first @@ -1384,7 +1300,7 @@ t_metrics(Config) -> ), ok. -%% request_timeout in bridge root should match request_timeout in +%% request_timeout in bridge root should match request_ttl in %% resource_opts. t_inconsistent_webhook_request_timeouts(Config) -> Port = ?config(port, Config), @@ -1395,7 +1311,7 @@ t_inconsistent_webhook_request_timeouts(Config) -> ?HTTP_BRIDGE(URL1, Name), #{ <<"request_timeout">> => <<"1s">>, - <<"resource_opts">> => #{<<"request_timeout">> => <<"2s">>} + <<"resource_opts">> => #{<<"request_ttl">> => <<"2s">>} } ), %% root request_timeout is deprecated for bridge. @@ -1410,8 +1326,8 @@ t_inconsistent_webhook_request_timeouts(Config) -> Config ), ?assertNot(maps:is_key(<<"request_timeout">>, Response)), - ?assertMatch(#{<<"request_timeout">> := <<"2s">>}, ResourceOpts), - validate_resource_request_timeout(proplists:get_value(group, Config), 2000, Name), + ?assertMatch(#{<<"request_ttl">> := <<"2s">>}, ResourceOpts), + validate_resource_request_ttl(proplists:get_value(group, Config), 2000, Name), ok. t_cluster_later_join_metrics(Config) -> @@ -1452,7 +1368,7 @@ t_cluster_later_join_metrics(Config) -> ), ok. -validate_resource_request_timeout(single, Timeout, Name) -> +validate_resource_request_ttl(single, Timeout, Name) -> SentData = #{payload => <<"Hello EMQX">>, timestamp => 1668602148000}, BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), ResId = emqx_bridge_resource:resource_id(<<"webhook">>, Name), @@ -1472,7 +1388,7 @@ validate_resource_request_timeout(single, Timeout, Name) -> ok end ); -validate_resource_request_timeout(_Cluster, _Timeout, _Name) -> +validate_resource_request_ttl(_Cluster, _Timeout, _Name) -> ignore. %% diff --git a/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl b/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl index 7c8ee4f4d..540c18878 100644 --- a/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl +++ b/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl @@ -71,7 +71,7 @@ webhook_config_test() -> } } } = check(Conf3), - ?assertMatch(#{<<"request_timeout">> := infinity}, ResourceOpts), + ?assertMatch(#{<<"request_ttl">> := infinity}, ResourceOpts), ok. up(#{<<"bridges">> := Bridges0} = Conf0) -> diff --git a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl index 45cc82251..4fc76fc9e 100644 --- a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl @@ -167,7 +167,7 @@ bridge_async_config(#{port := Port} = Config) -> ConnectTimeout = maps:get(connect_timeout, Config, 1), RequestTimeout = maps:get(request_timeout, Config, 10000), ResumeInterval = maps:get(resume_interval, Config, "1s"), - ResourceRequestTimeout = maps:get(resource_request_timeout, Config, "infinity"), + ResourceRequestTTL = maps:get(resource_request_ttl, Config, "infinity"), ConfigString = io_lib:format( "bridges.~s.~s {\n" " url = \"http://localhost:~p\"\n" @@ -182,11 +182,10 @@ bridge_async_config(#{port := Port} = Config) -> " body = \"${id}\"" " resource_opts {\n" " inflight_window = 100\n" - " auto_restart_interval = \"60s\"\n" " health_check_interval = \"15s\"\n" " max_buffer_bytes = \"1GB\"\n" " query_mode = \"~s\"\n" - " request_timeout = \"~p\"\n" + " request_ttl = \"~p\"\n" " resume_interval = \"~s\"\n" " start_after_created = \"true\"\n" " start_timeout = \"5s\"\n" @@ -204,7 +203,7 @@ bridge_async_config(#{port := Port} = Config) -> PoolSize, RequestTimeout, QueryMode, - ResourceRequestTimeout, + ResourceRequestTTL, ResumeInterval ] ), @@ -247,7 +246,7 @@ t_send_async_connection_timeout(_Config) -> query_mode => "async", connect_timeout => ResponseDelayMS * 2, request_timeout => 10000, - resource_request_timeout => "infinity" + resource_request_ttl => "infinity" }), NumberOfMessagesToSend = 10, [ @@ -269,7 +268,7 @@ t_async_free_retries(_Config) -> query_mode => "sync", connect_timeout => 1_000, request_timeout => 10_000, - resource_request_timeout => "10000s" + resource_request_ttl => "10000s" }), %% Fail 5 times then succeed. Context = #{error_attempts => 5}, @@ -295,7 +294,7 @@ t_async_common_retries(_Config) -> resume_interval => "100ms", connect_timeout => 1_000, request_timeout => 10_000, - resource_request_timeout => "10000s" + resource_request_ttl => "10000s" }), %% Keeps failing until connector gives up. Context = #{error_attempts => infinity}, diff --git a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.erl b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.erl index e8f7d50ce..2724b7c09 100644 --- a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.erl +++ b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.erl @@ -59,7 +59,6 @@ values(_Method, Type) -> resource_opts => #{ worker_pool_size => 8, health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, - auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW, batch_size => ?DEFAULT_BATCH_SIZE, batch_time => ?DEFAULT_BATCH_TIME, query_mode => sync, diff --git a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl index 11014a596..5525a640c 100644 --- a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl +++ b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl @@ -218,7 +218,7 @@ cassa_config(BridgeType, Config) -> " password = ~p\n" " cql = ~p\n" " resource_opts = {\n" - " request_timeout = 500ms\n" + " request_ttl = 500ms\n" " batch_size = ~b\n" " query_mode = ~s\n" " }\n" @@ -511,7 +511,6 @@ t_write_failure(Config) -> #{ <<"resource_opts">> => #{ - <<"auto_restart_interval">> => <<"100ms">>, <<"resume_interval">> => <<"100ms">>, <<"health_check_interval">> => <<"100ms">> } @@ -636,7 +635,7 @@ t_bad_sql_parameter(Config) -> Config, #{ <<"resource_opts">> => #{ - <<"request_timeout">> => 500, + <<"request_ttl">> => 500, <<"resume_interval">> => 100, <<"health_check_interval">> => 100 } diff --git a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.erl b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.erl index 9abcadbba..deca42154 100644 --- a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.erl +++ b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.erl @@ -56,7 +56,6 @@ values(_Method, Type) -> resource_opts => #{ worker_pool_size => 8, health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, - auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW, batch_size => ?DEFAULT_BATCH_SIZE, batch_time => ?DEFAULT_BATCH_TIME, query_mode => async, diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.erl b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.erl index 251e79ca2..6ddae57f7 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.erl +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.erl @@ -52,7 +52,6 @@ values(_Method) -> resource_opts => #{ worker_pool_size => 8, health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, - auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW, batch_size => ?DEFAULT_BATCH_SIZE, batch_time => ?DEFAULT_BATCH_TIME, query_mode => sync, diff --git a/apps/emqx_bridge_dynamo/test/emqx_bridge_dynamo_SUITE.erl b/apps/emqx_bridge_dynamo/test/emqx_bridge_dynamo_SUITE.erl index da87f6047..ac2b59229 100644 --- a/apps/emqx_bridge_dynamo/test/emqx_bridge_dynamo_SUITE.erl +++ b/apps/emqx_bridge_dynamo/test/emqx_bridge_dynamo_SUITE.erl @@ -170,7 +170,7 @@ dynamo_config(BridgeType, Config) -> " aws_access_key_id = ~p\n" " aws_secret_access_key = ~p\n" " resource_opts = {\n" - " request_timeout = 500ms\n" + " request_ttl = 500ms\n" " batch_size = ~b\n" " query_mode = ~s\n" " }\n" diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src index 2b3d359d3..f7401d423 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_gcp_pubsub, [ {description, "EMQX Enterprise GCP Pub/Sub Bridge"}, - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl index be5e56e85..d2f6bbee4 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl @@ -32,7 +32,7 @@ connect_timeout := emqx_schema:duration_ms(), max_retries := non_neg_integer(), pubsub_topic := binary(), - resource_opts := #{request_timeout := emqx_schema:duration_ms(), any() => term()}, + resource_opts := #{request_ttl := infinity | emqx_schema:duration_ms(), any() => term()}, service_account_json := service_account_json(), any() => term() }. @@ -44,7 +44,7 @@ pool_name := binary(), project_id := binary(), pubsub_topic := binary(), - request_timeout := timer:time() + request_ttl := infinity | timer:time() }. -type headers() :: [{binary(), iodata()}]. -type body() :: iodata(). @@ -69,7 +69,7 @@ on_start( payload_template := PayloadTemplate, pool_size := PoolSize, pubsub_topic := PubSubTopic, - resource_opts := #{request_timeout := RequestTimeout} + resource_opts := #{request_ttl := RequestTTL} } = Config ) -> ?SLOG(info, #{ @@ -108,7 +108,7 @@ on_start( pool_name => ResourceId, project_id => ProjectId, pubsub_topic => PubSubTopic, - request_timeout => RequestTimeout + request_ttl => RequestTTL }, ?tp( gcp_pubsub_on_start_before_starting_pool, @@ -344,7 +344,7 @@ do_send_requests_sync(State, Requests, ResourceId) -> #{ pool_name := PoolName, max_retries := MaxRetries, - request_timeout := RequestTimeout + request_ttl := RequestTTL } = State, ?tp( gcp_pubsub_bridge_do_send_requests, @@ -371,7 +371,7 @@ do_send_requests_sync(State, Requests, ResourceId) -> PoolName, Method, Request, - RequestTimeout, + RequestTTL, MaxRetries ) of @@ -467,7 +467,7 @@ do_send_requests_sync(State, Requests, ResourceId) -> do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId) -> #{ pool_name := PoolName, - request_timeout := RequestTimeout + request_ttl := RequestTTL } = State, ?tp( gcp_pubsub_bridge_do_send_requests, @@ -494,7 +494,7 @@ do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId) -> Worker, Method, Request, - RequestTimeout, + RequestTTL, {fun ?MODULE:reply_delegator/3, [ResourceId, ReplyFunAndArgs]} ), {ok, Worker}. diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl index 65b88d45b..b5f22f727 100644 --- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl @@ -287,7 +287,7 @@ gcp_pubsub_config(Config) -> " pool_size = 1\n" " pipelining = ~b\n" " resource_opts = {\n" - " request_timeout = 500ms\n" + " request_ttl = 500ms\n" " metrics_flush_interval = 700ms\n" " worker_pool_size = 1\n" " query_mode = ~s\n" @@ -627,7 +627,7 @@ t_publish_success_infinity_timeout(Config) -> ServiceAccountJSON = ?config(service_account_json, Config), Topic = <<"t/topic">>, {ok, _} = create_bridge(Config, #{ - <<"resource_opts">> => #{<<"request_timeout">> => <<"infinity">>} + <<"resource_opts">> => #{<<"request_ttl">> => <<"infinity">>} }), {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), diff --git a/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl b/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl index 825721052..8421f4e21 100644 --- a/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl +++ b/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl @@ -277,7 +277,7 @@ influxdb_config(apiv1 = Type, InfluxDBHost, InfluxDBPort, Config) -> " precision = ns\n" " write_syntax = \"~s\"\n" " resource_opts = {\n" - " request_timeout = 1s\n" + " request_ttl = 1s\n" " query_mode = ~s\n" " batch_size = ~b\n" " }\n" @@ -314,7 +314,7 @@ influxdb_config(apiv2 = Type, InfluxDBHost, InfluxDBPort, Config) -> " precision = ns\n" " write_syntax = \"~s\"\n" " resource_opts = {\n" - " request_timeout = 1s\n" + " request_ttl = 1s\n" " query_mode = ~s\n" " batch_size = ~b\n" " }\n" diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl index d003864fb..2948bd59c 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl @@ -227,7 +227,6 @@ conn_bridge_example(_Method, Type) -> resource_opts => #{ worker_pool_size => 8, health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, - auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW, query_mode => async, max_buffer_bytes => ?DEFAULT_BUFFER_BYTES } diff --git a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl index 434587cf0..2c24466df 100644 --- a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl +++ b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl @@ -101,8 +101,8 @@ bridge_config(TestCase, _TestGroup, Config) -> " }\n" " pool_size = 1\n" " resource_opts = {\n" - " auto_restart_interval = 5000\n" - " request_timeout = 30000\n" + " health_check_interval = 5000\n" + " request_ttl = 30000\n" " query_mode = \"async\"\n" " worker_pool_size = 1\n" " }\n" diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src index 54d7ffbed..660daca9c 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge_mqtt, [ {description, "EMQX MQTT Broker Bridge"}, - {vsn, "0.1.0"}, + {vsn, "0.1.1"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl index 81f9a3573..d14e0597e 100644 --- a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl @@ -755,11 +755,9 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> <<"worker_pool_size">> => 2, <<"query_mode">> => <<"sync">>, %% using a long time so we can test recovery - <<"request_timeout">> => <<"15s">>, - %% to make it check the healthy quickly - <<"health_check_interval">> => <<"0.5s">>, - %% to make it reconnect quickly - <<"auto_restart_interval">> => <<"1s">> + <<"request_ttl">> => <<"15s">>, + %% to make it check the healthy and reconnect quickly + <<"health_check_interval">> => <<"0.5s">> } } ), @@ -865,11 +863,9 @@ t_mqtt_conn_bridge_egress_async_reconnect(_) -> <<"worker_pool_size">> => 2, <<"query_mode">> => <<"async">>, %% using a long time so we can test recovery - <<"request_timeout">> => <<"15s">>, - %% to make it check the healthy quickly - <<"health_check_interval">> => <<"0.5s">>, - %% to make it reconnect quickly - <<"auto_restart_interval">> => <<"1s">> + <<"request_ttl">> => <<"15s">>, + %% to make it check the healthy and reconnect quickly + <<"health_check_interval">> => <<"0.5s">> } } ), diff --git a/apps/emqx_bridge_opents/src/emqx_bridge_opents.erl b/apps/emqx_bridge_opents/src/emqx_bridge_opents.erl index 2eb6a554f..cfb12453d 100644 --- a/apps/emqx_bridge_opents/src/emqx_bridge_opents.erl +++ b/apps/emqx_bridge_opents/src/emqx_bridge_opents.erl @@ -42,7 +42,6 @@ values(_Method) -> resource_opts => #{ worker_pool_size => 1, health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, - auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW, batch_size => ?DEFAULT_BATCH_SIZE, batch_time => ?DEFAULT_BATCH_TIME, query_mode => async, diff --git a/apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl b/apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl index 6f444b93e..3563e0774 100644 --- a/apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl +++ b/apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl @@ -127,7 +127,7 @@ opents_config(BridgeType, Config) -> " enable = true\n" " server = ~p\n" " resource_opts = {\n" - " request_timeout = 500ms\n" + " request_ttl = 500ms\n" " batch_size = ~b\n" " query_mode = sync\n" " }\n" @@ -298,7 +298,7 @@ t_write_timeout(Config) -> Config, #{ <<"resource_opts">> => #{ - <<"request_timeout">> => 500, + <<"request_ttl">> => 500, <<"resume_interval">> => 100, <<"health_check_interval">> => 100 } diff --git a/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.erl b/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.erl index 500059967..46e118c69 100644 --- a/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.erl +++ b/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.erl @@ -51,7 +51,6 @@ values(_Method) -> resource_opts => #{ worker_pool_size => 8, health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, - auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW, batch_size => ?DEFAULT_BATCH_SIZE, batch_time => ?DEFAULT_BATCH_TIME, query_mode => async, diff --git a/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl b/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl index 2e72458b6..9355e217a 100644 --- a/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl +++ b/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl @@ -203,11 +203,9 @@ oracle_config(TestCase, _ConnectionType, Config) -> " pool_size = 1\n" " sql = \"~s\"\n" " resource_opts = {\n" - " auto_restart_interval = \"5s\"\n" " health_check_interval = \"5s\"\n" - " request_timeout = \"30s\"\n" + " request_ttl = \"30s\"\n" " query_mode = \"async\"\n" - " enable_batch = true\n" " batch_size = 3\n" " batch_time = \"3s\"\n" " worker_pool_size = 1\n" diff --git a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src index a310b46b4..5a72107a4 100644 --- a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src +++ b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_pgsql, [ {description, "EMQX Enterprise PostgreSQL Bridge"}, - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {registered, []}, {applications, [kernel, stdlib]}, {env, []}, diff --git a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl index 4615b6789..12161b9b9 100644 --- a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl +++ b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl @@ -55,7 +55,6 @@ values(_Method, Type) -> resource_opts => #{ worker_pool_size => 8, health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, - auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW, batch_size => ?DEFAULT_BATCH_SIZE, batch_time => ?DEFAULT_BATCH_TIME, query_mode => async, diff --git a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl index e4f17d76a..4e7da85bd 100644 --- a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl +++ b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl @@ -193,7 +193,7 @@ pgsql_config(BridgeType, Config) -> " password = ~p\n" " sql = ~p\n" " resource_opts = {\n" - " request_timeout = 500ms\n" + " request_ttl = 500ms\n" " batch_size = ~b\n" " query_mode = ~s\n" " }\n" @@ -503,7 +503,6 @@ t_write_timeout(Config) -> Config, #{ <<"resource_opts">> => #{ - <<"auto_restart_interval">> => <<"100ms">>, <<"resume_interval">> => <<"100ms">>, <<"health_check_interval">> => <<"100ms">> } diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl index 602e9cfdd..aa1076d33 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl @@ -154,8 +154,7 @@ fields(producer_resource_opts) -> health_check_interval, resume_interval, start_after_created, - start_timeout, - auto_restart_interval + start_timeout ], lists:filtermap( fun diff --git a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl index 9dc2f05d6..ce14eb83d 100644 --- a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl @@ -1040,7 +1040,7 @@ t_resource_manager_crash_before_producers_started(Config) -> end), %% even if the resource manager is dead, we can still %% clear the allocated resources. - {{error, {config_update_crashed, {killed, _}}}, {ok, _}} = + {{error, {config_update_crashed, _}}, {ok, _}} = ?wait_async_action( create_bridge(Config), #{?snk_kind := pulsar_bridge_stopped, pulsar_producers := undefined}, diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.erl index c4897fa39..608e0a669 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.erl +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.erl @@ -57,7 +57,6 @@ values(_Method, Type) -> resource_opts => #{ worker_pool_size => 8, health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, - auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW, batch_size => ?DEFAULT_BATCH_SIZE, batch_time => ?DEFAULT_BATCH_TIME, query_mode => async, diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.erl b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.erl index a4a942d0e..b3149fa99 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.erl +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.erl @@ -52,7 +52,6 @@ values(post) -> resource_opts => #{ worker_pool_size => 1, health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, - auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW, batch_size => ?DEFAULT_BATCH_SIZE, batch_time => ?DEFAULT_BATCH_TIME, query_mode => sync, diff --git a/apps/emqx_bridge_rocketmq/test/emqx_bridge_rocketmq_SUITE.erl b/apps/emqx_bridge_rocketmq/test/emqx_bridge_rocketmq_SUITE.erl index a80aee810..62e1a7b3f 100644 --- a/apps/emqx_bridge_rocketmq/test/emqx_bridge_rocketmq_SUITE.erl +++ b/apps/emqx_bridge_rocketmq/test/emqx_bridge_rocketmq_SUITE.erl @@ -149,7 +149,7 @@ rocketmq_config(BridgeType, Config) -> " secret_key = ~p\n" " topic = ~p\n" " resource_opts = {\n" - " request_timeout = 1500ms\n" + " request_ttl = 1500ms\n" " batch_size = ~b\n" " query_mode = ~s\n" " }\n" diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.erl b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.erl index 8a97cb2ad..6ea6424dd 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.erl +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.erl @@ -56,7 +56,6 @@ values(post) -> resource_opts => #{ worker_pool_size => 1, health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, - auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW, batch_size => ?DEFAULT_BATCH_SIZE, batch_time => ?DEFAULT_BATCH_TIME, query_mode => async, diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl index 341f89852..52bd910db 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl @@ -55,8 +55,8 @@ default_port => ?SQLSERVER_DEFAULT_PORT }). --define(REQUEST_TIMEOUT(RESOURCE_OPTS), - maps:get(request_timeout, RESOURCE_OPTS, ?DEFAULT_REQUEST_TIMEOUT) +-define(REQUEST_TTL(RESOURCE_OPTS), + maps:get(request_ttl, RESOURCE_OPTS, ?DEFAULT_REQUEST_TTL) ). -define(BATCH_INSERT_TEMP, batch_insert_temp). @@ -394,7 +394,7 @@ worker_do_insert( ) -> LogMeta = #{connector => ResourceId, state => State}, try - case execute(Conn, SQL, ?REQUEST_TIMEOUT(ResourceOpts)) of + case execute(Conn, SQL, ?REQUEST_TTL(ResourceOpts)) of {selected, Rows, _} -> {ok, Rows}; {updated, _} -> diff --git a/apps/emqx_bridge_sqlserver/test/emqx_bridge_sqlserver_SUITE.erl b/apps/emqx_bridge_sqlserver/test/emqx_bridge_sqlserver_SUITE.erl index fcf20da8f..0e60e9c97 100644 --- a/apps/emqx_bridge_sqlserver/test/emqx_bridge_sqlserver_SUITE.erl +++ b/apps/emqx_bridge_sqlserver/test/emqx_bridge_sqlserver_SUITE.erl @@ -461,7 +461,7 @@ sqlserver_config(BridgeType, Config) -> " sql = ~p\n" " driver = ~p\n" " resource_opts = {\n" - " request_timeout = 500ms\n" + " request_ttl = 500ms\n" " batch_size = ~b\n" " query_mode = ~s\n" " worker_pool_size = ~b\n" diff --git a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.erl b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.erl index abdc26592..0b618487d 100644 --- a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.erl +++ b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.erl @@ -54,7 +54,6 @@ values(_Method) -> resource_opts => #{ worker_pool_size => 8, health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, - auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW, batch_size => ?DEFAULT_BATCH_SIZE, batch_time => ?DEFAULT_BATCH_TIME, query_mode => sync, diff --git a/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl b/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl index 0b8d20f15..55f8bb69e 100644 --- a/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl +++ b/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl @@ -190,7 +190,7 @@ tdengine_config(BridgeType, Config) -> " password = ~p\n" " sql = ~p\n" " resource_opts = {\n" - " request_timeout = 500ms\n" + " request_ttl = 500ms\n" " batch_size = ~b\n" " query_mode = ~s\n" " }\n" @@ -456,7 +456,7 @@ t_write_timeout(Config) -> Config, #{ <<"resource_opts">> => #{ - <<"request_timeout">> => 500, + <<"request_ttl">> => 500, <<"resume_interval">> => 100, <<"health_check_interval">> => 100 } diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index c1f1d1391..562e18d52 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -51,8 +51,10 @@ health_check_timeout => integer(), %% use start_timeout instead wait_for_resource_ready => integer(), - %% use auto_restart_interval instead + %% use health_check_interval instead auto_retry_interval => integer(), + %% use health_check_interval instead + auto_restart_interval => pos_integer() | infinity, %%======================================= Deprecated Opts END worker_pool_size => non_neg_integer(), %% use `integer()` compatibility to release 5.0.0 bpapi @@ -64,9 +66,6 @@ %% after it is created. But note that a `started` resource is not guaranteed %% to be `connected`. start_after_created => boolean(), - %% If the resource disconnected, we can set to retry starting the resource - %% periodically. - auto_restart_interval => pos_integer() | infinity, batch_size => pos_integer(), batch_time => pos_integer(), max_buffer_bytes => pos_integer(), @@ -87,7 +86,8 @@ -define(DEFAULT_BUFFER_BYTES, 256 * 1024 * 1024). -define(DEFAULT_BUFFER_BYTES_RAW, <<"256MB">>). --define(DEFAULT_REQUEST_TIMEOUT, timer:seconds(15)). +-define(DEFAULT_REQUEST_TTL, timer:seconds(45)). +-define(DEFAULT_REQUEST_TTL_RAW, <<"45s">>). %% count -define(DEFAULT_BATCH_SIZE, 1). @@ -115,10 +115,6 @@ -define(START_AFTER_CREATED, true). -define(START_AFTER_CREATED_RAW, <<"true">>). -%% milliseconds --define(AUTO_RESTART_INTERVAL, 60000). --define(AUTO_RESTART_INTERVAL_RAW, <<"60s">>). - -define(TEST_ID_PREFIX, "_probe_:"). -define(RES_METRICS, resource_metrics). diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 167dcc02e..7a0bcaea9 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -76,7 +76,7 @@ -type queue_query() :: ?QUERY(reply_fun(), request(), HasBeenSent :: boolean(), expire_at()). -type request() :: term(). -type request_from() :: undefined | gen_statem:from(). --type request_timeout() :: infinity | timer:time(). +-type request_ttl() :: infinity | timer:time(). -type health_check_interval() :: timer:time(). -type state() :: blocked | running. -type inflight_key() :: integer(). @@ -187,10 +187,10 @@ init({Id, Index, Opts}) -> InflightWinSize = maps:get(inflight_window, Opts, ?DEFAULT_INFLIGHT), InflightTID = inflight_new(InflightWinSize), HealthCheckInterval = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL), - RequestTimeout = maps:get(request_timeout, Opts, ?DEFAULT_REQUEST_TIMEOUT), + RequestTTL = maps:get(request_ttl, Opts, ?DEFAULT_REQUEST_TTL), BatchTime0 = maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME), - BatchTime = adjust_batch_time(Id, RequestTimeout, BatchTime0), - DefaultResumeInterval = default_resume_interval(RequestTimeout, HealthCheckInterval), + BatchTime = adjust_batch_time(Id, RequestTTL, BatchTime0), + DefaultResumeInterval = default_resume_interval(RequestTTL, HealthCheckInterval), ResumeInterval = maps:get(resume_interval, Opts, DefaultResumeInterval), MetricsFlushInterval = maps:get(metrics_flush_interval, Opts, ?DEFAULT_METRICS_FLUSH_INTERVAL), Data0 = #{ @@ -1733,7 +1733,7 @@ now_() -> ensure_timeout_query_opts(#{timeout := _} = Opts, _SyncOrAsync) -> Opts; ensure_timeout_query_opts(#{} = Opts0, sync) -> - Opts0#{timeout => ?DEFAULT_REQUEST_TIMEOUT}; + Opts0#{timeout => ?DEFAULT_REQUEST_TTL}; ensure_timeout_query_opts(#{} = Opts0, async) -> Opts0#{timeout => infinity}. @@ -1760,14 +1760,14 @@ do_minimize(?QUERY(ReplyTo, _Req, Sent, ExpireAt)) -> ?QUERY(ReplyTo, [], Sent, -endif. %% To avoid message loss due to misconfigurations, we adjust -%% `batch_time' based on `request_timeout'. If `batch_time' > -%% `request_timeout', all requests will timeout before being sent if +%% `batch_time' based on `request_ttl'. If `batch_time' > +%% `request_ttl', all requests will timeout before being sent if %% the message rate is low. Even worse if `pool_size' is high. -%% We cap `batch_time' at `request_timeout div 2' as a rule of thumb. -adjust_batch_time(_Id, _RequestTimeout = infinity, BatchTime0) -> +%% We cap `batch_time' at `request_ttl div 2' as a rule of thumb. +adjust_batch_time(_Id, _RequestTTL = infinity, BatchTime0) -> BatchTime0; -adjust_batch_time(Id, RequestTimeout, BatchTime0) -> - BatchTime = max(0, min(BatchTime0, RequestTimeout div 2)), +adjust_batch_time(Id, RequestTTL, BatchTime0) -> + BatchTime = max(0, min(BatchTime0, RequestTTL div 2)), case BatchTime =:= BatchTime0 of false -> ?SLOG(info, #{ @@ -1811,11 +1811,11 @@ replayq_opts(Id, Index, Opts) -> %% timeout is <= resume interval and the buffer worker is ever %% blocked, than all queued requests will basically fail without being %% attempted. --spec default_resume_interval(request_timeout(), health_check_interval()) -> timer:time(). -default_resume_interval(_RequestTimeout = infinity, HealthCheckInterval) -> +-spec default_resume_interval(request_ttl(), health_check_interval()) -> timer:time(). +default_resume_interval(_RequestTTL = infinity, HealthCheckInterval) -> max(1, HealthCheckInterval); -default_resume_interval(RequestTimeout, HealthCheckInterval) -> - max(1, min(HealthCheckInterval, RequestTimeout div 3)). +default_resume_interval(RequestTTL, HealthCheckInterval) -> + max(1, min(HealthCheckInterval, RequestTTL div 3)). -spec reply_call(reference(), term()) -> ok. reply_call(Alias, Response) -> diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index d7c2a4bd3..02dda8021 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -448,11 +448,9 @@ try_read_cache(ResId) -> end. retry_actions(Data) -> - case maps:get(auto_restart_interval, Data#data.opts, ?AUTO_RESTART_INTERVAL) of + case maps:get(health_check_interval, Data#data.opts, ?HEALTHCHECK_INTERVAL) of undefined -> []; - infinity -> - []; RetryInterval -> [{state_timeout, RetryInterval, auto_retry}] end. diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl index 8b2a68c4b..7f9886a5d 100644 --- a/apps/emqx_resource/src/schema/emqx_resource_schema.erl +++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl @@ -26,8 +26,6 @@ %% range interval in ms -define(HEALTH_CHECK_INTERVAL_RANGE_MIN, 1). -define(HEALTH_CHECK_INTERVAL_RANGE_MAX, 3_600_000). --define(AUTO_RESTART_INTERVAL_RANGE_MIN, 1). --define(AUTO_RESTART_INTERVAL_RANGE_MAX, 3_600_000). %% ------------------------------------------------------------------------------------------------- %% Hocon Schema Definitions @@ -55,7 +53,7 @@ fields("creation_opts") -> {start_timeout, fun start_timeout/1}, {auto_restart_interval, fun auto_restart_interval/1}, {query_mode, fun query_mode/1}, - {request_timeout, fun request_timeout/1}, + {request_ttl, fun request_ttl/1}, {inflight_window, fun inflight_window/1}, {enable_batch, fun enable_batch/1}, {batch_size, fun batch_size/1}, @@ -124,39 +122,22 @@ start_timeout(required) -> false; start_timeout(_) -> undefined. auto_restart_interval(type) -> hoconsc:union([infinity, emqx_schema:duration_ms()]); -auto_restart_interval(desc) -> ?DESC("auto_restart_interval"); -auto_restart_interval(default) -> ?AUTO_RESTART_INTERVAL_RAW; +auto_restart_interval(default) -> <<"15s">>; auto_restart_interval(required) -> false; -auto_restart_interval(validator) -> fun auto_restart_interval_range/1; +auto_restart_interval(deprecated) -> {since, "5.1.0"}; auto_restart_interval(_) -> undefined. -auto_restart_interval_range(infinity) -> - ok; -auto_restart_interval_range(AutoRestartInterval) when - is_integer(AutoRestartInterval) andalso - AutoRestartInterval >= ?AUTO_RESTART_INTERVAL_RANGE_MIN andalso - AutoRestartInterval =< ?AUTO_RESTART_INTERVAL_RANGE_MAX --> - ok; -auto_restart_interval_range(AutoRestartInterval) -> - Message = get_out_of_range_msg( - <<"Auto Restart Interval">>, - AutoRestartInterval, - ?AUTO_RESTART_INTERVAL_RANGE_MIN, - ?AUTO_RESTART_INTERVAL_RANGE_MAX - ), - {error, Message}. - query_mode(type) -> enum([sync, async]); query_mode(desc) -> ?DESC("query_mode"); query_mode(default) -> async; query_mode(required) -> false; query_mode(_) -> undefined. -request_timeout(type) -> hoconsc:union([infinity, emqx_schema:duration_ms()]); -request_timeout(desc) -> ?DESC("request_timeout"); -request_timeout(default) -> <<"15s">>; -request_timeout(_) -> undefined. +request_ttl(type) -> hoconsc:union([emqx_schema:duration_ms(), infinity]); +request_ttl(aliases) -> [request_timeout]; +request_ttl(desc) -> ?DESC("request_ttl"); +request_ttl(default) -> ?DEFAULT_REQUEST_TTL_RAW; +request_ttl(_) -> undefined. enable_batch(type) -> boolean(); enable_batch(required) -> false; diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 508b8d96b..5883614aa 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -2823,7 +2823,7 @@ t_volatile_offload_mode(_Config) -> t_late_call_reply(_Config) -> emqx_connector_demo:set_callback_mode(always_sync), - RequestTimeout = 500, + RequestTTL = 500, ?assertMatch( {ok, _}, emqx_resource:create( @@ -2833,7 +2833,7 @@ t_late_call_reply(_Config) -> #{name => test_resource}, #{ buffer_mode => memory_only, - request_timeout => RequestTimeout, + request_ttl => RequestTTL, query_mode => sync } ) @@ -2844,13 +2844,13 @@ t_late_call_reply(_Config) -> %% have been already returned (a timeout), but the resource will %% still send a message with the reply. %% The demo connector will reply with `{error, timeout}' after 1 s. - SleepFor = RequestTimeout + 500, + SleepFor = RequestTTL + 500, ?assertMatch( {error, {resource_error, #{reason := timeout}}}, emqx_resource:query( ?ID, {sync_sleep_before_reply, SleepFor}, - #{timeout => RequestTimeout} + #{timeout => RequestTTL} ) ), %% Our process shouldn't receive any late messages. @@ -2887,7 +2887,7 @@ do_t_resource_activate_alarm_once(ResourceConfig, SubscribeEvent) -> ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, ResourceConfig, - #{auto_restart_interval => 100, health_check_interval => 100} + #{health_check_interval => 100} ), #{?snk_kind := resource_activate_alarm, resource_id := ?ID} ), diff --git a/changes/ce/feat-10910.en.md b/changes/ce/feat-10910.en.md new file mode 100644 index 000000000..40373e1a1 --- /dev/null +++ b/changes/ce/feat-10910.en.md @@ -0,0 +1,3 @@ +The data bridge resource option `auto_restart_interval` was deprecated in favor of `health_check_interval`, and `request_timeout` was renamed to `request_ttl`. Also, the default `request_ttl` value went from 15 seconds to 45 seconds. + +The previous existence of both `auto_restart_interval` and `health_check_interval` was a source of confusion, as both parameters influenced the recovery of data bridges under failures. An inconsistent configuration of those two parameters could lead to messages being expired without a chance to retry. Now, `health_check_interval` is used both to control the periodicity of health checks that may transition the data bridge into `disconnected` or `connecting` states, as well as recovering from `disconnected`. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl index 7914c77e2..4d041135f 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl @@ -53,7 +53,6 @@ values(_Method) -> resource_opts => #{ worker_pool_size => 1, health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, - auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW, batch_size => ?DEFAULT_BATCH_SIZE, batch_time => ?DEFAULT_BATCH_TIME, query_mode => async, diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl index 707aa47ea..4af180a2f 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl @@ -178,7 +178,7 @@ mysql_config(BridgeType, Config) -> " pool_size = ~b\n" " sql = ~p\n" " resource_opts = {\n" - " request_timeout = 500ms\n" + " request_ttl = 500ms\n" " batch_size = ~b\n" " query_mode = ~s\n" " worker_pool_size = ~b\n" diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl index 56f932aba..c9dc402bd 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl @@ -554,7 +554,7 @@ resource_configs(#{query_mode := QueryMode}) -> <<"batch_size">> => integer_to_binary(?BATCH_SIZE), <<"start_timeout">> => <<"15s">>, <<"batch_time">> => <<"4s">>, - <<"request_timeout">> => <<"30s">> + <<"request_ttl">> => <<"30s">> } }. diff --git a/rel/i18n/emqx_bridge_gcp_pubsub.hocon b/rel/i18n/emqx_bridge_gcp_pubsub.hocon index cc255aec3..ca6b855dc 100644 --- a/rel/i18n/emqx_bridge_gcp_pubsub.hocon +++ b/rel/i18n/emqx_bridge_gcp_pubsub.hocon @@ -64,12 +64,6 @@ pubsub_topic.desc: pubsub_topic.label: """GCP PubSub Topic""" -request_timeout.desc: -"""Deprecated: Configure the request timeout in the buffer settings.""" - -request_timeout.label: -"""Request Timeout""" - service_account_json.desc: """JSON containing the GCP Service Account credentials to be used with PubSub. When a GCP Service Account is created (as described in https://developers.google.com/identity/protocols/oauth2/service-account#creatinganaccount), you have the option of downloading the credentials in JSON form. That's the file needed.""" diff --git a/rel/i18n/emqx_resource_schema.hocon b/rel/i18n/emqx_resource_schema.hocon index 8fc781794..cb26e4182 100644 --- a/rel/i18n/emqx_resource_schema.hocon +++ b/rel/i18n/emqx_resource_schema.hocon @@ -1,11 +1,5 @@ emqx_resource_schema { -auto_restart_interval.desc: -"""The auto restart interval after the resource is disconnected.""" - -auto_restart_interval.label: -"""Auto Restart Interval""" - batch_size.desc: """Maximum batch count. If equal to 1, there's effectively no batching.""" @@ -76,11 +70,11 @@ query_mode.desc: query_mode.label: """Query mode""" -request_timeout.desc: +request_ttl.desc: """Starting from the moment when the request enters the buffer, if the request remains in the buffer for the specified time or is sent but does not receive a response or acknowledgement in time, the request is considered expired.""" -request_timeout.label: -"""Request Expiry""" +request_ttl.label: +"""Request TTL""" resource_opts.desc: """Resource options.""" diff --git a/scripts/test/influx/influx-bridge.conf b/scripts/test/influx/influx-bridge.conf index 0574ac38a..08013185a 100644 --- a/scripts/test/influx/influx-bridge.conf +++ b/scripts/test/influx/influx-bridge.conf @@ -7,13 +7,12 @@ bridges { precision = "ms" resource_opts { inflight_window = 100 - auto_restart_interval = "60s" batch_size = 100 batch_time = "10ms" health_check_interval = "15s" max_buffer_bytes = "1GB" query_mode = "sync" - request_timeout = "15s" + request_ttl = "15s" start_after_created = "true" start_timeout = "5s" worker_pool_size = 4