diff --git a/rebar b/rebar deleted file mode 100755 index c2b7e2022..000000000 Binary files a/rebar and /dev/null differ diff --git a/rebar.config b/rebar.config index fb70bfda2..29b60bc42 100644 --- a/rebar.config +++ b/rebar.config @@ -24,7 +24,5 @@ {eunit_opts, [verbose]}. -{deps, [ - {gproc, ".*", {git, "git://github.com/uwiger/gproc.git", {branch, "master"}}} -]}. +{deps, [{gproc, "0.8.0"}]}. diff --git a/rebar.lock b/rebar.lock new file mode 100644 index 000000000..d55839bda --- /dev/null +++ b/rebar.lock @@ -0,0 +1,4 @@ +[{<<"gproc">>, + {git,"git://github.com/uwiger/gproc.git", + {ref,"b7b0748d7adaf9b2243921d7e9cf320690eb0544"}}, + 0}]. diff --git a/src/ecpool.app.src b/src/ecpool.app.src index d35073e48..93b9f8293 100644 --- a/src/ecpool.app.src +++ b/src/ecpool.app.src @@ -1,13 +1,15 @@ {application, ecpool, [ {description, "Erlang Client/Connection Pool"}, - {vsn, "0.3.1"}, + {vsn, "git"}, {registered, []}, {applications, [ kernel, stdlib, gproc ]}, - {mod, {ecpool_app, []}}, - {env, []} + {mod, { ecpool_app, []}}, + {env, []}, + {licenses,["Apache-2.0"]}, + {links,[{"Github","https://github.com/emqx/ecpool"}]} ]}. diff --git a/src/ecpool_worker.erl b/src/ecpool_worker.erl index 4b206de8b..08cd9ebd5 100644 --- a/src/ecpool_worker.erl +++ b/src/ecpool_worker.erl @@ -32,7 +32,7 @@ code_change/3 ]). --record(state, {pool, id, client, mod, on_reconnect, opts}). +-record(state, {pool, id, client, mod, on_reconnect, on_disconnect, supervisees = [], opts}). %%-------------------------------------------------------------------- %% Callback @@ -78,6 +78,10 @@ is_connected(Pid) -> set_reconnect_callback(Pid, OnReconnect) -> gen_server:cast(Pid, {set_reconn_callbk, OnReconnect}). +-spec(set_reconnect_callback(pid(), ecpool:reconn_callback()) -> ok). +set_reconnect_callback(Pid, OnReconnect) -> + gen_server:cast(Pid, {set_reconn_callbk, OnReconnect}). + %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- @@ -85,12 +89,12 @@ set_reconnect_callback(Pid, OnReconnect) -> init([Pool, Id, Mod, Opts]) -> process_flag(trap_exit, true), State = #state{pool = Pool, id = Id, mod = Mod, opts = Opts, - on_reconnect = proplists:get_value(on_reconnect, Opts)}, - case connect(State) of - {ok, Client} when is_pid(Client) -> - erlang:link(Client), + on_reconnect = proplists:get_value(on_reconnect, Opts), + on_disconnect = proplists:get_value(on_disconnect, Opts)}, + case connect_internal(State) of + {ok, NewState} -> gproc_pool:connect_worker(ecpool:name(Pool), {Pool, Id}), - {ok, State#state{client = Client}}; + {ok, NewState}; {error, Error} -> {stop, Error} end. @@ -99,6 +103,9 @@ handle_call(is_connected, _From, State = #state{client = Client}) -> IsAlive = Client =/= undefined andalso is_process_alive(Client), {reply, IsAlive, State}; +handle_call(is_connected, _From, State = #state{client = Client}) -> + {reply, Client =/= undefined, State}; + handle_call(client, _From, State = #state{client = undefined}) -> {reply, {error, disconnected}, State}; @@ -112,37 +119,42 @@ handle_call(Req, _From, State) -> handle_cast({set_reconn_callbk, OnReconnect}, State) -> {noreply, State#state{on_reconnect = OnReconnect}}; +handle_cast({set_reconn_callbk, OnReconnect}, State) -> + {noreply, State#state{on_reconnect = OnReconnect}}; + handle_cast(_Msg, State) -> {noreply, State}. -handle_info({'EXIT', Pid, Reason}, State = #state{client = Pid, opts = Opts}) -> - case proplists:get_value(auto_reconnect, Opts, false) of +handle_info({'EXIT', Pid, Reason}, State = #state{opts = Opts, supervisees = SupPids}) -> + case lists:member(Pid, SupPids) of + true -> + case proplists:get_value(auto_reconnect, Opts, false) of + false -> {stop, Reason, State}; + Secs -> reconnect(Secs, State) + end; false -> - {stop, Reason, State}; - Secs -> - reconnect_after(Secs, State) + logger:debug("~p received unexpected exit:~0p from ~p. Supervisees: ~p", + [?MODULE, Reason, Pid, SupPids]), + {noreply, State} end; -handle_info(reconnect, State = #state{pool = Pool, opts = Opts, on_reconnect = OnReconnect}) -> - try connect(State) of - {ok, Client} -> - handle_reconnect(Client, OnReconnect), - {noreply, State#state{client = Client}}; - {error, Reason} -> - logger:error("[PoolWorker] ~p reconnect error: ~p", [Pool, Reason]), - reconnect_after(proplists:get_value(auto_reconnect, Opts), State) - catch - _Error:Reason -> - logger:error("[PoolWorker] ~p reconnect error: ~p", [Pool, Reason]), - reconnect_after(proplists:get_value(auto_reconnect, Opts), State) - end; +handle_info(reconnect, State = #state{opts = Opts, on_reconnect = OnReconnect}) -> + case connect_internal(State) of + {ok, NewState = #state{client = Client}} -> + handle_reconnect(Client, OnReconnect), + {noreply, NewState}; + {Err, _Reason} when Err =:= error orelse Err =:= 'EXIT' -> + reconnect(proplists:get_value(auto_reconnect, Opts), State) + end; handle_info(Info, State) -> logger:error("[PoolWorker] unexpected info: ~p", [Info]), {noreply, State}. -terminate(_Reason, #state{pool = Pool, id = Id, opts = Opts}) -> - ok = maybe_apply(proplists:get_value(unbind, Opts), self()), +terminate(_Reason, #state{pool = Pool, id = Id, + client = Client, + on_disconnect = Disconnect}) -> + handle_disconnect(Client, Disconnect), gproc_pool:disconnect_worker(ecpool:name(Pool), {Pool, Id}). code_change(_OldVsn, State, _Extra) -> @@ -152,8 +164,8 @@ code_change(_OldVsn, State, _Extra) -> %% Internal Functions %%-------------------------------------------------------------------- -connect(#state{mod = Mod, opts = Opts}) -> - Mod:connect(connopts(Opts, [])). +connect(#state{mod = Mod, opts = Opts, id = Id}) -> + Mod:connect([{ecpool_worker_id, Id} | connopts(Opts, [])]). connopts([], Acc) -> Acc; @@ -170,8 +182,10 @@ connopts([{unbind, _}|Opts], Acc) -> connopts([Opt|Opts], Acc) -> connopts(Opts, [Opt|Acc]). -reconnect_after(Secs, State) -> - _ = erlang:send_after(timer:seconds(Secs), self(), reconnect), +reconnect(Secs, State = #state{client = Client, on_disconnect = Disconnect, supervisees = SubPids}) -> + [erlang:unlink(P) || P <- SubPids, is_pid(P)], + handle_disconnect(Client, Disconnect), + erlang:send_after(timer:seconds(Secs), self(), reconnect), {noreply, State#state{client = undefined}}. handle_reconnect(_, undefined) -> @@ -179,6 +193,23 @@ handle_reconnect(_, undefined) -> handle_reconnect(Client, OnReconnect) -> OnReconnect(Client). -maybe_apply(undefined, _) -> ok; -maybe_apply(Fun, Arg) -> erlang:apply(Fun, [Arg]). +handle_disconnect(undefined, _) -> + ok; +handle_disconnect(_, undefined) -> + ok; +handle_disconnect(Client, Disconnect) -> + Disconnect(Client). +connect_internal(State) -> + try connect(State) of + {ok, Client} when is_pid(Client) -> + erlang:link(Client), + {ok, State#state{client = Client, supervisees = [Client]}}; + {ok, Client, #{supervisees := SupPids} = _SupOpts} when is_list(SupPids) -> + [erlang:link(P) || P <- SupPids], + {ok, State#state{client = Client, supervisees = SupPids}}; + {error, Error} -> + {error, Error} + catch + _C:Reason -> {error, Reason} + end.