Merge pull request #10910 from thalesmg/unify-restart-interval-v50

feat(resource): deprecate `auto_restart_interval` in favor of `health_check_interval`
This commit is contained in:
Thales Macedo Garitezi 2023-06-02 16:20:36 -03:00 committed by GitHub
commit 33aa879ad4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
49 changed files with 101 additions and 245 deletions

View File

@ -221,9 +221,9 @@ send_message(BridgeType, BridgeName, ResId, Message) ->
end. end.
query_opts(Config) -> query_opts(Config) ->
case emqx_utils_maps:deep_get([resource_opts, request_timeout], Config, false) of case emqx_utils_maps:deep_get([resource_opts, request_ttl], Config, false) of
Timeout when is_integer(Timeout) orelse Timeout =:= infinity -> Timeout when is_integer(Timeout) orelse Timeout =:= infinity ->
%% request_timeout is configured %% request_ttl is configured
#{timeout => Timeout}; #{timeout => Timeout};
_ -> _ ->
%% emqx_resource has a default value (15s) %% emqx_resource has a default value (15s)

View File

@ -218,7 +218,6 @@ info_example_basic(webhook) ->
resource_opts => #{ resource_opts => #{
worker_pool_size => 1, worker_pool_size => 1,
health_check_interval => 15000, health_check_interval => 15000,
auto_restart_interval => 15000,
query_mode => async, query_mode => async,
inflight_window => 100, inflight_window => 100,
max_buffer_bytes => 100 * 1024 * 1024 max_buffer_bytes => 100 * 1024 * 1024
@ -244,7 +243,6 @@ mqtt_main_example() ->
max_inflight => 100, max_inflight => 100,
resource_opts => #{ resource_opts => #{
health_check_interval => <<"15s">>, health_check_interval => <<"15s">>,
auto_restart_interval => <<"60s">>,
query_mode => sync, query_mode => sync,
max_buffer_bytes => 100 * 1024 * 1024 max_buffer_bytes => 100 * 1024 * 1024
}, },

View File

@ -327,8 +327,8 @@ parse_confs(
Reason1 = emqx_utils:readable_error_msg(Reason), Reason1 = emqx_utils:readable_error_msg(Reason),
invalid_data(<<"Invalid URL: ", Url1/binary, ", details: ", Reason1/binary>>) invalid_data(<<"Invalid URL: ", Url1/binary, ", details: ", Reason1/binary>>)
end, end,
RequestTimeout = emqx_utils_maps:deep_get( RequestTTL = emqx_utils_maps:deep_get(
[resource_opts, request_timeout], [resource_opts, request_ttl],
Conf Conf
), ),
Conf#{ Conf#{
@ -339,7 +339,7 @@ parse_confs(
method => Method, method => Method,
body => maps:get(body, Conf, undefined), body => maps:get(body, Conf, undefined),
headers => Headers, headers => Headers,
request_timeout => RequestTimeout, request_ttl => RequestTTL,
max_retries => Retry max_retries => Retry
} }
}; };

View File

@ -87,7 +87,6 @@ default_ssl() ->
default_resource_opts() -> default_resource_opts() ->
#{ #{
<<"inflight_window">> => 100, <<"inflight_window">> => 100,
<<"auto_restart_interval">> => <<"60s">>,
<<"health_check_interval">> => <<"15s">>, <<"health_check_interval">> => <<"15s">>,
<<"max_buffer_bytes">> => <<"1GB">>, <<"max_buffer_bytes">> => <<"1GB">>,
<<"query_mode">> => <<"sync">>, <<"query_mode">> => <<"sync">>,

View File

@ -86,8 +86,7 @@ groups() ->
SingleOnlyTests = [ SingleOnlyTests = [
t_broken_bpapi_vsn, t_broken_bpapi_vsn,
t_old_bpapi_vsn, t_old_bpapi_vsn,
t_bridges_probe, t_bridges_probe
t_auto_restart_interval
], ],
ClusterLaterJoinOnlyTCs = [t_cluster_later_join_metrics], ClusterLaterJoinOnlyTCs = [t_cluster_later_join_metrics],
[ [
@ -559,89 +558,6 @@ t_http_crud_apis(Config) ->
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config). {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config).
t_auto_restart_interval(Config) ->
Port = ?config(port, Config),
%% assert we there's no bridges at first
{ok, 200, []} = request_json(get, uri(["bridges"]), Config),
meck:new(emqx_resource, [passthrough]),
meck:expect(emqx_resource, call_start, fun(_, _, _) -> {error, fake_error} end),
%% then we add a webhook bridge, using POST
%% POST /bridges/ will create a bridge
URL1 = ?URL(Port, "path1"),
Name = ?BRIDGE_NAME,
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
BridgeParams = ?HTTP_BRIDGE(URL1, Name)#{
<<"resource_opts">> => #{<<"auto_restart_interval">> => "1s"}
},
?check_trace(
begin
?assertMatch(
{ok, 201, #{
<<"type">> := ?BRIDGE_TYPE_HTTP,
<<"name">> := Name,
<<"enable">> := true,
<<"status">> := _,
<<"node_status">> := [_ | _],
<<"url">> := URL1
}},
request_json(
post,
uri(["bridges"]),
BridgeParams,
Config
)
),
{ok, _} = ?block_until(#{?snk_kind := resource_disconnected_enter}),
{ok, _} = ?block_until(#{?snk_kind := resource_auto_reconnect}, 1500)
end,
fun(Trace0) ->
Trace = ?of_kind(resource_auto_reconnect, Trace0),
?assertMatch([#{}], Trace),
ok
end
),
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config),
{ok, 200, []} = request_json(get, uri(["bridges"]), Config),
%% auto_retry_interval=infinity
BridgeParams1 = BridgeParams#{
<<"resource_opts">> => #{<<"auto_restart_interval">> => "infinity"}
},
?check_trace(
begin
?assertMatch(
{ok, 201, #{
<<"type">> := ?BRIDGE_TYPE_HTTP,
<<"name">> := Name,
<<"enable">> := true,
<<"status">> := _,
<<"node_status">> := [_ | _],
<<"url">> := URL1
}},
request_json(
post,
uri(["bridges"]),
BridgeParams1,
Config
)
),
{ok, _} = ?block_until(#{?snk_kind := resource_disconnected_enter}),
?assertEqual(timeout, ?block_until(#{?snk_kind := resource_auto_reconnect}, 1500))
end,
fun(Trace0) ->
Trace = ?of_kind(resource_auto_reconnect, Trace0),
?assertMatch([], Trace),
ok
end
),
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config),
{ok, 200, []} = request_json(get, uri(["bridges"]), Config),
meck:unload(emqx_resource).
t_http_bridges_local_topic(Config) -> t_http_bridges_local_topic(Config) ->
Port = ?config(port, Config), Port = ?config(port, Config),
%% assert we there's no bridges at first %% assert we there's no bridges at first
@ -1384,7 +1300,7 @@ t_metrics(Config) ->
), ),
ok. ok.
%% request_timeout in bridge root should match request_timeout in %% request_timeout in bridge root should match request_ttl in
%% resource_opts. %% resource_opts.
t_inconsistent_webhook_request_timeouts(Config) -> t_inconsistent_webhook_request_timeouts(Config) ->
Port = ?config(port, Config), Port = ?config(port, Config),
@ -1395,7 +1311,7 @@ t_inconsistent_webhook_request_timeouts(Config) ->
?HTTP_BRIDGE(URL1, Name), ?HTTP_BRIDGE(URL1, Name),
#{ #{
<<"request_timeout">> => <<"1s">>, <<"request_timeout">> => <<"1s">>,
<<"resource_opts">> => #{<<"request_timeout">> => <<"2s">>} <<"resource_opts">> => #{<<"request_ttl">> => <<"2s">>}
} }
), ),
%% root request_timeout is deprecated for bridge. %% root request_timeout is deprecated for bridge.
@ -1410,8 +1326,8 @@ t_inconsistent_webhook_request_timeouts(Config) ->
Config Config
), ),
?assertNot(maps:is_key(<<"request_timeout">>, Response)), ?assertNot(maps:is_key(<<"request_timeout">>, Response)),
?assertMatch(#{<<"request_timeout">> := <<"2s">>}, ResourceOpts), ?assertMatch(#{<<"request_ttl">> := <<"2s">>}, ResourceOpts),
validate_resource_request_timeout(proplists:get_value(group, Config), 2000, Name), validate_resource_request_ttl(proplists:get_value(group, Config), 2000, Name),
ok. ok.
t_cluster_later_join_metrics(Config) -> t_cluster_later_join_metrics(Config) ->
@ -1452,7 +1368,7 @@ t_cluster_later_join_metrics(Config) ->
), ),
ok. ok.
validate_resource_request_timeout(single, Timeout, Name) -> validate_resource_request_ttl(single, Timeout, Name) ->
SentData = #{payload => <<"Hello EMQX">>, timestamp => 1668602148000}, SentData = #{payload => <<"Hello EMQX">>, timestamp => 1668602148000},
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
ResId = emqx_bridge_resource:resource_id(<<"webhook">>, Name), ResId = emqx_bridge_resource:resource_id(<<"webhook">>, Name),
@ -1472,7 +1388,7 @@ validate_resource_request_timeout(single, Timeout, Name) ->
ok ok
end end
); );
validate_resource_request_timeout(_Cluster, _Timeout, _Name) -> validate_resource_request_ttl(_Cluster, _Timeout, _Name) ->
ignore. ignore.
%% %%

