Reconnect callback (#9)

* Link to Connection Pid

* Add reconnect callback function
This commit is contained in:
Gilbert 2019-06-10 22:12:33 +08:00 committed by GitHub
parent 640d6cc225
commit 93c263741c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 40 additions and 33 deletions

View File

@ -14,25 +14,20 @@
-module(ecpool). -module(ecpool).
-export([pool_spec/4, -export([pool_spec/4, start_pool/3, start_sup_pool/3, stop_sup_pool/1,
start_pool/3, get_client/1, get_client/2, with_client/2, with_client/3,
start_sup_pool/3, set_reconnect_callback/2,
stop_sup_pool/1, name/1, workers/1]).
get_client/1,
get_client/2,
with_client/2,
with_client/3,
workers/1
]).
-export([name/1]). -type pool_type() :: random | hash | round_robin.
-type(pool_name() :: atom()). -type reconn_callback() :: {fun((pid()) -> term())}.
-type(pool_type() :: random | hash | round_robin).
-type(option() :: {pool_size, pos_integer()} -type option() :: {pool_size, pos_integer()}
| {pool_type, pool_type()} | {pool_type, pool_type()}
| {auto_reconnect, false | pos_integer()} | {auto_reconnect, false | pos_integer()}
| tuple()). | {on_reconnect, reconn_callback()}
| tuple().
-export_type([pool_name/0, -export_type([pool_name/0,
pool_type/0, pool_type/0,
@ -70,6 +65,12 @@ get_client(Pool) ->
get_client(Pool, Key) -> get_client(Pool, Key) ->
gproc_pool:pick_worker(name(Pool), Key). gproc_pool:pick_worker(name(Pool), Key).
-spec(set_reconnect_callback(atom(), reconn_callback()) -> ok).
set_reconnect_callback(Pool, Callback) ->
[ecpool_worker:set_reconnect_callback(Worker, Callback)
|| {_WorkerName, Worker} <- ecpool:workers(Pool)],
ok.
%% @doc Call the fun with client/connection %% @doc Call the fun with client/connection
-spec(with_client(atom(), fun((Client :: pid()) -> any())) -> any()). -spec(with_client(atom(), fun((Client :: pid()) -> any())) -> any()).
with_client(Pool, Fun) when is_atom(Pool) -> with_client(Pool, Fun) when is_atom(Pool) ->

View File

@ -19,8 +19,7 @@
-export([start_link/4]). -export([start_link/4]).
%% API Function Exports %% API Function Exports
-export([client/1]). -export([start_link/4, client/1, is_connected/1, set_reconnect_callback/2]).
-export([is_connected/1]).
%% gen_server Function Exports %% gen_server Function Exports
-export([init/1, -export([init/1,
@ -31,7 +30,7 @@
code_change/3 code_change/3
]). ]).
-record(state, {pool, id, client, mod, opts}). -record(state, {pool, id, client, mod, on_reconnect, opts}).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Callback %% Callback
@ -73,17 +72,22 @@ client(Pid) ->
is_connected(Pid) -> is_connected(Pid) ->
gen_server:call(Pid, is_connected, infinity). gen_server:call(Pid, is_connected, infinity).
%%------------------------------------------------------------------------------ -spec(set_reconnect_callback(pid(), ecpool:reconn_callback()) -> ok).
%% gen_server callbacks set_reconnect_callback(Pid, OnReconnect) ->
%%------------------------------------------------------------------------------ gen_server:cast(Pid, {set_reconn_callbk, OnReconnect}).
%%%=============================================================================
%%% gen_server callbacks
%%%=============================================================================
init([Pool, Id, Mod, Opts]) -> init([Pool, Id, Mod, Opts]) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
State = #state{pool = Pool, id = Id, mod = Mod, opts = Opts}, State = #state{pool = Pool, id = Id, mod = Mod, opts = Opts,
on_reconnect = proplists:get_value(on_reconnect, Opts)},
case connect(State) of case connect(State) of
{ok, Client} -> {ok, Client} when is_pid(Client) ->
ok = maybe_apply(proplists:get_value(bind, Opts), self()), erlang:link(Client),
true = gproc_pool:connect_worker(ecpool:name(Pool), {Pool, Id}), gproc_pool:connect_worker(ecpool:name(Pool), {Pool, Id}),
{ok, State#state{client = Client}}; {ok, State#state{client = Client}};
{error, Error} -> {error, Error} ->
{stop, Error} {stop, Error}
@ -103,8 +107,10 @@ handle_call(Req, _From, State) ->
logger:error("[PoolWorker] unexpected call: ~p", [Req]), logger:error("[PoolWorker] unexpected call: ~p", [Req]),
{reply, ignored, State}. {reply, ignored, State}.
handle_cast(Msg, State) -> handle_cast({set_reconn_callbk, OnReconnect}, State) ->
logger:error("[PoolWorker] unexpected cast: ~p", [Msg]), {noreply, State#state{on_reconnect = OnReconnect}};
handle_cast(_Msg, State) ->
{noreply, State}. {noreply, State}.
handle_info({'EXIT', Pid, Reason}, State = #state{client = Pid, opts = Opts}) -> handle_info({'EXIT', Pid, Reason}, State = #state{client = Pid, opts = Opts}) ->
@ -115,9 +121,10 @@ handle_info({'EXIT', Pid, Reason}, State = #state{client = Pid, opts = Opts}) ->
reconnect_after(Secs, State) reconnect_after(Secs, State)
end; end;
handle_info(reconnect, State = #state{pool= Pool, opts = Opts}) -> handle_info(reconnect, State = #state{opts = Opts, on_reconnect = OnReconnect}) ->
try connect(State) of case catch connect(State) of
{ok, Client} -> {ok, Client} ->
handle_reconnect(Client, OnReconnect),
{noreply, State#state{client = Client}}; {noreply, State#state{client = Client}};
{error, Reason} -> {error, Reason} ->
logger:error("[PoolWorker] ~p reconnect error: ~p", [Pool, Reason]), logger:error("[PoolWorker] ~p reconnect error: ~p", [Pool, Reason]),
@ -165,8 +172,7 @@ reconnect_after(Secs, State) ->
_ = erlang:send_after(timer:seconds(Secs), self(), reconnect), _ = erlang:send_after(timer:seconds(Secs), self(), reconnect),
{noreply, State#state{client = undefined}}. {noreply, State#state{client = undefined}}.
maybe_apply(undefined, _Self) -> handle_reconnect(_, undefined) ->
ok; ok;
maybe_apply(Fun, Self) -> handle_reconnect(Client, OnReconnect) ->
Fun(Self). OnReconnect(Client).