feat: add more resource creation opts

This commit is contained in:
Shawn 2022-08-12 11:33:19 +08:00
parent bed10b097f
commit 0cdf4b47f1
9 changed files with 136 additions and 35 deletions

View File

@ -47,7 +47,6 @@
]). ]).
-define(DEFAULT_RESOURCE_OPTS, #{ -define(DEFAULT_RESOURCE_OPTS, #{
auto_retry_interval => 6000,
start_after_created => false start_after_created => false
}). }).

View File

@ -40,7 +40,6 @@
]). ]).
-define(DEFAULT_RESOURCE_OPTS, #{ -define(DEFAULT_RESOURCE_OPTS, #{
auto_retry_interval => 6000,
start_after_created => false start_after_created => false
}). }).

View File

@ -89,7 +89,7 @@ create(BridgeId, Conf) ->
create(BridgeType, BridgeName, Conf). create(BridgeType, BridgeName, Conf).
create(Type, Name, Conf) -> create(Type, Name, Conf) ->
create(Type, Name, Conf, #{auto_retry_interval => 60000}). create(Type, Name, Conf, #{}).
create(Type, Name, Conf, Opts) -> create(Type, Name, Conf, Opts) ->
?SLOG(info, #{ ?SLOG(info, #{
@ -169,7 +169,7 @@ recreate(Type, Name, Conf, Opts) ->
resource_id(Type, Name), resource_id(Type, Name),
bridge_to_resource_type(Type), bridge_to_resource_type(Type),
parse_confs(Type, Name, Conf), parse_confs(Type, Name, Conf),
Opts#{auto_retry_interval => 60000} Opts
). ).
create_dry_run(Type, Conf) -> create_dry_run(Type, Conf) ->

View File

@ -50,7 +50,7 @@ basic_config() ->
default => egress default => egress
} }
)} )}
] ++ ] ++ webhook_creation_opts() ++
proplists:delete(base_url, emqx_connector_http:fields(config)). proplists:delete(base_url, emqx_connector_http:fields(config)).
request_config() -> request_config() ->
@ -116,6 +116,22 @@ request_config() ->
)} )}
]. ].
webhook_creation_opts() ->
Opts = emqx_resource_schema:fields(creation_opts),
lists:filter(
fun({K, _V}) ->
not lists:member(K, unsupported_opts())
end,
Opts
).
unsupported_opts() ->
[
enable_batch,
batch_size,
batch_time
].
%%====================================================================================== %%======================================================================================
type_field() -> type_field() ->

View File

