From 0cdf4b47f11264389a283b5ea164b1d0f35e352b Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Fri, 12 Aug 2022 11:33:19 +0800 Subject: [PATCH] feat: add more resource creation opts --- apps/emqx_authn/src/emqx_authn_utils.erl | 1 - apps/emqx_authz/src/emqx_authz_utils.erl | 1 - apps/emqx_bridge/src/emqx_bridge_resource.erl | 4 +- .../src/schema/emqx_bridge_webhook_schema.erl | 18 +++++- .../i18n/emqx_resource_schema_i18n.conf | 56 ++++++++++++++++++- apps/emqx_resource/include/emqx_resource.hrl | 33 ++++++----- apps/emqx_resource/src/emqx_resource.erl | 5 +- .../src/emqx_resource_manager.erl | 25 ++++----- .../src/schema/emqx_resource_schema.erl | 28 ++++++++++ 9 files changed, 136 insertions(+), 35 deletions(-) diff --git a/apps/emqx_authn/src/emqx_authn_utils.erl b/apps/emqx_authn/src/emqx_authn_utils.erl index 994f2f275..dca243cc1 100644 --- a/apps/emqx_authn/src/emqx_authn_utils.erl +++ b/apps/emqx_authn/src/emqx_authn_utils.erl @@ -47,7 +47,6 @@ ]). -define(DEFAULT_RESOURCE_OPTS, #{ - auto_retry_interval => 6000, start_after_created => false }). diff --git a/apps/emqx_authz/src/emqx_authz_utils.erl b/apps/emqx_authz/src/emqx_authz_utils.erl index dd6f66c7f..0048c0dc4 100644 --- a/apps/emqx_authz/src/emqx_authz_utils.erl +++ b/apps/emqx_authz/src/emqx_authz_utils.erl @@ -40,7 +40,6 @@ ]). -define(DEFAULT_RESOURCE_OPTS, #{ - auto_retry_interval => 6000, start_after_created => false }). diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index d34c30ee9..f7aeec30d 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -89,7 +89,7 @@ create(BridgeId, Conf) -> create(BridgeType, BridgeName, Conf). create(Type, Name, Conf) -> - create(Type, Name, Conf, #{auto_retry_interval => 60000}). + create(Type, Name, Conf, #{}). create(Type, Name, Conf, Opts) -> ?SLOG(info, #{ @@ -169,7 +169,7 @@ recreate(Type, Name, Conf, Opts) -> resource_id(Type, Name), bridge_to_resource_type(Type), parse_confs(Type, Name, Conf), - Opts#{auto_retry_interval => 60000} + Opts ). create_dry_run(Type, Conf) -> diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl index d833e6ca8..d51c2edef 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl @@ -50,7 +50,7 @@ basic_config() -> default => egress } )} - ] ++ + ] ++ webhook_creation_opts() ++ proplists:delete(base_url, emqx_connector_http:fields(config)). request_config() -> @@ -116,6 +116,22 @@ request_config() -> )} ]. +webhook_creation_opts() -> + Opts = emqx_resource_schema:fields(creation_opts), + lists:filter( + fun({K, _V}) -> + not lists:member(K, unsupported_opts()) + end, + Opts + ). + +unsupported_opts() -> + [ + enable_batch, + batch_size, + batch_time + ]. + %%====================================================================================== type_field() -> diff --git a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf index 9a2234ff5..3ec170ebf 100644 --- a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf +++ b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf @@ -1,4 +1,58 @@ emqx_resource_schema { + + health_check_interval { + desc { + en: """Health check interval, in milliseconds.""" + zh: """健康检查间隔,单位毫秒。""" + } + label { + en: """Health Check Interval""" + zh: """健康检查间隔""" + } + } + + start_after_created { + desc { + en: """Whether start the resource right after created.""" + zh: """是否在创建资源后立即启动资源。""" + } + label { + en: """Start After Created""" + zh: """创建后立即启动""" + } + } + + start_timeout { + desc { + en: """ +If 'start_after_created' enabled, how long time do we wait for the +resource get started, in milliseconds. +""" + zh: """ +如果选择了创建后立即启动资源,此选项用来设置等待资源启动的超时时间,单位毫秒。 +""" + } + label { + en: """Start Timeout""" + zh: """启动超时时间""" + } + } + + auto_restart_interval { + desc { + en: """ +The auto restart interval after the resource is disconnected, in milliseconds. +""" + zh: """ +资源断开以后,自动重连的时间间隔,单位毫秒。 +""" + } + label { + en: """Auto Restart Interval""" + zh: """自动重连间隔""" + } + } + query_mode { desc { en: """Query mode. Optional 'sync/async', default 'sync'.""" @@ -6,7 +60,7 @@ emqx_resource_schema { } label { en: """query_mode""" - zh: """query_mode""" + zh: """请求模式""" } } diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index ed8929831..190c278ae 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -41,18 +41,25 @@ }. -type resource_group() :: binary(). -type creation_opts() :: #{ - health_check_interval => integer(), + %%======================================= Deprecated Opts: + %% use health_check_interval instead health_check_timeout => integer(), - %% We can choose to block the return of emqx_resource:start until - %% the resource connected, wait max to `wait_for_resource_ready` ms. + %% use start_timeout instead wait_for_resource_ready => integer(), + %% use auto_restart_interval instead + auto_retry_interval => integer(), + %%======================================= Deprecated Opts End + health_check_interval => integer(), + %% We can choose to block the return of emqx_resource:start until + %% the resource connected, wait max to `start_timeout` ms. + start_timeout => integer(), %% If `start_after_created` is set to true, the resource is started right %% after it is created. But note that a `started` resource is not guaranteed %% to be `connected`. start_after_created => boolean(), %% If the resource disconnected, we can set to retry starting the resource %% periodically. - auto_retry_interval => integer(), + auto_restart_interval => integer(), enable_batch => boolean(), batch_size => integer(), batch_time => integer(), @@ -68,17 +75,17 @@ | {error, term()} | {resource_down, term()}. -%% count --define(DEFAULT_BATCH_SIZE, 100). -%% milliseconds --define(DEFAULT_BATCH_TIME, 10). - -%% bytes -define(DEFAULT_QUEUE_SIZE, 1024 * 1024 * 1024). - +-define(DEFAULT_BATCH_SIZE, 100). +-define(DEFAULT_BATCH_TIME, 10). -define(DEFAULT_INFLIGHT, 100). - +-define(HEALTHCHECK_INTERVAL, 15000). +-define(HEALTHCHECK_INTERVAL_RAW, <<"15s">>). -define(RESUME_INTERVAL, 15000). - +-define(START_AFTER_CREATED, true). +-define(START_TIMEOUT, 5000). +-define(START_TIMEOUT_RAW, <<"5s">>). +-define(AUTO_RESTART_INTERVAL, 60000). +-define(AUTO_RESTART_INTERVAL_RAW, <<"60s">>). -define(TEST_ID_PREFIX, "_test_:"). -define(RES_METRICS, resource_metrics). diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index c4fd24007..60f0dd360 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -282,10 +282,9 @@ get_instance(ResId) -> fetch_creation_opts(Opts) -> SupportedOpts = [ health_check_interval, - health_check_timeout, - wait_for_resource_ready, + start_timeout, start_after_created, - auto_retry_interval, + auto_restart_interval, enable_batch, batch_size, batch_time, diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 608548898..66d9e32b0 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -57,7 +57,6 @@ -type data() :: #data{}. -define(SHORT_HEALTHCHECK_INTERVAL, 1000). --define(HEALTHCHECK_INTERVAL, 15000). -define(ETS_TABLE, ?MODULE). -define(WAIT_FOR_RESOURCE_DELAY, 100). -define(T_OPERATION, 5000). @@ -127,9 +126,9 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) -> [matched] ), ok = emqx_resource_worker_sup:start_workers(ResId, Opts), - case maps:get(start_after_created, Opts, true) of + case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of true -> - wait_for_resource_ready(ResId, maps:get(wait_for_resource_ready, Opts, 5000)); + wait_for_ready(ResId, maps:get(start_timeout, Opts, ?START_TIMEOUT)); false -> ok end, @@ -147,7 +146,7 @@ create_dry_run(ResourceType, Config) -> ok = emqx_resource_manager_sup:ensure_child( MgrId, ResId, <<"dry_run">>, ResourceType, Config, #{} ), - case wait_for_resource_ready(ResId, 15000) of + case wait_for_ready(ResId, 15000) of ok -> remove(ResId); timeout -> @@ -170,7 +169,7 @@ remove(ResId, ClearMetrics) when is_binary(ResId) -> restart(ResId, Opts) when is_binary(ResId) -> case safe_call(ResId, restart, ?T_OPERATION) of ok -> - wait_for_resource_ready(ResId, maps:get(wait_for_resource_ready, Opts, 5000)), + wait_for_ready(ResId, maps:get(start_timeout, Opts, 5000)), ok; {error, _Reason} = Error -> Error @@ -181,7 +180,7 @@ restart(ResId, Opts) when is_binary(ResId) -> start(ResId, Opts) -> case safe_call(ResId, start, ?T_OPERATION) of ok -> - wait_for_resource_ready(ResId, maps:get(wait_for_resource_ready, Opts, 5000)), + wait_for_ready(ResId, maps:get(start_timeout, Opts, 5000)), ok; {error, _Reason} = Error -> Error @@ -422,7 +421,7 @@ get_owner(ResId) -> end. handle_disconnected_state_enter(Data) -> - case maps:get(auto_retry_interval, Data#data.opts, undefined) of + case maps:get(auto_restart_interval, Data#data.opts, ?AUTO_RESTART_INTERVAL) of undefined -> {next_state, disconnected, Data}; RetryInterval -> @@ -573,19 +572,19 @@ data_record_to_external_map_with_metrics(Data) -> metrics => get_metrics(Data#data.id) }. --spec wait_for_resource_ready(resource_id(), integer()) -> ok | timeout. -wait_for_resource_ready(ResId, WaitTime) -> - do_wait_for_resource_ready(ResId, WaitTime div ?WAIT_FOR_RESOURCE_DELAY). +-spec wait_for_ready(resource_id(), integer()) -> ok | timeout. +wait_for_ready(ResId, WaitTime) -> + do_wait_for_ready(ResId, WaitTime div ?WAIT_FOR_RESOURCE_DELAY). -do_wait_for_resource_ready(_ResId, 0) -> +do_wait_for_ready(_ResId, 0) -> timeout; -do_wait_for_resource_ready(ResId, Retry) -> +do_wait_for_ready(ResId, Retry) -> case ets_lookup(ResId) of {ok, _Group, #{status := connected}} -> ok; _ -> timer:sleep(?WAIT_FOR_RESOURCE_DELAY), - do_wait_for_resource_ready(ResId, Retry - 1) + do_wait_for_ready(ResId, Retry - 1) end. safe_call(ResId, Message, Timeout) -> diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl index 464c055b7..ccc31a707 100644 --- a/apps/emqx_resource/src/schema/emqx_resource_schema.erl +++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl @@ -32,6 +32,10 @@ roots() -> []. fields('creation_opts') -> [ + {health_check_interval, fun health_check_interval/1}, + {start_after_created, fun start_after_created/1}, + {start_timeout, fun start_timeout/1}, + {auto_restart_interval, fun auto_restart_interval/1}, {query_mode, fun query_mode/1}, {resume_interval, fun resume_interval/1}, {async_inflight_window, fun async_inflight_window/1}, @@ -42,6 +46,30 @@ fields('creation_opts') -> {max_queue_bytes, fun queue_max_bytes/1} ]. +health_check_interval(type) -> emqx_schema:duration_ms(); +health_check_interval(desc) -> ?DESC("health_check_interval"); +health_check_interval(default) -> ?HEALTHCHECK_INTERVAL_RAW; +health_check_interval(required) -> false; +health_check_interval(_) -> undefined. + +start_after_created(type) -> boolean(); +start_after_created(required) -> false; +start_after_created(default) -> ?START_AFTER_CREATED; +start_after_created(desc) -> ?DESC("start_after_created"); +start_after_created(_) -> undefined. + +start_timeout(type) -> emqx_schema:duration_ms(); +start_timeout(desc) -> ?DESC("start_timeout"); +start_timeout(default) -> ?START_TIMEOUT_RAW; +start_timeout(required) -> false; +start_timeout(_) -> undefined. + +auto_restart_interval(type) -> hoconsc:union([infinity, emqx_schema:duration_ms()]); +auto_restart_interval(desc) -> ?DESC("auto_restart_interval"); +auto_restart_interval(default) -> ?AUTO_RESTART_INTERVAL_RAW; +auto_restart_interval(required) -> false; +auto_restart_interval(_) -> undefined. + query_mode(type) -> enum([sync, async]); query_mode(desc) -> ?DESC("query_mode"); query_mode(default) -> sync;