From c7528e9b35c0f55a76bc1c008d5c14d8a3d0f44d Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 29 May 2023 14:06:25 +0300 Subject: [PATCH] feat(mqttconn): add `pool_size` config parameter That currently tunes the number of MQTT clients employed both for subscriptions (if shared subscription is used) and for publishing to a remote broker. --- apps/emqx_connector/src/emqx_connector_mqtt.erl | 2 ++ apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl | 1 + apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl | 3 +++ 3 files changed, 6 insertions(+) diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 09228254a..0658c28a7 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -177,6 +177,7 @@ mk_worker_opts( ResourceId, #{ server := Server, + pool_size := PoolSize, proto_ver := ProtoVer, bridge_mode := BridgeMode, clean_start := CleanStart, @@ -188,6 +189,7 @@ mk_worker_opts( ) -> Options = #{ server => Server, + pool_size => PoolSize, %% 30s connect_timeout => 30, proto_ver => ProtoVer, diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl index a30de141e..f02fd19ad 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl @@ -73,6 +73,7 @@ fields("server_configs") -> } )}, {server, emqx_schema:servers_sc(#{desc => ?DESC("server")}, ?MQTT_HOST_OPTS)}, + {pool_size, fun emqx_connector_schema_lib:pool_size/1}, {clientid_prefix, mk(binary(), #{required => false, desc => ?DESC("clientid_prefix")})}, {reconnect_interval, mk(string(), #{deprecated => {since, "v5.0.16"}})}, {proto_ver, diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl index ede477602..223d4d058 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl @@ -46,6 +46,7 @@ -type options() :: #{ % endpoint server := iodata(), + pool_size := pos_integer(), % emqtt client options proto_ver := v3 | v4 | v5, username := binary(), @@ -66,6 +67,7 @@ -type client_option() :: emqtt:option() + | {pool_size, pos_integer()} | {name, name()} | {ingress, ingress() | undefined}. @@ -191,6 +193,7 @@ mk_client_options(Name, Ingress, BridgeOpts) -> end, Opts = maps:with( [ + pool_size, proto_ver, username, password,