refactor(resource): rename `request_timeout` -> `request_ttl`

See
https://emqx.atlassian.net/wiki/spaces/P/pages/612368639/open+e5.1+remove+auto+restart+interval+from+buffer+worker+resource+options
This commit is contained in:
Thales Macedo Garitezi 2023-06-01 10:18:08 -03:00
parent f42ccb6262
commit 99796224d8
29 changed files with 81 additions and 86 deletions

View File

@ -216,9 +216,9 @@ send_message(BridgeType, BridgeName, ResId, Message) ->
end. end.
query_opts(Config) -> query_opts(Config) ->
case emqx_utils_maps:deep_get([resource_opts, request_timeout], Config, false) of case emqx_utils_maps:deep_get([resource_opts, request_ttl], Config, false) of
Timeout when is_integer(Timeout) orelse Timeout =:= infinity -> Timeout when is_integer(Timeout) orelse Timeout =:= infinity ->
%% request_timeout is configured %% request_ttl is configured
#{timeout => Timeout}; #{timeout => Timeout};
_ -> _ ->
%% emqx_resource has a default value (15s) %% emqx_resource has a default value (15s)

View File

@ -327,8 +327,8 @@ parse_confs(
Reason1 = emqx_utils:readable_error_msg(Reason), Reason1 = emqx_utils:readable_error_msg(Reason),
invalid_data(<<"Invalid URL: ", Url1/binary, ", details: ", Reason1/binary>>) invalid_data(<<"Invalid URL: ", Url1/binary, ", details: ", Reason1/binary>>)
end, end,
RequestTimeout = emqx_utils_maps:deep_get( RequestTTL = emqx_utils_maps:deep_get(
[resource_opts, request_timeout], [resource_opts, request_ttl],
Conf Conf
), ),
Conf#{ Conf#{
@ -339,7 +339,7 @@ parse_confs(
method => Method, method => Method,
body => maps:get(body, Conf, undefined), body => maps:get(body, Conf, undefined),
headers => Headers, headers => Headers,
request_timeout => RequestTimeout, request_ttl => RequestTTL,
max_retries => Retry max_retries => Retry
} }
}; };

View File

@ -1300,7 +1300,7 @@ t_metrics(Config) ->
), ),
ok. ok.
%% request_timeout in bridge root should match request_timeout in %% request_timeout in bridge root should match request_ttl in
%% resource_opts. %% resource_opts.
t_inconsistent_webhook_request_timeouts(Config) -> t_inconsistent_webhook_request_timeouts(Config) ->
Port = ?config(port, Config), Port = ?config(port, Config),
@ -1311,7 +1311,7 @@ t_inconsistent_webhook_request_timeouts(Config) ->
?HTTP_BRIDGE(URL1, Name), ?HTTP_BRIDGE(URL1, Name),
#{ #{
<<"request_timeout">> => <<"1s">>, <<"request_timeout">> => <<"1s">>,
<<"resource_opts">> => #{<<"request_timeout">> => <<"2s">>} <<"resource_opts">> => #{<<"request_ttl">> => <<"2s">>}
} }
), ),
%% root request_timeout is deprecated for bridge. %% root request_timeout is deprecated for bridge.
@ -1326,8 +1326,8 @@ t_inconsistent_webhook_request_timeouts(Config) ->
Config Config
), ),
?assertNot(maps:is_key(<<"request_timeout">>, Response)), ?assertNot(maps:is_key(<<"request_timeout">>, Response)),
?assertMatch(#{<<"request_timeout">> := <<"2s">>}, ResourceOpts), ?assertMatch(#{<<"request_ttl">> := <<"2s">>}, ResourceOpts),
validate_resource_request_timeout(proplists:get_value(group, Config), 2000, Name), validate_resource_request_ttl(proplists:get_value(group, Config), 2000, Name),
ok. ok.
t_cluster_later_join_metrics(Config) -> t_cluster_later_join_metrics(Config) ->
@ -1368,7 +1368,7 @@ t_cluster_later_join_metrics(Config) ->
), ),
ok. ok.
validate_resource_request_timeout(single, Timeout, Name) -> validate_resource_request_ttl(single, Timeout, Name) ->
SentData = #{payload => <<"Hello EMQX">>, timestamp => 1668602148000}, SentData = #{payload => <<"Hello EMQX">>, timestamp => 1668602148000},
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
ResId = emqx_bridge_resource:resource_id(<<"webhook">>, Name), ResId = emqx_bridge_resource:resource_id(<<"webhook">>, Name),
@ -1388,7 +1388,7 @@ validate_resource_request_timeout(single, Timeout, Name) ->
ok ok
end end
); );
validate_resource_request_timeout(_Cluster, _Timeout, _Name) -> validate_resource_request_ttl(_Cluster, _Timeout, _Name) ->
ignore. ignore.
%% %%