View File

@ -71,7 +71,7 @@ webhook_config_test() ->
} }
} }
} = check(Conf3), } = check(Conf3),
?assertMatch(#{<<"request_timeout">> := infinity}, ResourceOpts), ?assertMatch(#{<<"request_ttl">> := infinity}, ResourceOpts),
ok. ok.
up(#{<<"bridges">> := Bridges0} = Conf0) -> up(#{<<"bridges">> := Bridges0} = Conf0) ->

View File

@ -167,7 +167,7 @@ bridge_async_config(#{port := Port} = Config) ->
ConnectTimeout = maps:get(connect_timeout, Config, 1), ConnectTimeout = maps:get(connect_timeout, Config, 1),
RequestTimeout = maps:get(request_timeout, Config, 10000), RequestTimeout = maps:get(request_timeout, Config, 10000),
ResumeInterval = maps:get(resume_interval, Config, "1s"), ResumeInterval = maps:get(resume_interval, Config, "1s"),
ResourceRequestTimeout = maps:get(resource_request_timeout, Config, "infinity"), ResourceRequestTTL = maps:get(resource_request_ttl, Config, "infinity"),
ConfigString = io_lib:format( ConfigString = io_lib:format(
"bridges.~s.~s {\n" "bridges.~s.~s {\n"
" url = \"http://localhost:~p\"\n" " url = \"http://localhost:~p\"\n"
@ -182,11 +182,10 @@ bridge_async_config(#{port := Port} = Config) ->
" body = \"${id}\"" " body = \"${id}\""
" resource_opts {\n" " resource_opts {\n"
" inflight_window = 100\n" " inflight_window = 100\n"
" auto_restart_interval = \"60s\"\n"
" health_check_interval = \"15s\"\n" " health_check_interval = \"15s\"\n"
" max_buffer_bytes = \"1GB\"\n" " max_buffer_bytes = \"1GB\"\n"
" query_mode = \"~s\"\n" " query_mode = \"~s\"\n"
" request_timeout = \"~p\"\n" " request_ttl = \"~p\"\n"
" resume_interval = \"~s\"\n" " resume_interval = \"~s\"\n"
" start_after_created = \"true\"\n" " start_after_created = \"true\"\n"
" start_timeout = \"5s\"\n" " start_timeout = \"5s\"\n"
@ -204,7 +203,7 @@ bridge_async_config(#{port := Port} = Config) ->
PoolSize, PoolSize,
RequestTimeout, RequestTimeout,
QueryMode, QueryMode,
ResourceRequestTimeout, ResourceRequestTTL,
ResumeInterval ResumeInterval
] ]
), ),
@ -247,7 +246,7 @@ t_send_async_connection_timeout(_Config) ->
query_mode => "async", query_mode => "async",
connect_timeout => ResponseDelayMS * 2, connect_timeout => ResponseDelayMS * 2,
request_timeout => 10000, request_timeout => 10000,
resource_request_timeout => "infinity" resource_request_ttl => "infinity"
}), }),
NumberOfMessagesToSend = 10, NumberOfMessagesToSend = 10,
[ [
@ -269,7 +268,7 @@ t_async_free_retries(_Config) ->
query_mode => "sync", query_mode => "sync",
connect_timeout => 1_000, connect_timeout => 1_000,
request_timeout => 10_000, request_timeout => 10_000,
resource_request_timeout => "10000s" resource_request_ttl => "10000s"
}), }),
%% Fail 5 times then succeed. %% Fail 5 times then succeed.
Context = #{error_attempts => 5}, Context = #{error_attempts => 5},
@ -295,7 +294,7 @@ t_async_common_retries(_Config) ->
resume_interval => "100ms", resume_interval => "100ms",
connect_timeout => 1_000, connect_timeout => 1_000,
request_timeout => 10_000, request_timeout => 10_000,
resource_request_timeout => "10000s" resource_request_ttl => "10000s"
}), }),
%% Keeps failing until connector gives up. %% Keeps failing until connector gives up.
Context = #{error_attempts => infinity}, Context = #{error_attempts => infinity},

View File

@ -59,7 +59,6 @@ values(_Method, Type) ->
resource_opts => #{ resource_opts => #{
worker_pool_size => 8, worker_pool_size => 8,
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
batch_size => ?DEFAULT_BATCH_SIZE, batch_size => ?DEFAULT_BATCH_SIZE,
batch_time => ?DEFAULT_BATCH_TIME, batch_time => ?DEFAULT_BATCH_TIME,
query_mode => sync, query_mode => sync,

