Merge pull request #12336 from SergeTupchiy/EMQX-11743-dedicated-channel-cleanup-pool

perf(emqx_cm): use a dedicated pool for channel cleanup
This commit is contained in:
SergeTupchiy 2024-01-16 19:38:15 +02:00 committed by GitHub
commit ed78f2488b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 66 additions and 22 deletions

View File

@ -30,4 +30,6 @@
-define(T_GET_INFO, 5_000).
-define(T_TAKEOVER, 15_000).
-define(CM_POOL, emqx_cm_pool).
-endif.

View File

@ -670,7 +670,11 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}
ChanPids = [Pid | emqx_utils:drain_down(BatchSize)],
{Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon),
lists:foreach(fun mark_channel_disconnected/1, ChanPids),
ok = emqx_pool:async_submit(fun lists:foreach/2, [fun ?MODULE:clean_down/1, Items]),
ok = emqx_pool:async_submit_to_pool(
?CM_POOL,
fun lists:foreach/2,
[fun ?MODULE:clean_down/1, Items]
),
{noreply, State#{chan_pmon := PMon1}};
handle_info(Info, State) ->
?SLOG(error, #{msg => "unexpected_info", info => Info}),

View File

@ -25,6 +25,8 @@
%% for test
-export([restart_flapping/0]).
-include("emqx_cm.hrl").
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
@ -45,6 +47,7 @@ init([]) ->
Banned = child_spec(emqx_banned, 1000, worker),
Flapping = child_spec(emqx_flapping, 1000, worker),
Locker = child_spec(emqx_cm_locker, 5000, worker),
CmPool = emqx_pool_sup:spec(emqx_cm_pool_sup, [?CM_POOL, random, {emqx_pool, start_link, []}]),
Registry = child_spec(emqx_cm_registry, 5000, worker),
Manager = child_spec(emqx_cm, 5000, worker),
DSSessionGCSup = child_spec(emqx_persistent_session_ds_sup, infinity, supervisor),
@ -53,6 +56,7 @@ init([]) ->
Banned,
Flapping,
Locker,
CmPool,
Registry,
Manager,
DSSessionGCSup

View File

@ -28,11 +28,15 @@
submit/1,
submit/2,
async_submit/1,
async_submit/2
async_submit/2,
submit_to_pool/2,
submit_to_pool/3,
async_submit_to_pool/2,
async_submit_to_pool/3
]).
-ifdef(TEST).
-export([worker/0, flush_async_tasks/0]).
-export([worker/0, flush_async_tasks/0, flush_async_tasks/1]).
-endif.
%% gen_server callbacks
@ -57,7 +61,7 @@
-spec start_link(atom(), pos_integer()) -> startlink_ret().
start_link(Pool, Id) ->
gen_server:start_link(
{local, emqx_utils:proc_name(?MODULE, Id)},
{local, emqx_utils:proc_name(Pool, Id)},
?MODULE,
[Pool, Id],
[{hibernate_after, 1000}]
@ -66,32 +70,48 @@ start_link(Pool, Id) ->
%% @doc Submit work to the pool.
-spec submit(task()) -> any().
submit(Task) ->
call({submit, Task}).
submit_to_pool(?POOL, Task).
-spec submit(fun(), list(any())) -> any().
submit(Fun, Args) ->
call({submit, {Fun, Args}}).
%% @private
call(Req) ->
gen_server:call(worker(), Req, infinity).
submit_to_pool(?POOL, Fun, Args).
%% @doc Submit work to the pool asynchronously.
-spec async_submit(task()) -> ok.
async_submit(Task) ->
cast({async_submit, Task}).
async_submit_to_pool(?POOL, Task).
-spec async_submit(fun(), list(any())) -> ok.
async_submit(Fun, Args) ->
cast({async_submit, {Fun, Args}}).
async_submit_to_pool(?POOL, Fun, Args).
-spec submit_to_pool(any(), task()) -> any().
submit_to_pool(Pool, Task) ->
call(Pool, {submit, Task}).
-spec submit_to_pool(any(), fun(), list(any())) -> any().
submit_to_pool(Pool, Fun, Args) ->
call(Pool, {submit, {Fun, Args}}).
-spec async_submit_to_pool(any(), task()) -> ok.
async_submit_to_pool(Pool, Task) ->
cast(Pool, {async_submit, Task}).
-spec async_submit_to_pool(any(), fun(), list(any())) -> ok.
async_submit_to_pool(Pool, Fun, Args) ->
cast(Pool, {async_submit, {Fun, Args}}).
%% @private
cast(Msg) ->
gen_server:cast(worker(), Msg).
call(Pool, Req) ->
gen_server:call(worker(Pool), Req, infinity).
%% @private
worker() ->
gproc_pool:pick_worker(?POOL).
cast(Pool, Msg) ->
gen_server:cast(worker(Pool), Msg).
%% @private
worker(Pool) ->
gproc_pool:pick_worker(Pool).
%%--------------------------------------------------------------------
%% gen_server callbacks
@ -146,15 +166,25 @@ run(Fun) when is_function(Fun) ->
Fun().
-ifdef(TEST).
worker() ->
worker(?POOL).
flush_async_tasks() ->
flush_async_tasks(?POOL).
%% This help function creates a large enough number of async tasks
%% to force flush the pool workers.
%% The number of tasks should be large enough to ensure all workers have
%% the chance to work on at least one of the tasks.
flush_async_tasks() ->
flush_async_tasks(Pool) ->
Ref = make_ref(),
Self = self(),
L = lists:seq(1, 997),
lists:foreach(fun(I) -> emqx_pool:async_submit(fun() -> Self ! {done, Ref, I} end, []) end, L),
lists:foreach(
fun(I) -> emqx_pool:async_submit_to_pool(Pool, fun() -> Self ! {done, Ref, I} end, []) end,
L
),
lists:foreach(
fun(I) ->
receive

View File

@ -221,7 +221,7 @@ t_open_session_race_condition(_) ->
end,
%% sync
ignored = gen_server:call(?CM, ignore, infinity),
ok = emqx_pool:flush_async_tasks(),
ok = emqx_pool:flush_async_tasks(?CM_POOL),
?assertEqual([], emqx_cm:lookup_channels(ClientId)).
t_kick_session_discard_normal(_) ->
@ -343,7 +343,7 @@ test_stepdown_session(Action, Reason) ->
end,
% sync
ignored = gen_server:call(?CM, ignore, infinity),
ok = flush_emqx_pool(),
ok = flush_emqx_cm_pool(),
?assertEqual([], emqx_cm:lookup_channels(ClientId)).
%% Channel deregistration is delegated to emqx_pool as a sync tasks.
@ -353,10 +353,12 @@ test_stepdown_session(Action, Reason) ->
%% to sync with the pool workers.
%% The number of tasks should be large enough to ensure all workers have
%% the chance to work on at least one of the tasks.
flush_emqx_pool() ->
flush_emqx_cm_pool() ->
Self = self(),
L = lists:seq(1, 1000),
lists:foreach(fun(I) -> emqx_pool:async_submit(fun() -> Self ! {done, I} end, []) end, L),
lists:foreach(
fun(I) -> emqx_pool:async_submit_to_pool(?CM_POOL, fun() -> Self ! {done, I} end, []) end, L
),
lists:foreach(
fun(I) ->
receive

View File

@ -0,0 +1,2 @@
Isolate channels cleanup from other async tasks (like routes cleanup) by using a dedicated pool,
as this task can be quite slow under high network latency conditions.