From f52cc93d9d5499c9a1affa59e9c5e270ed8c7a6f Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Tue, 16 Jan 2024 17:09:24 +0200 Subject: [PATCH] perf(emqx_cm): use a dedicated pool for channel cleanup This is to isolate channels cleanup from other async tasks (like routes cleanup), as channels cleanup can be quite slow under high network latency conditions. Fixes: EMQX-11743 --- apps/emqx/include/emqx_cm.hrl | 2 + apps/emqx/src/emqx_cm.erl | 6 ++- apps/emqx/src/emqx_cm_sup.erl | 4 ++ apps/emqx/src/emqx_pool.erl | 64 +++++++++++++++++++++++--------- apps/emqx/test/emqx_cm_SUITE.erl | 10 +++-- changes/ce/perf-12336.en.md | 2 + 6 files changed, 66 insertions(+), 22 deletions(-) create mode 100644 changes/ce/perf-12336.en.md diff --git a/apps/emqx/include/emqx_cm.hrl b/apps/emqx/include/emqx_cm.hrl index ae70f131f..6478a6162 100644 --- a/apps/emqx/include/emqx_cm.hrl +++ b/apps/emqx/include/emqx_cm.hrl @@ -30,4 +30,6 @@ -define(T_GET_INFO, 5_000). -define(T_TAKEOVER, 15_000). +-define(CM_POOL, emqx_cm_pool). + -endif. diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index 660ac3cfe..2e6714e7f 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -670,7 +670,11 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon} ChanPids = [Pid | emqx_utils:drain_down(BatchSize)], {Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon), lists:foreach(fun mark_channel_disconnected/1, ChanPids), - ok = emqx_pool:async_submit(fun lists:foreach/2, [fun ?MODULE:clean_down/1, Items]), + ok = emqx_pool:async_submit_to_pool( + ?CM_POOL, + fun lists:foreach/2, + [fun ?MODULE:clean_down/1, Items] + ), {noreply, State#{chan_pmon := PMon1}}; handle_info(Info, State) -> ?SLOG(error, #{msg => "unexpected_info", info => Info}), diff --git a/apps/emqx/src/emqx_cm_sup.erl b/apps/emqx/src/emqx_cm_sup.erl index e7420b4da..622921f1d 100644 --- a/apps/emqx/src/emqx_cm_sup.erl +++ b/apps/emqx/src/emqx_cm_sup.erl @@ -25,6 +25,8 @@ %% for test -export([restart_flapping/0]). +-include("emqx_cm.hrl"). + %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- @@ -45,6 +47,7 @@ init([]) -> Banned = child_spec(emqx_banned, 1000, worker), Flapping = child_spec(emqx_flapping, 1000, worker), Locker = child_spec(emqx_cm_locker, 5000, worker), + CmPool = emqx_pool_sup:spec(emqx_cm_pool_sup, [?CM_POOL, random, {emqx_pool, start_link, []}]), Registry = child_spec(emqx_cm_registry, 5000, worker), Manager = child_spec(emqx_cm, 5000, worker), DSSessionGCSup = child_spec(emqx_persistent_session_ds_sup, infinity, supervisor), @@ -53,6 +56,7 @@ init([]) -> Banned, Flapping, Locker, + CmPool, Registry, Manager, DSSessionGCSup diff --git a/apps/emqx/src/emqx_pool.erl b/apps/emqx/src/emqx_pool.erl index 1cb5f429c..39c585133 100644 --- a/apps/emqx/src/emqx_pool.erl +++ b/apps/emqx/src/emqx_pool.erl @@ -28,11 +28,15 @@ submit/1, submit/2, async_submit/1, - async_submit/2 + async_submit/2, + submit_to_pool/2, + submit_to_pool/3, + async_submit_to_pool/2, + async_submit_to_pool/3 ]). -ifdef(TEST). --export([worker/0, flush_async_tasks/0]). +-export([worker/0, flush_async_tasks/0, flush_async_tasks/1]). -endif. %% gen_server callbacks @@ -57,7 +61,7 @@ -spec start_link(atom(), pos_integer()) -> startlink_ret(). start_link(Pool, Id) -> gen_server:start_link( - {local, emqx_utils:proc_name(?MODULE, Id)}, + {local, emqx_utils:proc_name(Pool, Id)}, ?MODULE, [Pool, Id], [{hibernate_after, 1000}] @@ -66,32 +70,48 @@ start_link(Pool, Id) -> %% @doc Submit work to the pool. -spec submit(task()) -> any(). submit(Task) -> - call({submit, Task}). + submit_to_pool(?POOL, Task). -spec submit(fun(), list(any())) -> any(). submit(Fun, Args) -> - call({submit, {Fun, Args}}). - -%% @private -call(Req) -> - gen_server:call(worker(), Req, infinity). + submit_to_pool(?POOL, Fun, Args). %% @doc Submit work to the pool asynchronously. -spec async_submit(task()) -> ok. async_submit(Task) -> - cast({async_submit, Task}). + async_submit_to_pool(?POOL, Task). -spec async_submit(fun(), list(any())) -> ok. async_submit(Fun, Args) -> - cast({async_submit, {Fun, Args}}). + async_submit_to_pool(?POOL, Fun, Args). + +-spec submit_to_pool(any(), task()) -> any(). +submit_to_pool(Pool, Task) -> + call(Pool, {submit, Task}). + +-spec submit_to_pool(any(), fun(), list(any())) -> any(). +submit_to_pool(Pool, Fun, Args) -> + call(Pool, {submit, {Fun, Args}}). + +-spec async_submit_to_pool(any(), task()) -> ok. +async_submit_to_pool(Pool, Task) -> + cast(Pool, {async_submit, Task}). + +-spec async_submit_to_pool(any(), fun(), list(any())) -> ok. +async_submit_to_pool(Pool, Fun, Args) -> + cast(Pool, {async_submit, {Fun, Args}}). %% @private -cast(Msg) -> - gen_server:cast(worker(), Msg). +call(Pool, Req) -> + gen_server:call(worker(Pool), Req, infinity). %% @private -worker() -> - gproc_pool:pick_worker(?POOL). +cast(Pool, Msg) -> + gen_server:cast(worker(Pool), Msg). + +%% @private +worker(Pool) -> + gproc_pool:pick_worker(Pool). %%-------------------------------------------------------------------- %% gen_server callbacks @@ -146,15 +166,25 @@ run(Fun) when is_function(Fun) -> Fun(). -ifdef(TEST). + +worker() -> + worker(?POOL). + +flush_async_tasks() -> + flush_async_tasks(?POOL). + %% This help function creates a large enough number of async tasks %% to force flush the pool workers. %% The number of tasks should be large enough to ensure all workers have %% the chance to work on at least one of the tasks. -flush_async_tasks() -> +flush_async_tasks(Pool) -> Ref = make_ref(), Self = self(), L = lists:seq(1, 997), - lists:foreach(fun(I) -> emqx_pool:async_submit(fun() -> Self ! {done, Ref, I} end, []) end, L), + lists:foreach( + fun(I) -> emqx_pool:async_submit_to_pool(Pool, fun() -> Self ! {done, Ref, I} end, []) end, + L + ), lists:foreach( fun(I) -> receive diff --git a/apps/emqx/test/emqx_cm_SUITE.erl b/apps/emqx/test/emqx_cm_SUITE.erl index 4ecea9a4b..e175b4349 100644 --- a/apps/emqx/test/emqx_cm_SUITE.erl +++ b/apps/emqx/test/emqx_cm_SUITE.erl @@ -221,7 +221,7 @@ t_open_session_race_condition(_) -> end, %% sync ignored = gen_server:call(?CM, ignore, infinity), - ok = emqx_pool:flush_async_tasks(), + ok = emqx_pool:flush_async_tasks(?CM_POOL), ?assertEqual([], emqx_cm:lookup_channels(ClientId)). t_kick_session_discard_normal(_) -> @@ -343,7 +343,7 @@ test_stepdown_session(Action, Reason) -> end, % sync ignored = gen_server:call(?CM, ignore, infinity), - ok = flush_emqx_pool(), + ok = flush_emqx_cm_pool(), ?assertEqual([], emqx_cm:lookup_channels(ClientId)). %% Channel deregistration is delegated to emqx_pool as a sync tasks. @@ -353,10 +353,12 @@ test_stepdown_session(Action, Reason) -> %% to sync with the pool workers. %% The number of tasks should be large enough to ensure all workers have %% the chance to work on at least one of the tasks. -flush_emqx_pool() -> +flush_emqx_cm_pool() -> Self = self(), L = lists:seq(1, 1000), - lists:foreach(fun(I) -> emqx_pool:async_submit(fun() -> Self ! {done, I} end, []) end, L), + lists:foreach( + fun(I) -> emqx_pool:async_submit_to_pool(?CM_POOL, fun() -> Self ! {done, I} end, []) end, L + ), lists:foreach( fun(I) -> receive diff --git a/changes/ce/perf-12336.en.md b/changes/ce/perf-12336.en.md new file mode 100644 index 000000000..5c385e6b6 --- /dev/null +++ b/changes/ce/perf-12336.en.md @@ -0,0 +1,2 @@ +Isolate channels cleanup from other async tasks (like routes cleanup) by using a dedicated pool, +as this task can be quite slow under high network latency conditions.