View File

@ -218,7 +218,7 @@ cassa_config(BridgeType, Config) ->
" password = ~p\n" " password = ~p\n"
" cql = ~p\n" " cql = ~p\n"
" resource_opts = {\n" " resource_opts = {\n"
" request_timeout = 500ms\n" " request_ttl = 500ms\n"
" batch_size = ~b\n" " batch_size = ~b\n"
" query_mode = ~s\n" " query_mode = ~s\n"
" }\n" " }\n"
@ -511,7 +511,6 @@ t_write_failure(Config) ->
#{ #{
<<"resource_opts">> => <<"resource_opts">> =>
#{ #{
<<"auto_restart_interval">> => <<"100ms">>,
<<"resume_interval">> => <<"100ms">>, <<"resume_interval">> => <<"100ms">>,
<<"health_check_interval">> => <<"100ms">> <<"health_check_interval">> => <<"100ms">>
} }
@ -636,7 +635,7 @@ t_bad_sql_parameter(Config) ->
Config, Config,
#{ #{
<<"resource_opts">> => #{ <<"resource_opts">> => #{
<<"request_timeout">> => 500, <<"request_ttl">> => 500,
<<"resume_interval">> => 100, <<"resume_interval">> => 100,
<<"health_check_interval">> => 100 <<"health_check_interval">> => 100
} }

View File

@ -56,7 +56,6 @@ values(_Method, Type) ->
resource_opts => #{ resource_opts => #{
worker_pool_size => 8, worker_pool_size => 8,
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
batch_size => ?DEFAULT_BATCH_SIZE, batch_size => ?DEFAULT_BATCH_SIZE,
batch_time => ?DEFAULT_BATCH_TIME, batch_time => ?DEFAULT_BATCH_TIME,
query_mode => async, query_mode => async,

View File

@ -52,7 +52,6 @@ values(_Method) ->
resource_opts => #{ resource_opts => #{
worker_pool_size => 8, worker_pool_size => 8,
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
batch_size => ?DEFAULT_BATCH_SIZE, batch_size => ?DEFAULT_BATCH_SIZE,
batch_time => ?DEFAULT_BATCH_TIME, batch_time => ?DEFAULT_BATCH_TIME,
query_mode => sync, query_mode => sync,

View File

@ -170,7 +170,7 @@ dynamo_config(BridgeType, Config) ->
" aws_access_key_id = ~p\n" " aws_access_key_id = ~p\n"
" aws_secret_access_key = ~p\n" " aws_secret_access_key = ~p\n"
" resource_opts = {\n" " resource_opts = {\n"
" request_timeout = 500ms\n" " request_ttl = 500ms\n"
" batch_size = ~b\n" " batch_size = ~b\n"
" query_mode = ~s\n" " query_mode = ~s\n"
" }\n" " }\n"

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_gcp_pubsub, [ {application, emqx_bridge_gcp_pubsub, [
{description, "EMQX Enterprise GCP Pub/Sub Bridge"}, {description, "EMQX Enterprise GCP Pub/Sub Bridge"},
{vsn, "0.1.1"}, {vsn, "0.1.2"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -32,7 +32,7 @@
connect_timeout := emqx_schema:duration_ms(), connect_timeout := emqx_schema:duration_ms(),
max_retries := non_neg_integer(), max_retries := non_neg_integer(),
pubsub_topic := binary(), pubsub_topic := binary(),
resource_opts := #{request_timeout := emqx_schema:duration_ms(), any() => term()}, resource_opts := #{request_ttl := infinity | emqx_schema:duration_ms(), any() => term()},
service_account_json := service_account_json(), service_account_json := service_account_json(),
any() => term() any() => term()
}. }.
@ -44,7 +44,7 @@
pool_name := binary(), pool_name := binary(),
project_id := binary(), project_id := binary(),
pubsub_topic := binary(), pubsub_topic := binary(),
request_timeout := timer:time() request_ttl := infinity | timer:time()
}. }.
-type headers() :: [{binary(), iodata()}]. -type headers() :: [{binary(), iodata()}].
-type body() :: iodata(). -type body() :: iodata().
@ -69,7 +69,7 @@ on_start(
payload_template := PayloadTemplate, payload_template := PayloadTemplate,
pool_size := PoolSize, pool_size := PoolSize,
pubsub_topic := PubSubTopic, pubsub_topic := PubSubTopic,
resource_opts := #{request_timeout := RequestTimeout} resource_opts := #{request_ttl := RequestTTL}
} = Config } = Config
) -> ) ->
?SLOG(info, #{ ?SLOG(info, #{
@ -108,7 +108,7 @@ on_start(
pool_name => ResourceId, pool_name => ResourceId,
project_id => ProjectId, project_id => ProjectId,
pubsub_topic => PubSubTopic, pubsub_topic => PubSubTopic,
request_timeout => RequestTimeout request_ttl => RequestTTL
}, },
?tp( ?tp(
gcp_pubsub_on_start_before_starting_pool, gcp_pubsub_on_start_before_starting_pool,
@ -344,7 +344,7 @@ do_send_requests_sync(State, Requests, ResourceId) ->
#{ #{
pool_name := PoolName, pool_name := PoolName,
max_retries := MaxRetries, max_retries := MaxRetries,
request_timeout := RequestTimeout request_ttl := RequestTTL
} = State, } = State,
?tp( ?tp(
gcp_pubsub_bridge_do_send_requests, gcp_pubsub_bridge_do_send_requests,
@ -371,7 +371,7 @@ do_send_requests_sync(State, Requests, ResourceId) ->
PoolName, PoolName,
Method, Method,
Request, Request,
RequestTimeout, RequestTTL,
MaxRetries MaxRetries
) )
of of
@ -467,7 +467,7 @@ do_send_requests_sync(State, Requests, ResourceId) ->
do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId) -> do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId) ->
#{ #{
pool_name := PoolName, pool_name := PoolName,
request_timeout := RequestTimeout request_ttl := RequestTTL
} = State, } = State,
?tp( ?tp(
gcp_pubsub_bridge_do_send_requests, gcp_pubsub_bridge_do_send_requests,
@ -494,7 +494,7 @@ do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId) ->
Worker, Worker,
Method, Method,
Request, Request,
RequestTimeout, RequestTTL,
{fun ?MODULE:reply_delegator/3, [ResourceId, ReplyFunAndArgs]} {fun ?MODULE:reply_delegator/3, [ResourceId, ReplyFunAndArgs]}
), ),
{ok, Worker}. {ok, Worker}.

View File

