Merge pull request #10 from emqx/complex_client
Support complex client type
This commit is contained in:
commit
253ffcdb5e
|
@ -35,7 +35,7 @@
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||||
terminate/2, code_change/3]).
|
terminate/2, code_change/3]).
|
||||||
|
|
||||||
-record(state, {pool, id, client, mod, on_reconnect, opts}).
|
-record(state, {pool, id, client, mod, on_reconnect, on_disconnect, supervisees = [], opts}).
|
||||||
|
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
%%% Callback
|
%%% Callback
|
||||||
|
@ -88,19 +88,27 @@ set_reconnect_callback(Pid, OnReconnect) ->
|
||||||
init([Pool, Id, Mod, Opts]) ->
|
init([Pool, Id, Mod, Opts]) ->
|
||||||
process_flag(trap_exit, true),
|
process_flag(trap_exit, true),
|
||||||
State = #state{pool = Pool, id = Id, mod = Mod, opts = Opts,
|
State = #state{pool = Pool, id = Id, mod = Mod, opts = Opts,
|
||||||
on_reconnect = proplists:get_value(on_reconnect, Opts)},
|
on_reconnect = proplists:get_value(on_reconnect, Opts),
|
||||||
|
on_disconnect = proplists:get_value(on_disconnect, Opts)},
|
||||||
case connect(State) of
|
case connect(State) of
|
||||||
{ok, Client} when is_pid(Client) ->
|
{ok, Client} when is_pid(Client) ->
|
||||||
erlang:link(Client),
|
erlang:link(Client),
|
||||||
gproc_pool:connect_worker(ecpool:name(Pool), {Pool, Id}),
|
gproc_pool:connect_worker(ecpool:name(Pool), {Pool, Id}),
|
||||||
{ok, State#state{client = Client}};
|
{ok, State#state{client = Client, supervisees = [Client]}};
|
||||||
|
{{ok, Client}, #{supervisees := SupPids} = _SupOpts} when is_list(SupPids) ->
|
||||||
|
[erlang:link(P) || P <- SupPids],
|
||||||
|
gproc_pool:connect_worker(ecpool:name(Pool), {Pool, Id}),
|
||||||
|
{ok, State#state{client = Client, supervisees = SupPids}};
|
||||||
{error, Error} ->
|
{error, Error} ->
|
||||||
{stop, Error}
|
{stop, Error}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
handle_call(is_connected, _From, State = #state{client = Client}) ->
|
handle_call(is_connected, _From, State = #state{client = Client}) when is_pid(Client) ->
|
||||||
{reply, Client =/= undefined andalso is_process_alive(Client), State};
|
{reply, Client =/= undefined andalso is_process_alive(Client), State};
|
||||||
|
|
||||||
|
handle_call(is_connected, _From, State = #state{client = Client}) ->
|
||||||
|
{reply, Client =/= undefined, State};
|
||||||
|
|
||||||
handle_call(client, _From, State = #state{client = undefined}) ->
|
handle_call(client, _From, State = #state{client = undefined}) ->
|
||||||
{reply, {error, disconnected}, State};
|
{reply, {error, disconnected}, State};
|
||||||
|
|
||||||
|
@ -113,12 +121,16 @@ handle_cast({set_reconn_callbk, OnReconnect}, State) ->
|
||||||
handle_cast(_Msg, State) ->
|
handle_cast(_Msg, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info({'EXIT', Pid, Reason}, State = #state{client = Pid, opts = Opts}) ->
|
handle_info({'EXIT', Pid, Reason}, State = #state{opts = Opts, supervisees = SupPids}) ->
|
||||||
case proplists:get_value(auto_reconnect, Opts, false) of
|
case lists:member(Pid, SupPids) of
|
||||||
|
true ->
|
||||||
|
case proplists:get_value(auto_reconnect, Opts, false) of
|
||||||
|
false -> {stop, Reason, State};
|
||||||
|
Secs -> reconnect(Secs, State)
|
||||||
|
end;
|
||||||
false ->
|
false ->
|
||||||
{stop, Reason, State};
|
logger:warning("~p received unexpected exit: ~p, ~p", [?MODULE, Pid, Reason]),
|
||||||
Secs ->
|
{noreply, State}
|
||||||
reconnect(Secs, State)
|
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_info(reconnect, State = #state{opts = Opts, on_reconnect = OnReconnect}) ->
|
handle_info(reconnect, State = #state{opts = Opts, on_reconnect = OnReconnect}) ->
|
||||||
|
@ -126,6 +138,9 @@ handle_info(reconnect, State = #state{opts = Opts, on_reconnect = OnReconnect})
|
||||||
{ok, Client} ->
|
{ok, Client} ->
|
||||||
handle_reconnect(Client, OnReconnect),
|
handle_reconnect(Client, OnReconnect),
|
||||||
{noreply, State#state{client = Client}};
|
{noreply, State#state{client = Client}};
|
||||||
|
{{ok, Client}, #{supervisees := SupPids} = _SupOpts} ->
|
||||||
|
handle_reconnect(Client, OnReconnect),
|
||||||
|
{noreply, State#state{client = Client, supervisees = SupPids}};
|
||||||
{Err, _Reason} when Err =:= error orelse Err =:= 'EXIT' ->
|
{Err, _Reason} when Err =:= error orelse Err =:= 'EXIT' ->
|
||||||
reconnect(proplists:get_value(auto_reconnect, Opts), State)
|
reconnect(proplists:get_value(auto_reconnect, Opts), State)
|
||||||
end;
|
end;
|
||||||
|
@ -133,7 +148,10 @@ handle_info(reconnect, State = #state{opts = Opts, on_reconnect = OnReconnect})
|
||||||
handle_info(_Info, State) ->
|
handle_info(_Info, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
terminate(_Reason, #state{pool = Pool, id = Id}) ->
|
terminate(_Reason, #state{pool = Pool, id = Id,
|
||||||
|
client = Client,
|
||||||
|
on_disconnect = Disconnect}) ->
|
||||||
|
handle_disconnect(Client, Disconnect),
|
||||||
gproc_pool:disconnect_worker(ecpool:name(Pool), {Pool, Id}).
|
gproc_pool:disconnect_worker(ecpool:name(Pool), {Pool, Id}).
|
||||||
|
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
|
@ -157,11 +175,20 @@ connopts([{auto_reconnect, _} | Opts], Acc) ->
|
||||||
connopts([Opt | Opts], Acc) ->
|
connopts([Opt | Opts], Acc) ->
|
||||||
connopts(Opts, [Opt | Acc]).
|
connopts(Opts, [Opt | Acc]).
|
||||||
|
|
||||||
reconnect(Secs, State) ->
|
reconnect(Secs, State = #state{client = Client, on_disconnect = Disconnect, supervisees = SubPids}) ->
|
||||||
|
[erlang:unlink(P) || P <- SubPids, is_pid(P)],
|
||||||
|
handle_disconnect(Client, Disconnect),
|
||||||
erlang:send_after(timer:seconds(Secs), self(), reconnect),
|
erlang:send_after(timer:seconds(Secs), self(), reconnect),
|
||||||
{noreply, State#state{client = undefined}}.
|
{noreply, State#state{client = undefined, supervisees = []}}.
|
||||||
|
|
||||||
handle_reconnect(_, undefined) ->
|
handle_reconnect(_, undefined) ->
|
||||||
ok;
|
ok;
|
||||||
handle_reconnect(Client, OnReconnect) ->
|
handle_reconnect(Client, OnReconnect) ->
|
||||||
OnReconnect(Client).
|
OnReconnect(Client).
|
||||||
|
|
||||||
|
handle_disconnect(undefined, _) ->
|
||||||
|
ok;
|
||||||
|
handle_disconnect(_, undefined) ->
|
||||||
|
ok;
|
||||||
|
handle_disconnect(Client, Disconnect) ->
|
||||||
|
Disconnect(Client).
|
Loading…
Reference in New Issue