diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 574305a4f..a76346bad 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1902,6 +1902,7 @@ common_ssl_opts_schema(Defaults) -> sensitive => true, required => false, example => <<"">>, + format => <<"password">>, desc => ?DESC(common_ssl_opts_schema_password) } )}, diff --git a/apps/emqx_connector/src/emqx_connector_schema_lib.erl b/apps/emqx_connector/src/emqx_connector_schema_lib.erl index dd85566ed..53643c9f9 100644 --- a/apps/emqx_connector/src/emqx_connector_schema_lib.erl +++ b/apps/emqx_connector/src/emqx_connector_schema_lib.erl @@ -68,6 +68,8 @@ ssl_fields() -> relational_db_fields() -> [ {database, fun database/1}, + %% TODO: The `pool_size` for drivers will be deprecated. Ues `worker_pool_size` for emqx_resource + %% See emqx_resource.hrl {pool_size, fun pool_size/1}, {username, fun username/1}, {password, fun password/1}, @@ -102,6 +104,7 @@ username(_) -> undefined. password(type) -> binary(); password(desc) -> ?DESC("password"); password(required) -> false; +password(format) -> <<"password">>; password(_) -> undefined. auto_reconnect(type) -> boolean(); diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl index 8b6994dd4..4d35583a3 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl @@ -96,6 +96,7 @@ fields("connector") -> binary(), #{ default => "emqx", + format => <<"password">>, desc => ?DESC("password") } )}, diff --git a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf index ce4c7e3b0..0d53b813e 100644 --- a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf +++ b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf @@ -22,6 +22,17 @@ emqx_resource_schema { } } + worker_pool_size { + desc { + en: """Resource worker pool size.""" + zh: """资源连接池大小。""" + } + label { + en: """Worker Pool Size""" + zh: """资源连接池大小""" + } + } + health_check_interval { desc { en: """Health check interval, in milliseconds.""" @@ -134,8 +145,8 @@ emqx_resource_schema { queue_max_bytes { desc { - en: """Maximum queue storage size in bytes.""" - zh: """消息队列的最大长度,以字节计。""" + en: """Maximum queue storage.""" + zh: """消息队列的最大长度。""" } label { en: """Queue max bytes""" diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 04b3f16ea..3f2cac46b 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -49,25 +49,27 @@ %% use auto_restart_interval instead auto_retry_interval => integer(), %%======================================= Deprecated Opts End + worker_pool_size => pos_integer(), + %% use `integer()` compatibility to release 5.0.0 bpapi health_check_interval => integer(), %% We can choose to block the return of emqx_resource:start until %% the resource connected, wait max to `start_timeout` ms. - start_timeout => integer(), + start_timeout => pos_integer(), %% If `start_after_created` is set to true, the resource is started right %% after it is created. But note that a `started` resource is not guaranteed %% to be `connected`. start_after_created => boolean(), %% If the resource disconnected, we can set to retry starting the resource %% periodically. - auto_restart_interval => integer(), + auto_restart_interval => pos_integer(), enable_batch => boolean(), - batch_size => integer(), - batch_time => integer(), + batch_size => pos_integer(), + batch_time => pos_integer(), enable_queue => boolean(), - queue_max_bytes => integer(), + queue_max_bytes => pos_integer(), query_mode => async | sync | dynamic, - resume_interval => integer(), - async_inflight_window => integer() + resume_interval => pos_integer(), + async_inflight_window => pos_integer() }. -type query_result() :: ok @@ -75,6 +77,8 @@ | {error, term()} | {resource_down, term()}. +-define(WORKER_POOL_SIZE, 16). + -define(DEFAULT_QUEUE_SIZE, 1024 * 1024 * 1024). -define(DEFAULT_QUEUE_SIZE_RAW, <<"1GB">>). @@ -92,12 +96,6 @@ -define(HEALTHCHECK_INTERVAL, 15000). -define(HEALTHCHECK_INTERVAL_RAW, <<"15s">>). --define(START_AFTER_CREATED, true). - -%% milliseconds --define(START_TIMEOUT, 5000). --define(START_TIMEOUT_RAW, <<"5s">>). - %% milliseconds -define(AUTO_RESTART_INTERVAL, 60000). -define(AUTO_RESTART_INTERVAL_RAW, <<"60s">>). diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 07abd4007..2f6964380 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -114,6 +114,11 @@ create_and_return_data(MgrId, ResId, Group, ResourceType, Config, Opts) -> {ok, _Group, Data} = lookup(ResId), {ok, Data}. +%% internal configs +-define(START_AFTER_CREATED, true). +%% in milliseconds +-define(START_TIMEOUT, 5000). + %% @doc Create a resource_manager and wait until it is running create(MgrId, ResId, Group, ResourceType, Config, Opts) -> % The state machine will make the actual call to the callback/resource module after init @@ -379,9 +384,17 @@ handle_event(EventType, EventData, State, Data) -> %%------------------------------------------------------------------------------ insert_cache(ResId, Group, Data = #data{manager_id = MgrId}) -> case get_owner(ResId) of - not_found -> ets:insert(?ETS_TABLE, {ResId, Group, Data}); - MgrId -> ets:insert(?ETS_TABLE, {ResId, Group, Data}); - _ -> self() ! quit + not_found -> + ets:insert(?ETS_TABLE, {ResId, Group, Data}); + MgrId -> + ets:insert(?ETS_TABLE, {ResId, Group, Data}); + _ -> + ?SLOG(error, #{ + msg => get_resource_owner_failed, + resource_id => ResId, + action => quit_resource + }), + self() ! quit end. read_cache(ResId) -> @@ -420,12 +433,14 @@ get_owner(ResId) -> end. handle_disconnected_state_enter(Data) -> + {next_state, disconnected, Data, retry_actions(Data)}. + +retry_actions(Data) -> case maps:get(auto_restart_interval, Data#data.opts, ?AUTO_RESTART_INTERVAL) of undefined -> - {next_state, disconnected, Data}; + []; RetryInterval -> - Actions = [{state_timeout, RetryInterval, auto_retry}], - {next_state, disconnected, Data, Actions} + [{state_timeout, RetryInterval, auto_retry}] end. handle_remove_event(From, ClearMetrics, Data) -> @@ -456,7 +471,7 @@ start_resource(Data, From) -> %% Keep track of the error reason why the connection did not work %% so that the Reason can be returned when the verification call is made. UpdatedData = Data#data{error = Reason}, - Actions = maybe_reply([], From, Err), + Actions = maybe_reply(retry_actions(UpdatedData), From, Err), {next_state, disconnected, UpdatedData, Actions} end. diff --git a/apps/emqx_resource/src/emqx_resource_worker_sup.erl b/apps/emqx_resource/src/emqx_resource_worker_sup.erl index a2b3a1ba5..5305eddaf 100644 --- a/apps/emqx_resource/src/emqx_resource_worker_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_worker_sup.erl @@ -53,23 +53,23 @@ init([]) -> {ok, {SupFlags, ChildSpecs}}. start_workers(ResId, Opts) -> - PoolSize = pool_size(Opts), - _ = ensure_worker_pool(ResId, hash, [{size, PoolSize}]), + WorkerPoolSize = worker_pool_size(Opts), + _ = ensure_worker_pool(ResId, hash, [{size, WorkerPoolSize}]), lists:foreach( fun(Idx) -> _ = ensure_worker_added(ResId, Idx), ok = ensure_worker_started(ResId, Idx, Opts) end, - lists:seq(1, PoolSize) + lists:seq(1, WorkerPoolSize) ). stop_workers(ResId, Opts) -> - PoolSize = pool_size(Opts), + WorkerPoolSize = worker_pool_size(Opts), lists:foreach( fun(Idx) -> ensure_worker_removed(ResId, Idx) end, - lists:seq(1, PoolSize) + lists:seq(1, WorkerPoolSize) ), ensure_worker_pool_removed(ResId), ok. @@ -77,7 +77,7 @@ stop_workers(ResId, Opts) -> %%%============================================================================= %%% Internal %%%============================================================================= -pool_size(Opts) -> +worker_pool_size(Opts) -> maps:get(worker_pool_size, Opts, erlang:system_info(schedulers_online)). ensure_worker_pool(ResId, Type, Opts) -> diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl index 2272234f2..fe8564a41 100644 --- a/apps/emqx_resource/src/schema/emqx_resource_schema.erl +++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl @@ -44,9 +44,8 @@ fields("resource_opts") -> ]; fields("creation_opts") -> [ + {worker_pool_size, fun worker_pool_size/1}, {health_check_interval, fun health_check_interval/1}, - {start_after_created, fun start_after_created/1}, - {start_timeout, fun start_timeout/1}, {auto_restart_interval, fun auto_restart_interval/1}, {query_mode, fun query_mode/1}, {async_inflight_window, fun async_inflight_window/1}, @@ -57,24 +56,18 @@ fields("creation_opts") -> {max_queue_bytes, fun queue_max_bytes/1} ]. +worker_pool_size(type) -> pos_integer(); +worker_pool_size(desc) -> ?DESC("worker_pool_size"); +worker_pool_size(default) -> ?WORKER_POOL_SIZE; +worker_pool_size(required) -> false; +worker_pool_size(_) -> undefined. + health_check_interval(type) -> emqx_schema:duration_ms(); health_check_interval(desc) -> ?DESC("health_check_interval"); health_check_interval(default) -> ?HEALTHCHECK_INTERVAL_RAW; health_check_interval(required) -> false; health_check_interval(_) -> undefined. -start_after_created(type) -> boolean(); -start_after_created(required) -> false; -start_after_created(default) -> ?START_AFTER_CREATED; -start_after_created(desc) -> ?DESC("start_after_created"); -start_after_created(_) -> undefined. - -start_timeout(type) -> emqx_schema:duration_ms(); -start_timeout(desc) -> ?DESC("start_timeout"); -start_timeout(default) -> ?START_TIMEOUT_RAW; -start_timeout(required) -> false; -start_timeout(_) -> undefined. - auto_restart_interval(type) -> hoconsc:union([infinity, emqx_schema:duration_ms()]); auto_restart_interval(desc) -> ?DESC("auto_restart_interval"); auto_restart_interval(default) -> ?AUTO_RESTART_INTERVAL_RAW; diff --git a/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_influxdb.conf b/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_influxdb.conf index 4d2dc168c..ff2266de5 100644 --- a/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_influxdb.conf +++ b/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_influxdb.conf @@ -150,15 +150,5 @@ emqx_ee_connector_influxdb { zh: """时间精度""" } } - pool_size { - desc { - en: """InfluxDB Pool Size. Default value is CPU threads.""" - zh: """InfluxDB 连接池大小,默认为 CPU 线程数。""" - } - label { - en: """InfluxDB Pool Size""" - zh: """InfluxDB 连接池大小""" - } - } } diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl index 09a09aa44..d2725c797 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl @@ -135,16 +135,15 @@ fields(basic) -> {precision, mk(enum([ns, us, ms, s, m, h]), #{ required => false, default => ms, desc => ?DESC("precision") - })}, - {pool_size, mk(pos_integer(), #{desc => ?DESC("pool_size")})} + })} ]; fields(influxdb_udp) -> fields(basic); fields(influxdb_api_v1) -> [ {database, mk(binary(), #{required => true, desc => ?DESC("database")})}, - {username, mk(binary(), #{required => true, desc => ?DESC("username")})}, - {password, mk(binary(), #{required => true, desc => ?DESC("password")})} + {username, mk(binary(), #{desc => ?DESC("username")})}, + {password, mk(binary(), #{desc => ?DESC("password"), format => <<"password">>})} ] ++ emqx_connector_schema_lib:ssl_fields() ++ fields(basic); fields(influxdb_api_v2) -> [ @@ -190,15 +189,13 @@ values(udp, put) -> #{ host => <<"127.0.0.1">>, port => 8089, - precision => ms, - pool_size => 8 + precision => ms }; values(api_v1, put) -> #{ host => <<"127.0.0.1">>, port => 8086, precision => ms, - pool_size => 8, database => <<"my_db">>, username => <<"my_user">>, password => <<"my_password">>, @@ -209,7 +206,6 @@ values(api_v2, put) -> host => <<"127.0.0.1">>, port => 8086, precision => ms, - pool_size => 8, bucket => <<"my_bucket">>, org => <<"my_org">>, token => <<"my_token">>, @@ -302,14 +298,13 @@ client_config( InstId, Config = #{ host := Host, - port := Port, - pool_size := PoolSize + port := Port } ) -> [ {host, binary_to_list(Host)}, {port, Port}, - {pool_size, PoolSize}, + {pool_size, erlang:system_info(schedulers)}, {pool, binary_to_atom(InstId, utf8)}, {precision, atom_to_binary(maps:get(precision, Config, ms), utf8)} ] ++ protocol_config(Config).