From 466fe7e0099bf76d6ebb92c7865847cf63a3cdd2 Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Thu, 3 Aug 2023 17:49:47 +0300 Subject: [PATCH] perf: add broker_pool_size, generic_pool_size and channel_cleanup_batch_size config options Tuning these options can improve performance if cluster interconnect network latency is high. Fixes: EMQX-10661 --- apps/emqx/src/emqx.app.src | 2 +- apps/emqx/src/emqx_broker_sup.erl | 2 +- apps/emqx/src/emqx_cm.erl | 3 ++- apps/emqx/src/emqx_kernel_sup.erl | 4 +++- apps/emqx/src/emqx_pool_sup.erl | 4 ++++ apps/emqx_conf/src/emqx_conf.app.src | 2 +- apps/emqx_conf/src/emqx_conf_schema.erl | 29 +++++++++++++++++++++++++ changes/ce/feat-11390.en.md | 3 +++ rel/i18n/emqx_conf_schema.hocon | 28 ++++++++++++++++++++++++ 9 files changed, 72 insertions(+), 5 deletions(-) create mode 100644 changes/ce/feat-11390.en.md diff --git a/apps/emqx/src/emqx.app.src b/apps/emqx/src/emqx.app.src index cff8cf35b..d9598ee1b 100644 --- a/apps/emqx/src/emqx.app.src +++ b/apps/emqx/src/emqx.app.src @@ -2,7 +2,7 @@ {application, emqx, [ {id, "emqx"}, {description, "EMQX Core"}, - {vsn, "5.1.4"}, + {vsn, "5.1.5"}, {modules, []}, {registered, []}, {applications, [ diff --git a/apps/emqx/src/emqx_broker_sup.erl b/apps/emqx/src/emqx_broker_sup.erl index a43ee771f..ac2fe587c 100644 --- a/apps/emqx/src/emqx_broker_sup.erl +++ b/apps/emqx/src/emqx_broker_sup.erl @@ -31,7 +31,7 @@ start_link() -> init([]) -> %% Broker pool - PoolSize = emqx_vm:schedulers() * 2, + PoolSize = emqx:get_config([node, broker_pool_size], emqx_vm:schedulers() * 2), BrokerPool = emqx_pool_sup:spec([ broker_pool, hash, diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index c9fb93ceb..c680560fb 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -685,7 +685,8 @@ handle_cast(Msg, State) -> handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}) -> ?tp(emqx_cm_process_down, #{stale_pid => Pid, reason => _Reason}), - ChanPids = [Pid | emqx_utils:drain_down(?BATCH_SIZE)], + BatchSize = emqx:get_config([node, channel_cleanup_batch_size], ?BATCH_SIZE), + ChanPids = [Pid | emqx_utils:drain_down(BatchSize)], {Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon), lists:foreach(fun mark_channel_disconnected/1, ChanPids), ok = emqx_pool:async_submit(fun lists:foreach/2, [fun ?MODULE:clean_down/1, Items]), diff --git a/apps/emqx/src/emqx_kernel_sup.erl b/apps/emqx/src/emqx_kernel_sup.erl index 45451084a..85724b9b4 100644 --- a/apps/emqx/src/emqx_kernel_sup.erl +++ b/apps/emqx/src/emqx_kernel_sup.erl @@ -31,7 +31,9 @@ init([]) -> %% always start emqx_config_handler first to load the emqx.conf to emqx_config [ child_spec(emqx_config_handler, worker), - child_spec(emqx_pool_sup, supervisor), + child_spec(emqx_pool_sup, supervisor, [ + emqx:get_config([node, generic_pool_size], emqx_vm:schedulers()) + ]), child_spec(emqx_hooks, worker), child_spec(emqx_stats, worker), child_spec(emqx_metrics, worker), diff --git a/apps/emqx/src/emqx_pool_sup.erl b/apps/emqx/src/emqx_pool_sup.erl index aadd1895a..7c4f68d03 100644 --- a/apps/emqx/src/emqx_pool_sup.erl +++ b/apps/emqx/src/emqx_pool_sup.erl @@ -24,6 +24,7 @@ -export([ start_link/0, + start_link/1, start_link/3, start_link/4 ]). @@ -51,6 +52,9 @@ spec(ChildId, Args) -> start_link() -> start_link(?POOL, random, {?POOL, start_link, []}). +start_link(PoolSize) -> + start_link(?POOL, random, PoolSize, {?POOL, start_link, []}). + -spec start_link(atom() | tuple(), atom(), mfargs()) -> {ok, pid()} | {error, term()}. start_link(Pool, Type, MFA) -> diff --git a/apps/emqx_conf/src/emqx_conf.app.src b/apps/emqx_conf/src/emqx_conf.app.src index 3c1e5592f..86fb169a6 100644 --- a/apps/emqx_conf/src/emqx_conf.app.src +++ b/apps/emqx_conf/src/emqx_conf.app.src @@ -1,6 +1,6 @@ {application, emqx_conf, [ {description, "EMQX configuration management"}, - {vsn, "0.1.24"}, + {vsn, "0.1.25"}, {registered, []}, {mod, {emqx_conf_app, []}}, {applications, [kernel, stdlib, emqx_ctl]}, diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl index 464d1abef..eea2bf1b8 100644 --- a/apps/emqx_conf/src/emqx_conf_schema.erl +++ b/apps/emqx_conf/src/emqx_conf_schema.erl @@ -672,6 +672,35 @@ fields("node") -> mapping => "emqx_machine.custom_shard_transports", default => #{} } + )}, + {"broker_pool_size", + sc( + pos_integer(), + #{ + importance => ?IMPORTANCE_HIDDEN, + default => emqx_vm:schedulers() * 2, + 'readOnly' => true, + desc => ?DESC(node_broker_pool_size) + } + )}, + {"generic_pool_size", + sc( + pos_integer(), + #{ + importance => ?IMPORTANCE_HIDDEN, + default => emqx_vm:schedulers(), + 'readOnly' => true, + desc => ?DESC(node_generic_pool_size) + } + )}, + {"channel_cleanup_batch_size", + sc( + pos_integer(), + #{ + importance => ?IMPORTANCE_HIDDEN, + default => 100_000, + desc => ?DESC(node_channel_cleanup_batch_size) + } )} ]; fields("cluster_call") -> diff --git a/changes/ce/feat-11390.en.md b/changes/ce/feat-11390.en.md new file mode 100644 index 000000000..e0fa9a212 --- /dev/null +++ b/changes/ce/feat-11390.en.md @@ -0,0 +1,3 @@ +Add `node.broker_pool_size`, `node.generic_pool_size`, `node.channel_cleanup_batch_size` options to EMQX configuration. + +Tuning these options can significantly improve performance if cluster interconnect network latency is high. diff --git a/rel/i18n/emqx_conf_schema.hocon b/rel/i18n/emqx_conf_schema.hocon index bc45fa009..46d2817ca 100644 --- a/rel/i18n/emqx_conf_schema.hocon +++ b/rel/i18n/emqx_conf_schema.hocon @@ -776,4 +776,32 @@ the default is to use the value set in db.default_shard_transport." db_shard_transports.label: """Shard Transports""" +node_broker_pool_size.desc: +"""The number of workers in emqx_broker pool. Increasing this value may improve performance +by enhancing parallelism, especially when EMQX cluster interconnect network latency is high. +Defaults to the number of Erlang schedulers (CPU cores) * 2. +""" + +node_broker_pool_size.label: +"""Node Broker Pool Size""" + +node_generic_pool_size.desc: +"""The number of workers in emqx_pool. Increasing this value may improve performance +by enhancing parallelism, especially when EMQX cluster interconnect network latency is high. +Defaults to the number of Erlang schedulers (CPU cores). +""" + +node_generic_pool_size.label: +"""Node Generic Pool Size""" + +node_channel_cleanup_batch_size.desc: +"""The size of the channel cleanup batch. if EMQX cluster interconnect network latency is high, +reducing this value together with increasing node.generic_pool_size may improve performance +during an abrupt disconnect of a large numbers of clients. +Defaults to 100000. +""" + +node_channel_cleanup_batch_size.label: +"""Node Channel Cleanup Batch Size""" + }