%%-------------------------------------------------------------------- %% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_retainer_pool). -behaviour(gen_server). -include_lib("emqx/include/logger.hrl"). %% API -export([start_link/2, async_submit/2]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, format_status/2]). -define(POOL, ?MODULE). %%%=================================================================== %%% API %%%=================================================================== async_submit(Fun, Args) -> cast({async_submit, {Fun, Args}}). %%-------------------------------------------------------------------- %% @doc %% Starts the server %% @end %%-------------------------------------------------------------------- -spec start_link(atom(), pos_integer()) -> {ok, Pid :: pid()} | {error, Error :: {already_started, pid()}} | {error, Error :: term()} | ignore. start_link(Pool, Id) -> gen_server:start_link({local, emqx_misc:proc_name(?MODULE, Id)}, ?MODULE, [Pool, Id], [{hibernate_after, 1000}]). %%%=================================================================== %%% gen_server callbacks %%%=================================================================== %%-------------------------------------------------------------------- %% @private %% @doc %% Initializes the server %% @end %%-------------------------------------------------------------------- -spec init(Args :: term()) -> {ok, State :: term()} | {ok, State :: term(), Timeout :: timeout()} | {ok, State :: term(), hibernate} | {stop, Reason :: term()} | ignore. init([Pool, Id]) -> true = gproc_pool:connect_worker(Pool, {Pool, Id}), {ok, #{pool => Pool, id => Id}}. %%-------------------------------------------------------------------- %% @private %% @doc %% Handling call messages %% @end %%-------------------------------------------------------------------- -spec handle_call(Request :: term(), From :: {pid(), term()}, State :: term()) -> {reply, Reply :: term(), NewState :: term()} | {reply, Reply :: term(), NewState :: term(), Timeout :: timeout()} | {reply, Reply :: term(), NewState :: term(), hibernate} | {noreply, NewState :: term()} | {noreply, NewState :: term(), Timeout :: timeout()} | {noreply, NewState :: term(), hibernate} | {stop, Reason :: term(), Reply :: term(), NewState :: term()} | {stop, Reason :: term(), NewState :: term()}. handle_call(Req, _From, State) -> ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. %%-------------------------------------------------------------------- %% @private %% @doc %% Handling cast messages %% @end %%-------------------------------------------------------------------- -spec handle_cast(Request :: term(), State :: term()) -> {noreply, NewState :: term()} | {noreply, NewState :: term(), Timeout :: timeout()} | {noreply, NewState :: term(), hibernate} | {stop, Reason :: term(), NewState :: term()}. handle_cast({async_submit, Task}, State) -> try run(Task) catch _:Error:Stacktrace -> ?LOG(error, "Error: ~0p, ~0p", [Error, Stacktrace]) end, {noreply, State}; handle_cast(Msg, State) -> ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. %%-------------------------------------------------------------------- %% @private %% @doc %% Handling all non call/cast messages %% @end %%-------------------------------------------------------------------- -spec handle_info(Info :: timeout() | term(), State :: term()) -> {noreply, NewState :: term()} | {noreply, NewState :: term(), Timeout :: timeout()} | {noreply, NewState :: term(), hibernate} | {stop, Reason :: normal | term(), NewState :: term()}. handle_info(Info, State) -> ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}. %%-------------------------------------------------------------------- %% @private %% @doc %% This function is called by a gen_server when it is about to %% terminate. It should be the opposite of Module:init/1 and do any %% necessary cleaning up. When it returns, the gen_server terminates %% with Reason. The return value is ignored. %% @end %%-------------------------------------------------------------------- -spec terminate(Reason :: normal | shutdown | {shutdown, term()} | term(), State :: term()) -> any(). terminate(_Reason, #{pool := Pool, id := Id}) -> gproc_pool:disconnect_worker(Pool, {Pool, Id}). %%-------------------------------------------------------------------- %% @private %% @doc %% Convert process state when code is changed %% @end %%-------------------------------------------------------------------- -spec code_change(OldVsn :: term() | {down, term()}, State :: term(), Extra :: term()) -> {ok, NewState :: term()} | {error, Reason :: term()}. code_change(_OldVsn, State, _Extra) -> {ok, State}. %%-------------------------------------------------------------------- %% @private %% @doc %% This function is called for changing the form and appearance %% of gen_server status when it is returned from sys:get_status/1,2 %% or when it appears in termination error logs. %% @end %%-------------------------------------------------------------------- -spec format_status(Opt :: normal | terminate, Status :: list()) -> Status :: term(). format_status(_Opt, Status) -> Status. %%%=================================================================== %%% Internal functions %%%=================================================================== %% @private cast(Msg) -> gen_server:cast(worker(), Msg). %% @private worker() -> gproc_pool:pick_worker(?POOL, self()). run({M, F, A}) -> erlang:apply(M, F, A); run({F, A}) when is_function(F), is_list(A) -> erlang:apply(F, A); run(Fun) when is_function(Fun) -> Fun().