perf: add broker_pool_size, generic_pool_size and channel_cleanup_batch_size config options
Tuning these options can improve performance if cluster interconnect network latency is high. Fixes: EMQX-10661
This commit is contained in:
parent
ed28c12a66
commit
466fe7e009
|
@ -2,7 +2,7 @@
|
||||||
{application, emqx, [
|
{application, emqx, [
|
||||||
{id, "emqx"},
|
{id, "emqx"},
|
||||||
{description, "EMQX Core"},
|
{description, "EMQX Core"},
|
||||||
{vsn, "5.1.4"},
|
{vsn, "5.1.5"},
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{applications, [
|
{applications, [
|
||||||
|
|
|
@ -31,7 +31,7 @@ start_link() ->
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
%% Broker pool
|
%% Broker pool
|
||||||
PoolSize = emqx_vm:schedulers() * 2,
|
PoolSize = emqx:get_config([node, broker_pool_size], emqx_vm:schedulers() * 2),
|
||||||
BrokerPool = emqx_pool_sup:spec([
|
BrokerPool = emqx_pool_sup:spec([
|
||||||
broker_pool,
|
broker_pool,
|
||||||
hash,
|
hash,
|
||||||
|
|
|
@ -685,7 +685,8 @@ handle_cast(Msg, State) ->
|
||||||
|
|
||||||
handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}) ->
|
handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}) ->
|
||||||
?tp(emqx_cm_process_down, #{stale_pid => Pid, reason => _Reason}),
|
?tp(emqx_cm_process_down, #{stale_pid => Pid, reason => _Reason}),
|
||||||
ChanPids = [Pid | emqx_utils:drain_down(?BATCH_SIZE)],
|
BatchSize = emqx:get_config([node, channel_cleanup_batch_size], ?BATCH_SIZE),
|
||||||
|
ChanPids = [Pid | emqx_utils:drain_down(BatchSize)],
|
||||||
{Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon),
|
{Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon),
|
||||||
lists:foreach(fun mark_channel_disconnected/1, ChanPids),
|
lists:foreach(fun mark_channel_disconnected/1, ChanPids),
|
||||||
ok = emqx_pool:async_submit(fun lists:foreach/2, [fun ?MODULE:clean_down/1, Items]),
|
ok = emqx_pool:async_submit(fun lists:foreach/2, [fun ?MODULE:clean_down/1, Items]),
|
||||||
|
|
|
@ -31,7 +31,9 @@ init([]) ->
|
||||||
%% always start emqx_config_handler first to load the emqx.conf to emqx_config
|
%% always start emqx_config_handler first to load the emqx.conf to emqx_config
|
||||||
[
|
[
|
||||||
child_spec(emqx_config_handler, worker),
|
child_spec(emqx_config_handler, worker),
|
||||||
child_spec(emqx_pool_sup, supervisor),
|
child_spec(emqx_pool_sup, supervisor, [
|
||||||
|
emqx:get_config([node, generic_pool_size], emqx_vm:schedulers())
|
||||||
|
]),
|
||||||
child_spec(emqx_hooks, worker),
|
child_spec(emqx_hooks, worker),
|
||||||
child_spec(emqx_stats, worker),
|
child_spec(emqx_stats, worker),
|
||||||
child_spec(emqx_metrics, worker),
|
child_spec(emqx_metrics, worker),
|
||||||
|
|
|
@ -24,6 +24,7 @@
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
start_link/0,
|
start_link/0,
|
||||||
|
start_link/1,
|
||||||
start_link/3,
|
start_link/3,
|
||||||
start_link/4
|
start_link/4
|
||||||
]).
|
]).
|
||||||
|
@ -51,6 +52,9 @@ spec(ChildId, Args) ->
|
||||||
start_link() ->
|
start_link() ->
|
||||||
start_link(?POOL, random, {?POOL, start_link, []}).
|
start_link(?POOL, random, {?POOL, start_link, []}).
|
||||||
|
|
||||||
|
start_link(PoolSize) ->
|
||||||
|
start_link(?POOL, random, PoolSize, {?POOL, start_link, []}).
|
||||||
|
|
||||||
-spec start_link(atom() | tuple(), atom(), mfargs()) ->
|
-spec start_link(atom() | tuple(), atom(), mfargs()) ->
|
||||||
{ok, pid()} | {error, term()}.
|
{ok, pid()} | {error, term()}.
|
||||||
start_link(Pool, Type, MFA) ->
|
start_link(Pool, Type, MFA) ->
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_conf, [
|
{application, emqx_conf, [
|
||||||
{description, "EMQX configuration management"},
|
{description, "EMQX configuration management"},
|
||||||
{vsn, "0.1.24"},
|
{vsn, "0.1.25"},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{mod, {emqx_conf_app, []}},
|
{mod, {emqx_conf_app, []}},
|
||||||
{applications, [kernel, stdlib, emqx_ctl]},
|
{applications, [kernel, stdlib, emqx_ctl]},
|
||||||
|
|
|
@ -672,6 +672,35 @@ fields("node") ->
|
||||||
mapping => "emqx_machine.custom_shard_transports",
|
mapping => "emqx_machine.custom_shard_transports",
|
||||||
default => #{}
|
default => #{}
|
||||||
}
|
}
|
||||||
|
)},
|
||||||
|
{"broker_pool_size",
|
||||||
|
sc(
|
||||||
|
pos_integer(),
|
||||||
|
#{
|
||||||
|
importance => ?IMPORTANCE_HIDDEN,
|
||||||
|
default => emqx_vm:schedulers() * 2,
|
||||||
|
'readOnly' => true,
|
||||||
|
desc => ?DESC(node_broker_pool_size)
|
||||||
|
}
|
||||||
|
)},
|
||||||
|
{"generic_pool_size",
|
||||||
|
sc(
|
||||||
|
pos_integer(),
|
||||||
|
#{
|
||||||
|
importance => ?IMPORTANCE_HIDDEN,
|
||||||
|
default => emqx_vm:schedulers(),
|
||||||
|
'readOnly' => true,
|
||||||
|
desc => ?DESC(node_generic_pool_size)
|
||||||
|
}
|
||||||
|
)},
|
||||||
|
{"channel_cleanup_batch_size",
|
||||||
|
sc(
|
||||||
|
pos_integer(),
|
||||||
|
#{
|
||||||
|
importance => ?IMPORTANCE_HIDDEN,
|
||||||
|
default => 100_000,
|
||||||
|
desc => ?DESC(node_channel_cleanup_batch_size)
|
||||||
|
}
|
||||||
)}
|
)}
|
||||||
];
|
];
|
||||||
fields("cluster_call") ->
|
fields("cluster_call") ->
|
||||||
|
|
|
@ -0,0 +1,3 @@
|
||||||
|
Add `node.broker_pool_size`, `node.generic_pool_size`, `node.channel_cleanup_batch_size` options to EMQX configuration.
|
||||||
|
|
||||||
|
Tuning these options can significantly improve performance if cluster interconnect network latency is high.
|
|
@ -776,4 +776,32 @@ the default is to use the value set in <code>db.default_shard_transport</code>."
|
||||||
db_shard_transports.label:
|
db_shard_transports.label:
|
||||||
"""Shard Transports"""
|
"""Shard Transports"""
|
||||||
|
|
||||||
|
node_broker_pool_size.desc:
|
||||||
|
"""The number of workers in emqx_broker pool. Increasing this value may improve performance
|
||||||
|
by enhancing parallelism, especially when EMQX cluster interconnect network latency is high.
|
||||||
|
Defaults to the number of Erlang schedulers (CPU cores) * 2.
|
||||||
|
"""
|
||||||
|
|
||||||
|
node_broker_pool_size.label:
|
||||||
|
"""Node Broker Pool Size"""
|
||||||
|
|
||||||
|
node_generic_pool_size.desc:
|
||||||
|
"""The number of workers in emqx_pool. Increasing this value may improve performance
|
||||||
|
by enhancing parallelism, especially when EMQX cluster interconnect network latency is high.
|
||||||
|
Defaults to the number of Erlang schedulers (CPU cores).
|
||||||
|
"""
|
||||||
|
|
||||||
|
node_generic_pool_size.label:
|
||||||
|
"""Node Generic Pool Size"""
|
||||||
|
|
||||||
|
node_channel_cleanup_batch_size.desc:
|
||||||
|
"""The size of the channel cleanup batch. if EMQX cluster interconnect network latency is high,
|
||||||
|
reducing this value together with increasing node.generic_pool_size may improve performance
|
||||||
|
during an abrupt disconnect of a large numbers of clients.
|
||||||
|
Defaults to 100000.
|
||||||
|
"""
|
||||||
|
|
||||||
|
node_channel_cleanup_batch_size.label:
|
||||||
|
"""Node Channel Cleanup Batch Size"""
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue