Merge branch 'master' into develop

This commit is contained in:
Feng Lee 2019-08-26 14:38:32 +08:00 committed by GitHub
commit bb09d96f01
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 73 additions and 38 deletions

BIN
rebar vendored

Binary file not shown.

View File

@ -24,7 +24,5 @@
{eunit_opts, [verbose]}. {eunit_opts, [verbose]}.
{deps, [ {deps, [{gproc, "0.8.0"}]}.
{gproc, ".*", {git, "git://github.com/uwiger/gproc.git", {branch, "master"}}}
]}.

4
rebar.lock Normal file
View File

@ -0,0 +1,4 @@
[{<<"gproc">>,
{git,"git://github.com/uwiger/gproc.git",
{ref,"b7b0748d7adaf9b2243921d7e9cf320690eb0544"}},
0}].

View File

@ -1,13 +1,15 @@
{application, ecpool, {application, ecpool,
[ [
{description, "Erlang Client/Connection Pool"}, {description, "Erlang Client/Connection Pool"},
{vsn, "0.3.1"}, {vsn, "git"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,
stdlib, stdlib,
gproc gproc
]}, ]},
{mod, {ecpool_app, []}}, {mod, { ecpool_app, []}},
{env, []} {env, []},
{licenses,["Apache-2.0"]},
{links,[{"Github","https://github.com/emqx/ecpool"}]}
]}. ]}.

View File

@ -32,7 +32,7 @@
code_change/3 code_change/3
]). ]).
-record(state, {pool, id, client, mod, on_reconnect, opts}). -record(state, {pool, id, client, mod, on_reconnect, on_disconnect, supervisees = [], opts}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Callback %% Callback
@ -78,6 +78,10 @@ is_connected(Pid) ->
set_reconnect_callback(Pid, OnReconnect) -> set_reconnect_callback(Pid, OnReconnect) ->
gen_server:cast(Pid, {set_reconn_callbk, OnReconnect}). gen_server:cast(Pid, {set_reconn_callbk, OnReconnect}).
-spec(set_reconnect_callback(pid(), ecpool:reconn_callback()) -> ok).
set_reconnect_callback(Pid, OnReconnect) ->
gen_server:cast(Pid, {set_reconn_callbk, OnReconnect}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% gen_server callbacks %% gen_server callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -85,12 +89,12 @@ set_reconnect_callback(Pid, OnReconnect) ->
init([Pool, Id, Mod, Opts]) -> init([Pool, Id, Mod, Opts]) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
State = #state{pool = Pool, id = Id, mod = Mod, opts = Opts, State = #state{pool = Pool, id = Id, mod = Mod, opts = Opts,
on_reconnect = proplists:get_value(on_reconnect, Opts)}, on_reconnect = proplists:get_value(on_reconnect, Opts),
case connect(State) of on_disconnect = proplists:get_value(on_disconnect, Opts)},
{ok, Client} when is_pid(Client) -> case connect_internal(State) of
erlang:link(Client), {ok, NewState} ->
gproc_pool:connect_worker(ecpool:name(Pool), {Pool, Id}), gproc_pool:connect_worker(ecpool:name(Pool), {Pool, Id}),
{ok, State#state{client = Client}}; {ok, NewState};
{error, Error} -> {error, Error} ->
{stop, Error} {stop, Error}
end. end.
@ -99,6 +103,9 @@ handle_call(is_connected, _From, State = #state{client = Client}) ->
IsAlive = Client =/= undefined andalso is_process_alive(Client), IsAlive = Client =/= undefined andalso is_process_alive(Client),
{reply, IsAlive, State}; {reply, IsAlive, State};
handle_call(is_connected, _From, State = #state{client = Client}) ->
{reply, Client =/= undefined, State};
handle_call(client, _From, State = #state{client = undefined}) -> handle_call(client, _From, State = #state{client = undefined}) ->
{reply, {error, disconnected}, State}; {reply, {error, disconnected}, State};
@ -112,37 +119,42 @@ handle_call(Req, _From, State) ->
handle_cast({set_reconn_callbk, OnReconnect}, State) -> handle_cast({set_reconn_callbk, OnReconnect}, State) ->
{noreply, State#state{on_reconnect = OnReconnect}}; {noreply, State#state{on_reconnect = OnReconnect}};
handle_cast({set_reconn_callbk, OnReconnect}, State) ->
{noreply, State#state{on_reconnect = OnReconnect}};
handle_cast(_Msg, State) -> handle_cast(_Msg, State) ->
{noreply, State}. {noreply, State}.
handle_info({'EXIT', Pid, Reason}, State = #state{client = Pid, opts = Opts}) -> handle_info({'EXIT', Pid, Reason}, State = #state{opts = Opts, supervisees = SupPids}) ->
case proplists:get_value(auto_reconnect, Opts, false) of case lists:member(Pid, SupPids) of
true ->
case proplists:get_value(auto_reconnect, Opts, false) of
false -> {stop, Reason, State};
Secs -> reconnect(Secs, State)
end;
false -> false ->
{stop, Reason, State}; logger:debug("~p received unexpected exit:~0p from ~p. Supervisees: ~p",
Secs -> [?MODULE, Reason, Pid, SupPids]),
reconnect_after(Secs, State) {noreply, State}
end; end;
handle_info(reconnect, State = #state{pool = Pool, opts = Opts, on_reconnect = OnReconnect}) -> handle_info(reconnect, State = #state{opts = Opts, on_reconnect = OnReconnect}) ->
try connect(State) of case connect_internal(State) of
{ok, Client} -> {ok, NewState = #state{client = Client}} ->
handle_reconnect(Client, OnReconnect), handle_reconnect(Client, OnReconnect),
{noreply, State#state{client = Client}}; {noreply, NewState};
{error, Reason} -> {Err, _Reason} when Err =:= error orelse Err =:= 'EXIT' ->
logger:error("[PoolWorker] ~p reconnect error: ~p", [Pool, Reason]), reconnect(proplists:get_value(auto_reconnect, Opts), State)
reconnect_after(proplists:get_value(auto_reconnect, Opts), State) end;
catch
_Error:Reason ->
logger:error("[PoolWorker] ~p reconnect error: ~p", [Pool, Reason]),
reconnect_after(proplists:get_value(auto_reconnect, Opts), State)
end;
handle_info(Info, State) -> handle_info(Info, State) ->
logger:error("[PoolWorker] unexpected info: ~p", [Info]), logger:error("[PoolWorker] unexpected info: ~p", [Info]),
{noreply, State}. {noreply, State}.
terminate(_Reason, #state{pool = Pool, id = Id, opts = Opts}) -> terminate(_Reason, #state{pool = Pool, id = Id,
ok = maybe_apply(proplists:get_value(unbind, Opts), self()), client = Client,
on_disconnect = Disconnect}) ->
handle_disconnect(Client, Disconnect),
gproc_pool:disconnect_worker(ecpool:name(Pool), {Pool, Id}). gproc_pool:disconnect_worker(ecpool:name(Pool), {Pool, Id}).
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
@ -152,8 +164,8 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal Functions %% Internal Functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
connect(#state{mod = Mod, opts = Opts}) -> connect(#state{mod = Mod, opts = Opts, id = Id}) ->
Mod:connect(connopts(Opts, [])). Mod:connect([{ecpool_worker_id, Id} | connopts(Opts, [])]).
connopts([], Acc) -> connopts([], Acc) ->
Acc; Acc;
@ -170,8 +182,10 @@ connopts([{unbind, _}|Opts], Acc) ->
connopts([Opt|Opts], Acc) -> connopts([Opt|Opts], Acc) ->
connopts(Opts, [Opt|Acc]). connopts(Opts, [Opt|Acc]).
reconnect_after(Secs, State) -> reconnect(Secs, State = #state{client = Client, on_disconnect = Disconnect, supervisees = SubPids}) ->
_ = erlang:send_after(timer:seconds(Secs), self(), reconnect), [erlang:unlink(P) || P <- SubPids, is_pid(P)],
handle_disconnect(Client, Disconnect),
erlang:send_after(timer:seconds(Secs), self(), reconnect),
{noreply, State#state{client = undefined}}. {noreply, State#state{client = undefined}}.
handle_reconnect(_, undefined) -> handle_reconnect(_, undefined) ->
@ -179,6 +193,23 @@ handle_reconnect(_, undefined) ->
handle_reconnect(Client, OnReconnect) -> handle_reconnect(Client, OnReconnect) ->
OnReconnect(Client). OnReconnect(Client).
maybe_apply(undefined, _) -> ok; handle_disconnect(undefined, _) ->
maybe_apply(Fun, Arg) -> erlang:apply(Fun, [Arg]). ok;
handle_disconnect(_, undefined) ->
ok;
handle_disconnect(Client, Disconnect) ->
Disconnect(Client).
connect_internal(State) ->
try connect(State) of
{ok, Client} when is_pid(Client) ->
erlang:link(Client),
{ok, State#state{client = Client, supervisees = [Client]}};
{ok, Client, #{supervisees := SupPids} = _SupOpts} when is_list(SupPids) ->
[erlang:link(P) || P <- SupPids],
{ok, State#state{client = Client, supervisees = SupPids}};
{error, Error} ->
{error, Error}
catch
_C:Reason -> {error, Reason}
end.