View File

@ -71,7 +71,7 @@ webhook_config_test() ->
} }
} }
} = check(Conf3), } = check(Conf3),
?assertMatch(#{<<"request_timeout">> := infinity}, ResourceOpts), ?assertMatch(#{<<"request_ttl">> := infinity}, ResourceOpts),
ok. ok.
up(#{<<"bridges">> := Bridges0} = Conf0) -> up(#{<<"bridges">> := Bridges0} = Conf0) ->

View File

@ -167,7 +167,7 @@ bridge_async_config(#{port := Port} = Config) ->
ConnectTimeout = maps:get(connect_timeout, Config, 1), ConnectTimeout = maps:get(connect_timeout, Config, 1),
RequestTimeout = maps:get(request_timeout, Config, 10000), RequestTimeout = maps:get(request_timeout, Config, 10000),
ResumeInterval = maps:get(resume_interval, Config, "1s"), ResumeInterval = maps:get(resume_interval, Config, "1s"),
ResourceRequestTimeout = maps:get(resource_request_timeout, Config, "infinity"), ResourceRequestTTL = maps:get(resource_request_ttl, Config, "infinity"),
ConfigString = io_lib:format( ConfigString = io_lib:format(
"bridges.~s.~s {\n" "bridges.~s.~s {\n"
" url = \"http://localhost:~p\"\n" " url = \"http://localhost:~p\"\n"
@ -185,7 +185,7 @@ bridge_async_config(#{port := Port} = Config) ->
" health_check_interval = \"15s\"\n" " health_check_interval = \"15s\"\n"
" max_buffer_bytes = \"1GB\"\n" " max_buffer_bytes = \"1GB\"\n"
" query_mode = \"~s\"\n" " query_mode = \"~s\"\n"
" request_timeout = \"~p\"\n" " request_ttl = \"~p\"\n"
" resume_interval = \"~s\"\n" " resume_interval = \"~s\"\n"
" start_after_created = \"true\"\n" " start_after_created = \"true\"\n"
" start_timeout = \"5s\"\n" " start_timeout = \"5s\"\n"
@ -203,7 +203,7 @@ bridge_async_config(#{port := Port} = Config) ->
PoolSize, PoolSize,
RequestTimeout, RequestTimeout,
QueryMode, QueryMode,
ResourceRequestTimeout, ResourceRequestTTL,
ResumeInterval ResumeInterval
] ]
), ),
@ -246,7 +246,7 @@ t_send_async_connection_timeout(_Config) ->
query_mode => "async", query_mode => "async",
connect_timeout => ResponseDelayMS * 2, connect_timeout => ResponseDelayMS * 2,
request_timeout => 10000, request_timeout => 10000,
resource_request_timeout => "infinity" resource_request_ttl => "infinity"
}), }),
NumberOfMessagesToSend = 10, NumberOfMessagesToSend = 10,
[ [
@ -268,7 +268,7 @@ t_async_free_retries(_Config) ->
query_mode => "sync", query_mode => "sync",
connect_timeout => 1_000, connect_timeout => 1_000,
request_timeout => 10_000, request_timeout => 10_000,
resource_request_timeout => "10000s" resource_request_ttl => "10000s"
}), }),
%% Fail 5 times then succeed. %% Fail 5 times then succeed.
Context = #{error_attempts => 5}, Context = #{error_attempts => 5},
@ -294,7 +294,7 @@ t_async_common_retries(_Config) ->
resume_interval => "100ms", resume_interval => "100ms",
connect_timeout => 1_000, connect_timeout => 1_000,
request_timeout => 10_000, request_timeout => 10_000,
resource_request_timeout => "10000s" resource_request_ttl => "10000s"
}), }),
%% Keeps failing until connector gives up. %% Keeps failing until connector gives up.
Context = #{error_attempts => infinity}, Context = #{error_attempts => infinity},

View File