@ -287,7 +287,7 @@ gcp_pubsub_config(Config) ->
" pool_size = 1\n" " pool_size = 1\n"
" pipelining = ~b\n" " pipelining = ~b\n"
" resource_opts = {\n" " resource_opts = {\n"
" request_timeout = 500ms\n" " request_ttl = 500ms\n"
" metrics_flush_interval = 700ms\n" " metrics_flush_interval = 700ms\n"
" worker_pool_size = 1\n" " worker_pool_size = 1\n"
" query_mode = ~s\n" " query_mode = ~s\n"
@ -627,7 +627,7 @@ t_publish_success_infinity_timeout(Config) ->
ServiceAccountJSON = ?config(service_account_json, Config), ServiceAccountJSON = ?config(service_account_json, Config),
Topic = <<"t/topic">>, Topic = <<"t/topic">>,
{ok, _} = create_bridge(Config, #{ {ok, _} = create_bridge(Config, #{
<<"resource_opts">> => #{<<"request_timeout">> => <<"infinity">>} <<"resource_opts">> => #{<<"request_ttl">> => <<"infinity">>}
}), }),
{ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),

View File

@ -277,7 +277,7 @@ influxdb_config(apiv1 = Type, InfluxDBHost, InfluxDBPort, Config) ->
" precision = ns\n" " precision = ns\n"
" write_syntax = \"~s\"\n" " write_syntax = \"~s\"\n"
" resource_opts = {\n" " resource_opts = {\n"
" request_timeout = 1s\n" " request_ttl = 1s\n"
" query_mode = ~s\n" " query_mode = ~s\n"
" batch_size = ~b\n" " batch_size = ~b\n"
" }\n" " }\n"
@ -314,7 +314,7 @@ influxdb_config(apiv2 = Type, InfluxDBHost, InfluxDBPort, Config) ->
" precision = ns\n" " precision = ns\n"
" write_syntax = \"~s\"\n" " write_syntax = \"~s\"\n"
" resource_opts = {\n" " resource_opts = {\n"
" request_timeout = 1s\n" " request_ttl = 1s\n"
" query_mode = ~s\n" " query_mode = ~s\n"
" batch_size = ~b\n" " batch_size = ~b\n"
" }\n" " }\n"

View File

@ -227,7 +227,6 @@ conn_bridge_example(_Method, Type) ->
resource_opts => #{ resource_opts => #{
worker_pool_size => 8, worker_pool_size => 8,
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
query_mode => async, query_mode => async,
max_buffer_bytes => ?DEFAULT_BUFFER_BYTES max_buffer_bytes => ?DEFAULT_BUFFER_BYTES
} }

View File

@ -101,8 +101,8 @@ bridge_config(TestCase, _TestGroup, Config) ->
" }\n" " }\n"
" pool_size = 1\n" " pool_size = 1\n"
" resource_opts = {\n" " resource_opts = {\n"
" auto_restart_interval = 5000\n" " health_check_interval = 5000\n"
" request_timeout = 30000\n" " request_ttl = 30000\n"
" query_mode = \"async\"\n" " query_mode = \"async\"\n"
" worker_pool_size = 1\n" " worker_pool_size = 1\n"
" }\n" " }\n"

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_bridge_mqtt, [ {application, emqx_bridge_mqtt, [
{description, "EMQX MQTT Broker Bridge"}, {description, "EMQX MQTT Broker Bridge"},
{vsn, "0.1.0"}, {vsn, "0.1.1"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -755,11 +755,9 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
<<"worker_pool_size">> => 2, <<"worker_pool_size">> => 2,
<<"query_mode">> => <<"sync">>, <<"query_mode">> => <<"sync">>,
%% using a long time so we can test recovery %% using a long time so we can test recovery
<<"request_timeout">> => <<"15s">>, <<"request_ttl">> => <<"15s">>,
%% to make it check the healthy quickly %% to make it check the healthy and reconnect quickly
<<"health_check_interval">> => <<"0.5s">>, <<"health_check_interval">> => <<"0.5s">>
%% to make it reconnect quickly
<<"auto_restart_interval">> => <<"1s">>
} }
} }
), ),
@ -865,11 +863,9 @@ t_mqtt_conn_bridge_egress_async_reconnect(_) ->
<<"worker_pool_size">> => 2, <<"worker_pool_size">> => 2,
<<"query_mode">> => <<"async">>, <<"query_mode">> => <<"async">>,
%% using a long time so we can test recovery %% using a long time so we can test recovery
<<"request_timeout">> => <<"15s">>, <<"request_ttl">> => <<"15s">>,
%% to make it check the healthy quickly %% to make it check the healthy and reconnect quickly
<<"health_check_interval">> => <<"0.5s">>, <<"health_check_interval">> => <<"0.5s">>
%% to make it reconnect quickly
<<"auto_restart_interval">> => <<"1s">>
} }
} }
), ),

View File

@ -42,7 +42,6 @@ values(_Method) ->
resource_opts => #{ resource_opts => #{
worker_pool_size => 1, worker_pool_size => 1,
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
batch_size => ?DEFAULT_BATCH_SIZE, batch_size => ?DEFAULT_BATCH_SIZE,
batch_time => ?DEFAULT_BATCH_TIME, batch_time => ?DEFAULT_BATCH_TIME,
query_mode => async, query_mode => async,

View File

@ -127,7 +127,7 @@ opents_config(BridgeType, Config) ->
" enable = true\n" " enable = true\n"
" server = ~p\n" " server = ~p\n"
" resource_opts = {\n" " resource_opts = {\n"
" request_timeout = 500ms\n" " request_ttl = 500ms\n"
" batch_size = ~b\n" " batch_size = ~b\n"
" query_mode = sync\n" " query_mode = sync\n"
" }\n" " }\n"
@ -298,7 +298,7 @@ t_write_timeout(Config) ->
Config, Config,
#{ #{
<<"resource_opts">> => #{ <<"resource_opts">> => #{
<<"request_timeout">> => 500, <<"request_ttl">> => 500,
<<"resume_interval">> => 100, <<"resume_interval">> => 100,
<<"health_check_interval">> => 100 <<"health_check_interval">> => 100
} }

View File

@ -51,7 +51,6 @@ values(_Method) ->
resource_opts => #{ resource_opts => #{
worker_pool_size => 8, worker_pool_size => 8,
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
batch_size => ?DEFAULT_BATCH_SIZE, batch_size => ?DEFAULT_BATCH_SIZE,
batch_time => ?DEFAULT_BATCH_TIME, batch_time => ?DEFAULT_BATCH_TIME,
query_mode => async, query_mode => async,

View File

