perf(emqx_cm): use a dedicated pool for channel cleanup

This is to isolate channels cleanup from other async tasks (like routes cleanup),
as channels cleanup can be quite slow under high network latency conditions.

Fixes: EMQX-11743
This commit is contained in:
Serge Tupchii 2024-01-16 17:09:24 +02:00 committed by zhongwencool
parent bd13540e23
commit f52cc93d9d
6 changed files with 66 additions and 22 deletions

View File

@ -30,4 +30,6 @@
-define(T_GET_INFO, 5_000). -define(T_GET_INFO, 5_000).
-define(T_TAKEOVER, 15_000). -define(T_TAKEOVER, 15_000).
-define(CM_POOL, emqx_cm_pool).
-endif. -endif.

View File

@ -670,7 +670,11 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}
ChanPids = [Pid | emqx_utils:drain_down(BatchSize)], ChanPids = [Pid | emqx_utils:drain_down(BatchSize)],
{Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon), {Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon),
lists:foreach(fun mark_channel_disconnected/1, ChanPids), lists:foreach(fun mark_channel_disconnected/1, ChanPids),
ok = emqx_pool:async_submit(fun lists:foreach/2, [fun ?MODULE:clean_down/1, Items]), ok = emqx_pool:async_submit_to_pool(
?CM_POOL,
fun lists:foreach/2,
[fun ?MODULE:clean_down/1, Items]
),
{noreply, State#{chan_pmon := PMon1}}; {noreply, State#{chan_pmon := PMon1}};
handle_info(Info, State) -> handle_info(Info, State) ->
?SLOG(error, #{msg => "unexpected_info", info => Info}), ?SLOG(error, #{msg => "unexpected_info", info => Info}),

View File

@ -25,6 +25,8 @@
%% for test %% for test
-export([restart_flapping/0]). -export([restart_flapping/0]).
-include("emqx_cm.hrl").
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% API %% API
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -45,6 +47,7 @@ init([]) ->
Banned = child_spec(emqx_banned, 1000, worker), Banned = child_spec(emqx_banned, 1000, worker),
Flapping = child_spec(emqx_flapping, 1000, worker), Flapping = child_spec(emqx_flapping, 1000, worker),
Locker = child_spec(emqx_cm_locker, 5000, worker), Locker = child_spec(emqx_cm_locker, 5000, worker),
CmPool = emqx_pool_sup:spec(emqx_cm_pool_sup, [?CM_POOL, random, {emqx_pool, start_link, []}]),
Registry = child_spec(emqx_cm_registry, 5000, worker), Registry = child_spec(emqx_cm_registry, 5000, worker),
Manager = child_spec(emqx_cm, 5000, worker), Manager = child_spec(emqx_cm, 5000, worker),
DSSessionGCSup = child_spec(emqx_persistent_session_ds_sup, infinity, supervisor), DSSessionGCSup = child_spec(emqx_persistent_session_ds_sup, infinity, supervisor),
@ -53,6 +56,7 @@ init([]) ->
Banned, Banned,
Flapping, Flapping,
Locker, Locker,
CmPool,
Registry, Registry,
Manager, Manager,
DSSessionGCSup DSSessionGCSup

View File

@ -28,11 +28,15 @@
submit/1, submit/1,
submit/2, submit/2,
async_submit/1, async_submit/1,
async_submit/2 async_submit/2,
submit_to_pool/2,
submit_to_pool/3,
async_submit_to_pool/2,
async_submit_to_pool/3
]). ]).
-ifdef(TEST). -ifdef(TEST).
-export([worker/0, flush_async_tasks/0]). -export([worker/0, flush_async_tasks/0, flush_async_tasks/1]).
-endif. -endif.
%% gen_server callbacks %% gen_server callbacks
@ -57,7 +61,7 @@
-spec start_link(atom(), pos_integer()) -> startlink_ret(). -spec start_link(atom(), pos_integer()) -> startlink_ret().
start_link(Pool, Id) -> start_link(Pool, Id) ->
gen_server:start_link( gen_server:start_link(
{local, emqx_utils:proc_name(?MODULE, Id)}, {local, emqx_utils:proc_name(Pool, Id)},
?MODULE, ?MODULE,
[Pool, Id], [Pool, Id],
[{hibernate_after, 1000}] [{hibernate_after, 1000}]
@ -66,32 +70,48 @@ start_link(Pool, Id) ->
%% @doc Submit work to the pool. %% @doc Submit work to the pool.
-spec submit(task()) -> any(). -spec submit(task()) -> any().
submit(Task) -> submit(Task) ->
call({submit, Task}). submit_to_pool(?POOL, Task).
-spec submit(fun(), list(any())) -> any(). -spec submit(fun(), list(any())) -> any().
submit(Fun, Args) -> submit(Fun, Args) ->
call({submit, {Fun, Args}}). submit_to_pool(?POOL, Fun, Args).
%% @private
call(Req) ->
gen_server:call(worker(), Req, infinity).
%% @doc Submit work to the pool asynchronously. %% @doc Submit work to the pool asynchronously.
-spec async_submit(task()) -> ok. -spec async_submit(task()) -> ok.
async_submit(Task) -> async_submit(Task) ->
cast({async_submit, Task}). async_submit_to_pool(?POOL, Task).
-spec async_submit(fun(), list(any())) -> ok. -spec async_submit(fun(), list(any())) -> ok.
async_submit(Fun, Args) -> async_submit(Fun, Args) ->
cast({async_submit, {Fun, Args}}). async_submit_to_pool(?POOL, Fun, Args).
-spec submit_to_pool(any(), task()) -> any().
submit_to_pool(Pool, Task) ->
call(Pool, {submit, Task}).
-spec submit_to_pool(any(), fun(), list(any())) -> any().
submit_to_pool(Pool, Fun, Args) ->
call(Pool, {submit, {Fun, Args}}).
-spec async_submit_to_pool(any(), task()) -> ok.
async_submit_to_pool(Pool, Task) ->
cast(Pool, {async_submit, Task}).
-spec async_submit_to_pool(any(), fun(), list(any())) -> ok.
async_submit_to_pool(Pool, Fun, Args) ->
cast(Pool, {async_submit, {Fun, Args}}).
%% @private %% @private
cast(Msg) -> call(Pool, Req) ->
gen_server:cast(worker(), Msg). gen_server:call(worker(Pool), Req, infinity).
%% @private %% @private
worker() -> cast(Pool, Msg) ->
gproc_pool:pick_worker(?POOL). gen_server:cast(worker(Pool), Msg).
%% @private
worker(Pool) ->
gproc_pool:pick_worker(Pool).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% gen_server callbacks %% gen_server callbacks
@ -146,15 +166,25 @@ run(Fun) when is_function(Fun) ->
Fun(). Fun().
-ifdef(TEST). -ifdef(TEST).
worker() ->
worker(?POOL).
flush_async_tasks() ->
flush_async_tasks(?POOL).
%% This help function creates a large enough number of async tasks %% This help function creates a large enough number of async tasks
%% to force flush the pool workers. %% to force flush the pool workers.
%% The number of tasks should be large enough to ensure all workers have %% The number of tasks should be large enough to ensure all workers have
%% the chance to work on at least one of the tasks. %% the chance to work on at least one of the tasks.
flush_async_tasks() -> flush_async_tasks(Pool) ->
Ref = make_ref(), Ref = make_ref(),
Self = self(), Self = self(),
L = lists:seq(1, 997), L = lists:seq(1, 997),
lists:foreach(fun(I) -> emqx_pool:async_submit(fun() -> Self ! {done, Ref, I} end, []) end, L), lists:foreach(
fun(I) -> emqx_pool:async_submit_to_pool(Pool, fun() -> Self ! {done, Ref, I} end, []) end,
L
),
lists:foreach( lists:foreach(
fun(I) -> fun(I) ->
receive receive

View File

@ -221,7 +221,7 @@ t_open_session_race_condition(_) ->
end, end,
%% sync %% sync
ignored = gen_server:call(?CM, ignore, infinity), ignored = gen_server:call(?CM, ignore, infinity),
ok = emqx_pool:flush_async_tasks(), ok = emqx_pool:flush_async_tasks(?CM_POOL),
?assertEqual([], emqx_cm:lookup_channels(ClientId)). ?assertEqual([], emqx_cm:lookup_channels(ClientId)).
t_kick_session_discard_normal(_) -> t_kick_session_discard_normal(_) ->
@ -343,7 +343,7 @@ test_stepdown_session(Action, Reason) ->
end, end,
% sync % sync
ignored = gen_server:call(?CM, ignore, infinity), ignored = gen_server:call(?CM, ignore, infinity),
ok = flush_emqx_pool(), ok = flush_emqx_cm_pool(),
?assertEqual([], emqx_cm:lookup_channels(ClientId)). ?assertEqual([], emqx_cm:lookup_channels(ClientId)).
%% Channel deregistration is delegated to emqx_pool as a sync tasks. %% Channel deregistration is delegated to emqx_pool as a sync tasks.
@ -353,10 +353,12 @@ test_stepdown_session(Action, Reason) ->
%% to sync with the pool workers. %% to sync with the pool workers.
%% The number of tasks should be large enough to ensure all workers have %% The number of tasks should be large enough to ensure all workers have
%% the chance to work on at least one of the tasks. %% the chance to work on at least one of the tasks.
flush_emqx_pool() -> flush_emqx_cm_pool() ->
Self = self(), Self = self(),
L = lists:seq(1, 1000), L = lists:seq(1, 1000),
lists:foreach(fun(I) -> emqx_pool:async_submit(fun() -> Self ! {done, I} end, []) end, L), lists:foreach(
fun(I) -> emqx_pool:async_submit_to_pool(?CM_POOL, fun() -> Self ! {done, I} end, []) end, L
),
lists:foreach( lists:foreach(
fun(I) -> fun(I) ->
receive receive

View File

@ -0,0 +1,2 @@
Isolate channels cleanup from other async tasks (like routes cleanup) by using a dedicated pool,
as this task can be quite slow under high network latency conditions.