@ -218,7 +218,7 @@ cassa_config(BridgeType, Config) ->
" password = ~p\n" " password = ~p\n"
" cql = ~p\n" " cql = ~p\n"
" resource_opts = {\n" " resource_opts = {\n"
" request_timeout = 500ms\n" " request_ttl = 500ms\n"
" batch_size = ~b\n" " batch_size = ~b\n"
" query_mode = ~s\n" " query_mode = ~s\n"
" }\n" " }\n"
@ -635,7 +635,7 @@ t_bad_sql_parameter(Config) ->
Config, Config,
#{ #{
<<"resource_opts">> => #{ <<"resource_opts">> => #{
<<"request_timeout">> => 500, <<"request_ttl">> => 500,
<<"resume_interval">> => 100, <<"resume_interval">> => 100,
<<"health_check_interval">> => 100 <<"health_check_interval">> => 100
} }

View File

@ -170,7 +170,7 @@ dynamo_config(BridgeType, Config) ->
" aws_access_key_id = ~p\n" " aws_access_key_id = ~p\n"
" aws_secret_access_key = ~p\n" " aws_secret_access_key = ~p\n"
" resource_opts = {\n" " resource_opts = {\n"
" request_timeout = 500ms\n" " request_ttl = 500ms\n"
" batch_size = ~b\n" " batch_size = ~b\n"
" query_mode = ~s\n" " query_mode = ~s\n"
" }\n" " }\n"

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_gcp_pubsub, [ {application, emqx_bridge_gcp_pubsub, [
{description, "EMQX Enterprise GCP Pub/Sub Bridge"}, {description, "EMQX Enterprise GCP Pub/Sub Bridge"},
{vsn, "0.1.1"}, {vsn, "0.1.2"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -32,7 +32,7 @@
connect_timeout := emqx_schema:duration_ms(), connect_timeout := emqx_schema:duration_ms(),
max_retries := non_neg_integer(), max_retries := non_neg_integer(),
pubsub_topic := binary(), pubsub_topic := binary(),
resource_opts := #{request_timeout := emqx_schema:duration_ms(), any() => term()}, resource_opts := #{request_ttl := infinity | emqx_schema:duration_ms(), any() => term()},
service_account_json := service_account_json(), service_account_json := service_account_json(),
any() => term() any() => term()
}. }.
@ -44,7 +44,7 @@
pool_name := binary(), pool_name := binary(),
project_id := binary(), project_id := binary(),
pubsub_topic := binary(), pubsub_topic := binary(),
request_timeout := timer:time() request_ttl := infinity | timer:time()
}. }.
-type headers() :: [{binary(), iodata()}]. -type headers() :: [{binary(), iodata()}].
-type body() :: iodata(). -type body() :: iodata().
@ -69,7 +69,7 @@ on_start(
payload_template := PayloadTemplate, payload_template := PayloadTemplate,
pool_size := PoolSize, pool_size := PoolSize,
pubsub_topic := PubSubTopic, pubsub_topic := PubSubTopic,
resource_opts := #{request_timeout := RequestTimeout} resource_opts := #{request_ttl := RequestTTL}
} = Config } = Config
) -> ) ->
?SLOG(info, #{ ?SLOG(info, #{
@ -108,7 +108,7 @@ on_start(
pool_name => ResourceId, pool_name => ResourceId,
project_id => ProjectId, project_id => ProjectId,
pubsub_topic => PubSubTopic, pubsub_topic => PubSubTopic,
request_timeout => RequestTimeout request_ttl => RequestTTL
}, },
?tp( ?tp(
gcp_pubsub_on_start_before_starting_pool, gcp_pubsub_on_start_before_starting_pool,
@ -344,7 +344,7 @@ do_send_requests_sync(State, Requests, ResourceId) ->
#{ #{
pool_name := PoolName, pool_name := PoolName,
max_retries := MaxRetries, max_retries := MaxRetries,
request_timeout := RequestTimeout request_ttl := RequestTTL
} = State, } = State,
?tp( ?tp(
gcp_pubsub_bridge_do_send_requests, gcp_pubsub_bridge_do_send_requests,
@ -371,7 +371,7 @@ do_send_requests_sync(State, Requests, ResourceId) ->
PoolName, PoolName,
Method, Method,
Request, Request,
RequestTimeout, RequestTTL,
MaxRetries MaxRetries
) )
of of
@ -467,7 +467,7 @@ do_send_requests_sync(State, Requests, ResourceId) ->
do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId) -> do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId) ->
#{ #{
pool_name := PoolName, pool_name := PoolName,
request_timeout := RequestTimeout request_ttl := RequestTTL
} = State, } = State,
?tp( ?tp(
gcp_pubsub_bridge_do_send_requests, gcp_pubsub_bridge_do_send_requests,
@ -494,7 +494,7 @@ do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId) ->
Worker, Worker,
Method, Method,
Request, Request,
RequestTimeout, RequestTTL,
{fun ?MODULE:reply_delegator/3, [ResourceId, ReplyFunAndArgs]} {fun ?MODULE:reply_delegator/3, [ResourceId, ReplyFunAndArgs]}
), ),
{ok, Worker}. {ok, Worker}.

