diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 46c822ed0..591dd9e0d 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -216,9 +216,9 @@ send_message(BridgeType, BridgeName, ResId, Message) -> end. query_opts(Config) -> - case emqx_utils_maps:deep_get([resource_opts, request_timeout], Config, false) of + case emqx_utils_maps:deep_get([resource_opts, request_ttl], Config, false) of Timeout when is_integer(Timeout) orelse Timeout =:= infinity -> - %% request_timeout is configured + %% request_ttl is configured #{timeout => Timeout}; _ -> %% emqx_resource has a default value (15s) diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index eee39bd56..d7234a6bf 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -327,8 +327,8 @@ parse_confs( Reason1 = emqx_utils:readable_error_msg(Reason), invalid_data(<<"Invalid URL: ", Url1/binary, ", details: ", Reason1/binary>>) end, - RequestTimeout = emqx_utils_maps:deep_get( - [resource_opts, request_timeout], + RequestTTL = emqx_utils_maps:deep_get( + [resource_opts, request_ttl], Conf ), Conf#{ @@ -339,7 +339,7 @@ parse_confs( method => Method, body => maps:get(body, Conf, undefined), headers => Headers, - request_timeout => RequestTimeout, + request_ttl => RequestTTL, max_retries => Retry } }; diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index bbab0a09a..2f5d65259 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -1300,7 +1300,7 @@ t_metrics(Config) -> ), ok. -%% request_timeout in bridge root should match request_timeout in +%% request_timeout in bridge root should match request_ttl in %% resource_opts. t_inconsistent_webhook_request_timeouts(Config) -> Port = ?config(port, Config), @@ -1311,7 +1311,7 @@ t_inconsistent_webhook_request_timeouts(Config) -> ?HTTP_BRIDGE(URL1, Name), #{ <<"request_timeout">> => <<"1s">>, - <<"resource_opts">> => #{<<"request_timeout">> => <<"2s">>} + <<"resource_opts">> => #{<<"request_ttl">> => <<"2s">>} } ), %% root request_timeout is deprecated for bridge. @@ -1326,8 +1326,8 @@ t_inconsistent_webhook_request_timeouts(Config) -> Config ), ?assertNot(maps:is_key(<<"request_timeout">>, Response)), - ?assertMatch(#{<<"request_timeout">> := <<"2s">>}, ResourceOpts), - validate_resource_request_timeout(proplists:get_value(group, Config), 2000, Name), + ?assertMatch(#{<<"request_ttl">> := <<"2s">>}, ResourceOpts), + validate_resource_request_ttl(proplists:get_value(group, Config), 2000, Name), ok. t_cluster_later_join_metrics(Config) -> @@ -1368,7 +1368,7 @@ t_cluster_later_join_metrics(Config) -> ), ok. -validate_resource_request_timeout(single, Timeout, Name) -> +validate_resource_request_ttl(single, Timeout, Name) -> SentData = #{payload => <<"Hello EMQX">>, timestamp => 1668602148000}, BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), ResId = emqx_bridge_resource:resource_id(<<"webhook">>, Name), @@ -1388,7 +1388,7 @@ validate_resource_request_timeout(single, Timeout, Name) -> ok end ); -validate_resource_request_timeout(_Cluster, _Timeout, _Name) -> +validate_resource_request_ttl(_Cluster, _Timeout, _Name) -> ignore. %% diff --git a/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl b/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl index 7c8ee4f4d..540c18878 100644 --- a/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl +++ b/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl @@ -71,7 +71,7 @@ webhook_config_test() -> } } } = check(Conf3), - ?assertMatch(#{<<"request_timeout">> := infinity}, ResourceOpts), + ?assertMatch(#{<<"request_ttl">> := infinity}, ResourceOpts), ok. up(#{<<"bridges">> := Bridges0} = Conf0) -> diff --git a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl index ac98a08d7..4fc76fc9e 100644 --- a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl @@ -167,7 +167,7 @@ bridge_async_config(#{port := Port} = Config) -> ConnectTimeout = maps:get(connect_timeout, Config, 1), RequestTimeout = maps:get(request_timeout, Config, 10000), ResumeInterval = maps:get(resume_interval, Config, "1s"), - ResourceRequestTimeout = maps:get(resource_request_timeout, Config, "infinity"), + ResourceRequestTTL = maps:get(resource_request_ttl, Config, "infinity"), ConfigString = io_lib:format( "bridges.~s.~s {\n" " url = \"http://localhost:~p\"\n" @@ -185,7 +185,7 @@ bridge_async_config(#{port := Port} = Config) -> " health_check_interval = \"15s\"\n" " max_buffer_bytes = \"1GB\"\n" " query_mode = \"~s\"\n" - " request_timeout = \"~p\"\n" + " request_ttl = \"~p\"\n" " resume_interval = \"~s\"\n" " start_after_created = \"true\"\n" " start_timeout = \"5s\"\n" @@ -203,7 +203,7 @@ bridge_async_config(#{port := Port} = Config) -> PoolSize, RequestTimeout, QueryMode, - ResourceRequestTimeout, + ResourceRequestTTL, ResumeInterval ] ), @@ -246,7 +246,7 @@ t_send_async_connection_timeout(_Config) -> query_mode => "async", connect_timeout => ResponseDelayMS * 2, request_timeout => 10000, - resource_request_timeout => "infinity" + resource_request_ttl => "infinity" }), NumberOfMessagesToSend = 10, [ @@ -268,7 +268,7 @@ t_async_free_retries(_Config) -> query_mode => "sync", connect_timeout => 1_000, request_timeout => 10_000, - resource_request_timeout => "10000s" + resource_request_ttl => "10000s" }), %% Fail 5 times then succeed. Context = #{error_attempts => 5}, @@ -294,7 +294,7 @@ t_async_common_retries(_Config) -> resume_interval => "100ms", connect_timeout => 1_000, request_timeout => 10_000, - resource_request_timeout => "10000s" + resource_request_ttl => "10000s" }), %% Keeps failing until connector gives up. Context = #{error_attempts => infinity}, diff --git a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl index 307ef8cdf..5525a640c 100644 --- a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl +++ b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl @@ -218,7 +218,7 @@ cassa_config(BridgeType, Config) -> " password = ~p\n" " cql = ~p\n" " resource_opts = {\n" - " request_timeout = 500ms\n" + " request_ttl = 500ms\n" " batch_size = ~b\n" " query_mode = ~s\n" " }\n" @@ -635,7 +635,7 @@ t_bad_sql_parameter(Config) -> Config, #{ <<"resource_opts">> => #{ - <<"request_timeout">> => 500, + <<"request_ttl">> => 500, <<"resume_interval">> => 100, <<"health_check_interval">> => 100 } diff --git a/apps/emqx_bridge_dynamo/test/emqx_bridge_dynamo_SUITE.erl b/apps/emqx_bridge_dynamo/test/emqx_bridge_dynamo_SUITE.erl index da87f6047..ac2b59229 100644 --- a/apps/emqx_bridge_dynamo/test/emqx_bridge_dynamo_SUITE.erl +++ b/apps/emqx_bridge_dynamo/test/emqx_bridge_dynamo_SUITE.erl @@ -170,7 +170,7 @@ dynamo_config(BridgeType, Config) -> " aws_access_key_id = ~p\n" " aws_secret_access_key = ~p\n" " resource_opts = {\n" - " request_timeout = 500ms\n" + " request_ttl = 500ms\n" " batch_size = ~b\n" " query_mode = ~s\n" " }\n" diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src index 2b3d359d3..f7401d423 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_gcp_pubsub, [ {description, "EMQX Enterprise GCP Pub/Sub Bridge"}, - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl index be5e56e85..d2f6bbee4 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl @@ -32,7 +32,7 @@ connect_timeout := emqx_schema:duration_ms(), max_retries := non_neg_integer(), pubsub_topic := binary(), - resource_opts := #{request_timeout := emqx_schema:duration_ms(), any() => term()}, + resource_opts := #{request_ttl := infinity | emqx_schema:duration_ms(), any() => term()}, service_account_json := service_account_json(), any() => term() }. @@ -44,7 +44,7 @@ pool_name := binary(), project_id := binary(), pubsub_topic := binary(), - request_timeout := timer:time() + request_ttl := infinity | timer:time() }. -type headers() :: [{binary(), iodata()}]. -type body() :: iodata(). @@ -69,7 +69,7 @@ on_start( payload_template := PayloadTemplate, pool_size := PoolSize, pubsub_topic := PubSubTopic, - resource_opts := #{request_timeout := RequestTimeout} + resource_opts := #{request_ttl := RequestTTL} } = Config ) -> ?SLOG(info, #{ @@ -108,7 +108,7 @@ on_start( pool_name => ResourceId, project_id => ProjectId, pubsub_topic => PubSubTopic, - request_timeout => RequestTimeout + request_ttl => RequestTTL }, ?tp( gcp_pubsub_on_start_before_starting_pool, @@ -344,7 +344,7 @@ do_send_requests_sync(State, Requests, ResourceId) -> #{ pool_name := PoolName, max_retries := MaxRetries, - request_timeout := RequestTimeout + request_ttl := RequestTTL } = State, ?tp( gcp_pubsub_bridge_do_send_requests, @@ -371,7 +371,7 @@ do_send_requests_sync(State, Requests, ResourceId) -> PoolName, Method, Request, - RequestTimeout, + RequestTTL, MaxRetries ) of @@ -467,7 +467,7 @@ do_send_requests_sync(State, Requests, ResourceId) -> do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId) -> #{ pool_name := PoolName, - request_timeout := RequestTimeout + request_ttl := RequestTTL } = State, ?tp( gcp_pubsub_bridge_do_send_requests, @@ -494,7 +494,7 @@ do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId) -> Worker, Method, Request, - RequestTimeout, + RequestTTL, {fun ?MODULE:reply_delegator/3, [ResourceId, ReplyFunAndArgs]} ), {ok, Worker}. diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl index 65b88d45b..b5f22f727 100644 --- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl @@ -287,7 +287,7 @@ gcp_pubsub_config(Config) -> " pool_size = 1\n" " pipelining = ~b\n" " resource_opts = {\n" - " request_timeout = 500ms\n" + " request_ttl = 500ms\n" " metrics_flush_interval = 700ms\n" " worker_pool_size = 1\n" " query_mode = ~s\n" @@ -627,7 +627,7 @@ t_publish_success_infinity_timeout(Config) -> ServiceAccountJSON = ?config(service_account_json, Config), Topic = <<"t/topic">>, {ok, _} = create_bridge(Config, #{ - <<"resource_opts">> => #{<<"request_timeout">> => <<"infinity">>} + <<"resource_opts">> => #{<<"request_ttl">> => <<"infinity">>} }), {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), diff --git a/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl b/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl index 825721052..8421f4e21 100644 --- a/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl +++ b/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl @@ -277,7 +277,7 @@ influxdb_config(apiv1 = Type, InfluxDBHost, InfluxDBPort, Config) -> " precision = ns\n" " write_syntax = \"~s\"\n" " resource_opts = {\n" - " request_timeout = 1s\n" + " request_ttl = 1s\n" " query_mode = ~s\n" " batch_size = ~b\n" " }\n" @@ -314,7 +314,7 @@ influxdb_config(apiv2 = Type, InfluxDBHost, InfluxDBPort, Config) -> " precision = ns\n" " write_syntax = \"~s\"\n" " resource_opts = {\n" - " request_timeout = 1s\n" + " request_ttl = 1s\n" " query_mode = ~s\n" " batch_size = ~b\n" " }\n" diff --git a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl index 9a868c900..2c24466df 100644 --- a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl +++ b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl @@ -102,7 +102,7 @@ bridge_config(TestCase, _TestGroup, Config) -> " pool_size = 1\n" " resource_opts = {\n" " health_check_interval = 5000\n" - " request_timeout = 30000\n" + " request_ttl = 30000\n" " query_mode = \"async\"\n" " worker_pool_size = 1\n" " }\n" diff --git a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl index b44720400..d14e0597e 100644 --- a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl @@ -755,7 +755,7 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> <<"worker_pool_size">> => 2, <<"query_mode">> => <<"sync">>, %% using a long time so we can test recovery - <<"request_timeout">> => <<"15s">>, + <<"request_ttl">> => <<"15s">>, %% to make it check the healthy and reconnect quickly <<"health_check_interval">> => <<"0.5s">> } @@ -863,7 +863,7 @@ t_mqtt_conn_bridge_egress_async_reconnect(_) -> <<"worker_pool_size">> => 2, <<"query_mode">> => <<"async">>, %% using a long time so we can test recovery - <<"request_timeout">> => <<"15s">>, + <<"request_ttl">> => <<"15s">>, %% to make it check the healthy and reconnect quickly <<"health_check_interval">> => <<"0.5s">> } diff --git a/apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl b/apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl index 6f444b93e..3563e0774 100644 --- a/apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl +++ b/apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl @@ -127,7 +127,7 @@ opents_config(BridgeType, Config) -> " enable = true\n" " server = ~p\n" " resource_opts = {\n" - " request_timeout = 500ms\n" + " request_ttl = 500ms\n" " batch_size = ~b\n" " query_mode = sync\n" " }\n" @@ -298,7 +298,7 @@ t_write_timeout(Config) -> Config, #{ <<"resource_opts">> => #{ - <<"request_timeout">> => 500, + <<"request_ttl">> => 500, <<"resume_interval">> => 100, <<"health_check_interval">> => 100 } diff --git a/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl b/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl index 119817aaf..48d784633 100644 --- a/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl +++ b/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl @@ -204,7 +204,7 @@ oracle_config(TestCase, _ConnectionType, Config) -> " sql = \"~s\"\n" " resource_opts = {\n" " health_check_interval = \"5s\"\n" - " request_timeout = \"30s\"\n" + " request_ttl = \"30s\"\n" " query_mode = \"async\"\n" " batch_size = 3\n" " batch_time = \"3s\"\n" diff --git a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl index eca841f11..4e7da85bd 100644 --- a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl +++ b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl @@ -193,7 +193,7 @@ pgsql_config(BridgeType, Config) -> " password = ~p\n" " sql = ~p\n" " resource_opts = {\n" - " request_timeout = 500ms\n" + " request_ttl = 500ms\n" " batch_size = ~b\n" " query_mode = ~s\n" " }\n" diff --git a/apps/emqx_bridge_rocketmq/test/emqx_bridge_rocketmq_SUITE.erl b/apps/emqx_bridge_rocketmq/test/emqx_bridge_rocketmq_SUITE.erl index 90047e577..202c9cc4d 100644 --- a/apps/emqx_bridge_rocketmq/test/emqx_bridge_rocketmq_SUITE.erl +++ b/apps/emqx_bridge_rocketmq/test/emqx_bridge_rocketmq_SUITE.erl @@ -139,7 +139,7 @@ rocketmq_config(BridgeType, Config) -> " servers = ~p\n" " topic = ~p\n" " resource_opts = {\n" - " request_timeout = 1500ms\n" + " request_ttl = 1500ms\n" " batch_size = ~b\n" " query_mode = ~s\n" " }\n" diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl index ed8134051..72962229a 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl @@ -55,8 +55,8 @@ default_port => ?SQLSERVER_DEFAULT_PORT }). --define(REQUEST_TIMEOUT(RESOURCE_OPTS), - maps:get(request_timeout, RESOURCE_OPTS, ?DEFAULT_REQUEST_TIMEOUT) +-define(REQUEST_TTL(RESOURCE_OPTS), + maps:get(request_ttl, RESOURCE_OPTS, ?DEFAULT_REQUEST_TTL) ). -define(BATCH_INSERT_TEMP, batch_insert_temp). @@ -388,7 +388,7 @@ worker_do_insert( ) -> LogMeta = #{connector => ResourceId, state => State}, try - case execute(Conn, SQL, ?REQUEST_TIMEOUT(ResourceOpts)) of + case execute(Conn, SQL, ?REQUEST_TTL(ResourceOpts)) of {selected, Rows, _} -> {ok, Rows}; {updated, _} -> diff --git a/apps/emqx_bridge_sqlserver/test/emqx_bridge_sqlserver_SUITE.erl b/apps/emqx_bridge_sqlserver/test/emqx_bridge_sqlserver_SUITE.erl index fcf20da8f..0e60e9c97 100644 --- a/apps/emqx_bridge_sqlserver/test/emqx_bridge_sqlserver_SUITE.erl +++ b/apps/emqx_bridge_sqlserver/test/emqx_bridge_sqlserver_SUITE.erl @@ -461,7 +461,7 @@ sqlserver_config(BridgeType, Config) -> " sql = ~p\n" " driver = ~p\n" " resource_opts = {\n" - " request_timeout = 500ms\n" + " request_ttl = 500ms\n" " batch_size = ~b\n" " query_mode = ~s\n" " worker_pool_size = ~b\n" diff --git a/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl b/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl index 0b8d20f15..55f8bb69e 100644 --- a/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl +++ b/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl @@ -190,7 +190,7 @@ tdengine_config(BridgeType, Config) -> " password = ~p\n" " sql = ~p\n" " resource_opts = {\n" - " request_timeout = 500ms\n" + " request_ttl = 500ms\n" " batch_size = ~b\n" " query_mode = ~s\n" " }\n" @@ -456,7 +456,7 @@ t_write_timeout(Config) -> Config, #{ <<"resource_opts">> => #{ - <<"request_timeout">> => 500, + <<"request_ttl">> => 500, <<"resume_interval">> => 100, <<"health_check_interval">> => 100 } diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 32d1e94d0..562e18d52 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -86,8 +86,8 @@ -define(DEFAULT_BUFFER_BYTES, 256 * 1024 * 1024). -define(DEFAULT_BUFFER_BYTES_RAW, <<"256MB">>). --define(DEFAULT_REQUEST_TIMEOUT, timer:seconds(45)). --define(DEFAULT_REQUEST_TIMEOUT_RAW, <<"45s">>). +-define(DEFAULT_REQUEST_TTL, timer:seconds(45)). +-define(DEFAULT_REQUEST_TTL_RAW, <<"45s">>). %% count -define(DEFAULT_BATCH_SIZE, 1). diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 167dcc02e..7a0bcaea9 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -76,7 +76,7 @@ -type queue_query() :: ?QUERY(reply_fun(), request(), HasBeenSent :: boolean(), expire_at()). -type request() :: term(). -type request_from() :: undefined | gen_statem:from(). --type request_timeout() :: infinity | timer:time(). +-type request_ttl() :: infinity | timer:time(). -type health_check_interval() :: timer:time(). -type state() :: blocked | running. -type inflight_key() :: integer(). @@ -187,10 +187,10 @@ init({Id, Index, Opts}) -> InflightWinSize = maps:get(inflight_window, Opts, ?DEFAULT_INFLIGHT), InflightTID = inflight_new(InflightWinSize), HealthCheckInterval = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL), - RequestTimeout = maps:get(request_timeout, Opts, ?DEFAULT_REQUEST_TIMEOUT), + RequestTTL = maps:get(request_ttl, Opts, ?DEFAULT_REQUEST_TTL), BatchTime0 = maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME), - BatchTime = adjust_batch_time(Id, RequestTimeout, BatchTime0), - DefaultResumeInterval = default_resume_interval(RequestTimeout, HealthCheckInterval), + BatchTime = adjust_batch_time(Id, RequestTTL, BatchTime0), + DefaultResumeInterval = default_resume_interval(RequestTTL, HealthCheckInterval), ResumeInterval = maps:get(resume_interval, Opts, DefaultResumeInterval), MetricsFlushInterval = maps:get(metrics_flush_interval, Opts, ?DEFAULT_METRICS_FLUSH_INTERVAL), Data0 = #{ @@ -1733,7 +1733,7 @@ now_() -> ensure_timeout_query_opts(#{timeout := _} = Opts, _SyncOrAsync) -> Opts; ensure_timeout_query_opts(#{} = Opts0, sync) -> - Opts0#{timeout => ?DEFAULT_REQUEST_TIMEOUT}; + Opts0#{timeout => ?DEFAULT_REQUEST_TTL}; ensure_timeout_query_opts(#{} = Opts0, async) -> Opts0#{timeout => infinity}. @@ -1760,14 +1760,14 @@ do_minimize(?QUERY(ReplyTo, _Req, Sent, ExpireAt)) -> ?QUERY(ReplyTo, [], Sent, -endif. %% To avoid message loss due to misconfigurations, we adjust -%% `batch_time' based on `request_timeout'. If `batch_time' > -%% `request_timeout', all requests will timeout before being sent if +%% `batch_time' based on `request_ttl'. If `batch_time' > +%% `request_ttl', all requests will timeout before being sent if %% the message rate is low. Even worse if `pool_size' is high. -%% We cap `batch_time' at `request_timeout div 2' as a rule of thumb. -adjust_batch_time(_Id, _RequestTimeout = infinity, BatchTime0) -> +%% We cap `batch_time' at `request_ttl div 2' as a rule of thumb. +adjust_batch_time(_Id, _RequestTTL = infinity, BatchTime0) -> BatchTime0; -adjust_batch_time(Id, RequestTimeout, BatchTime0) -> - BatchTime = max(0, min(BatchTime0, RequestTimeout div 2)), +adjust_batch_time(Id, RequestTTL, BatchTime0) -> + BatchTime = max(0, min(BatchTime0, RequestTTL div 2)), case BatchTime =:= BatchTime0 of false -> ?SLOG(info, #{ @@ -1811,11 +1811,11 @@ replayq_opts(Id, Index, Opts) -> %% timeout is <= resume interval and the buffer worker is ever %% blocked, than all queued requests will basically fail without being %% attempted. --spec default_resume_interval(request_timeout(), health_check_interval()) -> timer:time(). -default_resume_interval(_RequestTimeout = infinity, HealthCheckInterval) -> +-spec default_resume_interval(request_ttl(), health_check_interval()) -> timer:time(). +default_resume_interval(_RequestTTL = infinity, HealthCheckInterval) -> max(1, HealthCheckInterval); -default_resume_interval(RequestTimeout, HealthCheckInterval) -> - max(1, min(HealthCheckInterval, RequestTimeout div 3)). +default_resume_interval(RequestTTL, HealthCheckInterval) -> + max(1, min(HealthCheckInterval, RequestTTL div 3)). -spec reply_call(reference(), term()) -> ok. reply_call(Alias, Response) -> diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl index 713af5bc9..f427fbbd0 100644 --- a/apps/emqx_resource/src/schema/emqx_resource_schema.erl +++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl @@ -53,7 +53,7 @@ fields("creation_opts") -> {start_timeout, fun start_timeout/1}, {auto_restart_interval, fun auto_restart_interval/1}, {query_mode, fun query_mode/1}, - {request_timeout, fun request_timeout/1}, + {request_ttl, fun request_ttl/1}, {inflight_window, fun inflight_window/1}, {enable_batch, fun enable_batch/1}, {batch_size, fun batch_size/1}, @@ -133,10 +133,11 @@ query_mode(default) -> async; query_mode(required) -> false; query_mode(_) -> undefined. -request_timeout(type) -> hoconsc:union([infinity, emqx_schema:duration_ms()]); -request_timeout(desc) -> ?DESC("request_timeout"); -request_timeout(default) -> ?DEFAULT_REQUEST_TIMEOUT_RAW; -request_timeout(_) -> undefined. +request_ttl(type) -> hoconsc:union([infinity, emqx_schema:duration_ms()]); +request_ttl(aliases) -> [request_timeout]; +request_ttl(desc) -> ?DESC("request_ttl"); +request_ttl(default) -> ?DEFAULT_REQUEST_TTL_RAW; +request_ttl(_) -> undefined. enable_batch(type) -> boolean(); enable_batch(required) -> false; diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 5b07523aa..5883614aa 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -2823,7 +2823,7 @@ t_volatile_offload_mode(_Config) -> t_late_call_reply(_Config) -> emqx_connector_demo:set_callback_mode(always_sync), - RequestTimeout = 500, + RequestTTL = 500, ?assertMatch( {ok, _}, emqx_resource:create( @@ -2833,7 +2833,7 @@ t_late_call_reply(_Config) -> #{name => test_resource}, #{ buffer_mode => memory_only, - request_timeout => RequestTimeout, + request_ttl => RequestTTL, query_mode => sync } ) @@ -2844,13 +2844,13 @@ t_late_call_reply(_Config) -> %% have been already returned (a timeout), but the resource will %% still send a message with the reply. %% The demo connector will reply with `{error, timeout}' after 1 s. - SleepFor = RequestTimeout + 500, + SleepFor = RequestTTL + 500, ?assertMatch( {error, {resource_error, #{reason := timeout}}}, emqx_resource:query( ?ID, {sync_sleep_before_reply, SleepFor}, - #{timeout => RequestTimeout} + #{timeout => RequestTTL} ) ), %% Our process shouldn't receive any late messages. diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl index 707aa47ea..4af180a2f 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl @@ -178,7 +178,7 @@ mysql_config(BridgeType, Config) -> " pool_size = ~b\n" " sql = ~p\n" " resource_opts = {\n" - " request_timeout = 500ms\n" + " request_ttl = 500ms\n" " batch_size = ~b\n" " query_mode = ~s\n" " worker_pool_size = ~b\n" diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl index 56f932aba..c9dc402bd 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl @@ -554,7 +554,7 @@ resource_configs(#{query_mode := QueryMode}) -> <<"batch_size">> => integer_to_binary(?BATCH_SIZE), <<"start_timeout">> => <<"15s">>, <<"batch_time">> => <<"4s">>, - <<"request_timeout">> => <<"30s">> + <<"request_ttl">> => <<"30s">> } }. diff --git a/rel/i18n/emqx_bridge_gcp_pubsub.hocon b/rel/i18n/emqx_bridge_gcp_pubsub.hocon index cc255aec3..ca6b855dc 100644 --- a/rel/i18n/emqx_bridge_gcp_pubsub.hocon +++ b/rel/i18n/emqx_bridge_gcp_pubsub.hocon @@ -64,12 +64,6 @@ pubsub_topic.desc: pubsub_topic.label: """GCP PubSub Topic""" -request_timeout.desc: -"""Deprecated: Configure the request timeout in the buffer settings.""" - -request_timeout.label: -"""Request Timeout""" - service_account_json.desc: """JSON containing the GCP Service Account credentials to be used with PubSub. When a GCP Service Account is created (as described in https://developers.google.com/identity/protocols/oauth2/service-account#creatinganaccount), you have the option of downloading the credentials in JSON form. That's the file needed.""" diff --git a/rel/i18n/emqx_resource_schema.hocon b/rel/i18n/emqx_resource_schema.hocon index ccf8e37ac..cb26e4182 100644 --- a/rel/i18n/emqx_resource_schema.hocon +++ b/rel/i18n/emqx_resource_schema.hocon @@ -70,11 +70,11 @@ query_mode.desc: query_mode.label: """Query mode""" -request_timeout.desc: +request_ttl.desc: """Starting from the moment when the request enters the buffer, if the request remains in the buffer for the specified time or is sent but does not receive a response or acknowledgement in time, the request is considered expired.""" -request_timeout.label: -"""Request Expiry""" +request_ttl.label: +"""Request TTL""" resource_opts.desc: """Resource options.""" diff --git a/scripts/test/influx/influx-bridge.conf b/scripts/test/influx/influx-bridge.conf index b14ecc2f1..08013185a 100644 --- a/scripts/test/influx/influx-bridge.conf +++ b/scripts/test/influx/influx-bridge.conf @@ -12,7 +12,7 @@ bridges { health_check_interval = "15s" max_buffer_bytes = "1GB" query_mode = "sync" - request_timeout = "15s" + request_ttl = "15s" start_after_created = "true" start_timeout = "5s" worker_pool_size = 4