diff --git a/src/ecpool_worker.erl b/src/ecpool_worker.erl index 588c96bee..2b1aeb9b5 100644 --- a/src/ecpool_worker.erl +++ b/src/ecpool_worker.erl @@ -35,7 +35,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {pool, id, client, mod, on_reconnect, opts}). +-record(state, {pool, id, client, mod, on_reconnect, on_disconnect, supervisees = [], opts}). %%%============================================================================= %%% Callback @@ -88,19 +88,27 @@ set_reconnect_callback(Pid, OnReconnect) -> init([Pool, Id, Mod, Opts]) -> process_flag(trap_exit, true), State = #state{pool = Pool, id = Id, mod = Mod, opts = Opts, - on_reconnect = proplists:get_value(on_reconnect, Opts)}, + on_reconnect = proplists:get_value(on_reconnect, Opts), + on_disconnect = proplists:get_value(on_disconnect, Opts)}, case connect(State) of {ok, Client} when is_pid(Client) -> erlang:link(Client), gproc_pool:connect_worker(ecpool:name(Pool), {Pool, Id}), - {ok, State#state{client = Client}}; + {ok, State#state{client = Client, supervisees = [Client]}}; + {{ok, Client}, #{supervisees := SupPids} = _SupOpts} when is_list(SupPids) -> + [erlang:link(P) || P <- SupPids], + gproc_pool:connect_worker(ecpool:name(Pool), {Pool, Id}), + {ok, State#state{client = Client, supervisees = SupPids}}; {error, Error} -> {stop, Error} end. -handle_call(is_connected, _From, State = #state{client = Client}) -> +handle_call(is_connected, _From, State = #state{client = Client}) when is_pid(Client) -> {reply, Client =/= undefined andalso is_process_alive(Client), State}; +handle_call(is_connected, _From, State = #state{client = Client}) -> + {reply, Client =/= undefined, State}; + handle_call(client, _From, State = #state{client = undefined}) -> {reply, {error, disconnected}, State}; @@ -113,12 +121,16 @@ handle_cast({set_reconn_callbk, OnReconnect}, State) -> handle_cast(_Msg, State) -> {noreply, State}. -handle_info({'EXIT', Pid, Reason}, State = #state{client = Pid, opts = Opts}) -> - case proplists:get_value(auto_reconnect, Opts, false) of +handle_info({'EXIT', Pid, Reason}, State = #state{opts = Opts, supervisees = SupPids}) -> + case lists:member(Pid, SupPids) of + true -> + case proplists:get_value(auto_reconnect, Opts, false) of + false -> {stop, Reason, State}; + Secs -> reconnect(Secs, State) + end; false -> - {stop, Reason, State}; - Secs -> - reconnect(Secs, State) + logger:warning("~p received unexpected exit: ~p, ~p", [?MODULE, Pid, Reason]), + {noreply, State} end; handle_info(reconnect, State = #state{opts = Opts, on_reconnect = OnReconnect}) -> @@ -126,6 +138,9 @@ handle_info(reconnect, State = #state{opts = Opts, on_reconnect = OnReconnect}) {ok, Client} -> handle_reconnect(Client, OnReconnect), {noreply, State#state{client = Client}}; + {{ok, Client}, #{supervisees := SupPids} = _SupOpts} -> + handle_reconnect(Client, OnReconnect), + {noreply, State#state{client = Client, supervisees = SupPids}}; {Err, _Reason} when Err =:= error orelse Err =:= 'EXIT' -> reconnect(proplists:get_value(auto_reconnect, Opts), State) end; @@ -133,7 +148,10 @@ handle_info(reconnect, State = #state{opts = Opts, on_reconnect = OnReconnect}) handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, #state{pool = Pool, id = Id}) -> +terminate(_Reason, #state{pool = Pool, id = Id, + client = Client, + on_disconnect = Disconnect}) -> + handle_disconnect(Client, Disconnect), gproc_pool:disconnect_worker(ecpool:name(Pool), {Pool, Id}). code_change(_OldVsn, State, _Extra) -> @@ -157,11 +175,20 @@ connopts([{auto_reconnect, _} | Opts], Acc) -> connopts([Opt | Opts], Acc) -> connopts(Opts, [Opt | Acc]). -reconnect(Secs, State) -> +reconnect(Secs, State = #state{client = Client, on_disconnect = Disconnect, supervisees = SubPids}) -> + [erlang:unlink(P) || P <- SubPids, is_pid(P)], + handle_disconnect(Client, Disconnect), erlang:send_after(timer:seconds(Secs), self(), reconnect), - {noreply, State#state{client = undefined}}. + {noreply, State#state{client = undefined, supervisees = []}}. handle_reconnect(_, undefined) -> ok; handle_reconnect(Client, OnReconnect) -> OnReconnect(Client). + +handle_disconnect(undefined, _) -> + ok; +handle_disconnect(_, undefined) -> + ok; +handle_disconnect(Client, Disconnect) -> + Disconnect(Client). \ No newline at end of file