@ -203,11 +203,9 @@ oracle_config(TestCase, _ConnectionType, Config) ->
" pool_size = 1\n" " pool_size = 1\n"
" sql = \"~s\"\n" " sql = \"~s\"\n"
" resource_opts = {\n" " resource_opts = {\n"
" auto_restart_interval = \"5s\"\n"
" health_check_interval = \"5s\"\n" " health_check_interval = \"5s\"\n"
" request_timeout = \"30s\"\n" " request_ttl = \"30s\"\n"
" query_mode = \"async\"\n" " query_mode = \"async\"\n"
" enable_batch = true\n"
" batch_size = 3\n" " batch_size = 3\n"
" batch_time = \"3s\"\n" " batch_time = \"3s\"\n"
" worker_pool_size = 1\n" " worker_pool_size = 1\n"

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_pgsql, [ {application, emqx_bridge_pgsql, [
{description, "EMQX Enterprise PostgreSQL Bridge"}, {description, "EMQX Enterprise PostgreSQL Bridge"},
{vsn, "0.1.1"}, {vsn, "0.1.2"},
{registered, []}, {registered, []},
{applications, [kernel, stdlib]}, {applications, [kernel, stdlib]},
{env, []}, {env, []},

View File

@ -55,7 +55,6 @@ values(_Method, Type) ->
resource_opts => #{ resource_opts => #{
worker_pool_size => 8, worker_pool_size => 8,
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
batch_size => ?DEFAULT_BATCH_SIZE, batch_size => ?DEFAULT_BATCH_SIZE,
batch_time => ?DEFAULT_BATCH_TIME, batch_time => ?DEFAULT_BATCH_TIME,
query_mode => async, query_mode => async,

View File

@ -193,7 +193,7 @@ pgsql_config(BridgeType, Config) ->
" password = ~p\n" " password = ~p\n"
" sql = ~p\n" " sql = ~p\n"
" resource_opts = {\n" " resource_opts = {\n"
" request_timeout = 500ms\n" " request_ttl = 500ms\n"
" batch_size = ~b\n" " batch_size = ~b\n"
" query_mode = ~s\n" " query_mode = ~s\n"
" }\n" " }\n"
@ -503,7 +503,6 @@ t_write_timeout(Config) ->
Config, Config,
#{ #{
<<"resource_opts">> => #{ <<"resource_opts">> => #{
<<"auto_restart_interval">> => <<"100ms">>,
<<"resume_interval">> => <<"100ms">>, <<"resume_interval">> => <<"100ms">>,
<<"health_check_interval">> => <<"100ms">> <<"health_check_interval">> => <<"100ms">>
} }

View File

@ -154,8 +154,7 @@ fields(producer_resource_opts) ->
health_check_interval, health_check_interval,
resume_interval, resume_interval,
start_after_created, start_after_created,
start_timeout, start_timeout
auto_restart_interval
], ],
lists:filtermap( lists:filtermap(
fun fun

View File

@ -1040,7 +1040,7 @@ t_resource_manager_crash_before_producers_started(Config) ->
end), end),
%% even if the resource manager is dead, we can still %% even if the resource manager is dead, we can still
%% clear the allocated resources. %% clear the allocated resources.
{{error, {config_update_crashed, {killed, _}}}, {ok, _}} = {{error, {config_update_crashed, _}}, {ok, _}} =
?wait_async_action( ?wait_async_action(
create_bridge(Config), create_bridge(Config),
#{?snk_kind := pulsar_bridge_stopped, pulsar_producers := undefined}, #{?snk_kind := pulsar_bridge_stopped, pulsar_producers := undefined},

View File

@ -57,7 +57,6 @@ values(_Method, Type) ->
resource_opts => #{ resource_opts => #{
worker_pool_size => 8, worker_pool_size => 8,
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
batch_size => ?DEFAULT_BATCH_SIZE, batch_size => ?DEFAULT_BATCH_SIZE,
batch_time => ?DEFAULT_BATCH_TIME, batch_time => ?DEFAULT_BATCH_TIME,
query_mode => async, query_mode => async,

View File

@ -52,7 +52,6 @@ values(post) ->
resource_opts => #{ resource_opts => #{
worker_pool_size => 1, worker_pool_size => 1,
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
batch_size => ?DEFAULT_BATCH_SIZE, batch_size => ?DEFAULT_BATCH_SIZE,
batch_time => ?DEFAULT_BATCH_TIME, batch_time => ?DEFAULT_BATCH_TIME,
query_mode => sync, query_mode => sync,

View File

@ -149,7 +149,7 @@ rocketmq_config(BridgeType, Config) ->
" secret_key = ~p\n" " secret_key = ~p\n"
" topic = ~p\n" " topic = ~p\n"
" resource_opts = {\n" " resource_opts = {\n"
" request_timeout = 1500ms\n" " request_ttl = 1500ms\n"
" batch_size = ~b\n" " batch_size = ~b\n"
" query_mode = ~s\n" " query_mode = ~s\n"
" }\n" " }\n"

View File

@ -56,7 +56,6 @@ values(post) ->
resource_opts => #{ resource_opts => #{
worker_pool_size => 1, worker_pool_size => 1,
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
batch_size => ?DEFAULT_BATCH_SIZE, batch_size => ?DEFAULT_BATCH_SIZE,
batch_time => ?DEFAULT_BATCH_TIME, batch_time => ?DEFAULT_BATCH_TIME,
query_mode => async, query_mode => async,

View File

@ -55,8 +55,8 @@
default_port => ?SQLSERVER_DEFAULT_PORT default_port => ?SQLSERVER_DEFAULT_PORT
}). }).
-define(REQUEST_TIMEOUT(RESOURCE_OPTS), -define(REQUEST_TTL(RESOURCE_OPTS),
maps:get(request_timeout, RESOURCE_OPTS, ?DEFAULT_REQUEST_TIMEOUT) maps:get(request_ttl, RESOURCE_OPTS, ?DEFAULT_REQUEST_TTL)
). ).
-define(BATCH_INSERT_TEMP, batch_insert_temp). -define(BATCH_INSERT_TEMP, batch_insert_temp).
@ -394,7 +394,7 @@ worker_do_insert(
) -> ) ->
LogMeta = #{connector => ResourceId, state => State}, LogMeta = #{connector => ResourceId, state => State},
try try
case execute(Conn, SQL, ?REQUEST_TIMEOUT(ResourceOpts)) of case execute(Conn, SQL, ?REQUEST_TTL(ResourceOpts)) of
{selected, Rows, _} -> {selected, Rows, _} ->
{ok, Rows}; {ok, Rows};
{updated, _} -> {updated, _} ->

View File

@ -461,7 +461,7 @@ sqlserver_config(BridgeType, Config) ->
" sql = ~p\n" " sql = ~p\n"
" driver = ~p\n" " driver = ~p\n"
" resource_opts = {\n" " resource_opts = {\n"
" request_timeout = 500ms\n" " request_ttl = 500ms\n"
" batch_size = ~b\n" " batch_size = ~b\n"
" query_mode = ~s\n" " query_mode = ~s\n"
" worker_pool_size = ~b\n" " worker_pool_size = ~b\n"

View File

@ -54,7 +54,6 @@ values(_Method) ->
resource_opts => #{ resource_opts => #{
worker_pool_size => 8, worker_pool_size => 8,
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
batch_size => ?DEFAULT_BATCH_SIZE, batch_size => ?DEFAULT_BATCH_SIZE,
batch_time => ?DEFAULT_BATCH_TIME, batch_time => ?DEFAULT_BATCH_TIME,
query_mode => sync, query_mode => sync,

View File

