From 06363e63d9caf61abacae9b162e97fcbfc314308 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 18 Aug 2022 16:00:04 +0800 Subject: [PATCH 1/5] fix(influxdb): connector use a fallbacke `pool_size` for influxdb client --- .../src/emqx_connector_schema_lib.erl | 2 ++ .../i18n/emqx_resource_schema_i18n.conf | 11 +++++++++++ apps/emqx_resource/include/emqx_resource.hrl | 19 +++++++++++-------- .../src/emqx_resource_worker_sup.erl | 12 ++++++------ .../src/schema/emqx_resource_schema.erl | 7 +++++++ .../i18n/emqx_ee_connector_influxdb.conf | 10 ---------- .../src/emqx_ee_connector_influxdb.erl | 13 ++++--------- 7 files changed, 41 insertions(+), 33 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector_schema_lib.erl b/apps/emqx_connector/src/emqx_connector_schema_lib.erl index dd85566ed..3bd29a9c1 100644 --- a/apps/emqx_connector/src/emqx_connector_schema_lib.erl +++ b/apps/emqx_connector/src/emqx_connector_schema_lib.erl @@ -68,6 +68,8 @@ ssl_fields() -> relational_db_fields() -> [ {database, fun database/1}, + %% TODO: The `pool_size` for drivers will be deprecated. Ues `worker_pool_size` for emqx_resource + %% See emqx_resource.hrl {pool_size, fun pool_size/1}, {username, fun username/1}, {password, fun password/1}, diff --git a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf index ce4c7e3b0..43a32288d 100644 --- a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf +++ b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf @@ -22,6 +22,17 @@ emqx_resource_schema { } } + worker_pool_size { + desc { + en: """Resource worker pool size.""" + zh: """资源连接池大小。""" + } + label { + en: """Worker Pool Size""" + zh: """资源连接池大小""" + } + } + health_check_interval { desc { en: """Health check interval, in milliseconds.""" diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 04b3f16ea..4bbb4beb6 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -49,25 +49,26 @@ %% use auto_restart_interval instead auto_retry_interval => integer(), %%======================================= Deprecated Opts End - health_check_interval => integer(), + worker_pool_size => pos_integer(), + health_check_interval => pos_integer(), %% We can choose to block the return of emqx_resource:start until %% the resource connected, wait max to `start_timeout` ms. - start_timeout => integer(), + start_timeout => pos_integer(), %% If `start_after_created` is set to true, the resource is started right %% after it is created. But note that a `started` resource is not guaranteed %% to be `connected`. start_after_created => boolean(), %% If the resource disconnected, we can set to retry starting the resource %% periodically. - auto_restart_interval => integer(), + auto_restart_interval => pos_integer(), enable_batch => boolean(), - batch_size => integer(), - batch_time => integer(), + batch_size => pos_integer(), + batch_time => pos_integer(), enable_queue => boolean(), - queue_max_bytes => integer(), + queue_max_bytes => pos_integer(), query_mode => async | sync | dynamic, - resume_interval => integer(), - async_inflight_window => integer() + resume_interval => pos_integer(), + async_inflight_window => pos_integer() }. -type query_result() :: ok @@ -75,6 +76,8 @@ | {error, term()} | {resource_down, term()}. +-define(WORKER_POOL_SIZE, 16). + -define(DEFAULT_QUEUE_SIZE, 1024 * 1024 * 1024). -define(DEFAULT_QUEUE_SIZE_RAW, <<"1GB">>). diff --git a/apps/emqx_resource/src/emqx_resource_worker_sup.erl b/apps/emqx_resource/src/emqx_resource_worker_sup.erl index a2b3a1ba5..5305eddaf 100644 --- a/apps/emqx_resource/src/emqx_resource_worker_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_worker_sup.erl @@ -53,23 +53,23 @@ init([]) -> {ok, {SupFlags, ChildSpecs}}. start_workers(ResId, Opts) -> - PoolSize = pool_size(Opts), - _ = ensure_worker_pool(ResId, hash, [{size, PoolSize}]), + WorkerPoolSize = worker_pool_size(Opts), + _ = ensure_worker_pool(ResId, hash, [{size, WorkerPoolSize}]), lists:foreach( fun(Idx) -> _ = ensure_worker_added(ResId, Idx), ok = ensure_worker_started(ResId, Idx, Opts) end, - lists:seq(1, PoolSize) + lists:seq(1, WorkerPoolSize) ). stop_workers(ResId, Opts) -> - PoolSize = pool_size(Opts), + WorkerPoolSize = worker_pool_size(Opts), lists:foreach( fun(Idx) -> ensure_worker_removed(ResId, Idx) end, - lists:seq(1, PoolSize) + lists:seq(1, WorkerPoolSize) ), ensure_worker_pool_removed(ResId), ok. @@ -77,7 +77,7 @@ stop_workers(ResId, Opts) -> %%%============================================================================= %%% Internal %%%============================================================================= -pool_size(Opts) -> +worker_pool_size(Opts) -> maps:get(worker_pool_size, Opts, erlang:system_info(schedulers_online)). ensure_worker_pool(ResId, Type, Opts) -> diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl index 2272234f2..77d9c3659 100644 --- a/apps/emqx_resource/src/schema/emqx_resource_schema.erl +++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl @@ -44,6 +44,7 @@ fields("resource_opts") -> ]; fields("creation_opts") -> [ + {worker_pool_size, fun worker_pool_size/1}, {health_check_interval, fun health_check_interval/1}, {start_after_created, fun start_after_created/1}, {start_timeout, fun start_timeout/1}, @@ -57,6 +58,12 @@ fields("creation_opts") -> {max_queue_bytes, fun queue_max_bytes/1} ]. +worker_pool_size(type) -> pos_integer(); +worker_pool_size(desc) -> ?DESC("worker_pool_size"); +worker_pool_size(default) -> ?WORKER_POOL_SIZE; +worker_pool_size(required) -> false; +worker_pool_size(_) -> undefined. + health_check_interval(type) -> emqx_schema:duration_ms(); health_check_interval(desc) -> ?DESC("health_check_interval"); health_check_interval(default) -> ?HEALTHCHECK_INTERVAL_RAW; diff --git a/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_influxdb.conf b/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_influxdb.conf index 4d2dc168c..ff2266de5 100644 --- a/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_influxdb.conf +++ b/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_influxdb.conf @@ -150,15 +150,5 @@ emqx_ee_connector_influxdb { zh: """时间精度""" } } - pool_size { - desc { - en: """InfluxDB Pool Size. Default value is CPU threads.""" - zh: """InfluxDB 连接池大小,默认为 CPU 线程数。""" - } - label { - en: """InfluxDB Pool Size""" - zh: """InfluxDB 连接池大小""" - } - } } diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl index 09a09aa44..5ec96bf2c 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl @@ -135,8 +135,7 @@ fields(basic) -> {precision, mk(enum([ns, us, ms, s, m, h]), #{ required => false, default => ms, desc => ?DESC("precision") - })}, - {pool_size, mk(pos_integer(), #{desc => ?DESC("pool_size")})} + })} ]; fields(influxdb_udp) -> fields(basic); @@ -190,15 +189,13 @@ values(udp, put) -> #{ host => <<"127.0.0.1">>, port => 8089, - precision => ms, - pool_size => 8 + precision => ms }; values(api_v1, put) -> #{ host => <<"127.0.0.1">>, port => 8086, precision => ms, - pool_size => 8, database => <<"my_db">>, username => <<"my_user">>, password => <<"my_password">>, @@ -209,7 +206,6 @@ values(api_v2, put) -> host => <<"127.0.0.1">>, port => 8086, precision => ms, - pool_size => 8, bucket => <<"my_bucket">>, org => <<"my_org">>, token => <<"my_token">>, @@ -302,14 +298,13 @@ client_config( InstId, Config = #{ host := Host, - port := Port, - pool_size := PoolSize + port := Port } ) -> [ {host, binary_to_list(Host)}, {port, Port}, - {pool_size, PoolSize}, + {pool_size, erlang:system_info(schedulers)}, {pool, binary_to_atom(InstId, utf8)}, {precision, atom_to_binary(maps:get(precision, Config, ms), utf8)} ] ++ protocol_config(Config). From 2648362c6216c8e5c2408edf098822576d1cf481 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Fri, 19 Aug 2022 15:53:22 +0800 Subject: [PATCH 2/5] fix(bridge): password for bridge/db format as `password` for dashboard --- apps/emqx/src/emqx_schema.erl | 1 + apps/emqx_connector/src/emqx_connector_schema_lib.erl | 1 + apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl | 1 + lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl | 4 ++-- 4 files changed, 5 insertions(+), 2 deletions(-) diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 574305a4f..a76346bad 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1902,6 +1902,7 @@ common_ssl_opts_schema(Defaults) -> sensitive => true, required => false, example => <<"">>, + format => <<"password">>, desc => ?DESC(common_ssl_opts_schema_password) } )}, diff --git a/apps/emqx_connector/src/emqx_connector_schema_lib.erl b/apps/emqx_connector/src/emqx_connector_schema_lib.erl index 3bd29a9c1..f37640538 100644 --- a/apps/emqx_connector/src/emqx_connector_schema_lib.erl +++ b/apps/emqx_connector/src/emqx_connector_schema_lib.erl @@ -104,6 +104,7 @@ username(_) -> undefined. password(type) -> binary(); password(desc) -> ?DESC("password"); password(required) -> false; +password(format) -> <<"format">>; password(_) -> undefined. auto_reconnect(type) -> boolean(); diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl index 8b6994dd4..4d35583a3 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl @@ -96,6 +96,7 @@ fields("connector") -> binary(), #{ default => "emqx", + format => <<"password">>, desc => ?DESC("password") } )}, diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl index 5ec96bf2c..d2725c797 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl @@ -142,8 +142,8 @@ fields(influxdb_udp) -> fields(influxdb_api_v1) -> [ {database, mk(binary(), #{required => true, desc => ?DESC("database")})}, - {username, mk(binary(), #{required => true, desc => ?DESC("username")})}, - {password, mk(binary(), #{required => true, desc => ?DESC("password")})} + {username, mk(binary(), #{desc => ?DESC("username")})}, + {password, mk(binary(), #{desc => ?DESC("password"), format => <<"password">>})} ] ++ emqx_connector_schema_lib:ssl_fields() ++ fields(basic); fields(influxdb_api_v2) -> [ From 7c4ea38c06a7d44b07d786467e31ab6bcc64cfae Mon Sep 17 00:00:00 2001 From: JimMoen Date: Mon, 22 Aug 2022 02:20:54 +0800 Subject: [PATCH 3/5] fix(resource): make some resource opts internal Resource options `start_after_created` and `start_timeout` are internal opts. Not provided to users anymore. --- .../i18n/emqx_resource_schema_i18n.conf | 4 ++-- apps/emqx_resource/include/emqx_resource.hrl | 6 ------ apps/emqx_resource/src/emqx_resource_manager.erl | 5 +++++ .../src/schema/emqx_resource_schema.erl | 14 -------------- 4 files changed, 7 insertions(+), 22 deletions(-) diff --git a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf index 43a32288d..0d53b813e 100644 --- a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf +++ b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf @@ -145,8 +145,8 @@ emqx_resource_schema { queue_max_bytes { desc { - en: """Maximum queue storage size in bytes.""" - zh: """消息队列的最大长度,以字节计。""" + en: """Maximum queue storage.""" + zh: """消息队列的最大长度。""" } label { en: """Queue max bytes""" diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 4bbb4beb6..bb8cb02bf 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -95,12 +95,6 @@ -define(HEALTHCHECK_INTERVAL, 15000). -define(HEALTHCHECK_INTERVAL_RAW, <<"15s">>). --define(START_AFTER_CREATED, true). - -%% milliseconds --define(START_TIMEOUT, 5000). --define(START_TIMEOUT_RAW, <<"5s">>). - %% milliseconds -define(AUTO_RESTART_INTERVAL, 60000). -define(AUTO_RESTART_INTERVAL_RAW, <<"60s">>). diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 07abd4007..382cf29d9 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -114,6 +114,11 @@ create_and_return_data(MgrId, ResId, Group, ResourceType, Config, Opts) -> {ok, _Group, Data} = lookup(ResId), {ok, Data}. +%% internal configs +-define(START_AFTER_CREATED, true). +%% in milliseconds +-define(START_TIMEOUT, 5000). + %% @doc Create a resource_manager and wait until it is running create(MgrId, ResId, Group, ResourceType, Config, Opts) -> % The state machine will make the actual call to the callback/resource module after init diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl index 77d9c3659..fe8564a41 100644 --- a/apps/emqx_resource/src/schema/emqx_resource_schema.erl +++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl @@ -46,8 +46,6 @@ fields("creation_opts") -> [ {worker_pool_size, fun worker_pool_size/1}, {health_check_interval, fun health_check_interval/1}, - {start_after_created, fun start_after_created/1}, - {start_timeout, fun start_timeout/1}, {auto_restart_interval, fun auto_restart_interval/1}, {query_mode, fun query_mode/1}, {async_inflight_window, fun async_inflight_window/1}, @@ -70,18 +68,6 @@ health_check_interval(default) -> ?HEALTHCHECK_INTERVAL_RAW; health_check_interval(required) -> false; health_check_interval(_) -> undefined. -start_after_created(type) -> boolean(); -start_after_created(required) -> false; -start_after_created(default) -> ?START_AFTER_CREATED; -start_after_created(desc) -> ?DESC("start_after_created"); -start_after_created(_) -> undefined. - -start_timeout(type) -> emqx_schema:duration_ms(); -start_timeout(desc) -> ?DESC("start_timeout"); -start_timeout(default) -> ?START_TIMEOUT_RAW; -start_timeout(required) -> false; -start_timeout(_) -> undefined. - auto_restart_interval(type) -> hoconsc:union([infinity, emqx_schema:duration_ms()]); auto_restart_interval(desc) -> ?DESC("auto_restart_interval"); auto_restart_interval(default) -> ?AUTO_RESTART_INTERVAL_RAW; From 62ecf6f545652bcff3914852f22cac5f66e3aeb0 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Mon, 22 Aug 2022 02:34:25 +0800 Subject: [PATCH 4/5] fix(resource): keep `auto_retry` in `disconnected` state Automatic retries should be maintained even in `disconnected` state without any state transition. --- .../src/emqx_resource_manager.erl | 40 +++++++++++++++---- 1 file changed, 33 insertions(+), 7 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 382cf29d9..226f9e927 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -384,9 +384,33 @@ handle_event(EventType, EventData, State, Data) -> %%------------------------------------------------------------------------------ insert_cache(ResId, Group, Data = #data{manager_id = MgrId}) -> case get_owner(ResId) of - not_found -> ets:insert(?ETS_TABLE, {ResId, Group, Data}); - MgrId -> ets:insert(?ETS_TABLE, {ResId, Group, Data}); - _ -> self() ! quit + not_found -> + ?SLOG( + debug, + #{ + msg => resource_owner_not_found, + resource_id => ResId, + action => auto_insert_cache + } + ), + ets:insert(?ETS_TABLE, {ResId, Group, Data}); + MgrId -> + ?SLOG( + debug, + #{ + msg => resource_owner_matched, + resource_id => ResId, + action => reinsert_cache + } + ), + ets:insert(?ETS_TABLE, {ResId, Group, Data}); + _ -> + ?SLOG(error, #{ + msg => get_resource_owner_failed, + resource_id => ResId, + action => quit_rusource + }), + self() ! quit end. read_cache(ResId) -> @@ -425,12 +449,14 @@ get_owner(ResId) -> end. handle_disconnected_state_enter(Data) -> + {next_state, disconnected, Data, retry_actions(Data)}. + +retry_actions(Data) -> case maps:get(auto_restart_interval, Data#data.opts, ?AUTO_RESTART_INTERVAL) of undefined -> - {next_state, disconnected, Data}; + []; RetryInterval -> - Actions = [{state_timeout, RetryInterval, auto_retry}], - {next_state, disconnected, Data, Actions} + [{state_timeout, RetryInterval, auto_retry}] end. handle_remove_event(From, ClearMetrics, Data) -> @@ -461,7 +487,7 @@ start_resource(Data, From) -> %% Keep track of the error reason why the connection did not work %% so that the Reason can be returned when the verification call is made. UpdatedData = Data#data{error = Reason}, - Actions = maybe_reply([], From, Err), + Actions = maybe_reply(retry_actions(UpdatedData), From, Err), {next_state, disconnected, UpdatedData, Actions} end. From f0c2b53868f06e52d9ec6e27da687cd42823dcfe Mon Sep 17 00:00:00 2001 From: JimMoen Date: Mon, 22 Aug 2022 10:31:26 +0800 Subject: [PATCH 5/5] fix(bpapi): make bpapi static_checks happy --- .../src/emqx_connector_schema_lib.erl | 2 +- apps/emqx_resource/include/emqx_resource.hrl | 3 ++- .../src/emqx_resource_manager.erl | 18 +----------------- 3 files changed, 4 insertions(+), 19 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector_schema_lib.erl b/apps/emqx_connector/src/emqx_connector_schema_lib.erl index f37640538..53643c9f9 100644 --- a/apps/emqx_connector/src/emqx_connector_schema_lib.erl +++ b/apps/emqx_connector/src/emqx_connector_schema_lib.erl @@ -104,7 +104,7 @@ username(_) -> undefined. password(type) -> binary(); password(desc) -> ?DESC("password"); password(required) -> false; -password(format) -> <<"format">>; +password(format) -> <<"password">>; password(_) -> undefined. auto_reconnect(type) -> boolean(); diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index bb8cb02bf..3f2cac46b 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -50,7 +50,8 @@ auto_retry_interval => integer(), %%======================================= Deprecated Opts End worker_pool_size => pos_integer(), - health_check_interval => pos_integer(), + %% use `integer()` compatibility to release 5.0.0 bpapi + health_check_interval => integer(), %% We can choose to block the return of emqx_resource:start until %% the resource connected, wait max to `start_timeout` ms. start_timeout => pos_integer(), diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 226f9e927..2f6964380 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -385,30 +385,14 @@ handle_event(EventType, EventData, State, Data) -> insert_cache(ResId, Group, Data = #data{manager_id = MgrId}) -> case get_owner(ResId) of not_found -> - ?SLOG( - debug, - #{ - msg => resource_owner_not_found, - resource_id => ResId, - action => auto_insert_cache - } - ), ets:insert(?ETS_TABLE, {ResId, Group, Data}); MgrId -> - ?SLOG( - debug, - #{ - msg => resource_owner_matched, - resource_id => ResId, - action => reinsert_cache - } - ), ets:insert(?ETS_TABLE, {ResId, Group, Data}); _ -> ?SLOG(error, #{ msg => get_resource_owner_failed, resource_id => ResId, - action => quit_rusource + action => quit_resource }), self() ! quit end.