diff --git a/apps/emqx_retainer/include/emqx_retainer.hrl b/apps/emqx_retainer/include/emqx_retainer.hrl index d25aecc6b..dbaf8937e 100644 --- a/apps/emqx_retainer/include/emqx_retainer.hrl +++ b/apps/emqx_retainer/include/emqx_retainer.hrl @@ -16,5 +16,6 @@ -define(APP, emqx_retainer). -define(TAB, ?APP). +-define(POOL, retainer_worker_pool). -record(retained, {topic, msg, expiry_time}). diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index 84e49c5be..281aacb6b 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -69,7 +69,7 @@ on_session_subscribed(_, _, #{share := ShareName}) when ShareName =/= undefined ok; on_session_subscribed(_, Topic, #{rh := Rh, is_new := IsNew}) -> case Rh =:= 0 orelse (Rh =:= 1 andalso IsNew) of - true -> emqx_pool:async_submit(fun ?MODULE:dispatch/2, [self(), Topic]); + true -> emqx_pool:async_submit(?POOL, fun ?MODULE:dispatch/2, [self(), Topic]); _ -> ok end. diff --git a/apps/emqx_retainer/src/emqx_retainer_sup.erl b/apps/emqx_retainer/src/emqx_retainer_sup.erl index 550858afa..01e8770e6 100644 --- a/apps/emqx_retainer/src/emqx_retainer_sup.erl +++ b/apps/emqx_retainer/src/emqx_retainer_sup.erl @@ -18,21 +18,36 @@ -behaviour(supervisor). --export([start_link/1]). +-include("emqx_retainer.hrl"). +-export([start_link/1]). -export([init/1]). start_link(Env) -> supervisor:start_link({local, ?MODULE}, ?MODULE, [Env]). init([Env]) -> - {ok, {{one_for_one, 10, 3600}, - [#{id => retainer, - start => {emqx_retainer, start_link, [Env]}, - restart => permanent, - shutdown => 5000, - type => worker, - modules => [emqx_retainer]} || not is_managed_by_modules()]}}. + Retainer = #{ + id => retainer, + start => {emqx_retainer, start_link, [Env]}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [emqx_retainer] + }, + WorkerPool = #{ + id => ?POOL, + start => {emqx_pool_sup, start_link, [?POOL, random, {emqx_pool, start_link, []}]}, + restart => permanent, + shutdown => 5000, + type => supervisor, + modules => [emqx_pool_sup] + }, + ChildSpecs = case is_managed_by_modules() of + false -> [Retainer, WorkerPool]; + true -> [WorkerPool] + end, + {ok, {{one_for_one, 10, 3600}, ChildSpecs}}. -ifdef(EMQX_ENTERPRISE). diff --git a/src/emqx_pool.erl b/src/emqx_pool.erl index ae1ed1d23..0872b4186 100644 --- a/src/emqx_pool.erl +++ b/src/emqx_pool.erl @@ -28,8 +28,10 @@ -export([ submit/1 , submit/2 + , submit/3 , async_submit/1 , async_submit/2 + , async_submit/3 ]). -ifdef(TEST). @@ -56,7 +58,7 @@ %% @doc Start pool. -spec(start_link(atom(), pos_integer()) -> startlink_ret()). start_link(Pool, Id) -> - gen_server:start_link({local, emqx_misc:proc_name(?MODULE, Id)}, + gen_server:start_link({local, emqx_misc:proc_name(Pool, Id)}, ?MODULE, [Pool, Id], [{hibernate_after, 1000}]). %% @doc Submit work to the pool. @@ -68,9 +70,9 @@ submit(Task) -> submit(Fun, Args) -> call({submit, {Fun, Args}}). -%% @private -call(Req) -> - gen_server:call(worker(), Req, infinity). +-spec(submit(atom(), fun(), list(any())) -> any()). +submit(Pool, Fun, Args) -> + call(Pool, {submit, {Fun, Args}}). %% @doc Submit work to the pool asynchronously. -spec(async_submit(task()) -> ok). @@ -81,14 +83,30 @@ async_submit(Task) -> async_submit(Fun, Args) -> cast({async_submit, {Fun, Args}}). +-spec(async_submit(atom(), fun(), list(any())) -> ok). +async_submit(Pool, Fun, Args) -> + cast(Pool, {async_submit, {Fun, Args}}). + +%% @private +call(Req) -> + gen_server:call(worker(), Req, infinity). + +call(Pool, Req) -> + gen_server:call(worker(Pool), Req, infinity). + %% @private cast(Msg) -> gen_server:cast(worker(), Msg). +cast(Pool, Msg) -> + gen_server:cast(worker(Pool), Msg). %% @private worker() -> gproc_pool:pick_worker(?POOL). +worker(Pool) -> + gproc_pool:pick_worker(Pool). + %%-------------------------------------------------------------------- %% gen_server callbacks %%--------------------------------------------------------------------