View File

@ -287,7 +287,7 @@ gcp_pubsub_config(Config) ->
" pool_size = 1\n" " pool_size = 1\n"
" pipelining = ~b\n" " pipelining = ~b\n"
" resource_opts = {\n" " resource_opts = {\n"
" request_timeout = 500ms\n" " request_ttl = 500ms\n"
" metrics_flush_interval = 700ms\n" " metrics_flush_interval = 700ms\n"
" worker_pool_size = 1\n" " worker_pool_size = 1\n"
" query_mode = ~s\n" " query_mode = ~s\n"
@ -627,7 +627,7 @@ t_publish_success_infinity_timeout(Config) ->
ServiceAccountJSON = ?config(service_account_json, Config), ServiceAccountJSON = ?config(service_account_json, Config),
Topic = <<"t/topic">>, Topic = <<"t/topic">>,
{ok, _} = create_bridge(Config, #{ {ok, _} = create_bridge(Config, #{
<<"resource_opts">> => #{<<"request_timeout">> => <<"infinity">>} <<"resource_opts">> => #{<<"request_ttl">> => <<"infinity">>}
}), }),
{ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),

View File

@ -277,7 +277,7 @@ influxdb_config(apiv1 = Type, InfluxDBHost, InfluxDBPort, Config) ->
" precision = ns\n" " precision = ns\n"
" write_syntax = \"~s\"\n" " write_syntax = \"~s\"\n"
" resource_opts = {\n" " resource_opts = {\n"
" request_timeout = 1s\n" " request_ttl = 1s\n"
" query_mode = ~s\n" " query_mode = ~s\n"
" batch_size = ~b\n" " batch_size = ~b\n"
" }\n" " }\n"
@ -314,7 +314,7 @@ influxdb_config(apiv2 = Type, InfluxDBHost, InfluxDBPort, Config) ->
" precision = ns\n" " precision = ns\n"
" write_syntax = \"~s\"\n" " write_syntax = \"~s\"\n"
" resource_opts = {\n" " resource_opts = {\n"
" request_timeout = 1s\n" " request_ttl = 1s\n"
" query_mode = ~s\n" " query_mode = ~s\n"
" batch_size = ~b\n" " batch_size = ~b\n"
" }\n" " }\n"

View File

@ -102,7 +102,7 @@ bridge_config(TestCase, _TestGroup, Config) ->
" pool_size = 1\n" " pool_size = 1\n"
" resource_opts = {\n" " resource_opts = {\n"
" health_check_interval = 5000\n" " health_check_interval = 5000\n"
" request_timeout = 30000\n" " request_ttl = 30000\n"
" query_mode = \"async\"\n" " query_mode = \"async\"\n"
" worker_pool_size = 1\n" " worker_pool_size = 1\n"
" }\n" " }\n"

View File

@ -755,7 +755,7 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
<<"worker_pool_size">> => 2, <<"worker_pool_size">> => 2,
<<"query_mode">> => <<"sync">>, <<"query_mode">> => <<"sync">>,
%% using a long time so we can test recovery %% using a long time so we can test recovery
<<"request_timeout">> => <<"15s">>, <<"request_ttl">> => <<"15s">>,
%% to make it check the healthy and reconnect quickly %% to make it check the healthy and reconnect quickly
<<"health_check_interval">> => <<"0.5s">> <<"health_check_interval">> => <<"0.5s">>
} }
@ -863,7 +863,7 @@ t_mqtt_conn_bridge_egress_async_reconnect(_) ->
<<"worker_pool_size">> => 2, <<"worker_pool_size">> => 2,
<<"query_mode">> => <<"async">>, <<"query_mode">> => <<"async">>,
%% using a long time so we can test recovery %% using a long time so we can test recovery
<<"request_timeout">> => <<"15s">>, <<"request_ttl">> => <<"15s">>,
%% to make it check the healthy and reconnect quickly %% to make it check the healthy and reconnect quickly
<<"health_check_interval">> => <<"0.5s">> <<"health_check_interval">> => <<"0.5s">>
} }

View File

