Merge pull request #8704 from terry-xiaoyu/webhook_async_conf
feat: add more resource creation opts
This commit is contained in:
commit
00966df18f
|
@ -47,7 +47,6 @@
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-define(DEFAULT_RESOURCE_OPTS, #{
|
-define(DEFAULT_RESOURCE_OPTS, #{
|
||||||
auto_retry_interval => 6000,
|
|
||||||
start_after_created => false
|
start_after_created => false
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
|
|
@ -40,7 +40,6 @@
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-define(DEFAULT_RESOURCE_OPTS, #{
|
-define(DEFAULT_RESOURCE_OPTS, #{
|
||||||
auto_retry_interval => 6000,
|
|
||||||
start_after_created => false
|
start_after_created => false
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
|
|
@ -89,7 +89,7 @@ create(BridgeId, Conf) ->
|
||||||
create(BridgeType, BridgeName, Conf).
|
create(BridgeType, BridgeName, Conf).
|
||||||
|
|
||||||
create(Type, Name, Conf) ->
|
create(Type, Name, Conf) ->
|
||||||
create(Type, Name, Conf, #{auto_retry_interval => 60000}).
|
create(Type, Name, Conf, #{}).
|
||||||
|
|
||||||
create(Type, Name, Conf, Opts) ->
|
create(Type, Name, Conf, Opts) ->
|
||||||
?SLOG(info, #{
|
?SLOG(info, #{
|
||||||
|
@ -169,7 +169,7 @@ recreate(Type, Name, Conf, Opts) ->
|
||||||
resource_id(Type, Name),
|
resource_id(Type, Name),
|
||||||
bridge_to_resource_type(Type),
|
bridge_to_resource_type(Type),
|
||||||
parse_confs(Type, Name, Conf),
|
parse_confs(Type, Name, Conf),
|
||||||
Opts#{auto_retry_interval => 60000}
|
Opts
|
||||||
).
|
).
|
||||||
|
|
||||||
create_dry_run(Type, Conf) ->
|
create_dry_run(Type, Conf) ->
|
||||||
|
|
|
@ -50,7 +50,7 @@ basic_config() ->
|
||||||
default => egress
|
default => egress
|
||||||
}
|
}
|
||||||
)}
|
)}
|
||||||
] ++
|
] ++ webhook_creation_opts() ++
|
||||||
proplists:delete(base_url, emqx_connector_http:fields(config)).
|
proplists:delete(base_url, emqx_connector_http:fields(config)).
|
||||||
|
|
||||||
request_config() ->
|
request_config() ->
|
||||||
|
@ -116,6 +116,22 @@ request_config() ->
|
||||||
)}
|
)}
|
||||||
].
|
].
|
||||||
|
|
||||||
|
webhook_creation_opts() ->
|
||||||
|
Opts = emqx_resource_schema:fields(creation_opts),
|
||||||
|
lists:filter(
|
||||||
|
fun({K, _V}) ->
|
||||||
|
not lists:member(K, unsupported_opts())
|
||||||
|
end,
|
||||||
|
Opts
|
||||||
|
).
|
||||||
|
|
||||||
|
unsupported_opts() ->
|
||||||
|
[
|
||||||
|
enable_batch,
|
||||||
|
batch_size,
|
||||||
|
batch_time
|
||||||
|
].
|
||||||
|
|
||||||
%%======================================================================================
|
%%======================================================================================
|
||||||
|
|
||||||
type_field() ->
|
type_field() ->
|
||||||
|
|
|
@ -1,4 +1,58 @@
|
||||||
emqx_resource_schema {
|
emqx_resource_schema {
|
||||||
|
|
||||||
|
health_check_interval {
|
||||||
|
desc {
|
||||||
|
en: """Health check interval, in milliseconds."""
|
||||||
|
zh: """健康检查间隔,单位毫秒。"""
|
||||||
|
}
|
||||||
|
label {
|
||||||
|
en: """Health Check Interval"""
|
||||||
|
zh: """健康检查间隔"""
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
start_after_created {
|
||||||
|
desc {
|
||||||
|
en: """Whether start the resource right after created."""
|
||||||
|
zh: """是否在创建资源后立即启动资源。"""
|
||||||
|
}
|
||||||
|
label {
|
||||||
|
en: """Start After Created"""
|
||||||
|
zh: """创建后立即启动"""
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
start_timeout {
|
||||||
|
desc {
|
||||||
|
en: """
|
||||||
|
If 'start_after_created' enabled, how long time do we wait for the
|
||||||
|
resource get started, in milliseconds.
|
||||||
|
"""
|
||||||
|
zh: """
|
||||||
|
如果选择了创建后立即启动资源,此选项用来设置等待资源启动的超时时间,单位毫秒。
|
||||||
|
"""
|
||||||
|
}
|
||||||
|
label {
|
||||||
|
en: """Start Timeout"""
|
||||||
|
zh: """启动超时时间"""
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
auto_restart_interval {
|
||||||
|
desc {
|
||||||
|
en: """
|
||||||
|
The auto restart interval after the resource is disconnected, in milliseconds.
|
||||||
|
"""
|
||||||
|
zh: """
|
||||||
|
资源断开以后,自动重连的时间间隔,单位毫秒。
|
||||||
|
"""
|
||||||
|
}
|
||||||
|
label {
|
||||||
|
en: """Auto Restart Interval"""
|
||||||
|
zh: """自动重连间隔"""
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
query_mode {
|
query_mode {
|
||||||
desc {
|
desc {
|
||||||
en: """Query mode. Optional 'sync/async', default 'sync'."""
|
en: """Query mode. Optional 'sync/async', default 'sync'."""
|
||||||
|
@ -6,7 +60,7 @@ emqx_resource_schema {
|
||||||
}
|
}
|
||||||
label {
|
label {
|
||||||
en: """query_mode"""
|
en: """query_mode"""
|
||||||
zh: """query_mode"""
|
zh: """请求模式"""
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -41,18 +41,25 @@
|
||||||
}.
|
}.
|
||||||
-type resource_group() :: binary().
|
-type resource_group() :: binary().
|
||||||
-type creation_opts() :: #{
|
-type creation_opts() :: #{
|
||||||
health_check_interval => integer(),
|
%%======================================= Deprecated Opts:
|
||||||
|
%% use health_check_interval instead
|
||||||
health_check_timeout => integer(),
|
health_check_timeout => integer(),
|
||||||
%% We can choose to block the return of emqx_resource:start until
|
%% use start_timeout instead
|
||||||
%% the resource connected, wait max to `wait_for_resource_ready` ms.
|
|
||||||
wait_for_resource_ready => integer(),
|
wait_for_resource_ready => integer(),
|
||||||
|
%% use auto_restart_interval instead
|
||||||
|
auto_retry_interval => integer(),
|
||||||
|
%%======================================= Deprecated Opts End
|
||||||
|
health_check_interval => integer(),
|
||||||
|
%% We can choose to block the return of emqx_resource:start until
|
||||||
|
%% the resource connected, wait max to `start_timeout` ms.
|
||||||
|
start_timeout => integer(),
|
||||||
%% If `start_after_created` is set to true, the resource is started right
|
%% If `start_after_created` is set to true, the resource is started right
|
||||||
%% after it is created. But note that a `started` resource is not guaranteed
|
%% after it is created. But note that a `started` resource is not guaranteed
|
||||||
%% to be `connected`.
|
%% to be `connected`.
|
||||||
start_after_created => boolean(),
|
start_after_created => boolean(),
|
||||||
%% If the resource disconnected, we can set to retry starting the resource
|
%% If the resource disconnected, we can set to retry starting the resource
|
||||||
%% periodically.
|
%% periodically.
|
||||||
auto_retry_interval => integer(),
|
auto_restart_interval => integer(),
|
||||||
enable_batch => boolean(),
|
enable_batch => boolean(),
|
||||||
batch_size => integer(),
|
batch_size => integer(),
|
||||||
batch_time => integer(),
|
batch_time => integer(),
|
||||||
|
@ -68,17 +75,17 @@
|
||||||
| {error, term()}
|
| {error, term()}
|
||||||
| {resource_down, term()}.
|
| {resource_down, term()}.
|
||||||
|
|
||||||
%% count
|
|
||||||
-define(DEFAULT_BATCH_SIZE, 100).
|
|
||||||
%% milliseconds
|
|
||||||
-define(DEFAULT_BATCH_TIME, 10).
|
|
||||||
|
|
||||||
%% bytes
|
|
||||||
-define(DEFAULT_QUEUE_SIZE, 1024 * 1024 * 1024).
|
-define(DEFAULT_QUEUE_SIZE, 1024 * 1024 * 1024).
|
||||||
|
-define(DEFAULT_BATCH_SIZE, 100).
|
||||||
|
-define(DEFAULT_BATCH_TIME, 10).
|
||||||
-define(DEFAULT_INFLIGHT, 100).
|
-define(DEFAULT_INFLIGHT, 100).
|
||||||
|
-define(HEALTHCHECK_INTERVAL, 15000).
|
||||||
|
-define(HEALTHCHECK_INTERVAL_RAW, <<"15s">>).
|
||||||
-define(RESUME_INTERVAL, 15000).
|
-define(RESUME_INTERVAL, 15000).
|
||||||
|
-define(START_AFTER_CREATED, true).
|
||||||
|
-define(START_TIMEOUT, 5000).
|
||||||
|
-define(START_TIMEOUT_RAW, <<"5s">>).
|
||||||
|
-define(AUTO_RESTART_INTERVAL, 60000).
|
||||||
|
-define(AUTO_RESTART_INTERVAL_RAW, <<"60s">>).
|
||||||
-define(TEST_ID_PREFIX, "_test_:").
|
-define(TEST_ID_PREFIX, "_test_:").
|
||||||
-define(RES_METRICS, resource_metrics).
|
-define(RES_METRICS, resource_metrics).
|
||||||
|
|
|
@ -282,10 +282,9 @@ get_instance(ResId) ->
|
||||||
fetch_creation_opts(Opts) ->
|
fetch_creation_opts(Opts) ->
|
||||||
SupportedOpts = [
|
SupportedOpts = [
|
||||||
health_check_interval,
|
health_check_interval,
|
||||||
health_check_timeout,
|
start_timeout,
|
||||||
wait_for_resource_ready,
|
|
||||||
start_after_created,
|
start_after_created,
|
||||||
auto_retry_interval,
|
auto_restart_interval,
|
||||||
enable_batch,
|
enable_batch,
|
||||||
batch_size,
|
batch_size,
|
||||||
batch_time,
|
batch_time,
|
||||||
|
|
|
@ -57,7 +57,6 @@
|
||||||
-type data() :: #data{}.
|
-type data() :: #data{}.
|
||||||
|
|
||||||
-define(SHORT_HEALTHCHECK_INTERVAL, 1000).
|
-define(SHORT_HEALTHCHECK_INTERVAL, 1000).
|
||||||
-define(HEALTHCHECK_INTERVAL, 15000).
|
|
||||||
-define(ETS_TABLE, ?MODULE).
|
-define(ETS_TABLE, ?MODULE).
|
||||||
-define(WAIT_FOR_RESOURCE_DELAY, 100).
|
-define(WAIT_FOR_RESOURCE_DELAY, 100).
|
||||||
-define(T_OPERATION, 5000).
|
-define(T_OPERATION, 5000).
|
||||||
|
@ -127,9 +126,9 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) ->
|
||||||
[matched]
|
[matched]
|
||||||
),
|
),
|
||||||
ok = emqx_resource_worker_sup:start_workers(ResId, Opts),
|
ok = emqx_resource_worker_sup:start_workers(ResId, Opts),
|
||||||
case maps:get(start_after_created, Opts, true) of
|
case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of
|
||||||
true ->
|
true ->
|
||||||
wait_for_resource_ready(ResId, maps:get(wait_for_resource_ready, Opts, 5000));
|
wait_for_ready(ResId, maps:get(start_timeout, Opts, ?START_TIMEOUT));
|
||||||
false ->
|
false ->
|
||||||
ok
|
ok
|
||||||
end,
|
end,
|
||||||
|
@ -147,7 +146,7 @@ create_dry_run(ResourceType, Config) ->
|
||||||
ok = emqx_resource_manager_sup:ensure_child(
|
ok = emqx_resource_manager_sup:ensure_child(
|
||||||
MgrId, ResId, <<"dry_run">>, ResourceType, Config, #{}
|
MgrId, ResId, <<"dry_run">>, ResourceType, Config, #{}
|
||||||
),
|
),
|
||||||
case wait_for_resource_ready(ResId, 15000) of
|
case wait_for_ready(ResId, 15000) of
|
||||||
ok ->
|
ok ->
|
||||||
remove(ResId);
|
remove(ResId);
|
||||||
timeout ->
|
timeout ->
|
||||||
|
@ -170,7 +169,7 @@ remove(ResId, ClearMetrics) when is_binary(ResId) ->
|
||||||
restart(ResId, Opts) when is_binary(ResId) ->
|
restart(ResId, Opts) when is_binary(ResId) ->
|
||||||
case safe_call(ResId, restart, ?T_OPERATION) of
|
case safe_call(ResId, restart, ?T_OPERATION) of
|
||||||
ok ->
|
ok ->
|
||||||
wait_for_resource_ready(ResId, maps:get(wait_for_resource_ready, Opts, 5000)),
|
wait_for_ready(ResId, maps:get(start_timeout, Opts, 5000)),
|
||||||
ok;
|
ok;
|
||||||
{error, _Reason} = Error ->
|
{error, _Reason} = Error ->
|
||||||
Error
|
Error
|
||||||
|
@ -181,7 +180,7 @@ restart(ResId, Opts) when is_binary(ResId) ->
|
||||||
start(ResId, Opts) ->
|
start(ResId, Opts) ->
|
||||||
case safe_call(ResId, start, ?T_OPERATION) of
|
case safe_call(ResId, start, ?T_OPERATION) of
|
||||||
ok ->
|
ok ->
|
||||||
wait_for_resource_ready(ResId, maps:get(wait_for_resource_ready, Opts, 5000)),
|
wait_for_ready(ResId, maps:get(start_timeout, Opts, 5000)),
|
||||||
ok;
|
ok;
|
||||||
{error, _Reason} = Error ->
|
{error, _Reason} = Error ->
|
||||||
Error
|
Error
|
||||||
|
@ -422,7 +421,7 @@ get_owner(ResId) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
handle_disconnected_state_enter(Data) ->
|
handle_disconnected_state_enter(Data) ->
|
||||||
case maps:get(auto_retry_interval, Data#data.opts, undefined) of
|
case maps:get(auto_restart_interval, Data#data.opts, ?AUTO_RESTART_INTERVAL) of
|
||||||
undefined ->
|
undefined ->
|
||||||
{next_state, disconnected, Data};
|
{next_state, disconnected, Data};
|
||||||
RetryInterval ->
|
RetryInterval ->
|
||||||
|
@ -573,19 +572,19 @@ data_record_to_external_map_with_metrics(Data) ->
|
||||||
metrics => get_metrics(Data#data.id)
|
metrics => get_metrics(Data#data.id)
|
||||||
}.
|
}.
|
||||||
|
|
||||||
-spec wait_for_resource_ready(resource_id(), integer()) -> ok | timeout.
|
-spec wait_for_ready(resource_id(), integer()) -> ok | timeout.
|
||||||
wait_for_resource_ready(ResId, WaitTime) ->
|
wait_for_ready(ResId, WaitTime) ->
|
||||||
do_wait_for_resource_ready(ResId, WaitTime div ?WAIT_FOR_RESOURCE_DELAY).
|
do_wait_for_ready(ResId, WaitTime div ?WAIT_FOR_RESOURCE_DELAY).
|
||||||
|
|
||||||
do_wait_for_resource_ready(_ResId, 0) ->
|
do_wait_for_ready(_ResId, 0) ->
|
||||||
timeout;
|
timeout;
|
||||||
do_wait_for_resource_ready(ResId, Retry) ->
|
do_wait_for_ready(ResId, Retry) ->
|
||||||
case ets_lookup(ResId) of
|
case ets_lookup(ResId) of
|
||||||
{ok, _Group, #{status := connected}} ->
|
{ok, _Group, #{status := connected}} ->
|
||||||
ok;
|
ok;
|
||||||
_ ->
|
_ ->
|
||||||
timer:sleep(?WAIT_FOR_RESOURCE_DELAY),
|
timer:sleep(?WAIT_FOR_RESOURCE_DELAY),
|
||||||
do_wait_for_resource_ready(ResId, Retry - 1)
|
do_wait_for_ready(ResId, Retry - 1)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
safe_call(ResId, Message, Timeout) ->
|
safe_call(ResId, Message, Timeout) ->
|
||||||
|
|
|
@ -32,6 +32,10 @@ roots() -> [].
|
||||||
|
|
||||||
fields('creation_opts') ->
|
fields('creation_opts') ->
|
||||||
[
|
[
|
||||||
|
{health_check_interval, fun health_check_interval/1},
|
||||||
|
{start_after_created, fun start_after_created/1},
|
||||||
|
{start_timeout, fun start_timeout/1},
|
||||||
|
{auto_restart_interval, fun auto_restart_interval/1},
|
||||||
{query_mode, fun query_mode/1},
|
{query_mode, fun query_mode/1},
|
||||||
{resume_interval, fun resume_interval/1},
|
{resume_interval, fun resume_interval/1},
|
||||||
{async_inflight_window, fun async_inflight_window/1},
|
{async_inflight_window, fun async_inflight_window/1},
|
||||||
|
@ -42,6 +46,30 @@ fields('creation_opts') ->
|
||||||
{max_queue_bytes, fun queue_max_bytes/1}
|
{max_queue_bytes, fun queue_max_bytes/1}
|
||||||
].
|
].
|
||||||
|
|
||||||
|
health_check_interval(type) -> emqx_schema:duration_ms();
|
||||||
|
health_check_interval(desc) -> ?DESC("health_check_interval");
|
||||||
|
health_check_interval(default) -> ?HEALTHCHECK_INTERVAL_RAW;
|
||||||
|
health_check_interval(required) -> false;
|
||||||
|
health_check_interval(_) -> undefined.
|
||||||
|
|
||||||
|
start_after_created(type) -> boolean();
|
||||||
|
start_after_created(required) -> false;
|
||||||
|
start_after_created(default) -> ?START_AFTER_CREATED;
|
||||||
|
start_after_created(desc) -> ?DESC("start_after_created");
|
||||||
|
start_after_created(_) -> undefined.
|
||||||
|
|
||||||
|
start_timeout(type) -> emqx_schema:duration_ms();
|
||||||
|
start_timeout(desc) -> ?DESC("start_timeout");
|
||||||
|
start_timeout(default) -> ?START_TIMEOUT_RAW;
|
||||||
|
start_timeout(required) -> false;
|
||||||
|
start_timeout(_) -> undefined.
|
||||||
|
|
||||||
|
auto_restart_interval(type) -> hoconsc:union([infinity, emqx_schema:duration_ms()]);
|
||||||
|
auto_restart_interval(desc) -> ?DESC("auto_restart_interval");
|
||||||
|
auto_restart_interval(default) -> ?AUTO_RESTART_INTERVAL_RAW;
|
||||||
|
auto_restart_interval(required) -> false;
|
||||||
|
auto_restart_interval(_) -> undefined.
|
||||||
|
|
||||||
query_mode(type) -> enum([sync, async]);
|
query_mode(type) -> enum([sync, async]);
|
||||||
query_mode(desc) -> ?DESC("query_mode");
|
query_mode(desc) -> ?DESC("query_mode");
|
||||||
query_mode(default) -> sync;
|
query_mode(default) -> sync;
|
||||||
|
|
Loading…
Reference in New Issue