parent
363afd9aa2
commit
ab7ec8cfd1
|
@ -38,7 +38,7 @@
|
|||
-record(state, {
|
||||
pool :: ecpool:poo_name(),
|
||||
id :: pos_integer(),
|
||||
client :: pid(),
|
||||
client :: pid() | undefined,
|
||||
mod :: module(),
|
||||
on_reconnect :: ecpool:reconn_callback(),
|
||||
on_disconnect :: ecpool:reconn_callback(),
|
||||
|
@ -110,10 +110,13 @@ init([Pool, Id, Mod, Opts]) ->
|
|||
{error, Error} -> {stop, Error}
|
||||
end.
|
||||
|
||||
handle_call(is_connected, _From, State = #state{client = Client}) ->
|
||||
handle_call(is_connected, _From, State = #state{client = Client}) when is_pid(Client) ->
|
||||
IsAlive = Client =/= undefined andalso is_process_alive(Client),
|
||||
{reply, IsAlive, State};
|
||||
|
||||
handle_call(is_connected, _From, State = #state{client = Client}) ->
|
||||
{reply, Client =/= undefined, State};
|
||||
|
||||
handle_call(client, _From, State = #state{client = undefined}) ->
|
||||
{reply, {error, disconnected}, State};
|
||||
|
||||
|
|
|
@ -17,13 +17,14 @@
|
|||
-module(ecpool_SUITE).
|
||||
|
||||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
-define(POOL, test_pool).
|
||||
|
||||
-define(POOL_OPTS, [
|
||||
%% schedulers number
|
||||
-define(POOL_OPTS,
|
||||
[%% schedulers number
|
||||
{pool_size, 10},
|
||||
%% round-robbin | random | hash
|
||||
{pool_type, random},
|
||||
|
@ -36,7 +37,8 @@
|
|||
{username, "feng"},
|
||||
{password, ""},
|
||||
{database, "mqtt"},
|
||||
{encoding, utf8}]).
|
||||
{encoding, utf8}
|
||||
]).
|
||||
|
||||
all() ->
|
||||
[{group, all}].
|
||||
|
@ -46,7 +48,9 @@ groups() ->
|
|||
[t_start_pool,
|
||||
t_start_sup_pool,
|
||||
t_restart_client,
|
||||
t_reconnect_client
|
||||
t_reconnect_client,
|
||||
t_multiprocess_client,
|
||||
t_multiprocess_client_not_restart
|
||||
]}].
|
||||
|
||||
init_per_suite(Config) ->
|
||||
|
@ -107,3 +111,53 @@ t_reconnect_client(_Config) ->
|
|||
timer:sleep(1100),
|
||||
?assertEqual(4, length(ecpool:workers(?POOL))).
|
||||
|
||||
t_multiprocess_client(_Config) ->
|
||||
ecpool:start_pool(?POOL, test_client, [{pool_size, 4}, {auto_reconnect, 1}, {multiprocess, true}]),
|
||||
?assertEqual(4, length(ecpool:workers(?POOL))),
|
||||
%% check client status
|
||||
[begin
|
||||
true = ecpool_worker:is_connected(Worker),
|
||||
{ok, _C = {Pid1, Pid2}} = ecpool_worker:client(Worker),
|
||||
true = is_process_alive(Pid1),
|
||||
true = is_process_alive(Pid2)
|
||||
end || {_WorkerName, Worker} <- ecpool:workers(?POOL)],
|
||||
|
||||
%% stop one of the clients
|
||||
Client = ecpool:with_client(?POOL,
|
||||
fun(C = {P1, _P2}) ->
|
||||
test_client:stop(P1, normal), C
|
||||
end),
|
||||
ct:sleep(1500),
|
||||
|
||||
%% check that client is reconnected
|
||||
[begin
|
||||
true = ecpool_worker:is_connected(Worker),
|
||||
{ok, Client2 = {Pid3, Pid4}} = ecpool_worker:client(Worker),
|
||||
true = is_process_alive(Pid3),
|
||||
true = is_process_alive(Pid4),
|
||||
?assert(Client2 =/= Client)
|
||||
end || {_WorkerName, Worker} <- ecpool:workers(?POOL)].
|
||||
|
||||
t_multiprocess_client_not_restart(_Config) ->
|
||||
ecpool:start_pool(?POOL, test_client, [{pool_size, 4}, {auto_reconnect, false}, {multiprocess, true}]),
|
||||
?assertEqual(4, length(ecpool:workers(?POOL))),
|
||||
%% check client status
|
||||
[begin
|
||||
true = ecpool_worker:is_connected(Worker),
|
||||
{ok, {Pid1, Pid2}} = ecpool_worker:client(Worker),
|
||||
true = is_process_alive(Pid1),
|
||||
true = is_process_alive(Pid2)
|
||||
end || {_WorkerName, Worker} <- ecpool:workers(?POOL)],
|
||||
|
||||
%% stop all the clients
|
||||
[begin
|
||||
true = ecpool_worker:is_connected(Worker),
|
||||
{ok, {Pid1, _Pid2}} = ecpool_worker:client(Worker),
|
||||
test_client:stop(Pid1, normal)
|
||||
end || {_WorkerName, Worker} <- ecpool:workers(?POOL)],
|
||||
ct:sleep(1500),
|
||||
|
||||
%% check that all the clients are disconnected and not restarted.
|
||||
[begin
|
||||
?assertEqual(false, ecpool_worker:is_connected(Worker))
|
||||
end || {_WorkerName, Worker} <- ecpool:workers(?POOL)].
|
||||
|
|
|
@ -35,7 +35,14 @@
|
|||
]).
|
||||
|
||||
connect(Opts) ->
|
||||
gen_server:start_link(?MODULE, [Opts], []).
|
||||
case proplists:get_value(multiprocess, Opts, false) of
|
||||
true ->
|
||||
{ok, Pid1} = gen_server:start_link(?MODULE, [Opts], []),
|
||||
{ok, Pid2} = gen_server:start_link(?MODULE, [Opts], []),
|
||||
{ok, {Pid1, Pid2}, #{supervisees => [Pid1, Pid2]}};
|
||||
false ->
|
||||
gen_server:start_link(?MODULE, [Opts], [])
|
||||
end.
|
||||
|
||||
stop(Pid, Reason) ->
|
||||
gen_server:call(Pid, {stop, Reason}).
|
||||
|
|
Loading…
Reference in New Issue