@ -190,7 +190,7 @@ tdengine_config(BridgeType, Config) ->
" password = ~p\n" " password = ~p\n"
" sql = ~p\n" " sql = ~p\n"
" resource_opts = {\n" " resource_opts = {\n"
" request_timeout = 500ms\n" " request_ttl = 500ms\n"
" batch_size = ~b\n" " batch_size = ~b\n"
" query_mode = ~s\n" " query_mode = ~s\n"
" }\n" " }\n"
@ -456,7 +456,7 @@ t_write_timeout(Config) ->
Config, Config,
#{ #{
<<"resource_opts">> => #{ <<"resource_opts">> => #{
<<"request_timeout">> => 500, <<"request_ttl">> => 500,
<<"resume_interval">> => 100, <<"resume_interval">> => 100,
<<"health_check_interval">> => 100 <<"health_check_interval">> => 100
} }

View File

@ -51,8 +51,10 @@
health_check_timeout => integer(), health_check_timeout => integer(),
%% use start_timeout instead %% use start_timeout instead
wait_for_resource_ready => integer(), wait_for_resource_ready => integer(),
%% use auto_restart_interval instead %% use health_check_interval instead
auto_retry_interval => integer(), auto_retry_interval => integer(),
%% use health_check_interval instead
auto_restart_interval => pos_integer() | infinity,
%%======================================= Deprecated Opts END %%======================================= Deprecated Opts END
worker_pool_size => non_neg_integer(), worker_pool_size => non_neg_integer(),
%% use `integer()` compatibility to release 5.0.0 bpapi %% use `integer()` compatibility to release 5.0.0 bpapi
@ -64,9 +66,6 @@
%% after it is created. But note that a `started` resource is not guaranteed %% after it is created. But note that a `started` resource is not guaranteed
%% to be `connected`. %% to be `connected`.
start_after_created => boolean(), start_after_created => boolean(),
%% If the resource disconnected, we can set to retry starting the resource
%% periodically.
auto_restart_interval => pos_integer() | infinity,
batch_size => pos_integer(), batch_size => pos_integer(),
batch_time => pos_integer(), batch_time => pos_integer(),
max_buffer_bytes => pos_integer(), max_buffer_bytes => pos_integer(),
@ -87,7 +86,8 @@
-define(DEFAULT_BUFFER_BYTES, 256 * 1024 * 1024). -define(DEFAULT_BUFFER_BYTES, 256 * 1024 * 1024).
-define(DEFAULT_BUFFER_BYTES_RAW, <<"256MB">>). -define(DEFAULT_BUFFER_BYTES_RAW, <<"256MB">>).
-define(DEFAULT_REQUEST_TIMEOUT, timer:seconds(15)). -define(DEFAULT_REQUEST_TTL, timer:seconds(45)).
-define(DEFAULT_REQUEST_TTL_RAW, <<"45s">>).
%% count %% count
-define(DEFAULT_BATCH_SIZE, 1). -define(DEFAULT_BATCH_SIZE, 1).
@ -115,10 +115,6 @@
-define(START_AFTER_CREATED, true). -define(START_AFTER_CREATED, true).
-define(START_AFTER_CREATED_RAW, <<"true">>). -define(START_AFTER_CREATED_RAW, <<"true">>).
%% milliseconds
-define(AUTO_RESTART_INTERVAL, 60000).
-define(AUTO_RESTART_INTERVAL_RAW, <<"60s">>).
-define(TEST_ID_PREFIX, "_probe_:"). -define(TEST_ID_PREFIX, "_probe_:").
-define(RES_METRICS, resource_metrics). -define(RES_METRICS, resource_metrics).

View File

