From 06363e63d9caf61abacae9b162e97fcbfc314308 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 18 Aug 2022 16:00:04 +0800 Subject: [PATCH] fix(influxdb): connector use a fallbacke `pool_size` for influxdb client --- .../src/emqx_connector_schema_lib.erl | 2 ++ .../i18n/emqx_resource_schema_i18n.conf | 11 +++++++++++ apps/emqx_resource/include/emqx_resource.hrl | 19 +++++++++++-------- .../src/emqx_resource_worker_sup.erl | 12 ++++++------ .../src/schema/emqx_resource_schema.erl | 7 +++++++ .../i18n/emqx_ee_connector_influxdb.conf | 10 ---------- .../src/emqx_ee_connector_influxdb.erl | 13 ++++--------- 7 files changed, 41 insertions(+), 33 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector_schema_lib.erl b/apps/emqx_connector/src/emqx_connector_schema_lib.erl index dd85566ed..3bd29a9c1 100644 --- a/apps/emqx_connector/src/emqx_connector_schema_lib.erl +++ b/apps/emqx_connector/src/emqx_connector_schema_lib.erl @@ -68,6 +68,8 @@ ssl_fields() -> relational_db_fields() -> [ {database, fun database/1}, + %% TODO: The `pool_size` for drivers will be deprecated. Ues `worker_pool_size` for emqx_resource + %% See emqx_resource.hrl {pool_size, fun pool_size/1}, {username, fun username/1}, {password, fun password/1}, diff --git a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf index ce4c7e3b0..43a32288d 100644 --- a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf +++ b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf @@ -22,6 +22,17 @@ emqx_resource_schema { } } + worker_pool_size { + desc { + en: """Resource worker pool size.""" + zh: """资源连接池大小。""" + } + label { + en: """Worker Pool Size""" + zh: """资源连接池大小""" + } + } + health_check_interval { desc { en: """Health check interval, in milliseconds.""" diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 04b3f16ea..4bbb4beb6 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -49,25 +49,26 @@ %% use auto_restart_interval instead auto_retry_interval => integer(), %%======================================= Deprecated Opts End - health_check_interval => integer(), + worker_pool_size => pos_integer(), + health_check_interval => pos_integer(), %% We can choose to block the return of emqx_resource:start until %% the resource connected, wait max to `start_timeout` ms. - start_timeout => integer(), + start_timeout => pos_integer(), %% If `start_after_created` is set to true, the resource is started right %% after it is created. But note that a `started` resource is not guaranteed %% to be `connected`. start_after_created => boolean(), %% If the resource disconnected, we can set to retry starting the resource %% periodically. - auto_restart_interval => integer(), + auto_restart_interval => pos_integer(), enable_batch => boolean(), - batch_size => integer(), - batch_time => integer(), + batch_size => pos_integer(), + batch_time => pos_integer(), enable_queue => boolean(), - queue_max_bytes => integer(), + queue_max_bytes => pos_integer(), query_mode => async | sync | dynamic, - resume_interval => integer(), - async_inflight_window => integer() + resume_interval => pos_integer(), + async_inflight_window => pos_integer() }. -type query_result() :: ok @@ -75,6 +76,8 @@ | {error, term()} | {resource_down, term()}. +-define(WORKER_POOL_SIZE, 16). + -define(DEFAULT_QUEUE_SIZE, 1024 * 1024 * 1024). -define(DEFAULT_QUEUE_SIZE_RAW, <<"1GB">>). diff --git a/apps/emqx_resource/src/emqx_resource_worker_sup.erl b/apps/emqx_resource/src/emqx_resource_worker_sup.erl index a2b3a1ba5..5305eddaf 100644 --- a/apps/emqx_resource/src/emqx_resource_worker_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_worker_sup.erl @@ -53,23 +53,23 @@ init([]) -> {ok, {SupFlags, ChildSpecs}}. start_workers(ResId, Opts) -> - PoolSize = pool_size(Opts), - _ = ensure_worker_pool(ResId, hash, [{size, PoolSize}]), + WorkerPoolSize = worker_pool_size(Opts), + _ = ensure_worker_pool(ResId, hash, [{size, WorkerPoolSize}]), lists:foreach( fun(Idx) -> _ = ensure_worker_added(ResId, Idx), ok = ensure_worker_started(ResId, Idx, Opts) end, - lists:seq(1, PoolSize) + lists:seq(1, WorkerPoolSize) ). stop_workers(ResId, Opts) -> - PoolSize = pool_size(Opts), + WorkerPoolSize = worker_pool_size(Opts), lists:foreach( fun(Idx) -> ensure_worker_removed(ResId, Idx) end, - lists:seq(1, PoolSize) + lists:seq(1, WorkerPoolSize) ), ensure_worker_pool_removed(ResId), ok. @@ -77,7 +77,7 @@ stop_workers(ResId, Opts) -> %%%============================================================================= %%% Internal %%%============================================================================= -pool_size(Opts) -> +worker_pool_size(Opts) -> maps:get(worker_pool_size, Opts, erlang:system_info(schedulers_online)). ensure_worker_pool(ResId, Type, Opts) -> diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl index 2272234f2..77d9c3659 100644 --- a/apps/emqx_resource/src/schema/emqx_resource_schema.erl +++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl @@ -44,6 +44,7 @@ fields("resource_opts") -> ]; fields("creation_opts") -> [ + {worker_pool_size, fun worker_pool_size/1}, {health_check_interval, fun health_check_interval/1}, {start_after_created, fun start_after_created/1}, {start_timeout, fun start_timeout/1}, @@ -57,6 +58,12 @@ fields("creation_opts") -> {max_queue_bytes, fun queue_max_bytes/1} ]. +worker_pool_size(type) -> pos_integer(); +worker_pool_size(desc) -> ?DESC("worker_pool_size"); +worker_pool_size(default) -> ?WORKER_POOL_SIZE; +worker_pool_size(required) -> false; +worker_pool_size(_) -> undefined. + health_check_interval(type) -> emqx_schema:duration_ms(); health_check_interval(desc) -> ?DESC("health_check_interval"); health_check_interval(default) -> ?HEALTHCHECK_INTERVAL_RAW; diff --git a/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_influxdb.conf b/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_influxdb.conf index 4d2dc168c..ff2266de5 100644 --- a/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_influxdb.conf +++ b/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_influxdb.conf @@ -150,15 +150,5 @@ emqx_ee_connector_influxdb { zh: """时间精度""" } } - pool_size { - desc { - en: """InfluxDB Pool Size. Default value is CPU threads.""" - zh: """InfluxDB 连接池大小,默认为 CPU 线程数。""" - } - label { - en: """InfluxDB Pool Size""" - zh: """InfluxDB 连接池大小""" - } - } } diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl index 09a09aa44..5ec96bf2c 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl @@ -135,8 +135,7 @@ fields(basic) -> {precision, mk(enum([ns, us, ms, s, m, h]), #{ required => false, default => ms, desc => ?DESC("precision") - })}, - {pool_size, mk(pos_integer(), #{desc => ?DESC("pool_size")})} + })} ]; fields(influxdb_udp) -> fields(basic); @@ -190,15 +189,13 @@ values(udp, put) -> #{ host => <<"127.0.0.1">>, port => 8089, - precision => ms, - pool_size => 8 + precision => ms }; values(api_v1, put) -> #{ host => <<"127.0.0.1">>, port => 8086, precision => ms, - pool_size => 8, database => <<"my_db">>, username => <<"my_user">>, password => <<"my_password">>, @@ -209,7 +206,6 @@ values(api_v2, put) -> host => <<"127.0.0.1">>, port => 8086, precision => ms, - pool_size => 8, bucket => <<"my_bucket">>, org => <<"my_org">>, token => <<"my_token">>, @@ -302,14 +298,13 @@ client_config( InstId, Config = #{ host := Host, - port := Port, - pool_size := PoolSize + port := Port } ) -> [ {host, binary_to_list(Host)}, {port, Port}, - {pool_size, PoolSize}, + {pool_size, erlang:system_info(schedulers)}, {pool, binary_to_atom(InstId, utf8)}, {precision, atom_to_binary(maps:get(precision, Config, ms), utf8)} ] ++ protocol_config(Config).