fix: create dedicated pool for retainer

This commit is contained in:
Shawn 2023-03-21 17:05:13 +08:00
parent 4f3f9d533c
commit 4bfd7dd14a
4 changed files with 47 additions and 13 deletions

View File

@ -16,5 +16,6 @@
-define(APP, emqx_retainer).
-define(TAB, ?APP).
-define(POOL, retainer_worker_pool).
-record(retained, {topic, msg, expiry_time}).

View File

@ -69,7 +69,7 @@ on_session_subscribed(_, _, #{share := ShareName}) when ShareName =/= undefined
ok;
on_session_subscribed(_, Topic, #{rh := Rh, is_new := IsNew}) ->
case Rh =:= 0 orelse (Rh =:= 1 andalso IsNew) of
true -> emqx_pool:async_submit(fun ?MODULE:dispatch/2, [self(), Topic]);
true -> emqx_pool:async_submit(?POOL, fun ?MODULE:dispatch/2, [self(), Topic]);
_ -> ok
end.

View File

@ -18,21 +18,36 @@
-behaviour(supervisor).
-export([start_link/1]).
-include("emqx_retainer.hrl").
-export([start_link/1]).
-export([init/1]).
start_link(Env) ->
supervisor:start_link({local, ?MODULE}, ?MODULE, [Env]).
init([Env]) ->
{ok, {{one_for_one, 10, 3600},
[#{id => retainer,
start => {emqx_retainer, start_link, [Env]},
restart => permanent,
shutdown => 5000,
type => worker,
modules => [emqx_retainer]} || not is_managed_by_modules()]}}.
Retainer = #{
id => retainer,
start => {emqx_retainer, start_link, [Env]},
restart => permanent,
shutdown => 5000,
type => worker,
modules => [emqx_retainer]
},
WorkerPool = #{
id => ?POOL,
start => {emqx_pool_sup, start_link, [?POOL, random, {emqx_pool, start_link, []}]},
restart => permanent,
shutdown => 5000,
type => supervisor,
modules => [emqx_pool_sup]
},
ChildSpecs = case is_managed_by_modules() of
false -> [Retainer, WorkerPool];
true -> [WorkerPool]
end,
{ok, {{one_for_one, 10, 3600}, ChildSpecs}}.
-ifdef(EMQX_ENTERPRISE).

View File

@ -28,8 +28,10 @@
-export([ submit/1
, submit/2
, submit/3
, async_submit/1
, async_submit/2
, async_submit/3
]).
-ifdef(TEST).
@ -56,7 +58,7 @@
%% @doc Start pool.
-spec(start_link(atom(), pos_integer()) -> startlink_ret()).
start_link(Pool, Id) ->
gen_server:start_link({local, emqx_misc:proc_name(?MODULE, Id)},
gen_server:start_link({local, emqx_misc:proc_name(Pool, Id)},
?MODULE, [Pool, Id], [{hibernate_after, 1000}]).
%% @doc Submit work to the pool.
@ -68,9 +70,9 @@ submit(Task) ->
submit(Fun, Args) ->
call({submit, {Fun, Args}}).
%% @private
call(Req) ->
gen_server:call(worker(), Req, infinity).
-spec(submit(atom(), fun(), list(any())) -> any()).
submit(Pool, Fun, Args) ->
call(Pool, {submit, {Fun, Args}}).
%% @doc Submit work to the pool asynchronously.
-spec(async_submit(task()) -> ok).
@ -81,14 +83,30 @@ async_submit(Task) ->
async_submit(Fun, Args) ->
cast({async_submit, {Fun, Args}}).
-spec(async_submit(atom(), fun(), list(any())) -> ok).
async_submit(Pool, Fun, Args) ->
cast(Pool, {async_submit, {Fun, Args}}).
%% @private
call(Req) ->
gen_server:call(worker(), Req, infinity).
call(Pool, Req) ->
gen_server:call(worker(Pool), Req, infinity).
%% @private
cast(Msg) ->
gen_server:cast(worker(), Msg).
cast(Pool, Msg) ->
gen_server:cast(worker(Pool), Msg).
%% @private
worker() ->
gproc_pool:pick_worker(?POOL).
worker(Pool) ->
gproc_pool:pick_worker(Pool).
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------