Merge pull request #19 from emqx/develop

Manual-pull-request-by-2019-09-06
This commit is contained in:
Shawn 2019-09-06 10:43:25 +08:00 committed by GitHub
commit 624d848f77
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 148 additions and 64 deletions

View File

@ -1,15 +1,14 @@
{application, ecpool,
[
{description, "Erlang Client/Connection Pool"},
[{description, "Erlang Client/Connection Pool"},
{vsn, "git"},
{registered, []},
{applications, [
kernel,
{applications, [kernel,
stdlib,
gproc
]},
{mod, { ecpool_app, []}},
{mod, {ecpool_app, []}},
{env, []},
{licenses,["Apache-2.0"]},
{maintainers, ["Feng Lee <feng@emqx.io>"]},
{links,[{"Github","https://github.com/emqx/ecpool"}]}
]}.

View File

@ -16,27 +16,36 @@
-module(ecpool).
-export([pool_spec/4, start_pool/3, start_sup_pool/3, stop_sup_pool/1,
get_client/1, get_client/2, with_client/2, with_client/3,
set_reconnect_callback/2,
name/1, workers/1]).
-export([ pool_spec/4
, start_pool/3
, start_sup_pool/3
, stop_sup_pool/1
, get_client/1
, get_client/2
, with_client/2
, with_client/3
, name/1
, workers/1
]).
-export_type([pool_name/0,
pool_type/0,
option/0
-export([set_reconnect_callback/2]).
-export_type([ pool_name/0
, pool_type/0
, option/0
]).
-type pool_name() :: term().
-type(pool_name() :: term()).
-type pool_type() :: random | hash | round_robin.
-type(pool_type() :: random | hash | round_robin).
-type reconn_callback() :: {fun((pid()) -> term())}.
-type(reconn_callback() :: {fun((pid()) -> term())}).
-type option() :: {pool_size, pos_integer()}
-type(option() :: {pool_size, pos_integer()}
| {pool_type, pool_type()}
| {auto_reconnect, false | pos_integer()}
| {on_reconnect, reconn_callback()}
| tuple().
| tuple()).
pool_spec(ChildId, Pool, Mod, Opts) ->
#{id => ChildId,

View File

@ -18,22 +18,22 @@
-behaviour(gen_server).
-import(proplists, [get_value/3]).
%% API Function Exports
-export([start_link/2]).
-export([info/1]).
%% gen_server Function Exports
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-import(proplists, [get_value/3]).
-record(state, {name, size, type}).
%%--------------------------------------------------------------------

View File

@ -21,9 +21,9 @@
-export([start_link/0]).
%% API
-export([start_pool/3,
stop_pool/1,
get_pool/1
-export([ start_pool/3
, stop_pool/1
, get_pool/1
]).
-export([pools/0]).

View File

@ -21,18 +21,30 @@
-export([start_link/4]).
%% API Function Exports
-export([client/1, is_connected/1, set_reconnect_callback/2]).
%% gen_server Function Exports
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
-export([ client/1
, is_connected/1
, set_reconnect_callback/2
]).
-record(state, {pool, id, client, mod, on_reconnect, on_disconnect, supervisees = [], opts}).
%% gen_server Function Exports
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-record(state, {
pool :: ecpool:poo_name(),
id :: pos_integer(),
client :: pid() | undefined,
mod :: module(),
on_reconnect :: ecpool:reconn_callback(),
on_disconnect :: ecpool:reconn_callback(),
supervisees = [],
opts :: proplists:proplist()
}).
%%--------------------------------------------------------------------
%% Callback
@ -84,21 +96,27 @@ set_reconnect_callback(Pid, OnReconnect) ->
init([Pool, Id, Mod, Opts]) ->
process_flag(trap_exit, true),
State = #state{pool = Pool, id = Id, mod = Mod, opts = Opts,
State = #state{pool = Pool,
id = Id,
mod = Mod,
opts = Opts,
on_reconnect = proplists:get_value(on_reconnect, Opts),
on_disconnect = proplists:get_value(on_disconnect, Opts)},
on_disconnect = proplists:get_value(on_disconnect, Opts)
},
case connect_internal(State) of
{ok, NewState} ->
{ok, NewState} ->
gproc_pool:connect_worker(ecpool:name(Pool), {Pool, Id}),
{ok, NewState};
{error, Error} ->
{stop, Error}
{error, Error} -> {stop, Error}
end.
handle_call(is_connected, _From, State = #state{client = Client}) ->
handle_call(is_connected, _From, State = #state{client = Client}) when is_pid(Client) ->
IsAlive = Client =/= undefined andalso is_process_alive(Client),
{reply, IsAlive, State};
handle_call(is_connected, _From, State = #state{client = Client}) ->
{reply, Client =/= undefined, State};
handle_call(client, _From, State = #state{client = undefined}) ->
{reply, {error, disconnected}, State};
@ -165,10 +183,6 @@ connopts([{pool_type, _}|Opts], Acc) ->
connopts(Opts, Acc);
connopts([{auto_reconnect, _}|Opts], Acc) ->
connopts(Opts, Acc);
connopts([{bind, _}|Opts], Acc) ->
connopts(Opts, Acc);
connopts([{unbind, _}|Opts], Acc) ->
connopts(Opts, Acc);
connopts([Opt|Opts], Acc) ->
connopts(Opts, [Opt|Acc]).
@ -203,3 +217,4 @@ connect_internal(State) ->
catch
_C:Reason -> {error, Reason}
end.

View File

@ -17,26 +17,28 @@
-module(ecpool_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-define(POOL, test_pool).
-define(POOL_OPTS, [
%% schedulers number
{pool_size, 10},
%% round-robbin | random | hash
{pool_type, random},
%% false | pos_integer()
{auto_reconnect, false},
-define(POOL_OPTS,
[%% schedulers number
{pool_size, 10},
%% round-robbin | random | hash
{pool_type, random},
%% false | pos_integer()
{auto_reconnect, false},
%% DB Parameters
{host, "localhost"},
{port, 5432},
{username, "feng"},
{password, ""},
{database, "mqtt"},
{encoding, utf8}]).
%% DB Parameters
{host, "localhost"},
{port, 5432},
{username, "feng"},
{password, ""},
{database, "mqtt"},
{encoding, utf8}
]).
all() ->
[{group, all}].
@ -46,7 +48,9 @@ groups() ->
[t_start_pool,
t_start_sup_pool,
t_restart_client,
t_reconnect_client
t_reconnect_client,
t_multiprocess_client,
t_multiprocess_client_not_restart
]}].
init_per_suite(Config) ->
@ -107,3 +111,53 @@ t_reconnect_client(_Config) ->
timer:sleep(1100),
?assertEqual(4, length(ecpool:workers(?POOL))).
t_multiprocess_client(_Config) ->
ecpool:start_pool(?POOL, test_client, [{pool_size, 4}, {auto_reconnect, 1}, {multiprocess, true}]),
?assertEqual(4, length(ecpool:workers(?POOL))),
%% check client status
[begin
true = ecpool_worker:is_connected(Worker),
{ok, _C = {Pid1, Pid2}} = ecpool_worker:client(Worker),
true = is_process_alive(Pid1),
true = is_process_alive(Pid2)
end || {_WorkerName, Worker} <- ecpool:workers(?POOL)],
%% stop one of the clients
Client = ecpool:with_client(?POOL,
fun(C = {P1, _P2}) ->
test_client:stop(P1, normal), C
end),
ct:sleep(1500),
%% check that client is reconnected
[begin
true = ecpool_worker:is_connected(Worker),
{ok, Client2 = {Pid3, Pid4}} = ecpool_worker:client(Worker),
true = is_process_alive(Pid3),
true = is_process_alive(Pid4),
?assert(Client2 =/= Client)
end || {_WorkerName, Worker} <- ecpool:workers(?POOL)].
t_multiprocess_client_not_restart(_Config) ->
ecpool:start_pool(?POOL, test_client, [{pool_size, 4}, {auto_reconnect, false}, {multiprocess, true}]),
?assertEqual(4, length(ecpool:workers(?POOL))),
%% check client status
[begin
true = ecpool_worker:is_connected(Worker),
{ok, {Pid1, Pid2}} = ecpool_worker:client(Worker),
true = is_process_alive(Pid1),
true = is_process_alive(Pid2)
end || {_WorkerName, Worker} <- ecpool:workers(?POOL)],
%% stop all the clients
[begin
true = ecpool_worker:is_connected(Worker),
{ok, {Pid1, _Pid2}} = ecpool_worker:client(Worker),
test_client:stop(Pid1, normal)
end || {_WorkerName, Worker} <- ecpool:workers(?POOL)],
ct:sleep(1500),
%% check that all the clients are disconnected and not restarted.
[begin
?assertEqual(false, ecpool_worker:is_connected(Worker))
end || {_WorkerName, Worker} <- ecpool:workers(?POOL)].

View File

@ -35,7 +35,14 @@
]).
connect(Opts) ->
gen_server:start_link(?MODULE, [Opts], []).
case proplists:get_value(multiprocess, Opts, false) of
true ->
{ok, Pid1} = gen_server:start_link(?MODULE, [Opts], []),
{ok, Pid2} = gen_server:start_link(?MODULE, [Opts], []),
{ok, {Pid1, Pid2}, #{supervisees => [Pid1, Pid2]}};
false ->
gen_server:start_link(?MODULE, [Opts], [])
end.
stop(Pid, Reason) ->
gen_server:call(Pid, {stop, Reason}).