diff --git a/apps/emqx/include/emqx_cm.hrl b/apps/emqx/include/emqx_cm.hrl index ae70f131f..6478a6162 100644 --- a/apps/emqx/include/emqx_cm.hrl +++ b/apps/emqx/include/emqx_cm.hrl @@ -30,4 +30,6 @@ -define(T_GET_INFO, 5_000). -define(T_TAKEOVER, 15_000). +-define(CM_POOL, emqx_cm_pool). + -endif. diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index 660ac3cfe..2e6714e7f 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -670,7 +670,11 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon} ChanPids = [Pid | emqx_utils:drain_down(BatchSize)], {Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon), lists:foreach(fun mark_channel_disconnected/1, ChanPids), - ok = emqx_pool:async_submit(fun lists:foreach/2, [fun ?MODULE:clean_down/1, Items]), + ok = emqx_pool:async_submit_to_pool( + ?CM_POOL, + fun lists:foreach/2, + [fun ?MODULE:clean_down/1, Items] + ), {noreply, State#{chan_pmon := PMon1}}; handle_info(Info, State) -> ?SLOG(error, #{msg => "unexpected_info", info => Info}), diff --git a/apps/emqx/src/emqx_cm_sup.erl b/apps/emqx/src/emqx_cm_sup.erl index e7420b4da..622921f1d 100644 --- a/apps/emqx/src/emqx_cm_sup.erl +++ b/apps/emqx/src/emqx_cm_sup.erl @@ -25,6 +25,8 @@ %% for test -export([restart_flapping/0]). +-include("emqx_cm.hrl"). + %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- @@ -45,6 +47,7 @@ init([]) -> Banned = child_spec(emqx_banned, 1000, worker), Flapping = child_spec(emqx_flapping, 1000, worker), Locker = child_spec(emqx_cm_locker, 5000, worker), + CmPool = emqx_pool_sup:spec(emqx_cm_pool_sup, [?CM_POOL, random, {emqx_pool, start_link, []}]), Registry = child_spec(emqx_cm_registry, 5000, worker), Manager = child_spec(emqx_cm, 5000, worker), DSSessionGCSup = child_spec(emqx_persistent_session_ds_sup, infinity, supervisor), @@ -53,6 +56,7 @@ init([]) -> Banned, Flapping, Locker, + CmPool, Registry, Manager, DSSessionGCSup diff --git a/apps/emqx/src/emqx_pool.erl b/apps/emqx/src/emqx_pool.erl index 1cb5f429c..39c585133 100644 --- a/apps/emqx/src/emqx_pool.erl +++ b/apps/emqx/src/emqx_pool.erl @@ -28,11 +28,15 @@ submit/1, submit/2, async_submit/1, - async_submit/2 + async_submit/2, + submit_to_pool/2, + submit_to_pool/3, + async_submit_to_pool/2, + async_submit_to_pool/3 ]). -ifdef(TEST). --export([worker/0, flush_async_tasks/0]). +-export([worker/0, flush_async_tasks/0, flush_async_tasks/1]). -endif. %% gen_server callbacks @@ -57,7 +61,7 @@ -spec start_link(atom(), pos_integer()) -> startlink_ret(). start_link(Pool, Id) -> gen_server:start_link( - {local, emqx_utils:proc_name(?MODULE, Id)}, + {local, emqx_utils:proc_name(Pool, Id)}, ?MODULE, [Pool, Id], [{hibernate_after, 1000}] @@ -66,32 +70,48 @@ start_link(Pool, Id) -> %% @doc Submit work to the pool. -spec submit(task()) -> any(). submit(Task) -> - call({submit, Task}). + submit_to_pool(?POOL, Task). -spec submit(fun(), list(any())) -> any(). submit(Fun, Args) -> - call({submit, {Fun, Args}}). - -%% @private -call(Req) -> - gen_server:call(worker(), Req, infinity). + submit_to_pool(?POOL, Fun, Args). %% @doc Submit work to the pool asynchronously. -spec async_submit(task()) -> ok. async_submit(Task) -> - cast({async_submit, Task}). + async_submit_to_pool(?POOL, Task). -spec async_submit(fun(), list(any())) -> ok. async_submit(Fun, Args) -> - cast({async_submit, {Fun, Args}}). + async_submit_to_pool(?POOL, Fun, Args). + +-spec submit_to_pool(any(), task()) -> any(). +submit_to_pool(Pool, Task) -> + call(Pool, {submit, Task}). + +-spec submit_to_pool(any(), fun(), list(any())) -> any(). +submit_to_pool(Pool, Fun, Args) -> + call(Pool, {submit, {Fun, Args}}). + +-spec async_submit_to_pool(any(), task()) -> ok. +async_submit_to_pool(Pool, Task) -> + cast(Pool, {async_submit, Task}). + +-spec async_submit_to_pool(any(), fun(), list(any())) -> ok. +async_submit_to_pool(Pool, Fun, Args) -> + cast(Pool, {async_submit, {Fun, Args}}). %% @private -cast(Msg) -> - gen_server:cast(worker(), Msg). +call(Pool, Req) -> + gen_server:call(worker(Pool), Req, infinity). %% @private -worker() -> - gproc_pool:pick_worker(?POOL). +cast(Pool, Msg) -> + gen_server:cast(worker(Pool), Msg). + +%% @private +worker(Pool) -> + gproc_pool:pick_worker(Pool). %%-------------------------------------------------------------------- %% gen_server callbacks @@ -146,15 +166,25 @@ run(Fun) when is_function(Fun) -> Fun(). -ifdef(TEST). + +worker() -> + worker(?POOL). + +flush_async_tasks() -> + flush_async_tasks(?POOL). + %% This help function creates a large enough number of async tasks %% to force flush the pool workers. %% The number of tasks should be large enough to ensure all workers have %% the chance to work on at least one of the tasks. -flush_async_tasks() -> +flush_async_tasks(Pool) -> Ref = make_ref(), Self = self(), L = lists:seq(1, 997), - lists:foreach(fun(I) -> emqx_pool:async_submit(fun() -> Self ! {done, Ref, I} end, []) end, L), + lists:foreach( + fun(I) -> emqx_pool:async_submit_to_pool(Pool, fun() -> Self ! {done, Ref, I} end, []) end, + L + ), lists:foreach( fun(I) -> receive diff --git a/apps/emqx/test/emqx_cm_SUITE.erl b/apps/emqx/test/emqx_cm_SUITE.erl index 4ecea9a4b..e175b4349 100644 --- a/apps/emqx/test/emqx_cm_SUITE.erl +++ b/apps/emqx/test/emqx_cm_SUITE.erl @@ -221,7 +221,7 @@ t_open_session_race_condition(_) -> end, %% sync ignored = gen_server:call(?CM, ignore, infinity), - ok = emqx_pool:flush_async_tasks(), + ok = emqx_pool:flush_async_tasks(?CM_POOL), ?assertEqual([], emqx_cm:lookup_channels(ClientId)). t_kick_session_discard_normal(_) -> @@ -343,7 +343,7 @@ test_stepdown_session(Action, Reason) -> end, % sync ignored = gen_server:call(?CM, ignore, infinity), - ok = flush_emqx_pool(), + ok = flush_emqx_cm_pool(), ?assertEqual([], emqx_cm:lookup_channels(ClientId)). %% Channel deregistration is delegated to emqx_pool as a sync tasks. @@ -353,10 +353,12 @@ test_stepdown_session(Action, Reason) -> %% to sync with the pool workers. %% The number of tasks should be large enough to ensure all workers have %% the chance to work on at least one of the tasks. -flush_emqx_pool() -> +flush_emqx_cm_pool() -> Self = self(), L = lists:seq(1, 1000), - lists:foreach(fun(I) -> emqx_pool:async_submit(fun() -> Self ! {done, I} end, []) end, L), + lists:foreach( + fun(I) -> emqx_pool:async_submit_to_pool(?CM_POOL, fun() -> Self ! {done, I} end, []) end, L + ), lists:foreach( fun(I) -> receive diff --git a/changes/ce/perf-12336.en.md b/changes/ce/perf-12336.en.md new file mode 100644 index 000000000..5c385e6b6 --- /dev/null +++ b/changes/ce/perf-12336.en.md @@ -0,0 +1,2 @@ +Isolate channels cleanup from other async tasks (like routes cleanup) by using a dedicated pool, +as this task can be quite slow under high network latency conditions.