unit test
This commit is contained in:
parent
ac24678d0e
commit
b2a7dffccc
|
@ -1,7 +1,7 @@
|
|||
{application, ecpool,
|
||||
[
|
||||
{description, "Erlang Client/Connection Pool"},
|
||||
{vsn, "0.1"},
|
||||
{vsn, "0.2"},
|
||||
{registered, []},
|
||||
{applications, [
|
||||
kernel,
|
||||
|
|
|
@ -19,31 +19,38 @@
|
|||
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
%%% SOFTWARE.
|
||||
%%%-----------------------------------------------------------------------------
|
||||
%%% @doc ecpool API.
|
||||
%%% @doc ecpool Main API.
|
||||
%%%
|
||||
%%% @author Feng Lee <feng@emqtt.io>
|
||||
%%%-----------------------------------------------------------------------------
|
||||
|
||||
-module(ecpool).
|
||||
|
||||
-export([name/1, start_pool/3, with_client/2, with_client/3, stop_pool/1]).
|
||||
|
||||
name(Pool) ->
|
||||
{?MODULE, Pool}.
|
||||
-export([start_pool/3, start_sup_pool/3, stop_sup_pool/1,
|
||||
with_client/2, with_client/3, name/1, workers/1]).
|
||||
|
||||
%% @doc Start the pool
|
||||
-spec start_pool(atom(), atom(), list()) -> {ok, pid()} | {error, any()}.
|
||||
start_pool(Pool, Mod, Opts) when is_atom(Pool) ->
|
||||
ecpool_pool_sup:start_link(Pool, Mod, Opts).
|
||||
|
||||
stop_pool(Pool) when is_atom(Pool) ->
|
||||
%% @doc Start the pool supervised by ecpool_sup
|
||||
start_sup_pool(Pool, Mod, Opts) when is_atom(Pool) ->
|
||||
ecpool_sup:start_pool(Pool, Mod, Opts).
|
||||
|
||||
%% @doc Start the pool supervised by ecpool_sup
|
||||
stop_sup_pool(Pool) when is_atom(Pool) ->
|
||||
ecpool_sup:stop_pool(Pool).
|
||||
|
||||
%% @doc Call the fun with client/connection
|
||||
-spec with_client(atom(), fun((Client :: pid()) -> any())) -> any().
|
||||
with_client(Pool, Fun) when is_atom(Pool) ->
|
||||
Worker = gproc_pool:pick_worker({?MODULE, Pool}),
|
||||
with_worker(Worker, Fun).
|
||||
with_worker(gproc_pool:pick_worker(name(Pool)), Fun).
|
||||
|
||||
%% @doc Call the fun with client/connection
|
||||
-spec with_client(atom(), any(), fun((Client :: pid()) -> any())) -> any().
|
||||
with_client(Pool, Key, Fun) when is_atom(Pool) ->
|
||||
Worker = gproc_pool:pick_worker({?MODULE, Pool}, Key),
|
||||
with_worker(Worker, Fun).
|
||||
with_worker(gproc_pool:pick_worker(name(Pool), Key), Fun).
|
||||
|
||||
with_worker(Worker, Fun) ->
|
||||
case ecpool_worker:client(Worker) of
|
||||
|
@ -51,3 +58,9 @@ with_worker(Worker, Fun) ->
|
|||
{error, Reason} -> {error, Reason}
|
||||
end.
|
||||
|
||||
%% @doc ecpool name
|
||||
name(Pool) -> {?MODULE, Pool}.
|
||||
|
||||
%% @doc pool workers
|
||||
workers(Pool) -> gproc_pool:active_workers(name(Pool)).
|
||||
|
||||
|
|
|
@ -78,11 +78,9 @@ ensure_pool_worker(Pool, Name, Slot) ->
|
|||
error:exists -> ok
|
||||
end.
|
||||
|
||||
handle_call(info, _From, State = #state{name = Pool, size = Size,
|
||||
type = Type}) ->
|
||||
Workers = gproc_pool:active_workers(ecpool:name(Pool)),
|
||||
handle_call(info, _From, State = #state{name = Pool, size = Size, type = Type}) ->
|
||||
Info = [{pool_name, Pool}, {pool_size, Size},
|
||||
{pool_type, Type}, {workers, Workers}],
|
||||
{pool_type, Type}, {workers, ecpool:workers(Pool)}],
|
||||
{reply, Info, State};
|
||||
|
||||
handle_call(_Request, _From, State) ->
|
||||
|
|
|
@ -35,7 +35,7 @@
|
|||
-export([init/1]).
|
||||
|
||||
%% @doc Start supervisor.
|
||||
-spec start_link() -> {ok, pid()}.
|
||||
-spec start_link() -> {ok, pid()} | {error, any()}.
|
||||
start_link() ->
|
||||
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
||||
|
||||
|
@ -46,10 +46,10 @@ start_pool(Pool, Mod, Opts) when is_atom(Pool) ->
|
|||
stop_pool(Pool) when is_atom(Pool) ->
|
||||
ChildId = child_id(Pool),
|
||||
case supervisor:terminate_child(?MODULE, ChildId) of
|
||||
ok ->
|
||||
supervisor:delete_child(?MODULE, ChildId);
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
ok ->
|
||||
supervisor:delete_child(?MODULE, ChildId);
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
%% @doc All Pools supervisored by ecpool_sup.
|
||||
|
|
|
@ -29,7 +29,7 @@
|
|||
-behaviour(gen_server).
|
||||
|
||||
%% API Function Exports
|
||||
-export([start_link/4, client/1]).
|
||||
-export([start_link/4, client/1, is_connected/1]).
|
||||
|
||||
%% gen_server Function Exports
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||
|
@ -69,7 +69,13 @@ start_link(Pool, Id, Mod, Opts) ->
|
|||
|
||||
%% @doc Get client/connection.
|
||||
-spec client(pid()) -> undefined | pid().
|
||||
client(Pid) -> gen_server:call(Pid, client, infinity).
|
||||
client(Pid) ->
|
||||
gen_server:call(Pid, client, infinity).
|
||||
|
||||
%% @doc Is client connected?
|
||||
-spec is_connected(pid()) -> boolean().
|
||||
is_connected(Pid) ->
|
||||
gen_server:call(Pid, is_connected).
|
||||
|
||||
%%%=============================================================================
|
||||
%%% gen_server callbacks
|
||||
|
@ -86,6 +92,9 @@ init([Pool, Id, Mod, Opts]) ->
|
|||
{stop, Error}
|
||||
end.
|
||||
|
||||
handle_call(is_connected, _From, State = #state{client = Client}) ->
|
||||
{reply, Client =/= undefined andalso is_process_alive(Client), State};
|
||||
|
||||
handle_call(client, _From, State = #state{client = undefined}) ->
|
||||
{reply, {error, disconnected}, State};
|
||||
|
||||
|
@ -100,8 +109,7 @@ handle_info({'EXIT', Pid, Reason}, State = #state{client = Pid, opts = Opts}) ->
|
|||
false ->
|
||||
{stop, Reason, State};
|
||||
Secs ->
|
||||
erlang:send_after(timer:seconds(Secs), self(), reconnect),
|
||||
{noreply, State#state{client = undefined}}
|
||||
reconnect(Secs, State)
|
||||
end;
|
||||
|
||||
handle_info(reconnect, State = #state{opts = Opts}) ->
|
||||
|
@ -109,9 +117,7 @@ handle_info(reconnect, State = #state{opts = Opts}) ->
|
|||
{ok, Client} ->
|
||||
{noreply, State#state{client = Client}};
|
||||
{error, _Error} ->
|
||||
Secs = proplists:get_value(auto_reconnect, Opts),
|
||||
erlang:send_after(timer:seconds(Secs), self(), reconnect),
|
||||
{noreply, State#state{client = undefined}}
|
||||
reconnect(proplists:get_value(auto_reconnect, Opts), State)
|
||||
end;
|
||||
|
||||
handle_info(_Info, State) ->
|
||||
|
@ -141,3 +147,7 @@ connopts([{auto_reconnect, _} | Opts], Acc) ->
|
|||
connopts([Opt | Opts], Acc) ->
|
||||
connopts(Opts, [Opt | Acc]).
|
||||
|
||||
reconnect(Secs, State) ->
|
||||
erlang:send_after(timer:seconds(Secs), self(), reconnect),
|
||||
{noreply, State#state{client = undefined}}.
|
||||
|
||||
|
|
|
@ -28,15 +28,11 @@
|
|||
|
||||
-behaviour(supervisor).
|
||||
|
||||
-export([start_link/3, workers/1, init/1]).
|
||||
-export([start_link/3, init/1]).
|
||||
|
||||
start_link(Pool, Mod, Opts) when is_atom(Pool) ->
|
||||
supervisor:start_link(?MODULE, [Pool, Mod, Opts]).
|
||||
|
||||
workers(WorkerSup) ->
|
||||
[{ChildId, Pid} || {ChildId, Pid, worker, _}
|
||||
<- supervisor:which_children(WorkerSup)].
|
||||
|
||||
init([Pool, Mod, Opts]) ->
|
||||
WorkerSpec = fun(Id) ->
|
||||
{{worker, Id}, {ecpool_worker, start_link, [Pool, Id, Mod, Opts]},
|
||||
|
|
Loading…
Reference in New Issue