@ -127,7 +127,7 @@ opents_config(BridgeType, Config) ->
" enable = true\n" " enable = true\n"
" server = ~p\n" " server = ~p\n"
" resource_opts = {\n" " resource_opts = {\n"
" request_timeout = 500ms\n" " request_ttl = 500ms\n"
" batch_size = ~b\n" " batch_size = ~b\n"
" query_mode = sync\n" " query_mode = sync\n"
" }\n" " }\n"
@ -298,7 +298,7 @@ t_write_timeout(Config) ->
Config, Config,
#{ #{
<<"resource_opts">> => #{ <<"resource_opts">> => #{
<<"request_timeout">> => 500, <<"request_ttl">> => 500,
<<"resume_interval">> => 100, <<"resume_interval">> => 100,
<<"health_check_interval">> => 100 <<"health_check_interval">> => 100
} }

View File

@ -204,7 +204,7 @@ oracle_config(TestCase, _ConnectionType, Config) ->
" sql = \"~s\"\n" " sql = \"~s\"\n"
" resource_opts = {\n" " resource_opts = {\n"
" health_check_interval = \"5s\"\n" " health_check_interval = \"5s\"\n"
" request_timeout = \"30s\"\n" " request_ttl = \"30s\"\n"
" query_mode = \"async\"\n" " query_mode = \"async\"\n"
" batch_size = 3\n" " batch_size = 3\n"
" batch_time = \"3s\"\n" " batch_time = \"3s\"\n"

View File

@ -193,7 +193,7 @@ pgsql_config(BridgeType, Config) ->
" password = ~p\n" " password = ~p\n"
" sql = ~p\n" " sql = ~p\n"
" resource_opts = {\n" " resource_opts = {\n"
" request_timeout = 500ms\n" " request_ttl = 500ms\n"
" batch_size = ~b\n" " batch_size = ~b\n"
" query_mode = ~s\n" " query_mode = ~s\n"
" }\n" " }\n"

View File

@ -139,7 +139,7 @@ rocketmq_config(BridgeType, Config) ->
" servers = ~p\n" " servers = ~p\n"
" topic = ~p\n" " topic = ~p\n"
" resource_opts = {\n" " resource_opts = {\n"
" request_timeout = 1500ms\n" " request_ttl = 1500ms\n"
" batch_size = ~b\n" " batch_size = ~b\n"
" query_mode = ~s\n" " query_mode = ~s\n"
" }\n" " }\n"

View File

@ -55,8 +55,8 @@
default_port => ?SQLSERVER_DEFAULT_PORT default_port => ?SQLSERVER_DEFAULT_PORT
}). }).
-define(REQUEST_TIMEOUT(RESOURCE_OPTS), -define(REQUEST_TTL(RESOURCE_OPTS),
maps:get(request_timeout, RESOURCE_OPTS, ?DEFAULT_REQUEST_TIMEOUT) maps:get(request_ttl, RESOURCE_OPTS, ?DEFAULT_REQUEST_TTL)
). ).
-define(BATCH_INSERT_TEMP, batch_insert_temp). -define(BATCH_INSERT_TEMP, batch_insert_temp).
@ -388,7 +388,7 @@ worker_do_insert(
) -> ) ->
LogMeta = #{connector => ResourceId, state => State}, LogMeta = #{connector => ResourceId, state => State},
try try
case execute(Conn, SQL, ?REQUEST_TIMEOUT(ResourceOpts)) of case execute(Conn, SQL, ?REQUEST_TTL(ResourceOpts)) of
{selected, Rows, _} -> {selected, Rows, _} ->
{ok, Rows}; {ok, Rows};
{updated, _} -> {updated, _} ->

View File

@ -461,7 +461,7 @@ sqlserver_config(BridgeType, Config) ->
" sql = ~p\n" " sql = ~p\n"
" driver = ~p\n" " driver = ~p\n"
" resource_opts = {\n" " resource_opts = {\n"
" request_timeout = 500ms\n" " request_ttl = 500ms\n"
" batch_size = ~b\n" " batch_size = ~b\n"
" query_mode = ~s\n" " query_mode = ~s\n"
" worker_pool_size = ~b\n" " worker_pool_size = ~b\n"

View File

@ -190,7 +190,7 @@ tdengine_config(BridgeType, Config) ->
" password = ~p\n" " password = ~p\n"
" sql = ~p\n" " sql = ~p\n"
" resource_opts = {\n" " resource_opts = {\n"
" request_timeout = 500ms\n" " request_ttl = 500ms\n"
" batch_size = ~b\n" " batch_size = ~b\n"
" query_mode = ~s\n" " query_mode = ~s\n"
" }\n" " }\n"
@ -456,7 +456,7 @@ t_write_timeout(Config) ->
Config, Config,
#{ #{
<<"resource_opts">> => #{ <<"resource_opts">> => #{
<<"request_timeout">> => 500, <<"request_ttl">> => 500,
<<"resume_interval">> => 100, <<"resume_interval">> => 100,
<<"health_check_interval">> => 100 <<"health_check_interval">> => 100
} }

