feat(resource): deprecate `auto_restart_interval` in favor of `health_check_interval`
See: https://emqx.atlassian.net/wiki/spaces/P/pages/612368639/open+e5.1+remove+auto+restart+interval+from+buffer+worker+resource+options Current problem: In 5.0.x, we have two timer options that control the state changing of buffer worker resources: auto_restart_interval and health_check_interval. - auto_restart_interval controls how often the resource attempts to transition from disconnected to connected. - health_check_interval controls how often the resource is checked and potentially moved from connected to disconnected or connecting. The existence of two independent timers for very similar purposes is confusing to users, QA and even developers. Also, an intimately related configuration is request_timeout, which can interact badly with auto_restart_interval if the latter is poorly configured: requests may always expire if request_timeout < auto_restart_interval and if the resource enters the disconnected state. For health_check_interval, we attempt to derive a sane default that gives requests a chance to retry (if request timeout is finite, then the resource retries requests with a period of min(health_check_interval, request_timeout / 3). Another problem with the separate auto_restart_interval is that its default value (60 s) is too high when compared to the default request timeout and health check, leading to the problems described above if not tuned. Proposed solution: We propose to drop auto_restart_interval in favor of health_check_interval, which will be used for both disconnected -> connected and connected -> {disconnected, connecting} transition checks. With that, the resource will attempt to reconnect at the same interval as the health check, which currently is 15 s. Also, as two smaller changes to accompany this one: - Increase the default request_timeout from 15 s to 45 s. - Rename request_timeout to request_ttl.
This commit is contained in:
parent
b5f24c4f88
commit
10425eb925
|
@ -218,7 +218,6 @@ info_example_basic(webhook) ->
|
||||||
resource_opts => #{
|
resource_opts => #{
|
||||||
worker_pool_size => 1,
|
worker_pool_size => 1,
|
||||||
health_check_interval => 15000,
|
health_check_interval => 15000,
|
||||||
auto_restart_interval => 15000,
|
|
||||||
query_mode => async,
|
query_mode => async,
|
||||||
inflight_window => 100,
|
inflight_window => 100,
|
||||||
max_buffer_bytes => 100 * 1024 * 1024
|
max_buffer_bytes => 100 * 1024 * 1024
|
||||||
|
@ -244,7 +243,6 @@ mqtt_main_example() ->
|
||||||
max_inflight => 100,
|
max_inflight => 100,
|
||||||
resource_opts => #{
|
resource_opts => #{
|
||||||
health_check_interval => <<"15s">>,
|
health_check_interval => <<"15s">>,
|
||||||
auto_restart_interval => <<"60s">>,
|
|
||||||
query_mode => sync,
|
query_mode => sync,
|
||||||
max_buffer_bytes => 100 * 1024 * 1024
|
max_buffer_bytes => 100 * 1024 * 1024
|
||||||
},
|
},
|
||||||
|
|
|
@ -87,7 +87,6 @@ default_ssl() ->
|
||||||
default_resource_opts() ->
|
default_resource_opts() ->
|
||||||
#{
|
#{
|
||||||
<<"inflight_window">> => 100,
|
<<"inflight_window">> => 100,
|
||||||
<<"auto_restart_interval">> => <<"60s">>,
|
|
||||||
<<"health_check_interval">> => <<"15s">>,
|
<<"health_check_interval">> => <<"15s">>,
|
||||||
<<"max_buffer_bytes">> => <<"1GB">>,
|
<<"max_buffer_bytes">> => <<"1GB">>,
|
||||||
<<"query_mode">> => <<"sync">>,
|
<<"query_mode">> => <<"sync">>,
|
||||||
|
|
|
@ -86,8 +86,7 @@ groups() ->
|
||||||
SingleOnlyTests = [
|
SingleOnlyTests = [
|
||||||
t_broken_bpapi_vsn,
|
t_broken_bpapi_vsn,
|
||||||
t_old_bpapi_vsn,
|
t_old_bpapi_vsn,
|
||||||
t_bridges_probe,
|
t_bridges_probe
|
||||||
t_auto_restart_interval
|
|
||||||
],
|
],
|
||||||
ClusterLaterJoinOnlyTCs = [t_cluster_later_join_metrics],
|
ClusterLaterJoinOnlyTCs = [t_cluster_later_join_metrics],
|
||||||
[
|
[
|
||||||
|
@ -559,89 +558,6 @@ t_http_crud_apis(Config) ->
|
||||||
|
|
||||||
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config).
|
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config).
|
||||||
|
|
||||||
t_auto_restart_interval(Config) ->
|
|
||||||
Port = ?config(port, Config),
|
|
||||||
%% assert we there's no bridges at first
|
|
||||||
{ok, 200, []} = request_json(get, uri(["bridges"]), Config),
|
|
||||||
|
|
||||||
meck:new(emqx_resource, [passthrough]),
|
|
||||||
meck:expect(emqx_resource, call_start, fun(_, _, _) -> {error, fake_error} end),
|
|
||||||
|
|
||||||
%% then we add a webhook bridge, using POST
|
|
||||||
%% POST /bridges/ will create a bridge
|
|
||||||
URL1 = ?URL(Port, "path1"),
|
|
||||||
Name = ?BRIDGE_NAME,
|
|
||||||
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
|
|
||||||
BridgeParams = ?HTTP_BRIDGE(URL1, Name)#{
|
|
||||||
<<"resource_opts">> => #{<<"auto_restart_interval">> => "1s"}
|
|
||||||
},
|
|
||||||
?check_trace(
|
|
||||||
begin
|
|
||||||
?assertMatch(
|
|
||||||
{ok, 201, #{
|
|
||||||
<<"type">> := ?BRIDGE_TYPE_HTTP,
|
|
||||||
<<"name">> := Name,
|
|
||||||
<<"enable">> := true,
|
|
||||||
<<"status">> := _,
|
|
||||||
<<"node_status">> := [_ | _],
|
|
||||||
<<"url">> := URL1
|
|
||||||
}},
|
|
||||||
request_json(
|
|
||||||
post,
|
|
||||||
uri(["bridges"]),
|
|
||||||
BridgeParams,
|
|
||||||
Config
|
|
||||||
)
|
|
||||||
),
|
|
||||||
{ok, _} = ?block_until(#{?snk_kind := resource_disconnected_enter}),
|
|
||||||
{ok, _} = ?block_until(#{?snk_kind := resource_auto_reconnect}, 1500)
|
|
||||||
end,
|
|
||||||
fun(Trace0) ->
|
|
||||||
Trace = ?of_kind(resource_auto_reconnect, Trace0),
|
|
||||||
?assertMatch([#{}], Trace),
|
|
||||||
ok
|
|
||||||
end
|
|
||||||
),
|
|
||||||
%% delete the bridge
|
|
||||||
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config),
|
|
||||||
{ok, 200, []} = request_json(get, uri(["bridges"]), Config),
|
|
||||||
|
|
||||||
%% auto_retry_interval=infinity
|
|
||||||
BridgeParams1 = BridgeParams#{
|
|
||||||
<<"resource_opts">> => #{<<"auto_restart_interval">> => "infinity"}
|
|
||||||
},
|
|
||||||
?check_trace(
|
|
||||||
begin
|
|
||||||
?assertMatch(
|
|
||||||
{ok, 201, #{
|
|
||||||
<<"type">> := ?BRIDGE_TYPE_HTTP,
|
|
||||||
<<"name">> := Name,
|
|
||||||
<<"enable">> := true,
|
|
||||||
<<"status">> := _,
|
|
||||||
<<"node_status">> := [_ | _],
|
|
||||||
<<"url">> := URL1
|
|
||||||
}},
|
|
||||||
request_json(
|
|
||||||
post,
|
|
||||||
uri(["bridges"]),
|
|
||||||
BridgeParams1,
|
|
||||||
Config
|
|
||||||
)
|
|
||||||
),
|
|
||||||
{ok, _} = ?block_until(#{?snk_kind := resource_disconnected_enter}),
|
|
||||||
?assertEqual(timeout, ?block_until(#{?snk_kind := resource_auto_reconnect}, 1500))
|
|
||||||
end,
|
|
||||||
fun(Trace0) ->
|
|
||||||
Trace = ?of_kind(resource_auto_reconnect, Trace0),
|
|
||||||
?assertMatch([], Trace),
|
|
||||||
ok
|
|
||||||
end
|
|
||||||
),
|
|
||||||
%% delete the bridge
|
|
||||||
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config),
|
|
||||||
{ok, 200, []} = request_json(get, uri(["bridges"]), Config),
|
|
||||||
meck:unload(emqx_resource).
|
|
||||||
|
|
||||||
t_http_bridges_local_topic(Config) ->
|
t_http_bridges_local_topic(Config) ->
|
||||||
Port = ?config(port, Config),
|
Port = ?config(port, Config),
|
||||||
%% assert we there's no bridges at first
|
%% assert we there's no bridges at first
|
||||||
|
|
|
@ -182,7 +182,6 @@ bridge_async_config(#{port := Port} = Config) ->
|
||||||
" body = \"${id}\""
|
" body = \"${id}\""
|
||||||
" resource_opts {\n"
|
" resource_opts {\n"
|
||||||
" inflight_window = 100\n"
|
" inflight_window = 100\n"
|
||||||
" auto_restart_interval = \"60s\"\n"
|
|
||||||
" health_check_interval = \"15s\"\n"
|
" health_check_interval = \"15s\"\n"
|
||||||
" max_buffer_bytes = \"1GB\"\n"
|
" max_buffer_bytes = \"1GB\"\n"
|
||||||
" query_mode = \"~s\"\n"
|
" query_mode = \"~s\"\n"
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_bridge_cassandra, [
|
{application, emqx_bridge_cassandra, [
|
||||||
{description, "EMQX Enterprise Cassandra Bridge"},
|
{description, "EMQX Enterprise Cassandra Bridge"},
|
||||||
{vsn, "0.1.1"},
|
{vsn, "0.1.2"},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{applications, [kernel, stdlib, ecql]},
|
{applications, [kernel, stdlib, ecql]},
|
||||||
{env, []},
|
{env, []},
|
||||||
|
|
|
@ -59,7 +59,6 @@ values(_Method, Type) ->
|
||||||
resource_opts => #{
|
resource_opts => #{
|
||||||
worker_pool_size => 8,
|
worker_pool_size => 8,
|
||||||
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
|
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
|
||||||
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
|
|
||||||
batch_size => ?DEFAULT_BATCH_SIZE,
|
batch_size => ?DEFAULT_BATCH_SIZE,
|
||||||
batch_time => ?DEFAULT_BATCH_TIME,
|
batch_time => ?DEFAULT_BATCH_TIME,
|
||||||
query_mode => sync,
|
query_mode => sync,
|
||||||
|
|
|
@ -511,7 +511,6 @@ t_write_failure(Config) ->
|
||||||
#{
|
#{
|
||||||
<<"resource_opts">> =>
|
<<"resource_opts">> =>
|
||||||
#{
|
#{
|
||||||
<<"auto_restart_interval">> => <<"100ms">>,
|
|
||||||
<<"resume_interval">> => <<"100ms">>,
|
<<"resume_interval">> => <<"100ms">>,
|
||||||
<<"health_check_interval">> => <<"100ms">>
|
<<"health_check_interval">> => <<"100ms">>
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_bridge_clickhouse, [
|
{application, emqx_bridge_clickhouse, [
|
||||||
{description, "EMQX Enterprise ClickHouse Bridge"},
|
{description, "EMQX Enterprise ClickHouse Bridge"},
|
||||||
{vsn, "0.2.0"},
|
{vsn, "0.2.1"},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{applications, [kernel, stdlib, clickhouse, emqx_resource]},
|
{applications, [kernel, stdlib, clickhouse, emqx_resource]},
|
||||||
{env, []},
|
{env, []},
|
||||||
|
|
|
@ -56,7 +56,6 @@ values(_Method, Type) ->
|
||||||
resource_opts => #{
|
resource_opts => #{
|
||||||
worker_pool_size => 8,
|
worker_pool_size => 8,
|
||||||
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
|
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
|
||||||
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
|
|
||||||
batch_size => ?DEFAULT_BATCH_SIZE,
|
batch_size => ?DEFAULT_BATCH_SIZE,
|
||||||
batch_time => ?DEFAULT_BATCH_TIME,
|
batch_time => ?DEFAULT_BATCH_TIME,
|
||||||
query_mode => async,
|
query_mode => async,
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_bridge_dynamo, [
|
{application, emqx_bridge_dynamo, [
|
||||||
{description, "EMQX Enterprise Dynamo Bridge"},
|
{description, "EMQX Enterprise Dynamo Bridge"},
|
||||||
{vsn, "0.1.1"},
|
{vsn, "0.1.2"},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{applications, [kernel, stdlib, erlcloud]},
|
{applications, [kernel, stdlib, erlcloud]},
|
||||||
{env, []},
|
{env, []},
|
||||||
|
|
|
@ -52,7 +52,6 @@ values(_Method) ->
|
||||||
resource_opts => #{
|
resource_opts => #{
|
||||||
worker_pool_size => 8,
|
worker_pool_size => 8,
|
||||||
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
|
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
|
||||||
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
|
|
||||||
batch_size => ?DEFAULT_BATCH_SIZE,
|
batch_size => ?DEFAULT_BATCH_SIZE,
|
||||||
batch_time => ?DEFAULT_BATCH_TIME,
|
batch_time => ?DEFAULT_BATCH_TIME,
|
||||||
query_mode => sync,
|
query_mode => sync,
|
||||||
|
|
|
@ -227,7 +227,6 @@ conn_bridge_example(_Method, Type) ->
|
||||||
resource_opts => #{
|
resource_opts => #{
|
||||||
worker_pool_size => 8,
|
worker_pool_size => 8,
|
||||||
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
|
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
|
||||||
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
|
|
||||||
query_mode => async,
|
query_mode => async,
|
||||||
max_buffer_bytes => ?DEFAULT_BUFFER_BYTES
|
max_buffer_bytes => ?DEFAULT_BUFFER_BYTES
|
||||||
}
|
}
|
||||||
|
|
|
@ -101,7 +101,7 @@ bridge_config(TestCase, _TestGroup, Config) ->
|
||||||
" }\n"
|
" }\n"
|
||||||
" pool_size = 1\n"
|
" pool_size = 1\n"
|
||||||
" resource_opts = {\n"
|
" resource_opts = {\n"
|
||||||
" auto_restart_interval = 5000\n"
|
" health_check_interval = 5000\n"
|
||||||
" request_timeout = 30000\n"
|
" request_timeout = 30000\n"
|
||||||
" query_mode = \"async\"\n"
|
" query_mode = \"async\"\n"
|
||||||
" worker_pool_size = 1\n"
|
" worker_pool_size = 1\n"
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
{application, emqx_bridge_mqtt, [
|
{application, emqx_bridge_mqtt, [
|
||||||
{description, "EMQX MQTT Broker Bridge"},
|
{description, "EMQX MQTT Broker Bridge"},
|
||||||
{vsn, "0.1.0"},
|
{vsn, "0.1.1"},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{applications, [
|
{applications, [
|
||||||
kernel,
|
kernel,
|
||||||
|
|
|
@ -756,10 +756,8 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
|
||||||
<<"query_mode">> => <<"sync">>,
|
<<"query_mode">> => <<"sync">>,
|
||||||
%% using a long time so we can test recovery
|
%% using a long time so we can test recovery
|
||||||
<<"request_timeout">> => <<"15s">>,
|
<<"request_timeout">> => <<"15s">>,
|
||||||
%% to make it check the healthy quickly
|
%% to make it check the healthy and reconnect quickly
|
||||||
<<"health_check_interval">> => <<"0.5s">>,
|
<<"health_check_interval">> => <<"0.5s">>
|
||||||
%% to make it reconnect quickly
|
|
||||||
<<"auto_restart_interval">> => <<"1s">>
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
|
@ -866,10 +864,8 @@ t_mqtt_conn_bridge_egress_async_reconnect(_) ->
|
||||||
<<"query_mode">> => <<"async">>,
|
<<"query_mode">> => <<"async">>,
|
||||||
%% using a long time so we can test recovery
|
%% using a long time so we can test recovery
|
||||||
<<"request_timeout">> => <<"15s">>,
|
<<"request_timeout">> => <<"15s">>,
|
||||||
%% to make it check the healthy quickly
|
%% to make it check the healthy and reconnect quickly
|
||||||
<<"health_check_interval">> => <<"0.5s">>,
|
<<"health_check_interval">> => <<"0.5s">>
|
||||||
%% to make it reconnect quickly
|
|
||||||
<<"auto_restart_interval">> => <<"1s">>
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_bridge_opents, [
|
{application, emqx_bridge_opents, [
|
||||||
{description, "EMQX Enterprise OpenTSDB Bridge"},
|
{description, "EMQX Enterprise OpenTSDB Bridge"},
|
||||||
{vsn, "0.1.0"},
|
{vsn, "0.1.1"},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{applications, [
|
{applications, [
|
||||||
kernel,
|
kernel,
|
||||||
|
|
|
@ -42,7 +42,6 @@ values(_Method) ->
|
||||||
resource_opts => #{
|
resource_opts => #{
|
||||||
worker_pool_size => 1,
|
worker_pool_size => 1,
|
||||||
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
|
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
|
||||||
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
|
|
||||||
batch_size => ?DEFAULT_BATCH_SIZE,
|
batch_size => ?DEFAULT_BATCH_SIZE,
|
||||||
batch_time => ?DEFAULT_BATCH_TIME,
|
batch_time => ?DEFAULT_BATCH_TIME,
|
||||||
query_mode => async,
|
query_mode => async,
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_bridge_oracle, [
|
{application, emqx_bridge_oracle, [
|
||||||
{description, "EMQX Enterprise Oracle Database Bridge"},
|
{description, "EMQX Enterprise Oracle Database Bridge"},
|
||||||
{vsn, "0.1.1"},
|
{vsn, "0.1.2"},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{applications, [
|
{applications, [
|
||||||
kernel,
|
kernel,
|
||||||
|
|
|
@ -50,7 +50,6 @@ values(_Method) ->
|
||||||
resource_opts => #{
|
resource_opts => #{
|
||||||
worker_pool_size => 8,
|
worker_pool_size => 8,
|
||||||
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
|
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
|
||||||
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
|
|
||||||
batch_size => ?DEFAULT_BATCH_SIZE,
|
batch_size => ?DEFAULT_BATCH_SIZE,
|
||||||
batch_time => ?DEFAULT_BATCH_TIME,
|
batch_time => ?DEFAULT_BATCH_TIME,
|
||||||
query_mode => async,
|
query_mode => async,
|
||||||
|
|
|
@ -203,11 +203,9 @@ oracle_config(TestCase, _ConnectionType, Config) ->
|
||||||
" pool_size = 1\n"
|
" pool_size = 1\n"
|
||||||
" sql = \"~s\"\n"
|
" sql = \"~s\"\n"
|
||||||
" resource_opts = {\n"
|
" resource_opts = {\n"
|
||||||
" auto_restart_interval = \"5s\"\n"
|
|
||||||
" health_check_interval = \"5s\"\n"
|
" health_check_interval = \"5s\"\n"
|
||||||
" request_timeout = \"30s\"\n"
|
" request_timeout = \"30s\"\n"
|
||||||
" query_mode = \"async\"\n"
|
" query_mode = \"async\"\n"
|
||||||
" enable_batch = true\n"
|
|
||||||
" batch_size = 3\n"
|
" batch_size = 3\n"
|
||||||
" batch_time = \"3s\"\n"
|
" batch_time = \"3s\"\n"
|
||||||
" worker_pool_size = 1\n"
|
" worker_pool_size = 1\n"
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_bridge_pgsql, [
|
{application, emqx_bridge_pgsql, [
|
||||||
{description, "EMQX Enterprise PostgreSQL Bridge"},
|
{description, "EMQX Enterprise PostgreSQL Bridge"},
|
||||||
{vsn, "0.1.1"},
|
{vsn, "0.1.2"},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{applications, [kernel, stdlib]},
|
{applications, [kernel, stdlib]},
|
||||||
{env, []},
|
{env, []},
|
||||||
|
|
|
@ -55,7 +55,6 @@ values(_Method, Type) ->
|
||||||
resource_opts => #{
|
resource_opts => #{
|
||||||
worker_pool_size => 8,
|
worker_pool_size => 8,
|
||||||
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
|
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
|
||||||
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
|
|
||||||
batch_size => ?DEFAULT_BATCH_SIZE,
|
batch_size => ?DEFAULT_BATCH_SIZE,
|
||||||
batch_time => ?DEFAULT_BATCH_TIME,
|
batch_time => ?DEFAULT_BATCH_TIME,
|
||||||
query_mode => async,
|
query_mode => async,
|
||||||
|
|
|
@ -503,7 +503,6 @@ t_write_timeout(Config) ->
|
||||||
Config,
|
Config,
|
||||||
#{
|
#{
|
||||||
<<"resource_opts">> => #{
|
<<"resource_opts">> => #{
|
||||||
<<"auto_restart_interval">> => <<"100ms">>,
|
|
||||||
<<"resume_interval">> => <<"100ms">>,
|
<<"resume_interval">> => <<"100ms">>,
|
||||||
<<"health_check_interval">> => <<"100ms">>
|
<<"health_check_interval">> => <<"100ms">>
|
||||||
}
|
}
|
||||||
|
|
|
@ -154,8 +154,7 @@ fields(producer_resource_opts) ->
|
||||||
health_check_interval,
|
health_check_interval,
|
||||||
resume_interval,
|
resume_interval,
|
||||||
start_after_created,
|
start_after_created,
|
||||||
start_timeout,
|
start_timeout
|
||||||
auto_restart_interval
|
|
||||||
],
|
],
|
||||||
lists:filtermap(
|
lists:filtermap(
|
||||||
fun
|
fun
|
||||||
|
|
|
@ -57,7 +57,6 @@ values(_Method, Type) ->
|
||||||
resource_opts => #{
|
resource_opts => #{
|
||||||
worker_pool_size => 8,
|
worker_pool_size => 8,
|
||||||
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
|
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
|
||||||
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
|
|
||||||
batch_size => ?DEFAULT_BATCH_SIZE,
|
batch_size => ?DEFAULT_BATCH_SIZE,
|
||||||
batch_time => ?DEFAULT_BATCH_TIME,
|
batch_time => ?DEFAULT_BATCH_TIME,
|
||||||
query_mode => async,
|
query_mode => async,
|
||||||
|
|
|
@ -52,7 +52,6 @@ values(post) ->
|
||||||
resource_opts => #{
|
resource_opts => #{
|
||||||
worker_pool_size => 1,
|
worker_pool_size => 1,
|
||||||
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
|
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
|
||||||
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
|
|
||||||
batch_size => ?DEFAULT_BATCH_SIZE,
|
batch_size => ?DEFAULT_BATCH_SIZE,
|
||||||
batch_time => ?DEFAULT_BATCH_TIME,
|
batch_time => ?DEFAULT_BATCH_TIME,
|
||||||
query_mode => sync,
|
query_mode => sync,
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_bridge_sqlserver, [
|
{application, emqx_bridge_sqlserver, [
|
||||||
{description, "EMQX Enterprise SQL Server Bridge"},
|
{description, "EMQX Enterprise SQL Server Bridge"},
|
||||||
{vsn, "0.1.0"},
|
{vsn, "0.1.1"},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{applications, [kernel, stdlib, odbc]},
|
{applications, [kernel, stdlib, odbc]},
|
||||||
{env, []},
|
{env, []},
|
||||||
|
|
|
@ -56,7 +56,6 @@ values(post) ->
|
||||||
resource_opts => #{
|
resource_opts => #{
|
||||||
worker_pool_size => 1,
|
worker_pool_size => 1,
|
||||||
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
|
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
|
||||||
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
|
|
||||||
batch_size => ?DEFAULT_BATCH_SIZE,
|
batch_size => ?DEFAULT_BATCH_SIZE,
|
||||||
batch_time => ?DEFAULT_BATCH_TIME,
|
batch_time => ?DEFAULT_BATCH_TIME,
|
||||||
query_mode => async,
|
query_mode => async,
|
||||||
|
|
|
@ -54,7 +54,6 @@ values(_Method) ->
|
||||||
resource_opts => #{
|
resource_opts => #{
|
||||||
worker_pool_size => 8,
|
worker_pool_size => 8,
|
||||||
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
|
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
|
||||||
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
|
|
||||||
batch_size => ?DEFAULT_BATCH_SIZE,
|
batch_size => ?DEFAULT_BATCH_SIZE,
|
||||||
batch_time => ?DEFAULT_BATCH_TIME,
|
batch_time => ?DEFAULT_BATCH_TIME,
|
||||||
query_mode => sync,
|
query_mode => sync,
|
||||||
|
|
|
@ -51,8 +51,10 @@
|
||||||
health_check_timeout => integer(),
|
health_check_timeout => integer(),
|
||||||
%% use start_timeout instead
|
%% use start_timeout instead
|
||||||
wait_for_resource_ready => integer(),
|
wait_for_resource_ready => integer(),
|
||||||
%% use auto_restart_interval instead
|
%% use health_check_interval instead
|
||||||
auto_retry_interval => integer(),
|
auto_retry_interval => integer(),
|
||||||
|
%% use health_check_interval instead
|
||||||
|
auto_restart_interval => pos_integer() | infinity,
|
||||||
%%======================================= Deprecated Opts END
|
%%======================================= Deprecated Opts END
|
||||||
worker_pool_size => non_neg_integer(),
|
worker_pool_size => non_neg_integer(),
|
||||||
%% use `integer()` compatibility to release 5.0.0 bpapi
|
%% use `integer()` compatibility to release 5.0.0 bpapi
|
||||||
|
@ -64,9 +66,6 @@
|
||||||
%% after it is created. But note that a `started` resource is not guaranteed
|
%% after it is created. But note that a `started` resource is not guaranteed
|
||||||
%% to be `connected`.
|
%% to be `connected`.
|
||||||
start_after_created => boolean(),
|
start_after_created => boolean(),
|
||||||
%% If the resource disconnected, we can set to retry starting the resource
|
|
||||||
%% periodically.
|
|
||||||
auto_restart_interval => pos_integer() | infinity,
|
|
||||||
batch_size => pos_integer(),
|
batch_size => pos_integer(),
|
||||||
batch_time => pos_integer(),
|
batch_time => pos_integer(),
|
||||||
max_buffer_bytes => pos_integer(),
|
max_buffer_bytes => pos_integer(),
|
||||||
|
@ -115,10 +114,6 @@
|
||||||
-define(START_AFTER_CREATED, true).
|
-define(START_AFTER_CREATED, true).
|
||||||
-define(START_AFTER_CREATED_RAW, <<"true">>).
|
-define(START_AFTER_CREATED_RAW, <<"true">>).
|
||||||
|
|
||||||
%% milliseconds
|
|
||||||
-define(AUTO_RESTART_INTERVAL, 60000).
|
|
||||||
-define(AUTO_RESTART_INTERVAL_RAW, <<"60s">>).
|
|
||||||
|
|
||||||
-define(TEST_ID_PREFIX, "_probe_:").
|
-define(TEST_ID_PREFIX, "_probe_:").
|
||||||
-define(RES_METRICS, resource_metrics).
|
-define(RES_METRICS, resource_metrics).
|
||||||
|
|
||||||
|
|
|
@ -448,11 +448,9 @@ try_read_cache(ResId) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
retry_actions(Data) ->
|
retry_actions(Data) ->
|
||||||
case maps:get(auto_restart_interval, Data#data.opts, ?AUTO_RESTART_INTERVAL) of
|
case maps:get(health_check_interval, Data#data.opts, ?HEALTHCHECK_INTERVAL) of
|
||||||
undefined ->
|
undefined ->
|
||||||
[];
|
[];
|
||||||
infinity ->
|
|
||||||
[];
|
|
||||||
RetryInterval ->
|
RetryInterval ->
|
||||||
[{state_timeout, RetryInterval, auto_retry}]
|
[{state_timeout, RetryInterval, auto_retry}]
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -26,8 +26,6 @@
|
||||||
%% range interval in ms
|
%% range interval in ms
|
||||||
-define(HEALTH_CHECK_INTERVAL_RANGE_MIN, 1).
|
-define(HEALTH_CHECK_INTERVAL_RANGE_MIN, 1).
|
||||||
-define(HEALTH_CHECK_INTERVAL_RANGE_MAX, 3_600_000).
|
-define(HEALTH_CHECK_INTERVAL_RANGE_MAX, 3_600_000).
|
||||||
-define(AUTO_RESTART_INTERVAL_RANGE_MIN, 1).
|
|
||||||
-define(AUTO_RESTART_INTERVAL_RANGE_MAX, 3_600_000).
|
|
||||||
|
|
||||||
%% -------------------------------------------------------------------------------------------------
|
%% -------------------------------------------------------------------------------------------------
|
||||||
%% Hocon Schema Definitions
|
%% Hocon Schema Definitions
|
||||||
|
@ -124,29 +122,11 @@ start_timeout(required) -> false;
|
||||||
start_timeout(_) -> undefined.
|
start_timeout(_) -> undefined.
|
||||||
|
|
||||||
auto_restart_interval(type) -> hoconsc:union([infinity, emqx_schema:duration_ms()]);
|
auto_restart_interval(type) -> hoconsc:union([infinity, emqx_schema:duration_ms()]);
|
||||||
auto_restart_interval(desc) -> ?DESC("auto_restart_interval");
|
auto_restart_interval(default) -> <<"15s">>;
|
||||||
auto_restart_interval(default) -> ?AUTO_RESTART_INTERVAL_RAW;
|
|
||||||
auto_restart_interval(required) -> false;
|
auto_restart_interval(required) -> false;
|
||||||
auto_restart_interval(validator) -> fun auto_restart_interval_range/1;
|
auto_restart_interval(deprecated) -> {since, "5.1.0"};
|
||||||
auto_restart_interval(_) -> undefined.
|
auto_restart_interval(_) -> undefined.
|
||||||
|
|
||||||
auto_restart_interval_range(infinity) ->
|
|
||||||
ok;
|
|
||||||
auto_restart_interval_range(AutoRestartInterval) when
|
|
||||||
is_integer(AutoRestartInterval) andalso
|
|
||||||
AutoRestartInterval >= ?AUTO_RESTART_INTERVAL_RANGE_MIN andalso
|
|
||||||
AutoRestartInterval =< ?AUTO_RESTART_INTERVAL_RANGE_MAX
|
|
||||||
->
|
|
||||||
ok;
|
|
||||||
auto_restart_interval_range(AutoRestartInterval) ->
|
|
||||||
Message = get_out_of_range_msg(
|
|
||||||
<<"Auto Restart Interval">>,
|
|
||||||
AutoRestartInterval,
|
|
||||||
?AUTO_RESTART_INTERVAL_RANGE_MIN,
|
|
||||||
?AUTO_RESTART_INTERVAL_RANGE_MAX
|
|
||||||
),
|
|
||||||
{error, Message}.
|
|
||||||
|
|
||||||
query_mode(type) -> enum([sync, async]);
|
query_mode(type) -> enum([sync, async]);
|
||||||
query_mode(desc) -> ?DESC("query_mode");
|
query_mode(desc) -> ?DESC("query_mode");
|
||||||
query_mode(default) -> async;
|
query_mode(default) -> async;
|
||||||
|
|
|
@ -2887,7 +2887,7 @@ do_t_resource_activate_alarm_once(ResourceConfig, SubscribeEvent) ->
|
||||||
?DEFAULT_RESOURCE_GROUP,
|
?DEFAULT_RESOURCE_GROUP,
|
||||||
?TEST_RESOURCE,
|
?TEST_RESOURCE,
|
||||||
ResourceConfig,
|
ResourceConfig,
|
||||||
#{auto_restart_interval => 100, health_check_interval => 100}
|
#{health_check_interval => 100}
|
||||||
),
|
),
|
||||||
#{?snk_kind := resource_activate_alarm, resource_id := ?ID}
|
#{?snk_kind := resource_activate_alarm, resource_id := ?ID}
|
||||||
),
|
),
|
||||||
|
|
|
@ -53,7 +53,6 @@ values(_Method) ->
|
||||||
resource_opts => #{
|
resource_opts => #{
|
||||||
worker_pool_size => 1,
|
worker_pool_size => 1,
|
||||||
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
|
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
|
||||||
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
|
|
||||||
batch_size => ?DEFAULT_BATCH_SIZE,
|
batch_size => ?DEFAULT_BATCH_SIZE,
|
||||||
batch_time => ?DEFAULT_BATCH_TIME,
|
batch_time => ?DEFAULT_BATCH_TIME,
|
||||||
query_mode => async,
|
query_mode => async,
|
||||||
|
|
|
@ -1,11 +1,5 @@
|
||||||
emqx_resource_schema {
|
emqx_resource_schema {
|
||||||
|
|
||||||
auto_restart_interval.desc:
|
|
||||||
"""The auto restart interval after the resource is disconnected."""
|
|
||||||
|
|
||||||
auto_restart_interval.label:
|
|
||||||
"""Auto Restart Interval"""
|
|
||||||
|
|
||||||
batch_size.desc:
|
batch_size.desc:
|
||||||
"""Maximum batch count. If equal to 1, there's effectively no batching."""
|
"""Maximum batch count. If equal to 1, there's effectively no batching."""
|
||||||
|
|
||||||
|
|
|
@ -7,7 +7,6 @@ bridges {
|
||||||
precision = "ms"
|
precision = "ms"
|
||||||
resource_opts {
|
resource_opts {
|
||||||
inflight_window = 100
|
inflight_window = 100
|
||||||
auto_restart_interval = "60s"
|
|
||||||
batch_size = 100
|
batch_size = 100
|
||||||
batch_time = "10ms"
|
batch_time = "10ms"
|
||||||
health_check_interval = "15s"
|
health_check_interval = "15s"
|
||||||
|
|
Loading…
Reference in New Issue