@ -76,7 +76,7 @@
-type queue_query() :: ?QUERY(reply_fun(), request(), HasBeenSent :: boolean(), expire_at()). -type queue_query() :: ?QUERY(reply_fun(), request(), HasBeenSent :: boolean(), expire_at()).
-type request() :: term(). -type request() :: term().
-type request_from() :: undefined | gen_statem:from(). -type request_from() :: undefined | gen_statem:from().
-type request_timeout() :: infinity | timer:time(). -type request_ttl() :: infinity | timer:time().
-type health_check_interval() :: timer:time(). -type health_check_interval() :: timer:time().
-type state() :: blocked | running. -type state() :: blocked | running.
-type inflight_key() :: integer(). -type inflight_key() :: integer().
@ -187,10 +187,10 @@ init({Id, Index, Opts}) ->
InflightWinSize = maps:get(inflight_window, Opts, ?DEFAULT_INFLIGHT), InflightWinSize = maps:get(inflight_window, Opts, ?DEFAULT_INFLIGHT),
InflightTID = inflight_new(InflightWinSize), InflightTID = inflight_new(InflightWinSize),
HealthCheckInterval = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL), HealthCheckInterval = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL),
RequestTimeout = maps:get(request_timeout, Opts, ?DEFAULT_REQUEST_TIMEOUT), RequestTTL = maps:get(request_ttl, Opts, ?DEFAULT_REQUEST_TTL),
BatchTime0 = maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME), BatchTime0 = maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME),
BatchTime = adjust_batch_time(Id, RequestTimeout, BatchTime0), BatchTime = adjust_batch_time(Id, RequestTTL, BatchTime0),
DefaultResumeInterval = default_resume_interval(RequestTimeout, HealthCheckInterval), DefaultResumeInterval = default_resume_interval(RequestTTL, HealthCheckInterval),
ResumeInterval = maps:get(resume_interval, Opts, DefaultResumeInterval), ResumeInterval = maps:get(resume_interval, Opts, DefaultResumeInterval),
MetricsFlushInterval = maps:get(metrics_flush_interval, Opts, ?DEFAULT_METRICS_FLUSH_INTERVAL), MetricsFlushInterval = maps:get(metrics_flush_interval, Opts, ?DEFAULT_METRICS_FLUSH_INTERVAL),
Data0 = #{ Data0 = #{
@ -1733,7 +1733,7 @@ now_() ->
ensure_timeout_query_opts(#{timeout := _} = Opts, _SyncOrAsync) -> ensure_timeout_query_opts(#{timeout := _} = Opts, _SyncOrAsync) ->
Opts; Opts;
ensure_timeout_query_opts(#{} = Opts0, sync) -> ensure_timeout_query_opts(#{} = Opts0, sync) ->
Opts0#{timeout => ?DEFAULT_REQUEST_TIMEOUT}; Opts0#{timeout => ?DEFAULT_REQUEST_TTL};
ensure_timeout_query_opts(#{} = Opts0, async) -> ensure_timeout_query_opts(#{} = Opts0, async) ->
Opts0#{timeout => infinity}. Opts0#{timeout => infinity}.
@ -1760,14 +1760,14 @@ do_minimize(?QUERY(ReplyTo, _Req, Sent, ExpireAt)) -> ?QUERY(ReplyTo, [], Sent,
-endif. -endif.
%% To avoid message loss due to misconfigurations, we adjust %% To avoid message loss due to misconfigurations, we adjust
%% `batch_time' based on `request_timeout'. If `batch_time' > %% `batch_time' based on `request_ttl'. If `batch_time' >
%% `request_timeout', all requests will timeout before being sent if %% `request_ttl', all requests will timeout before being sent if
%% the message rate is low. Even worse if `pool_size' is high. %% the message rate is low. Even worse if `pool_size' is high.
%% We cap `batch_time' at `request_timeout div 2' as a rule of thumb. %% We cap `batch_time' at `request_ttl div 2' as a rule of thumb.
adjust_batch_time(_Id, _RequestTimeout = infinity, BatchTime0) -> adjust_batch_time(_Id, _RequestTTL = infinity, BatchTime0) ->
BatchTime0; BatchTime0;
adjust_batch_time(Id, RequestTimeout, BatchTime0) -> adjust_batch_time(Id, RequestTTL, BatchTime0) ->
BatchTime = max(0, min(BatchTime0, RequestTimeout div 2)), BatchTime = max(0, min(BatchTime0, RequestTTL div 2)),
case BatchTime =:= BatchTime0 of case BatchTime =:= BatchTime0 of
false -> false ->
?SLOG(info, #{ ?SLOG(info, #{
@ -1811,11 +1811,11 @@ replayq_opts(Id, Index, Opts) ->
%% timeout is <= resume interval and the buffer worker is ever %% timeout is <= resume interval and the buffer worker is ever
%% blocked, than all queued requests will basically fail without being %% blocked, than all queued requests will basically fail without being
%% attempted. %% attempted.
-spec default_resume_interval(request_timeout(), health_check_interval()) -> timer:time(). -spec default_resume_interval(request_ttl(), health_check_interval()) -> timer:time().
default_resume_interval(_RequestTimeout = infinity, HealthCheckInterval) -> default_resume_interval(_RequestTTL = infinity, HealthCheckInterval) ->
max(1, HealthCheckInterval); max(1, HealthCheckInterval);
default_resume_interval(RequestTimeout, HealthCheckInterval) -> default_resume_interval(RequestTTL, HealthCheckInterval) ->
max(1, min(HealthCheckInterval, RequestTimeout div 3)). max(1, min(HealthCheckInterval, RequestTTL div 3)).
-spec reply_call(reference(), term()) -> ok. -spec reply_call(reference(), term()) -> ok.
reply_call(Alias, Response) -> reply_call(Alias, Response) ->

View File

@ -448,11 +448,9 @@ try_read_cache(ResId) ->
end. end.
retry_actions(Data) -> retry_actions(Data) ->
case maps:get(auto_restart_interval, Data#data.opts, ?AUTO_RESTART_INTERVAL) of case maps:get(health_check_interval, Data#data.opts, ?HEALTHCHECK_INTERVAL) of
undefined -> undefined ->
[]; [];
infinity ->
[];
RetryInterval -> RetryInterval ->
[{state_timeout, RetryInterval, auto_retry}] [{state_timeout, RetryInterval, auto_retry}]
end. end.

View File

@ -26,8 +26,6 @@
%% range interval in ms %% range interval in ms
-define(HEALTH_CHECK_INTERVAL_RANGE_MIN, 1). -define(HEALTH_CHECK_INTERVAL_RANGE_MIN, 1).
-define(HEALTH_CHECK_INTERVAL_RANGE_MAX, 3_600_000). -define(HEALTH_CHECK_INTERVAL_RANGE_MAX, 3_600_000).
-define(AUTO_RESTART_INTERVAL_RANGE_MIN, 1).
-define(AUTO_RESTART_INTERVAL_RANGE_MAX, 3_600_000).
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% Hocon Schema Definitions %% Hocon Schema Definitions
@ -55,7 +53,7 @@ fields("creation_opts") ->
{start_timeout, fun start_timeout/1}, {start_timeout, fun start_timeout/1},
{auto_restart_interval, fun auto_restart_interval/1}, {auto_restart_interval, fun auto_restart_interval/1},
{query_mode, fun query_mode/1}, {query_mode, fun query_mode/1},
{request_timeout, fun request_timeout/1}, {request_ttl, fun request_ttl/1},
{inflight_window, fun inflight_window/1}, {inflight_window, fun inflight_window/1},
{enable_batch, fun enable_batch/1}, {enable_batch, fun enable_batch/1},
{batch_size, fun batch_size/1}, {batch_size, fun batch_size/1},
@ -124,39 +122,22 @@ start_timeout(required) -> false;
start_timeout(_) -> undefined. start_timeout(_) -> undefined.
auto_restart_interval(type) -> hoconsc:union([infinity, emqx_schema:duration_ms()]); auto_restart_interval(type) -> hoconsc:union([infinity, emqx_schema:duration_ms()]);
auto_restart_interval(desc) -> ?DESC("auto_restart_interval"); auto_restart_interval(default) -> <<"15s">>;
auto_restart_interval(default) -> ?AUTO_RESTART_INTERVAL_RAW;
auto_restart_interval(required) -> false; auto_restart_interval(required) -> false;
auto_restart_interval(validator) -> fun auto_restart_interval_range/1; auto_restart_interval(deprecated) -> {since, "5.1.0"};
auto_restart_interval(_) -> undefined. auto_restart_interval(_) -> undefined.
auto_restart_interval_range(infinity) ->
ok;
auto_restart_interval_range(AutoRestartInterval) when
is_integer(AutoRestartInterval) andalso
AutoRestartInterval >= ?AUTO_RESTART_INTERVAL_RANGE_MIN andalso
AutoRestartInterval =< ?AUTO_RESTART_INTERVAL_RANGE_MAX
->
ok;
auto_restart_interval_range(AutoRestartInterval) ->
Message = get_out_of_range_msg(
<<"Auto Restart Interval">>,
AutoRestartInterval,
?AUTO_RESTART_INTERVAL_RANGE_MIN,
?AUTO_RESTART_INTERVAL_RANGE_MAX
),
{error, Message}.
query_mode(type) -> enum([sync, async]); query_mode(type) -> enum([sync, async]);
query_mode(desc) -> ?DESC("query_mode"); query_mode(desc) -> ?DESC("query_mode");
query_mode(default) -> async; query_mode(default) -> async;
query_mode(required) -> false; query_mode(required) -> false;
query_mode(_) -> undefined. query_mode(_) -> undefined.
request_timeout(type) -> hoconsc:union([infinity, emqx_schema:duration_ms()]); request_ttl(type) -> hoconsc:union([emqx_schema:duration_ms(), infinity]);
request_timeout(desc) -> ?DESC("request_timeout"); request_ttl(aliases) -> [request_timeout];
request_timeout(default) -> <<"15s">>; request_ttl(desc) -> ?DESC("request_ttl");
request_timeout(_) -> undefined. request_ttl(default) -> ?DEFAULT_REQUEST_TTL_RAW;
request_ttl(_) -> undefined.
enable_batch(type) -> boolean(); enable_batch(type) -> boolean();
enable_batch(required) -> false; enable_batch(required) -> false;

View File

@ -2823,7 +2823,7 @@ t_volatile_offload_mode(_Config) ->
t_late_call_reply(_Config) -> t_late_call_reply(_Config) ->
emqx_connector_demo:set_callback_mode(always_sync), emqx_connector_demo:set_callback_mode(always_sync),
RequestTimeout = 500, RequestTTL = 500,
?assertMatch( ?assertMatch(
{ok, _}, {ok, _},
emqx_resource:create( emqx_resource:create(
@ -2833,7 +2833,7 @@ t_late_call_reply(_Config) ->
#{name => test_resource}, #{name => test_resource},
#{ #{
buffer_mode => memory_only, buffer_mode => memory_only,
request_timeout => RequestTimeout, request_ttl => RequestTTL,
query_mode => sync query_mode => sync
} }
) )
@ -2844,13 +2844,13 @@ t_late_call_reply(_Config) ->
%% have been already returned (a timeout), but the resource will %% have been already returned (a timeout), but the resource will
%% still send a message with the reply. %% still send a message with the reply.
%% The demo connector will reply with `{error, timeout}' after 1 s. %% The demo connector will reply with `{error, timeout}' after 1 s.
SleepFor = RequestTimeout + 500, SleepFor = RequestTTL + 500,
?assertMatch( ?assertMatch(
{error, {resource_error, #{reason := timeout}}}, {error, {resource_error, #{reason := timeout}}},
emqx_resource:query( emqx_resource:query(
?ID, ?ID,
{sync_sleep_before_reply, SleepFor}, {sync_sleep_before_reply, SleepFor},
#{timeout => RequestTimeout} #{timeout => RequestTTL}
) )
), ),
%% Our process shouldn't receive any late messages. %% Our process shouldn't receive any late messages.
@ -2887,7 +2887,7 @@ do_t_resource_activate_alarm_once(ResourceConfig, SubscribeEvent) ->
?DEFAULT_RESOURCE_GROUP, ?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE, ?TEST_RESOURCE,
ResourceConfig, ResourceConfig,
#{auto_restart_interval => 100, health_check_interval => 100} #{health_check_interval => 100}
), ),
#{?snk_kind := resource_activate_alarm, resource_id := ?ID} #{?snk_kind := resource_activate_alarm, resource_id := ?ID}
), ),

View File

@ -0,0 +1,3 @@
The data bridge resource option `auto_restart_interval` was deprecated in favor of `health_check_interval`, and `request_timeout` was renamed to `request_ttl`. Also, the default `request_ttl` value went from 15 seconds to 45 seconds.
The previous existence of both `auto_restart_interval` and `health_check_interval` was a source of confusion, as both parameters influenced the recovery of data bridges under failures. An inconsistent configuration of those two parameters could lead to messages being expired without a chance to retry. Now, `health_check_interval` is used both to control the periodicity of health checks that may transition the data bridge into `disconnected` or `connecting` states, as well as recovering from `disconnected`.

View File

@ -53,7 +53,6 @@ values(_Method) ->
resource_opts => #{ resource_opts => #{
worker_pool_size => 1, worker_pool_size => 1,
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
batch_size => ?DEFAULT_BATCH_SIZE, batch_size => ?DEFAULT_BATCH_SIZE,
batch_time => ?DEFAULT_BATCH_TIME, batch_time => ?DEFAULT_BATCH_TIME,
query_mode => async, query_mode => async,

View File

@ -178,7 +178,7 @@ mysql_config(BridgeType, Config) ->
" pool_size = ~b\n" " pool_size = ~b\n"
" sql = ~p\n" " sql = ~p\n"
" resource_opts = {\n" " resource_opts = {\n"
" request_timeout = 500ms\n" " request_ttl = 500ms\n"
" batch_size = ~b\n" " batch_size = ~b\n"
" query_mode = ~s\n" " query_mode = ~s\n"
" worker_pool_size = ~b\n" " worker_pool_size = ~b\n"

View File

@ -554,7 +554,7 @@ resource_configs(#{query_mode := QueryMode}) ->
<<"batch_size">> => integer_to_binary(?BATCH_SIZE), <<"batch_size">> => integer_to_binary(?BATCH_SIZE),
<<"start_timeout">> => <<"15s">>, <<"start_timeout">> => <<"15s">>,
<<"batch_time">> => <<"4s">>, <<"batch_time">> => <<"4s">>,
<<"request_timeout">> => <<"30s">> <<"request_ttl">> => <<"30s">>
} }
}. }.

