Support complex client type

This commit is contained in:
terry-xiaoyu 2019-07-09 00:38:46 +08:00
parent 5bd3905354
commit d963056757
1 changed files with 39 additions and 12 deletions

View File

@ -35,7 +35,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
-record(state, {pool, id, client, mod, on_reconnect, opts}). -record(state, {pool, id, client, mod, on_reconnect, on_disconnect, supervisees = [], opts}).
%%%============================================================================= %%%=============================================================================
%%% Callback %%% Callback
@ -88,19 +88,27 @@ set_reconnect_callback(Pid, OnReconnect) ->
init([Pool, Id, Mod, Opts]) -> init([Pool, Id, Mod, Opts]) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
State = #state{pool = Pool, id = Id, mod = Mod, opts = Opts, State = #state{pool = Pool, id = Id, mod = Mod, opts = Opts,
on_reconnect = proplists:get_value(on_reconnect, Opts)}, on_reconnect = proplists:get_value(on_reconnect, Opts),
on_disconnect = proplists:get_value(on_disconnect, Opts)},
case connect(State) of case connect(State) of
{ok, Client} when is_pid(Client) -> {ok, Client} when is_pid(Client) ->
erlang:link(Client), erlang:link(Client),
gproc_pool:connect_worker(ecpool:name(Pool), {Pool, Id}), gproc_pool:connect_worker(ecpool:name(Pool), {Pool, Id}),
{ok, State#state{client = Client}}; {ok, State#state{client = Client, supervisees = [Client]}};
{{ok, Client}, #{supervisees := SupPids} = _SupOpts} when is_list(SupPids) ->
[erlang:link(P) || P <- SupPids],
gproc_pool:connect_worker(ecpool:name(Pool), {Pool, Id}),
{ok, State#state{client = Client, supervisees = SupPids}};
{error, Error} -> {error, Error} ->
{stop, Error} {stop, Error}
end. end.
handle_call(is_connected, _From, State = #state{client = Client}) -> handle_call(is_connected, _From, State = #state{client = Client}) when is_pid(Client) ->
{reply, Client =/= undefined andalso is_process_alive(Client), State}; {reply, Client =/= undefined andalso is_process_alive(Client), State};
handle_call(is_connected, _From, State = #state{client = Client}) ->
{reply, Client =/= undefined, State};
handle_call(client, _From, State = #state{client = undefined}) -> handle_call(client, _From, State = #state{client = undefined}) ->
{reply, {error, disconnected}, State}; {reply, {error, disconnected}, State};
@ -113,12 +121,16 @@ handle_cast({set_reconn_callbk, OnReconnect}, State) ->
handle_cast(_Msg, State) -> handle_cast(_Msg, State) ->
{noreply, State}. {noreply, State}.
handle_info({'EXIT', Pid, Reason}, State = #state{client = Pid, opts = Opts}) -> handle_info({'EXIT', Pid, Reason}, State = #state{opts = Opts, supervisees = SupPids}) ->
case lists:member(Pid, SupPids) of
true ->
case proplists:get_value(auto_reconnect, Opts, false) of case proplists:get_value(auto_reconnect, Opts, false) of
false -> {stop, Reason, State};
Secs -> reconnect(Secs, State)
end;
false -> false ->
{stop, Reason, State}; logger:warning("~p received unexpected exit: ~p, ~p", [?MODULE, Pid, Reason]),
Secs -> {noreply, State}
reconnect(Secs, State)
end; end;
handle_info(reconnect, State = #state{opts = Opts, on_reconnect = OnReconnect}) -> handle_info(reconnect, State = #state{opts = Opts, on_reconnect = OnReconnect}) ->
@ -126,6 +138,9 @@ handle_info(reconnect, State = #state{opts = Opts, on_reconnect = OnReconnect})
{ok, Client} -> {ok, Client} ->
handle_reconnect(Client, OnReconnect), handle_reconnect(Client, OnReconnect),
{noreply, State#state{client = Client}}; {noreply, State#state{client = Client}};
{{ok, Client}, #{supervisees := SupPids} = _SupOpts} ->
handle_reconnect(Client, OnReconnect),
{noreply, State#state{client = Client, supervisees = SupPids}};
{Err, _Reason} when Err =:= error orelse Err =:= 'EXIT' -> {Err, _Reason} when Err =:= error orelse Err =:= 'EXIT' ->
reconnect(proplists:get_value(auto_reconnect, Opts), State) reconnect(proplists:get_value(auto_reconnect, Opts), State)
end; end;
@ -133,7 +148,10 @@ handle_info(reconnect, State = #state{opts = Opts, on_reconnect = OnReconnect})
handle_info(_Info, State) -> handle_info(_Info, State) ->
{noreply, State}. {noreply, State}.
terminate(_Reason, #state{pool = Pool, id = Id}) -> terminate(_Reason, #state{pool = Pool, id = Id,
client = Client,
on_disconnect = Disconnect}) ->
handle_disconnect(Client, Disconnect),
gproc_pool:disconnect_worker(ecpool:name(Pool), {Pool, Id}). gproc_pool:disconnect_worker(ecpool:name(Pool), {Pool, Id}).
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
@ -157,11 +175,20 @@ connopts([{auto_reconnect, _} | Opts], Acc) ->
connopts([Opt | Opts], Acc) -> connopts([Opt | Opts], Acc) ->
connopts(Opts, [Opt | Acc]). connopts(Opts, [Opt | Acc]).
reconnect(Secs, State) -> reconnect(Secs, State = #state{client = Client, on_disconnect = Disconnect, supervisees = SubPids}) ->
[erlang:unlink(P) || P <- SubPids, is_pid(P)],
handle_disconnect(Client, Disconnect),
erlang:send_after(timer:seconds(Secs), self(), reconnect), erlang:send_after(timer:seconds(Secs), self(), reconnect),
{noreply, State#state{client = undefined}}. {noreply, State#state{client = undefined, supervisees = []}}.
handle_reconnect(_, undefined) -> handle_reconnect(_, undefined) ->
ok; ok;
handle_reconnect(Client, OnReconnect) -> handle_reconnect(Client, OnReconnect) ->
OnReconnect(Client). OnReconnect(Client).
handle_disconnect(undefined, _) ->
ok;
handle_disconnect(_, undefined) ->
ok;
handle_disconnect(Client, Disconnect) ->
Disconnect(Client).