@ -1,4 +1,58 @@
emqx_resource_schema { emqx_resource_schema {
health_check_interval {
desc {
en: """Health check interval, in milliseconds."""
zh: """健康检查间隔,单位毫秒。"""
}
label {
en: """Health Check Interval"""
zh: """健康检查间隔"""
}
}
start_after_created {
desc {
en: """Whether start the resource right after created."""
zh: """是否在创建资源后立即启动资源。"""
}
label {
en: """Start After Created"""
zh: """创建后立即启动"""
}
}
start_timeout {
desc {
en: """
If 'start_after_created' enabled, how long time do we wait for the
resource get started, in milliseconds.
"""
zh: """
如果选择了创建后立即启动资源,此选项用来设置等待资源启动的超时时间,单位毫秒。
"""
}
label {
en: """Start Timeout"""
zh: """启动超时时间"""
}
}
auto_restart_interval {
desc {
en: """
The auto restart interval after the resource is disconnected, in milliseconds.
"""
zh: """
资源断开以后,自动重连的时间间隔,单位毫秒。
"""
}
label {
en: """Auto Restart Interval"""
zh: """自动重连间隔"""
}
}
query_mode { query_mode {
desc { desc {
en: """Query mode. Optional 'sync/async', default 'sync'.""" en: """Query mode. Optional 'sync/async', default 'sync'."""
@ -6,7 +60,7 @@ emqx_resource_schema {
} }
label { label {
en: """query_mode""" en: """query_mode"""
zh: """query_mode""" zh: """请求模式"""
} }
} }

View File

@ -41,18 +41,25 @@
}. }.
-type resource_group() :: binary(). -type resource_group() :: binary().
-type creation_opts() :: #{ -type creation_opts() :: #{
health_check_interval => integer(), %%======================================= Deprecated Opts:
%% use health_check_interval instead
health_check_timeout => integer(), health_check_timeout => integer(),
%% We can choose to block the return of emqx_resource:start until %% use start_timeout instead
%% the resource connected, wait max to `wait_for_resource_ready` ms.
wait_for_resource_ready => integer(), wait_for_resource_ready => integer(),
%% use auto_restart_interval instead
auto_retry_interval => integer(),
%%======================================= Deprecated Opts End
health_check_interval => integer(),
%% We can choose to block the return of emqx_resource:start until
%% the resource connected, wait max to `start_timeout` ms.
start_timeout => integer(),
%% If `start_after_created` is set to true, the resource is started right %% If `start_after_created` is set to true, the resource is started right
%% after it is created. But note that a `started` resource is not guaranteed %% after it is created. But note that a `started` resource is not guaranteed
%% to be `connected`. %% to be `connected`.
start_after_created => boolean(), start_after_created => boolean(),
%% If the resource disconnected, we can set to retry starting the resource %% If the resource disconnected, we can set to retry starting the resource
%% periodically. %% periodically.
auto_retry_interval => integer(), auto_restart_interval => integer(),
enable_batch => boolean(), enable_batch => boolean(),
batch_size => integer(), batch_size => integer(),
batch_time => integer(), batch_time => integer(),
@ -68,17 +75,17 @@
| {error, term()} | {error, term()}
| {resource_down, term()}. | {resource_down, term()}.
%% count
-define(DEFAULT_BATCH_SIZE, 100).
%% milliseconds
-define(DEFAULT_BATCH_TIME, 10).
%% bytes
-define(DEFAULT_QUEUE_SIZE, 1024 * 1024 * 1024). -define(DEFAULT_QUEUE_SIZE, 1024 * 1024 * 1024).
-define(DEFAULT_BATCH_SIZE, 100).
-define(DEFAULT_BATCH_TIME, 10).
-define(DEFAULT_INFLIGHT, 100). -define(DEFAULT_INFLIGHT, 100).
-define(HEALTHCHECK_INTERVAL, 15000).
-define(HEALTHCHECK_INTERVAL_RAW, <<"15s">>).
-define(RESUME_INTERVAL, 15000). -define(RESUME_INTERVAL, 15000).
-define(START_AFTER_CREATED, true).
-define(START_TIMEOUT, 5000).
-define(START_TIMEOUT_RAW, <<"5s">>).
-define(AUTO_RESTART_INTERVAL, 60000).
-define(AUTO_RESTART_INTERVAL_RAW, <<"60s">>).
-define(TEST_ID_PREFIX, "_test_:"). -define(TEST_ID_PREFIX, "_test_:").
-define(RES_METRICS, resource_metrics). -define(RES_METRICS, resource_metrics).

View File

@ -282,10 +282,9 @@ get_instance(ResId) ->
fetch_creation_opts(Opts) -> fetch_creation_opts(Opts) ->
SupportedOpts = [ SupportedOpts = [
health_check_interval, health_check_interval,
health_check_timeout, start_timeout,
wait_for_resource_ready,
start_after_created, start_after_created,
auto_retry_interval, auto_restart_interval,
enable_batch, enable_batch,
batch_size, batch_size,
batch_time, batch_time,

View File

@ -57,7 +57,6 @@
-type data() :: #data{}. -type data() :: #data{}.
-define(SHORT_HEALTHCHECK_INTERVAL, 1000). -define(SHORT_HEALTHCHECK_INTERVAL, 1000).
-define(HEALTHCHECK_INTERVAL, 15000).
-define(ETS_TABLE, ?MODULE). -define(ETS_TABLE, ?MODULE).
-define(WAIT_FOR_RESOURCE_DELAY, 100). -define(WAIT_FOR_RESOURCE_DELAY, 100).
-define(T_OPERATION, 5000). -define(T_OPERATION, 5000).
@ -127,9 +126,9 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) ->
[matched] [matched]
), ),
ok = emqx_resource_worker_sup:start_workers(ResId, Opts), ok = emqx_resource_worker_sup:start_workers(ResId, Opts),
case maps:get(start_after_created, Opts, true) of case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of
true -> true ->
wait_for_resource_ready(ResId, maps:get(wait_for_resource_ready, Opts, 5000)); wait_for_ready(ResId, maps:get(start_timeout, Opts, ?START_TIMEOUT));
false -> false ->
ok ok
end, end,
@ -147,7 +146,7 @@ create_dry_run(ResourceType, Config) ->
ok = emqx_resource_manager_sup:ensure_child( ok = emqx_resource_manager_sup:ensure_child(
MgrId, ResId, <<"dry_run">>, ResourceType, Config, #{} MgrId, ResId, <<"dry_run">>, ResourceType, Config, #{}
), ),
case wait_for_resource_ready(ResId, 15000) of case wait_for_ready(ResId, 15000) of
ok -> ok ->
remove(ResId); remove(ResId);
timeout -> timeout ->
@ -170,7 +169,7 @@ remove(ResId, ClearMetrics) when is_binary(ResId) ->
restart(ResId, Opts) when is_binary(ResId) -> restart(ResId, Opts) when is_binary(ResId) ->
case safe_call(ResId, restart, ?T_OPERATION) of case safe_call(ResId, restart, ?T_OPERATION) of
ok -> ok ->
wait_for_resource_ready(ResId, maps:get(wait_for_resource_ready, Opts, 5000)), wait_for_ready(ResId, maps:get(start_timeout, Opts, 5000)),
ok; ok;
{error, _Reason} = Error -> {error, _Reason} = Error ->
Error Error
@ -181,7 +180,7 @@ restart(ResId, Opts) when is_binary(ResId) ->
start(ResId, Opts) -> start(ResId, Opts) ->
case safe_call(ResId, start, ?T_OPERATION) of case safe_call(ResId, start, ?T_OPERATION) of
ok -> ok ->
wait_for_resource_ready(ResId, maps:get(wait_for_resource_ready, Opts, 5000)), wait_for_ready(ResId, maps:get(start_timeout, Opts, 5000)),
ok; ok;
{error, _Reason} = Error -> {error, _Reason} = Error ->
Error Error
@ -422,7 +421,7 @@ get_owner(ResId) ->
end. end.
handle_disconnected_state_enter(Data) -> handle_disconnected_state_enter(Data) ->
case maps:get(auto_retry_interval, Data#data.opts, undefined) of case maps:get(auto_restart_interval, Data#data.opts, ?AUTO_RESTART_INTERVAL) of
undefined -> undefined ->
{next_state, disconnected, Data}; {next_state, disconnected, Data};
RetryInterval -> RetryInterval ->
@ -573,19 +572,19 @@ data_record_to_external_map_with_metrics(Data) ->
metrics => get_metrics(Data#data.id) metrics => get_metrics(Data#data.id)
}. }.
-spec wait_for_resource_ready(resource_id(), integer()) -> ok | timeout. -spec wait_for_ready(resource_id(), integer()) -> ok | timeout.
wait_for_resource_ready(ResId, WaitTime) -> wait_for_ready(ResId, WaitTime) ->
do_wait_for_resource_ready(ResId, WaitTime div ?WAIT_FOR_RESOURCE_DELAY). do_wait_for_ready(ResId, WaitTime div ?WAIT_FOR_RESOURCE_DELAY).
do_wait_for_resource_ready(_ResId, 0) -> do_wait_for_ready(_ResId, 0) ->
timeout; timeout;
do_wait_for_resource_ready(ResId, Retry) -> do_wait_for_ready(ResId, Retry) ->
case ets_lookup(ResId) of case ets_lookup(ResId) of
{ok, _Group, #{status := connected}} -> {ok, _Group, #{status := connected}} ->
ok; ok;
_ -> _ ->
timer:sleep(?WAIT_FOR_RESOURCE_DELAY), timer:sleep(?WAIT_FOR_RESOURCE_DELAY),
do_wait_for_resource_ready(ResId, Retry - 1) do_wait_for_ready(ResId, Retry - 1)
end. end.
safe_call(ResId, Message, Timeout) -> safe_call(ResId, Message, Timeout) ->

View File

@ -32,6 +32,10 @@ roots() -> [].
fields('creation_opts') -> fields('creation_opts') ->
[ [
{health_check_interval, fun health_check_interval/1},
{start_after_created, fun start_after_created/1},
{start_timeout, fun start_timeout/1},
{auto_restart_interval, fun auto_restart_interval/1},
{query_mode, fun query_mode/1}, {query_mode, fun query_mode/1},
{resume_interval, fun resume_interval/1}, {resume_interval, fun resume_interval/1},
{async_inflight_window, fun async_inflight_window/1}, {async_inflight_window, fun async_inflight_window/1},
@ -42,6 +46,30 @@ fields('creation_opts') ->
{max_queue_bytes, fun queue_max_bytes/1} {max_queue_bytes, fun queue_max_bytes/1}
]. ].
health_check_interval(type) -> emqx_schema:duration_ms();
health_check_interval(desc) -> ?DESC("health_check_interval");
health_check_interval(default) -> ?HEALTHCHECK_INTERVAL_RAW;
health_check_interval(required) -> false;
health_check_interval(_) -> undefined.
start_after_created(type) -> boolean();
start_after_created(required) -> false;
start_after_created(default) -> ?START_AFTER_CREATED;
start_after_created(desc) -> ?DESC("start_after_created");
start_after_created(_) -> undefined.
start_timeout(type) -> emqx_schema:duration_ms();
start_timeout(desc) -> ?DESC("start_timeout");
start_timeout(default) -> ?START_TIMEOUT_RAW;
start_timeout(required) -> false;
start_timeout(_) -> undefined.
auto_restart_interval(type) -> hoconsc:union([infinity, emqx_schema:duration_ms()]);
auto_restart_interval(desc) -> ?DESC("auto_restart_interval");
auto_restart_interval(default) -> ?AUTO_RESTART_INTERVAL_RAW;
auto_restart_interval(required) -> false;
auto_restart_interval(_) -> undefined.
query_mode(type) -> enum([sync, async]); query_mode(type) -> enum([sync, async]);
query_mode(desc) -> ?DESC("query_mode"); query_mode(desc) -> ?DESC("query_mode");
query_mode(default) -> sync; query_mode(default) -> sync;