View File

@ -86,8 +86,8 @@
-define(DEFAULT_BUFFER_BYTES, 256 * 1024 * 1024). -define(DEFAULT_BUFFER_BYTES, 256 * 1024 * 1024).
-define(DEFAULT_BUFFER_BYTES_RAW, <<"256MB">>). -define(DEFAULT_BUFFER_BYTES_RAW, <<"256MB">>).
-define(DEFAULT_REQUEST_TIMEOUT, timer:seconds(45)). -define(DEFAULT_REQUEST_TTL, timer:seconds(45)).
-define(DEFAULT_REQUEST_TIMEOUT_RAW, <<"45s">>). -define(DEFAULT_REQUEST_TTL_RAW, <<"45s">>).
%% count %% count
-define(DEFAULT_BATCH_SIZE, 1). -define(DEFAULT_BATCH_SIZE, 1).

View File

@ -76,7 +76,7 @@
-type queue_query() :: ?QUERY(reply_fun(), request(), HasBeenSent :: boolean(), expire_at()). -type queue_query() :: ?QUERY(reply_fun(), request(), HasBeenSent :: boolean(), expire_at()).
-type request() :: term(). -type request() :: term().
-type request_from() :: undefined | gen_statem:from(). -type request_from() :: undefined | gen_statem:from().
-type request_timeout() :: infinity | timer:time(). -type request_ttl() :: infinity | timer:time().
-type health_check_interval() :: timer:time(). -type health_check_interval() :: timer:time().
-type state() :: blocked | running. -type state() :: blocked | running.
-type inflight_key() :: integer(). -type inflight_key() :: integer().
@ -187,10 +187,10 @@ init({Id, Index, Opts}) ->
InflightWinSize = maps:get(inflight_window, Opts, ?DEFAULT_INFLIGHT), InflightWinSize = maps:get(inflight_window, Opts, ?DEFAULT_INFLIGHT),
InflightTID = inflight_new(InflightWinSize), InflightTID = inflight_new(InflightWinSize),
HealthCheckInterval = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL), HealthCheckInterval = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL),
RequestTimeout = maps:get(request_timeout, Opts, ?DEFAULT_REQUEST_TIMEOUT), RequestTTL = maps:get(request_ttl, Opts, ?DEFAULT_REQUEST_TTL),
BatchTime0 = maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME), BatchTime0 = maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME),
BatchTime = adjust_batch_time(Id, RequestTimeout, BatchTime0), BatchTime = adjust_batch_time(Id, RequestTTL, BatchTime0),
DefaultResumeInterval = default_resume_interval(RequestTimeout, HealthCheckInterval), DefaultResumeInterval = default_resume_interval(RequestTTL, HealthCheckInterval),
ResumeInterval = maps:get(resume_interval, Opts, DefaultResumeInterval), ResumeInterval = maps:get(resume_interval, Opts, DefaultResumeInterval),
MetricsFlushInterval = maps:get(metrics_flush_interval, Opts, ?DEFAULT_METRICS_FLUSH_INTERVAL), MetricsFlushInterval = maps:get(metrics_flush_interval, Opts, ?DEFAULT_METRICS_FLUSH_INTERVAL),
Data0 = #{ Data0 = #{
@ -1733,7 +1733,7 @@ now_() ->
ensure_timeout_query_opts(#{timeout := _} = Opts, _SyncOrAsync) -> ensure_timeout_query_opts(#{timeout := _} = Opts, _SyncOrAsync) ->
Opts; Opts;
ensure_timeout_query_opts(#{} = Opts0, sync) -> ensure_timeout_query_opts(#{} = Opts0, sync) ->
Opts0#{timeout => ?DEFAULT_REQUEST_TIMEOUT}; Opts0#{timeout => ?DEFAULT_REQUEST_TTL};
ensure_timeout_query_opts(#{} = Opts0, async) -> ensure_timeout_query_opts(#{} = Opts0, async) ->
Opts0#{timeout => infinity}. Opts0#{timeout => infinity}.
@ -1760,14 +1760,14 @@ do_minimize(?QUERY(ReplyTo, _Req, Sent, ExpireAt)) -> ?QUERY(ReplyTo, [], Sent,
-endif. -endif.
%% To avoid message loss due to misconfigurations, we adjust %% To avoid message loss due to misconfigurations, we adjust
%% `batch_time' based on `request_timeout'. If `batch_time' > %% `batch_time' based on `request_ttl'. If `batch_time' >
%% `request_timeout', all requests will timeout before being sent if %% `request_ttl', all requests will timeout before being sent if
%% the message rate is low. Even worse if `pool_size' is high. %% the message rate is low. Even worse if `pool_size' is high.
%% We cap `batch_time' at `request_timeout div 2' as a rule of thumb. %% We cap `batch_time' at `request_ttl div 2' as a rule of thumb.
adjust_batch_time(_Id, _RequestTimeout = infinity, BatchTime0) -> adjust_batch_time(_Id, _RequestTTL = infinity, BatchTime0) ->
BatchTime0; BatchTime0;
adjust_batch_time(Id, RequestTimeout, BatchTime0) -> adjust_batch_time(Id, RequestTTL, BatchTime0) ->
BatchTime = max(0, min(BatchTime0, RequestTimeout div 2)), BatchTime = max(0, min(BatchTime0, RequestTTL div 2)),
case BatchTime =:= BatchTime0 of case BatchTime =:= BatchTime0 of
false -> false ->
?SLOG(info, #{ ?SLOG(info, #{
@ -1811,11 +1811,11 @@ replayq_opts(Id, Index, Opts) ->
%% timeout is <= resume interval and the buffer worker is ever %% timeout is <= resume interval and the buffer worker is ever
%% blocked, than all queued requests will basically fail without being %% blocked, than all queued requests will basically fail without being
%% attempted. %% attempted.
-spec default_resume_interval(request_timeout(), health_check_interval()) -> timer:time(). -spec default_resume_interval(request_ttl(), health_check_interval()) -> timer:time().
default_resume_interval(_RequestTimeout = infinity, HealthCheckInterval) -> default_resume_interval(_RequestTTL = infinity, HealthCheckInterval) ->
max(1, HealthCheckInterval); max(1, HealthCheckInterval);
default_resume_interval(RequestTimeout, HealthCheckInterval) -> default_resume_interval(RequestTTL, HealthCheckInterval) ->
max(1, min(HealthCheckInterval, RequestTimeout div 3)). max(1, min(HealthCheckInterval, RequestTTL div 3)).
-spec reply_call(reference(), term()) -> ok. -spec reply_call(reference(), term()) -> ok.
reply_call(Alias, Response) -> reply_call(Alias, Response) ->

View File

@ -53,7 +53,7 @@ fields("creation_opts") ->
{start_timeout, fun start_timeout/1}, {start_timeout, fun start_timeout/1},
{auto_restart_interval, fun auto_restart_interval/1}, {auto_restart_interval, fun auto_restart_interval/1},
{query_mode, fun query_mode/1}, {query_mode, fun query_mode/1},
{request_timeout, fun request_timeout/1}, {request_ttl, fun request_ttl/1},
{inflight_window, fun inflight_window/1}, {inflight_window, fun inflight_window/1},
{enable_batch, fun enable_batch/1}, {enable_batch, fun enable_batch/1},
{batch_size, fun batch_size/1}, {batch_size, fun batch_size/1},
@ -133,10 +133,11 @@ query_mode(default) -> async;
query_mode(required) -> false; query_mode(required) -> false;
query_mode(_) -> undefined. query_mode(_) -> undefined.
request_timeout(type) -> hoconsc:union([infinity, emqx_schema:duration_ms()]); request_ttl(type) -> hoconsc:union([infinity, emqx_schema:duration_ms()]);
request_timeout(desc) -> ?DESC("request_timeout"); request_ttl(aliases) -> [request_timeout];
request_timeout(default) -> ?DEFAULT_REQUEST_TIMEOUT_RAW; request_ttl(desc) -> ?DESC("request_ttl");
request_timeout(_) -> undefined. request_ttl(default) -> ?DEFAULT_REQUEST_TTL_RAW;
request_ttl(_) -> undefined.
enable_batch(type) -> boolean(); enable_batch(type) -> boolean();
enable_batch(required) -> false; enable_batch(required) -> false;

View File

@ -2823,7 +2823,7 @@ t_volatile_offload_mode(_Config) ->
t_late_call_reply(_Config) -> t_late_call_reply(_Config) ->
emqx_connector_demo:set_callback_mode(always_sync), emqx_connector_demo:set_callback_mode(always_sync),
RequestTimeout = 500, RequestTTL = 500,
?assertMatch( ?assertMatch(
{ok, _}, {ok, _},
emqx_resource:create( emqx_resource:create(
@ -2833,7 +2833,7 @@ t_late_call_reply(_Config) ->
#{name => test_resource}, #{name => test_resource},
#{ #{
buffer_mode => memory_only, buffer_mode => memory_only,
request_timeout => RequestTimeout, request_ttl => RequestTTL,
query_mode => sync query_mode => sync
} }
) )
@ -2844,13 +2844,13 @@ t_late_call_reply(_Config) ->
%% have been already returned (a timeout), but the resource will %% have been already returned (a timeout), but the resource will
%% still send a message with the reply. %% still send a message with the reply.
%% The demo connector will reply with `{error, timeout}' after 1 s. %% The demo connector will reply with `{error, timeout}' after 1 s.
SleepFor = RequestTimeout + 500, SleepFor = RequestTTL + 500,
?assertMatch( ?assertMatch(
{error, {resource_error, #{reason := timeout}}}, {error, {resource_error, #{reason := timeout}}},
emqx_resource:query( emqx_resource:query(
?ID, ?ID,
{sync_sleep_before_reply, SleepFor}, {sync_sleep_before_reply, SleepFor},
#{timeout => RequestTimeout} #{timeout => RequestTTL}
) )
), ),
%% Our process shouldn't receive any late messages. %% Our process shouldn't receive any late messages.

View File

@ -178,7 +178,7 @@ mysql_config(BridgeType, Config) ->
" pool_size = ~b\n" " pool_size = ~b\n"
" sql = ~p\n" " sql = ~p\n"
" resource_opts = {\n" " resource_opts = {\n"
" request_timeout = 500ms\n" " request_ttl = 500ms\n"
" batch_size = ~b\n" " batch_size = ~b\n"
" query_mode = ~s\n" " query_mode = ~s\n"
" worker_pool_size = ~b\n" " worker_pool_size = ~b\n"

View File

@ -554,7 +554,7 @@ resource_configs(#{query_mode := QueryMode}) ->
<<"batch_size">> => integer_to_binary(?BATCH_SIZE), <<"batch_size">> => integer_to_binary(?BATCH_SIZE),
<<"start_timeout">> => <<"15s">>, <<"start_timeout">> => <<"15s">>,
<<"batch_time">> => <<"4s">>, <<"batch_time">> => <<"4s">>,
<<"request_timeout">> => <<"30s">> <<"request_ttl">> => <<"30s">>
} }
}. }.

View File

@ -64,12 +64,6 @@ pubsub_topic.desc:
pubsub_topic.label: pubsub_topic.label:
"""GCP PubSub Topic""" """GCP PubSub Topic"""
request_timeout.desc:
"""Deprecated: Configure the request timeout in the buffer settings."""
request_timeout.label:
"""Request Timeout"""
service_account_json.desc: service_account_json.desc:
"""JSON containing the GCP Service Account credentials to be used with PubSub. """JSON containing the GCP Service Account credentials to be used with PubSub.
When a GCP Service Account is created (as described in https://developers.google.com/identity/protocols/oauth2/service-account#creatinganaccount), you have the option of downloading the credentials in JSON form. That's the file needed.""" When a GCP Service Account is created (as described in https://developers.google.com/identity/protocols/oauth2/service-account#creatinganaccount), you have the option of downloading the credentials in JSON form. That's the file needed."""

View File

@ -70,11 +70,11 @@ query_mode.desc:
query_mode.label: query_mode.label:
"""Query mode""" """Query mode"""
request_timeout.desc: request_ttl.desc:
"""Starting from the moment when the request enters the buffer, if the request remains in the buffer for the specified time or is sent but does not receive a response or acknowledgement in time, the request is considered expired.""" """Starting from the moment when the request enters the buffer, if the request remains in the buffer for the specified time or is sent but does not receive a response or acknowledgement in time, the request is considered expired."""
request_timeout.label: request_ttl.label:
"""Request Expiry""" """Request TTL"""
resource_opts.desc: resource_opts.desc:
"""Resource options.""" """Resource options."""

View File

@ -12,7 +12,7 @@ bridges {
health_check_interval = "15s" health_check_interval = "15s"
max_buffer_bytes = "1GB" max_buffer_bytes = "1GB"
query_mode = "sync" query_mode = "sync"
request_timeout = "15s" request_ttl = "15s"
start_after_created = "true" start_after_created = "true"
start_timeout = "5s" start_timeout = "5s"
worker_pool_size = 4 worker_pool_size = 4