View File

@ -64,12 +64,6 @@ pubsub_topic.desc:
pubsub_topic.label: pubsub_topic.label:
"""GCP PubSub Topic""" """GCP PubSub Topic"""
request_timeout.desc:
"""Deprecated: Configure the request timeout in the buffer settings."""
request_timeout.label:
"""Request Timeout"""
service_account_json.desc: service_account_json.desc:
"""JSON containing the GCP Service Account credentials to be used with PubSub. """JSON containing the GCP Service Account credentials to be used with PubSub.
When a GCP Service Account is created (as described in https://developers.google.com/identity/protocols/oauth2/service-account#creatinganaccount), you have the option of downloading the credentials in JSON form. That's the file needed.""" When a GCP Service Account is created (as described in https://developers.google.com/identity/protocols/oauth2/service-account#creatinganaccount), you have the option of downloading the credentials in JSON form. That's the file needed."""

View File

@ -1,11 +1,5 @@
emqx_resource_schema { emqx_resource_schema {
auto_restart_interval.desc:
"""The auto restart interval after the resource is disconnected."""
auto_restart_interval.label:
"""Auto Restart Interval"""
batch_size.desc: batch_size.desc:
"""Maximum batch count. If equal to 1, there's effectively no batching.""" """Maximum batch count. If equal to 1, there's effectively no batching."""
@ -76,11 +70,11 @@ query_mode.desc:
query_mode.label: query_mode.label:
"""Query mode""" """Query mode"""
request_timeout.desc: request_ttl.desc:
"""Starting from the moment when the request enters the buffer, if the request remains in the buffer for the specified time or is sent but does not receive a response or acknowledgement in time, the request is considered expired.""" """Starting from the moment when the request enters the buffer, if the request remains in the buffer for the specified time or is sent but does not receive a response or acknowledgement in time, the request is considered expired."""
request_timeout.label: request_ttl.label:
"""Request Expiry""" """Request TTL"""
resource_opts.desc: resource_opts.desc:
"""Resource options.""" """Resource options."""

View File

@ -7,13 +7,12 @@ bridges {
precision = "ms" precision = "ms"
resource_opts { resource_opts {
inflight_window = 100 inflight_window = 100
auto_restart_interval = "60s"
batch_size = 100 batch_size = 100
batch_time = "10ms" batch_time = "10ms"
health_check_interval = "15s" health_check_interval = "15s"
max_buffer_bytes = "1GB" max_buffer_bytes = "1GB"
query_mode = "sync" query_mode = "sync"
request_timeout = "15s" request_ttl = "15s"
start_after_created = "true" start_after_created = "true"
start_timeout = "5s" start_timeout = "5s"
worker_pool_size = 4 worker_pool_size = 4