feat(mqttconn): add `pool_size` config parameter

That currently tunes the number of MQTT clients employed both for
subscriptions (if shared subscription is used) and for publishing to
a remote broker.
This commit is contained in:
Andrew Mayorov 2023-05-29 14:06:25 +03:00
parent 6e97dffdb8
commit c7528e9b35
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
3 changed files with 6 additions and 0 deletions

View File

@ -177,6 +177,7 @@ mk_worker_opts(
ResourceId, ResourceId,
#{ #{
server := Server, server := Server,
pool_size := PoolSize,
proto_ver := ProtoVer, proto_ver := ProtoVer,
bridge_mode := BridgeMode, bridge_mode := BridgeMode,
clean_start := CleanStart, clean_start := CleanStart,
@ -188,6 +189,7 @@ mk_worker_opts(
) -> ) ->
Options = #{ Options = #{
server => Server, server => Server,
pool_size => PoolSize,
%% 30s %% 30s
connect_timeout => 30, connect_timeout => 30,
proto_ver => ProtoVer, proto_ver => ProtoVer,

View File

@ -73,6 +73,7 @@ fields("server_configs") ->
} }
)}, )},
{server, emqx_schema:servers_sc(#{desc => ?DESC("server")}, ?MQTT_HOST_OPTS)}, {server, emqx_schema:servers_sc(#{desc => ?DESC("server")}, ?MQTT_HOST_OPTS)},
{pool_size, fun emqx_connector_schema_lib:pool_size/1},
{clientid_prefix, mk(binary(), #{required => false, desc => ?DESC("clientid_prefix")})}, {clientid_prefix, mk(binary(), #{required => false, desc => ?DESC("clientid_prefix")})},
{reconnect_interval, mk(string(), #{deprecated => {since, "v5.0.16"}})}, {reconnect_interval, mk(string(), #{deprecated => {since, "v5.0.16"}})},
{proto_ver, {proto_ver,

View File

@ -46,6 +46,7 @@
-type options() :: #{ -type options() :: #{
% endpoint % endpoint
server := iodata(), server := iodata(),
pool_size := pos_integer(),
% emqtt client options % emqtt client options
proto_ver := v3 | v4 | v5, proto_ver := v3 | v4 | v5,
username := binary(), username := binary(),
@ -66,6 +67,7 @@
-type client_option() :: -type client_option() ::
emqtt:option() emqtt:option()
| {pool_size, pos_integer()}
| {name, name()} | {name, name()}
| {ingress, ingress() | undefined}. | {ingress, ingress() | undefined}.
@ -191,6 +193,7 @@ mk_client_options(Name, Ingress, BridgeOpts) ->
end, end,
Opts = maps:with( Opts = maps:with(
[ [
pool_size,
proto_ver, proto_ver,
username, username,
password, password,