From 36fb68e5ed2ca5765069f744393b093e90e3ea8d Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 26 Aug 2019 17:08:22 +0800 Subject: [PATCH 1/2] Update 'ecpool.app.src' and adopt new exports syntax --- src/ecpool.app.src | 9 ++++---- src/ecpool.erl | 33 +++++++++++++++++----------- src/ecpool_pool.erl | 16 +++++++------- src/ecpool_sup.erl | 6 +++--- src/ecpool_worker.erl | 50 +++++++++++++++++++++++++++---------------- 5 files changed, 67 insertions(+), 47 deletions(-) diff --git a/src/ecpool.app.src b/src/ecpool.app.src index 93b9f8293..daea2176b 100644 --- a/src/ecpool.app.src +++ b/src/ecpool.app.src @@ -1,15 +1,14 @@ {application, ecpool, - [ - {description, "Erlang Client/Connection Pool"}, + [{description, "Erlang Client/Connection Pool"}, {vsn, "git"}, {registered, []}, - {applications, [ - kernel, + {applications, [kernel, stdlib, gproc ]}, - {mod, { ecpool_app, []}}, + {mod, {ecpool_app, []}}, {env, []}, {licenses,["Apache-2.0"]}, + {maintainers, ["Feng Lee "]}, {links,[{"Github","https://github.com/emqx/ecpool"}]} ]}. diff --git a/src/ecpool.erl b/src/ecpool.erl index 3b26e63f0..2d73ad002 100644 --- a/src/ecpool.erl +++ b/src/ecpool.erl @@ -16,27 +16,36 @@ -module(ecpool). --export([pool_spec/4, start_pool/3, start_sup_pool/3, stop_sup_pool/1, - get_client/1, get_client/2, with_client/2, with_client/3, - set_reconnect_callback/2, - name/1, workers/1]). +-export([ pool_spec/4 + , start_pool/3 + , start_sup_pool/3 + , stop_sup_pool/1 + , get_client/1 + , get_client/2 + , with_client/2 + , with_client/3 + , name/1 + , workers/1 + ]). --export_type([pool_name/0, - pool_type/0, - option/0 +-export([set_reconnect_callback/2]). + +-export_type([ pool_name/0 + , pool_type/0 + , option/0 ]). --type pool_name() :: term(). +-type(pool_name() :: term()). --type pool_type() :: random | hash | round_robin. +-type(pool_type() :: random | hash | round_robin). --type reconn_callback() :: {fun((pid()) -> term())}. +-type(reconn_callback() :: {fun((pid()) -> term())}). --type option() :: {pool_size, pos_integer()} +-type(option() :: {pool_size, pos_integer()} | {pool_type, pool_type()} | {auto_reconnect, false | pos_integer()} | {on_reconnect, reconn_callback()} - | tuple(). + | tuple()). pool_spec(ChildId, Pool, Mod, Opts) -> #{id => ChildId, diff --git a/src/ecpool_pool.erl b/src/ecpool_pool.erl index 25e2e6ea4..2d44148f9 100644 --- a/src/ecpool_pool.erl +++ b/src/ecpool_pool.erl @@ -18,22 +18,22 @@ -behaviour(gen_server). --import(proplists, [get_value/3]). - %% API Function Exports -export([start_link/2]). -export([info/1]). %% gen_server Function Exports --export([init/1, - handle_call/3, - handle_cast/2, - handle_info/2, - terminate/2, - code_change/3 +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 ]). +-import(proplists, [get_value/3]). + -record(state, {name, size, type}). %%-------------------------------------------------------------------- diff --git a/src/ecpool_sup.erl b/src/ecpool_sup.erl index c664dba97..b53954433 100644 --- a/src/ecpool_sup.erl +++ b/src/ecpool_sup.erl @@ -21,9 +21,9 @@ -export([start_link/0]). %% API --export([start_pool/3, - stop_pool/1, - get_pool/1 +-export([ start_pool/3 + , stop_pool/1 + , get_pool/1 ]). -export([pools/0]). diff --git a/src/ecpool_worker.erl b/src/ecpool_worker.erl index 1ed7a05ef..760fea0de 100644 --- a/src/ecpool_worker.erl +++ b/src/ecpool_worker.erl @@ -21,18 +21,30 @@ -export([start_link/4]). %% API Function Exports --export([client/1, is_connected/1, set_reconnect_callback/2]). - -%% gen_server Function Exports --export([init/1, - handle_call/3, - handle_cast/2, - handle_info/2, - terminate/2, - code_change/3 +-export([ client/1 + , is_connected/1 + , set_reconnect_callback/2 ]). --record(state, {pool, id, client, mod, on_reconnect, on_disconnect, supervisees = [], opts}). +%% gen_server Function Exports +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). + +-record(state, { + pool :: ecpool:poo_name(), + id :: pos_integer(), + client :: pid(), + mod :: module(), + on_reconnect :: ecpool:reconn_callback(), + on_disconnect :: ecpool:reconn_callback(), + supervisees = [], + opts :: proplists:proplist() + }). %%-------------------------------------------------------------------- %% Callback @@ -84,15 +96,18 @@ set_reconnect_callback(Pid, OnReconnect) -> init([Pool, Id, Mod, Opts]) -> process_flag(trap_exit, true), - State = #state{pool = Pool, id = Id, mod = Mod, opts = Opts, + State = #state{pool = Pool, + id = Id, + mod = Mod, + opts = Opts, on_reconnect = proplists:get_value(on_reconnect, Opts), - on_disconnect = proplists:get_value(on_disconnect, Opts)}, + on_disconnect = proplists:get_value(on_disconnect, Opts) + }, case connect_internal(State) of - {ok, NewState} -> + {ok, NewState} -> gproc_pool:connect_worker(ecpool:name(Pool), {Pool, Id}), {ok, NewState}; - {error, Error} -> - {stop, Error} + {error, Error} -> {stop, Error} end. handle_call(is_connected, _From, State = #state{client = Client}) -> @@ -165,10 +180,6 @@ connopts([{pool_type, _}|Opts], Acc) -> connopts(Opts, Acc); connopts([{auto_reconnect, _}|Opts], Acc) -> connopts(Opts, Acc); -connopts([{bind, _}|Opts], Acc) -> - connopts(Opts, Acc); -connopts([{unbind, _}|Opts], Acc) -> - connopts(Opts, Acc); connopts([Opt|Opts], Acc) -> connopts(Opts, [Opt|Acc]). @@ -203,3 +214,4 @@ connect_internal(State) -> catch _C:Reason -> {error, Reason} end. + From ab7ec8cfd1b5948f89786d3596c8f7c3065a63df Mon Sep 17 00:00:00 2001 From: Gilbert Date: Fri, 6 Sep 2019 09:28:00 +0800 Subject: [PATCH 2/2] Revert wrong fix (#16) Revert wrong fix of ecpool_worker:isconnected --- src/ecpool_worker.erl | 7 ++-- test/ecpool_SUITE.erl | 84 +++++++++++++++++++++++++++++++++++-------- test/test_client.erl | 9 ++++- 3 files changed, 82 insertions(+), 18 deletions(-) diff --git a/src/ecpool_worker.erl b/src/ecpool_worker.erl index 760fea0de..3e919bd8b 100644 --- a/src/ecpool_worker.erl +++ b/src/ecpool_worker.erl @@ -38,7 +38,7 @@ -record(state, { pool :: ecpool:poo_name(), id :: pos_integer(), - client :: pid(), + client :: pid() | undefined, mod :: module(), on_reconnect :: ecpool:reconn_callback(), on_disconnect :: ecpool:reconn_callback(), @@ -110,10 +110,13 @@ init([Pool, Id, Mod, Opts]) -> {error, Error} -> {stop, Error} end. -handle_call(is_connected, _From, State = #state{client = Client}) -> +handle_call(is_connected, _From, State = #state{client = Client}) when is_pid(Client) -> IsAlive = Client =/= undefined andalso is_process_alive(Client), {reply, IsAlive, State}; +handle_call(is_connected, _From, State = #state{client = Client}) -> + {reply, Client =/= undefined, State}; + handle_call(client, _From, State = #state{client = undefined}) -> {reply, {error, disconnected}, State}; diff --git a/test/ecpool_SUITE.erl b/test/ecpool_SUITE.erl index 9b8a444cb..b9115ace0 100644 --- a/test/ecpool_SUITE.erl +++ b/test/ecpool_SUITE.erl @@ -17,26 +17,28 @@ -module(ecpool_SUITE). -compile(export_all). +-compile(nowarn_export_all). -include_lib("eunit/include/eunit.hrl"). -define(POOL, test_pool). --define(POOL_OPTS, [ - %% schedulers number - {pool_size, 10}, - %% round-robbin | random | hash - {pool_type, random}, - %% false | pos_integer() - {auto_reconnect, false}, +-define(POOL_OPTS, + [%% schedulers number + {pool_size, 10}, + %% round-robbin | random | hash + {pool_type, random}, + %% false | pos_integer() + {auto_reconnect, false}, - %% DB Parameters - {host, "localhost"}, - {port, 5432}, - {username, "feng"}, - {password, ""}, - {database, "mqtt"}, - {encoding, utf8}]). + %% DB Parameters + {host, "localhost"}, + {port, 5432}, + {username, "feng"}, + {password, ""}, + {database, "mqtt"}, + {encoding, utf8} + ]). all() -> [{group, all}]. @@ -46,7 +48,9 @@ groups() -> [t_start_pool, t_start_sup_pool, t_restart_client, - t_reconnect_client + t_reconnect_client, + t_multiprocess_client, + t_multiprocess_client_not_restart ]}]. init_per_suite(Config) -> @@ -107,3 +111,53 @@ t_reconnect_client(_Config) -> timer:sleep(1100), ?assertEqual(4, length(ecpool:workers(?POOL))). +t_multiprocess_client(_Config) -> + ecpool:start_pool(?POOL, test_client, [{pool_size, 4}, {auto_reconnect, 1}, {multiprocess, true}]), + ?assertEqual(4, length(ecpool:workers(?POOL))), + %% check client status + [begin + true = ecpool_worker:is_connected(Worker), + {ok, _C = {Pid1, Pid2}} = ecpool_worker:client(Worker), + true = is_process_alive(Pid1), + true = is_process_alive(Pid2) + end || {_WorkerName, Worker} <- ecpool:workers(?POOL)], + + %% stop one of the clients + Client = ecpool:with_client(?POOL, + fun(C = {P1, _P2}) -> + test_client:stop(P1, normal), C + end), + ct:sleep(1500), + + %% check that client is reconnected + [begin + true = ecpool_worker:is_connected(Worker), + {ok, Client2 = {Pid3, Pid4}} = ecpool_worker:client(Worker), + true = is_process_alive(Pid3), + true = is_process_alive(Pid4), + ?assert(Client2 =/= Client) + end || {_WorkerName, Worker} <- ecpool:workers(?POOL)]. + +t_multiprocess_client_not_restart(_Config) -> + ecpool:start_pool(?POOL, test_client, [{pool_size, 4}, {auto_reconnect, false}, {multiprocess, true}]), + ?assertEqual(4, length(ecpool:workers(?POOL))), + %% check client status + [begin + true = ecpool_worker:is_connected(Worker), + {ok, {Pid1, Pid2}} = ecpool_worker:client(Worker), + true = is_process_alive(Pid1), + true = is_process_alive(Pid2) + end || {_WorkerName, Worker} <- ecpool:workers(?POOL)], + + %% stop all the clients + [begin + true = ecpool_worker:is_connected(Worker), + {ok, {Pid1, _Pid2}} = ecpool_worker:client(Worker), + test_client:stop(Pid1, normal) + end || {_WorkerName, Worker} <- ecpool:workers(?POOL)], + ct:sleep(1500), + + %% check that all the clients are disconnected and not restarted. + [begin + ?assertEqual(false, ecpool_worker:is_connected(Worker)) + end || {_WorkerName, Worker} <- ecpool:workers(?POOL)]. diff --git a/test/test_client.erl b/test/test_client.erl index 272c82ddf..9da63753e 100644 --- a/test/test_client.erl +++ b/test/test_client.erl @@ -35,7 +35,14 @@ ]). connect(Opts) -> - gen_server:start_link(?MODULE, [Opts], []). + case proplists:get_value(multiprocess, Opts, false) of + true -> + {ok, Pid1} = gen_server:start_link(?MODULE, [Opts], []), + {ok, Pid2} = gen_server:start_link(?MODULE, [Opts], []), + {ok, {Pid1, Pid2}, #{supervisees => [Pid1, Pid2]}}; + false -> + gen_server:start_link(?MODULE, [Opts], []) + end. stop(Pid, Reason) -> gen_server:call(Pid, {stop, Reason}).