Merge pull request #13 from emqx/develop

Update README and Makefile
This commit is contained in:
Feng Lee 2019-08-26 16:42:26 +08:00 committed by GitHub
commit 448820e709
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 389 additions and 403 deletions

3
.gitignore vendored
View File

@ -10,4 +10,5 @@ rel/example_project
.rebar .rebar
tmp tmp
_build/ _build/
rebar rebar.lock
logs/

20
.travis.yml Normal file
View File

@ -0,0 +1,20 @@
language: erlang
otp_release:
- 22.0
- 21.3
before_install:
- git clone https://github.com/erlang/rebar3.git; cd rebar3; ./bootstrap; sudo mv rebar3 /usr/local/bin/; cd ..
script:
- make compile
- make xref
- make ct
- make cover
- make dialyzer
after_success:
- make coveralls
sudo: false

20
CHANGES
View File

@ -1,20 +0,0 @@
0.2.1-beta (2016/04/13)
-----------------------
Update Copyright
Spec Syntax
Catch exceptions when reconnecting
0.2-beta (2016/01/02)
---------------------
Eunit tests and first release.
0.1-beta (2015/12/30)
---------------------
First commit.

View File

@ -1,7 +1,6 @@
.PHONY: deps test .PHONY: deps test
BASE_DIR = $(shell pwd) REBAR=rebar3
REBAR=$(BASE_DIR)/rebar
all: deps compile xref all: deps compile xref
@ -12,25 +11,19 @@ compile:
@$(REBAR) compile @$(REBAR) compile
xref: xref:
@$(REBAR) xref skip_deps=true @$(REBAR) xref
clean: clean:
@$(REBAR) clean @$(REBAR) clean
test: ct:
@$(REBAR) skip_deps=true eunit @$(REBAR) ct
cover:
@$(REBAR) cover
edoc: edoc:
@$(REBAR) doc @$(REBAR) edoc
PLT = $(BASE_DIR)/.ecpool_dialyzer.plt
APPS = erts kernel stdlib sasl crypto syntax_tools ssl public_key mnesia inets compiler
check_plt: compile
dialyzer --check_plt --plt $(PLT) --apps $(APPS) deps/*/ebin ebin
build_plt: compile
dialyzer --build_plt --output_plt $(PLT) --apps $(APPS) deps/*/ebin ebin
dialyzer: compile dialyzer: compile
dialyzer -Wno_return --plt $(PLT) deps/*/ebin ebin @$(REBAR) dialyzer

View File

@ -1,16 +1,15 @@
# Erlang Connection/Client Pool # Erlang Connection/Client Pool
ecpool is different with worker-pool libraries in that it is designed to pool connection/clients to server or database. `ecpool` is different with worker-pool libraries in that it is designed to pool connection/clients to server or database.
ecpool tries to avoid the erlang application crash when the server or database going down.
`ecpool` tries to avoid the erlang application crash when the server or database going down.
## Overview ## Overview
A pool worker to manage/monitor the client to server or database: A pool worker to manage/monitor the connection to server or database:
``` ```
PoolWorker -> Client -> DB PoolWorker -> Conn -> DB
``` ```
Use client: Use client:
@ -19,7 +18,6 @@ Use client:
ecpool:with_client(Pool, fun(Client) -> call(Client) end). ecpool:with_client(Pool, fun(Client) -> call(Client) end).
``` ```
## Usage ## Usage
### Start the pool ### Start the pool
@ -47,7 +45,6 @@ PgOpts = [%% Pool Size
{encoding, utf8}], {encoding, utf8}],
ecpool:start_pool(epgsql_pool, epgsql_pool_client, PgOpts) ecpool:start_pool(epgsql_pool, epgsql_pool_client, PgOpts)
``` ```
### The Callback Module ### The Callback Module
@ -77,7 +74,7 @@ squery(Pool, Sql) ->
## Design ## Design
The ecpool supervisor tree: The `ecpool` supervisor tree:
``` ```
pool_sup[one_for_all supervisor] pool_sup[one_for_all supervisor]
@ -92,9 +89,8 @@ pool_sup[one_for_all supervisor]
Feng Lee <feng@emqx.io> Feng Lee <feng@emqx.io>
## License ## License
The MIT License (MIT) The Apache License Version 2.0

8
TODO
View File

@ -1,8 +0,0 @@
worker -> ecql
integrage with ecql
management api
unit tests

View File

@ -1,11 +1,10 @@
%% -*- mode: erlang;erlang-indent-level: 4;indent-tabs-mode: nil -*- %% -*- mode: erlang;erlang-indent-level: 4;indent-tabs-mode: nil -*-
%% ex: ts=4 sw=4 ft=erlang et %% ex: ts=4 sw=4 ft=erlang et
{require_min_otp_vsn, "R17"}. {require_min_otp_vsn, "R20"}.
%warnings_as_errors, warn_untyped_record, %warnings_as_errors, warn_untyped_record,
{erl_opts, [ {erl_opts, [warn_export_all,
warn_export_all,
warn_unused_import, warn_unused_import,
{i, "include"}, {i, "include"},
{src_dirs, ["src"]} {src_dirs, ["src"]}

View File

@ -1,4 +1,6 @@
[{<<"gproc">>, {"1.1.0",
{git,"git://github.com/uwiger/gproc.git", [{<<"gproc">>,{pkg,<<"gproc">>,<<"0.8.0">>},0}]}.
{ref,"b7b0748d7adaf9b2243921d7e9cf320690eb0544"}}, [
0}]. {pkg_hash,[
{<<"gproc">>, <<"CEA02C578589C61E5341FCE149EA36CCEF236CC2ECAC8691FBA408E7EA77EC2F">>}]}
].

View File

@ -1,28 +1,18 @@
%%%----------------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2015-2016 Feng Lee <feng@emqtt.io>. %% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%% %%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy %% Licensed under the Apache License, Version 2.0 (the "License");
%%% of this software and associated documentation files (the "Software"), to deal %% you may not use this file except in compliance with the License.
%%% in the Software without restriction, including without limitation the rights %% You may obtain a copy of the License at
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell %%
%%% copies of the Software, and to permit persons to whom the Software is %% http://www.apache.org/licenses/LICENSE-2.0
%%% furnished to do so, subject to the following conditions: %%
%%% %% Unless required by applicable law or agreed to in writing, software
%%% The above copyright notice and this permission notice shall be included in all %% distributed under the License is distributed on an "AS IS" BASIS,
%%% copies or substantial portions of the Software. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%%% %% See the License for the specific language governing permissions and
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR %% limitations under the License.
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, %%--------------------------------------------------------------------
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc ecpool Main API.
%%%
%%% @author Feng Lee <feng@emqtt.io>
%%%-----------------------------------------------------------------------------
-module(ecpool). -module(ecpool).
@ -31,6 +21,13 @@
set_reconnect_callback/2, set_reconnect_callback/2,
name/1, workers/1]). name/1, workers/1]).
-export_type([pool_name/0,
pool_type/0,
option/0
]).
-type pool_name() :: term().
-type pool_type() :: random | hash | round_robin. -type pool_type() :: random | hash | round_robin.
-type reconn_callback() :: {fun((pid()) -> term())}. -type reconn_callback() :: {fun((pid()) -> term())}.
@ -42,11 +39,15 @@
| tuple(). | tuple().
pool_spec(ChildId, Pool, Mod, Opts) -> pool_spec(ChildId, Pool, Mod, Opts) ->
{ChildId, {?MODULE, start_pool, [Pool, Mod, Opts]}, #{id => ChildId,
permanent, 5000, supervisor, [ecpool_pool_sup]}. start => {?MODULE, start_pool, [Pool, Mod, Opts]},
restart => permanent,
shutdown => 5000,
type => supervisor,
modules => [ecpool_pool_sup]}.
%% @doc Start the pool %% @doc Start the pool sup.
-spec(start_pool(atom(), atom(), [option()]) -> {ok, pid()} | {error, any()}). -spec(start_pool(atom(), atom(), [option()]) -> {ok, pid()} | {error, term()}).
start_pool(Pool, Mod, Opts) when is_atom(Pool) -> start_pool(Pool, Mod, Opts) when is_atom(Pool) ->
ecpool_pool_sup:start_link(Pool, Mod, Opts). ecpool_pool_sup:start_link(Pool, Mod, Opts).
@ -75,24 +76,26 @@ set_reconnect_callback(Pool, Callback) ->
ok. ok.
%% @doc Call the fun with client/connection %% @doc Call the fun with client/connection
-spec(with_client(atom(), fun((Client :: pid()) -> any())) -> any()). -spec(with_client(atom(), fun((Client :: pid()) -> any())) -> no_return()).
with_client(Pool, Fun) when is_atom(Pool) -> with_client(Pool, Fun) when is_atom(Pool) ->
with_worker(gproc_pool:pick_worker(name(Pool)), Fun). with_worker(gproc_pool:pick_worker(name(Pool)), Fun).
%% @doc Call the fun with client/connection %% @doc Call the fun with client/connection
-spec(with_client(atom(), any(), fun((Client :: pid()) -> any())) -> any()). -spec(with_client(atom(), any(), fun((Client :: pid()) -> term())) -> no_return()).
with_client(Pool, Key, Fun) when is_atom(Pool) -> with_client(Pool, Key, Fun) when is_atom(Pool) ->
with_worker(gproc_pool:pick_worker(name(Pool), Key), Fun). with_worker(gproc_pool:pick_worker(name(Pool), Key), Fun).
-spec(with_worker(Worker :: pid(), fun((Client :: pid()) -> any())) -> no_return()).
with_worker(Worker, Fun) -> with_worker(Worker, Fun) ->
case ecpool_worker:client(Worker) of case ecpool_worker:client(Worker) of
{ok, Client} -> Fun(Client); {ok, Client} -> Fun(Client);
{error, Reason} -> {error, Reason} {error, Reason} -> {error, Reason}
end. end.
%% @doc Pool workers
workers(Pool) ->
gproc_pool:active_workers(name(Pool)).
%% @doc ecpool name %% @doc ecpool name
name(Pool) -> {?MODULE, Pool}. name(Pool) -> {?MODULE, Pool}.
%% @doc pool workers
workers(Pool) -> gproc_pool:active_workers(name(Pool)).

View File

@ -1,28 +1,18 @@
%%%----------------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2015-2016 Feng Lee <feng@emqtt.io>. %% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%% %%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy %% Licensed under the Apache License, Version 2.0 (the "License");
%%% of this software and associated documentation files (the "Software"), to deal %% you may not use this file except in compliance with the License.
%%% in the Software without restriction, including without limitation the rights %% You may obtain a copy of the License at
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell %%
%%% copies of the Software, and to permit persons to whom the Software is %% http://www.apache.org/licenses/LICENSE-2.0
%%% furnished to do so, subject to the following conditions: %%
%%% %% Unless required by applicable law or agreed to in writing, software
%%% The above copyright notice and this permission notice shall be included in all %% distributed under the License is distributed on an "AS IS" BASIS,
%%% copies or substantial portions of the Software. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%%% %% See the License for the specific language governing permissions and
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR %% limitations under the License.
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, %%--------------------------------------------------------------------
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc Erlang Connection/Client Pool Application.
%%%
%%% @author Feng Lee <feng@emqtt.io>
%%%-----------------------------------------------------------------------------
-module(ecpool_app). -module(ecpool_app).

View File

@ -1,28 +1,18 @@
%%%----------------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2015-2016 Feng Lee <feng@emqtt.io>. %% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%% %%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy %% Licensed under the Apache License, Version 2.0 (the "License");
%%% of this software and associated documentation files (the "Software"), to deal %% you may not use this file except in compliance with the License.
%%% in the Software without restriction, including without limitation the rights %% You may obtain a copy of the License at
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell %%
%%% copies of the Software, and to permit persons to whom the Software is %% http://www.apache.org/licenses/LICENSE-2.0
%%% furnished to do so, subject to the following conditions: %%
%%% %% Unless required by applicable law or agreed to in writing, software
%%% The above copyright notice and this permission notice shall be included in all %% distributed under the License is distributed on an "AS IS" BASIS,
%%% copies or substantial portions of the Software. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%%% %% See the License for the specific language governing permissions and
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR %% limitations under the License.
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, %%--------------------------------------------------------------------
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc Wrap gproc_pool.
%%%
%%% @author Feng Lee <feng@emqtt.io>
%%%-----------------------------------------------------------------------------
-module(ecpool_pool). -module(ecpool_pool).
@ -31,39 +21,47 @@
-import(proplists, [get_value/3]). -import(proplists, [get_value/3]).
%% API Function Exports %% API Function Exports
-export([start_link/2, info/1]). -export([start_link/2]).
-export([info/1]).
%% ------------------------------------------------------------------
%% gen_server Function Exports %% gen_server Function Exports
%% ------------------------------------------------------------------ -export([init/1,
handle_call/3,
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, handle_cast/2,
terminate/2, code_change/3]). handle_info/2,
terminate/2,
code_change/3
]).
-record(state, {name, size, type}). -record(state, {name, size, type}).
%%%============================================================================= %%--------------------------------------------------------------------
%%% API %% API
%%%============================================================================= %%--------------------------------------------------------------------
-spec(start_link(ecpool:pool_name(), list(ecpool:option()))
-> {ok, pid()} | {error, term()}).
start_link(Pool, Opts) -> start_link(Pool, Opts) ->
gen_server:start_link(?MODULE, [Pool, Opts], []). gen_server:start_link(?MODULE, [Pool, Opts], []).
-spec(info(pid()) -> list()).
info(Pid) -> info(Pid) ->
gen_server:call(Pid, info). gen_server:call(Pid, info).
%%%============================================================================= %%--------------------------------------------------------------------
%%% gen_server callbacks %% gen_server callbacks
%%%============================================================================= %%--------------------------------------------------------------------
init([Pool, Opts]) -> init([Pool, Opts]) ->
Schedulers = erlang:system_info(schedulers), Schedulers = erlang:system_info(schedulers),
PoolSize = get_value(pool_size, Opts, Schedulers), PoolSize = get_value(pool_size, Opts, Schedulers),
PoolType = get_value(pool_type, Opts, random), PoolType = get_value(pool_type, Opts, random),
ensure_pool(ecpool:name(Pool), PoolType, [{size, PoolSize}]), ok = ensure_pool(ecpool:name(Pool), PoolType, [{size, PoolSize}]),
lists:foreach(fun(I) -> ok = lists:foreach(
ensure_pool_worker(ecpool:name(Pool), {Pool, I}, I) fun(I) ->
end, lists:seq(1, PoolSize)), ensure_pool_worker(ecpool:name(Pool), {Pool, I}, I)
end, lists:seq(1, PoolSize)),
{ok, #state{name = Pool, size = PoolSize, type = PoolType}}. {ok, #state{name = Pool, size = PoolSize, type = PoolType}}.
ensure_pool(Pool, Type, Opts) -> ensure_pool(Pool, Type, Opts) ->
@ -79,23 +77,30 @@ ensure_pool_worker(Pool, Name, Slot) ->
end. end.
handle_call(info, _From, State = #state{name = Pool, size = Size, type = Type}) -> handle_call(info, _From, State = #state{name = Pool, size = Size, type = Type}) ->
Info = [{pool_name, Pool}, {pool_size, Size}, Workers = ecpool:workers(Pool),
{pool_type, Type}, {workers, ecpool:workers(Pool)}], Info = [{pool_name, Pool},
{pool_size, Size},
{pool_type, Type},
{workers, Workers}],
{reply, Info, State}; {reply, Info, State};
handle_call(_Request, _From, State) -> handle_call(Req, _From, State) ->
{reply, {error, unexpected_req}, State}. logger:error("[Pool] unexpected request: ~p", [Req]),
{reply, ignored, State}.
handle_cast(_Msg, State) -> handle_cast(Msg, State) ->
logger:error("[Pool] unexpected msg: ~p", [Msg]),
{noreply, State}. {noreply, State}.
handle_info(_Info, State) -> handle_info(Info, State) ->
logger:error("[Pool] unexpected info: ~p", [Info]),
{noreply, State}. {noreply, State}.
terminate(_Reason, #state{name = Pool, size = Size}) -> terminate(_Reason, #state{name = Pool, size = Size}) ->
lists:foreach(fun(I) -> lists:foreach(
gproc_pool:remove_worker(ecpool:name(Pool), {Pool, I}) fun(I) ->
end, lists:seq(1, Size)), gproc_pool:remove_worker(ecpool:name(Pool), {Pool, I})
end, lists:seq(1, Size)),
gproc_pool:delete(ecpool:name(Pool)). gproc_pool:delete(ecpool:name(Pool)).
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->

View File

@ -1,28 +1,18 @@
%%%----------------------------------------------------------------------------- %%--------------------------------------------------------------------
%%% Copyright (c) 2015-2016 Feng Lee <feng@emqtt.io>. %% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%% %%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy %% Licensed under the Apache License, Version 2.0 (the "License");
%%% of this software and associated documentation files (the "Software"), to deal %% you may not use this file except in compliance with the License.
%%% in the Software without restriction, including without limitation the rights %% You may obtain a copy of the License at
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell %%
%%% copies of the Software, and to permit persons to whom the Software is %% http://www.apache.org/licenses/LICENSE-2.0
%%% furnished to do so, subject to the following conditions: %%
%%% %% Unless required by applicable law or agreed to in writing, software
%%% The above copyright notice and this permission notice shall be included in all %% distributed under the License is distributed on an "AS IS" BASIS,
%%% copies or substantial portions of the Software. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%%% %% See the License for the specific language governing permissions and
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR %% limitations under the License.
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, %%--------------------------------------------------------------------
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc ecpool pool supervisor.
%%%
%%% @author Feng Lee <feng@emqtt.io>
%%%-----------------------------------------------------------------------------
-module(ecpool_pool_sup). -module(ecpool_pool_sup).

View File

@ -1,48 +1,52 @@
%%%----------------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2015-2016 Feng Lee <feng@emqtt.io>. %% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%% %%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy %% Licensed under the Apache License, Version 2.0 (the "License");
%%% of this software and associated documentation files (the "Software"), to deal %% you may not use this file except in compliance with the License.
%%% in the Software without restriction, including without limitation the rights %% You may obtain a copy of the License at
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell %%
%%% copies of the Software, and to permit persons to whom the Software is %% http://www.apache.org/licenses/LICENSE-2.0
%%% furnished to do so, subject to the following conditions: %%
%%% %% Unless required by applicable law or agreed to in writing, software
%%% The above copyright notice and this permission notice shall be included in all %% distributed under the License is distributed on an "AS IS" BASIS,
%%% copies or substantial portions of the Software. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%%% %% See the License for the specific language governing permissions and
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR %% limitations under the License.
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, %%--------------------------------------------------------------------
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc ecpool supervisor.
%%%
%%% @author Feng Lee <feng@emqtt.io>
%%%-----------------------------------------------------------------------------
-module(ecpool_sup). -module(ecpool_sup).
-behaviour(supervisor). -behaviour(supervisor).
-export([start_link/0]).
%% API %% API
-export([start_link/0, start_pool/3, stop_pool/1, pools/0, pool/1]). -export([start_pool/3,
stop_pool/1,
get_pool/1
]).
-export([pools/0]).
%% Supervisor callbacks %% Supervisor callbacks
-export([init/1]). -export([init/1]).
%% @doc Start supervisor. %% @doc Start supervisor.
-spec(start_link() -> {ok, pid()} | {error, any()}). -spec(start_link() -> {ok, pid()} | {error, term()}).
start_link() -> start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []). supervisor:start_link({local, ?MODULE}, ?MODULE, []).
%%--------------------------------------------------------------------
%% Start/Stop a pool
%%--------------------------------------------------------------------
%% @doc Start a pool.
-spec(start_pool(atom(), atom(), list(tuple())) -> {ok, pid()} | {error, term()}).
start_pool(Pool, Mod, Opts) when is_atom(Pool) -> start_pool(Pool, Mod, Opts) when is_atom(Pool) ->
supervisor:start_child(?MODULE, pool_spec(Pool, Mod, Opts)). supervisor:start_child(?MODULE, pool_spec(Pool, Mod, Opts)).
-spec(stop_pool(Pool :: atom()) -> ok | {error, any()}). %% @doc Stop a pool.
-spec(stop_pool(Pool :: atom()) -> ok | {error, term()}).
stop_pool(Pool) when is_atom(Pool) -> stop_pool(Pool) when is_atom(Pool) ->
ChildId = child_id(Pool), ChildId = child_id(Pool),
case supervisor:terminate_child(?MODULE, ChildId) of case supervisor:terminate_child(?MODULE, ChildId) of
@ -52,32 +56,35 @@ stop_pool(Pool) when is_atom(Pool) ->
{error, Reason} {error, Reason}
end. end.
%% @doc All Pools supervisored by ecpool_sup. %% @doc Get a pool.
-spec(pools() -> [{atom(), pid()}]). -spec(get_pool(atom()) -> undefined | pid()).
pools() -> get_pool(Pool) when is_atom(Pool) ->
[{Pool, Pid} || {{pool_sup, Pool}, Pid, supervisor, _}
<- supervisor:which_children(?MODULE)].
%% @doc Find a pool.
-spec(pool(atom()) -> undefined | pid()).
pool(Pool) when is_atom(Pool) ->
ChildId = child_id(Pool), ChildId = child_id(Pool),
case [Pid || {Id, Pid, supervisor, _} <- supervisor:which_children(?MODULE), Id =:= ChildId] of case [Pid || {Id, Pid, supervisor, _} <- supervisor:which_children(?MODULE), Id =:= ChildId] of
[] -> undefined; [] -> undefined;
L -> hd(L) L -> hd(L)
end. end.
%%%============================================================================= %% @doc Get All Pools supervisored by the ecpool_sup.
%%% Supervisor callbacks -spec(pools() -> [{atom(), pid()}]).
%%%============================================================================= pools() ->
[{Pool, Pid} || {{pool_sup, Pool}, Pid, supervisor, _}
<- supervisor:which_children(?MODULE)].
%%--------------------------------------------------------------------
%% Supervisor callbacks
%%--------------------------------------------------------------------
init([]) -> init([]) ->
{ok, { {one_for_one, 10, 100}, []} }. {ok, { {one_for_one, 10, 100}, []} }.
pool_spec(Pool, Mod, Opts) -> pool_spec(Pool, Mod, Opts) ->
{child_id(Pool), #{id => child_id(Pool),
{ecpool_pool_sup, start_link, [Pool, Mod, Opts]}, start => {ecpool_pool_sup, start_link, [Pool, Mod, Opts]},
transient, infinity, supervisor, [ecpool_pool_sup]}. restart => transient,
shutdown => infinity,
type => supervisor,
modules => [ecpool_pool_sup]}.
child_id(Pool) -> {pool_sup, Pool}. child_id(Pool) -> {pool_sup, Pool}.

View File

@ -1,49 +1,47 @@
%%%----------------------------------------------------------------------------- %%--------------------------------------------------------------------
%%% Copyright (c) 2015-2016 Feng Lee <feng@emqtt.io>. %% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%% %%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy %% Licensed under the Apache License, Version 2.0 (the "License");
%%% of this software and associated documentation files (the "Software"), to deal %% you may not use this file except in compliance with the License.
%%% in the Software without restriction, including without limitation the rights %% You may obtain a copy of the License at
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell %%
%%% copies of the Software, and to permit persons to whom the Software is %% http://www.apache.org/licenses/LICENSE-2.0
%%% furnished to do so, subject to the following conditions: %%
%%% %% Unless required by applicable law or agreed to in writing, software
%%% The above copyright notice and this permission notice shall be included in all %% distributed under the License is distributed on an "AS IS" BASIS,
%%% copies or substantial portions of the Software. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%%% %% See the License for the specific language governing permissions and
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR %% limitations under the License.
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, %%--------------------------------------------------------------------
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc ecpool worker.
%%%
%%% @author Feng Lee <feng@emqtt.io>
%%%-----------------------------------------------------------------------------
-module(ecpool_worker). -module(ecpool_worker).
-behaviour(gen_server). -behaviour(gen_server).
-export([start_link/4]).
%% API Function Exports %% API Function Exports
-export([start_link/4, client/1, is_connected/1, set_reconnect_callback/2]). -export([client/1, is_connected/1, set_reconnect_callback/2]).
%% gen_server Function Exports %% gen_server Function Exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1,
terminate/2, code_change/3]). handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
]).
-record(state, {pool, id, client, mod, on_reconnect, on_disconnect, supervisees = [], opts}). -record(state, {pool, id, client, mod, on_reconnect, on_disconnect, supervisees = [], opts}).
%%%============================================================================= %%--------------------------------------------------------------------
%%% Callback %% Callback
%%%============================================================================= %%--------------------------------------------------------------------
-ifdef(use_specs). -ifdef(use_specs).
-callback connect(ConnOpts :: list()) -> {ok, pid()} | {error, any()}. -callback(connect(ConnOpts :: list())
-> {ok, pid()} | {error, Reason :: term()}).
-else. -else.
@ -51,15 +49,14 @@
behaviour_info(callbacks) -> behaviour_info(callbacks) ->
[{connect, 1}]; [{connect, 1}];
behaviour_info(_Other) -> behaviour_info(_Other) ->
undefined. undefined.
-endif. -endif.
%%%============================================================================= %%--------------------------------------------------------------------
%%% API %% API
%%%============================================================================= %%--------------------------------------------------------------------
%% @doc Start a pool worker. %% @doc Start a pool worker.
-spec(start_link(atom(), pos_integer(), module(), list()) -> -spec(start_link(atom(), pos_integer(), module(), list()) ->
@ -68,22 +65,22 @@ start_link(Pool, Id, Mod, Opts) ->
gen_server:start_link(?MODULE, [Pool, Id, Mod, Opts], []). gen_server:start_link(?MODULE, [Pool, Id, Mod, Opts], []).
%% @doc Get client/connection. %% @doc Get client/connection.
-spec(client(pid()) -> undefined | pid()). -spec(client(pid()) -> {ok, Client :: pid()} | {error, Reason :: term()}).
client(Pid) -> client(Pid) ->
gen_server:call(Pid, client, infinity). gen_server:call(Pid, client, infinity).
%% @doc Is client connected? %% @doc Is client connected?
-spec(is_connected(pid()) -> boolean()). -spec(is_connected(pid()) -> boolean()).
is_connected(Pid) -> is_connected(Pid) ->
gen_server:call(Pid, is_connected). gen_server:call(Pid, is_connected, infinity).
-spec(set_reconnect_callback(pid(), ecpool:reconn_callback()) -> ok). -spec(set_reconnect_callback(pid(), ecpool:reconn_callback()) -> ok).
set_reconnect_callback(Pid, OnReconnect) -> set_reconnect_callback(Pid, OnReconnect) ->
gen_server:cast(Pid, {set_reconn_callbk, OnReconnect}). gen_server:cast(Pid, {set_reconn_callbk, OnReconnect}).
%%%============================================================================= %%--------------------------------------------------------------------
%%% gen_server callbacks %% gen_server callbacks
%%%============================================================================= %%--------------------------------------------------------------------
init([Pool, Id, Mod, Opts]) -> init([Pool, Id, Mod, Opts]) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
@ -98,18 +95,19 @@ init([Pool, Id, Mod, Opts]) ->
{stop, Error} {stop, Error}
end. end.
handle_call(is_connected, _From, State = #state{client = Client}) when is_pid(Client) ->
{reply, Client =/= undefined andalso is_process_alive(Client), State};
handle_call(is_connected, _From, State = #state{client = Client}) -> handle_call(is_connected, _From, State = #state{client = Client}) ->
{reply, Client =/= undefined, State}; IsAlive = Client =/= undefined andalso is_process_alive(Client),
{reply, IsAlive, State};
handle_call(client, _From, State = #state{client = undefined}) -> handle_call(client, _From, State = #state{client = undefined}) ->
{reply, {error, disconnected}, State}; {reply, {error, disconnected}, State};
handle_call(client, _From, State = #state{client = Client}) -> handle_call(client, _From, State = #state{client = Client}) ->
{reply, {ok, Client}, State}. {reply, {ok, Client}, State};
handle_call(Req, _From, State) ->
logger:error("[PoolWorker] unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast({set_reconn_callbk, OnReconnect}, State) -> handle_cast({set_reconn_callbk, OnReconnect}, State) ->
{noreply, State#state{on_reconnect = OnReconnect}}; {noreply, State#state{on_reconnect = OnReconnect}};
@ -125,7 +123,8 @@ handle_info({'EXIT', Pid, Reason}, State = #state{opts = Opts, supervisees = Sup
Secs -> reconnect(Secs, State) Secs -> reconnect(Secs, State)
end; end;
false -> false ->
logger:debug("~p received unexpected exit:~0p from ~p. Supervisees: ~p", [?MODULE, Reason, Pid, SupPids]), logger:debug("~p received unexpected exit:~0p from ~p. Supervisees: ~p",
[?MODULE, Reason, Pid, SupPids]),
{noreply, State} {noreply, State}
end; end;
@ -138,7 +137,8 @@ handle_info(reconnect, State = #state{opts = Opts, on_reconnect = OnReconnect})
reconnect(proplists:get_value(auto_reconnect, Opts), State) reconnect(proplists:get_value(auto_reconnect, Opts), State)
end; end;
handle_info(_Info, State) -> handle_info(Info, State) ->
logger:error("[PoolWorker] unexpected info: ~p", [Info]),
{noreply, State}. {noreply, State}.
terminate(_Reason, #state{pool = Pool, id = Id, terminate(_Reason, #state{pool = Pool, id = Id,
@ -150,23 +150,27 @@ terminate(_Reason, #state{pool = Pool, id = Id,
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.
%%%============================================================================= %%--------------------------------------------------------------------
%%% Internal Functions %% Internal Functions
%%%============================================================================= %%--------------------------------------------------------------------
connect(#state{mod = Mod, opts = Opts, id = Id}) -> connect(#state{mod = Mod, opts = Opts, id = Id}) ->
Mod:connect([{ecpool_worker_id, Id} | connopts(Opts, [])]). Mod:connect([{ecpool_worker_id, Id} | connopts(Opts, [])]).
connopts([], Acc) -> connopts([], Acc) ->
Acc; Acc;
connopts([{pool_size, _} | Opts], Acc) -> connopts([{pool_size, _}|Opts], Acc) ->
connopts(Opts, Acc); connopts(Opts, Acc);
connopts([{pool_type, _} | Opts], Acc) -> connopts([{pool_type, _}|Opts], Acc) ->
connopts(Opts, Acc); connopts(Opts, Acc);
connopts([{auto_reconnect, _} | Opts], Acc) -> connopts([{auto_reconnect, _}|Opts], Acc) ->
connopts(Opts, Acc); connopts(Opts, Acc);
connopts([Opt | Opts], Acc) -> connopts([{bind, _}|Opts], Acc) ->
connopts(Opts, [Opt | Acc]). connopts(Opts, Acc);
connopts([{unbind, _}|Opts], Acc) ->
connopts(Opts, Acc);
connopts([Opt|Opts], Acc) ->
connopts(Opts, [Opt|Acc]).
reconnect(Secs, State = #state{client = Client, on_disconnect = Disconnect, supervisees = SubPids}) -> reconnect(Secs, State = #state{client = Client, on_disconnect = Disconnect, supervisees = SubPids}) ->
[erlang:unlink(P) || P <- SubPids, is_pid(P)], [erlang:unlink(P) || P <- SubPids, is_pid(P)],

View File

@ -1,43 +1,39 @@
%%%----------------------------------------------------------------------------- %%--------------------------------------------------------------------
%%% Copyright (c) 2015-2016 Feng Lee <feng@emqtt.io>. %% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%% %%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy %% Licensed under the Apache License, Version 2.0 (the "License");
%%% of this software and associated documentation files (the "Software"), to deal %% you may not use this file except in compliance with the License.
%%% in the Software without restriction, including without limitation the rights %% You may obtain a copy of the License at
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell %%
%%% copies of the Software, and to permit persons to whom the Software is %% http://www.apache.org/licenses/LICENSE-2.0
%%% furnished to do so, subject to the following conditions: %%
%%% %% Unless required by applicable law or agreed to in writing, software
%%% The above copyright notice and this permission notice shall be included in all %% distributed under the License is distributed on an "AS IS" BASIS,
%%% copies or substantial portions of the Software. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%%% %% See the License for the specific language governing permissions and
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR %% limitations under the License.
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, %%--------------------------------------------------------------------
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc ecpool worker supervisor.
%%%
%%% @author Feng Lee <feng@emqtt.io>
%%%-----------------------------------------------------------------------------
-module(ecpool_worker_sup). -module(ecpool_worker_sup).
-behaviour(supervisor). -behaviour(supervisor).
-export([start_link/3, init/1]). -export([start_link/3]).
-export([init/1]).
start_link(Pool, Mod, Opts) when is_atom(Pool) -> start_link(Pool, Mod, Opts) when is_atom(Pool) ->
supervisor:start_link(?MODULE, [Pool, Mod, Opts]). supervisor:start_link(?MODULE, [Pool, Mod, Opts]).
init([Pool, Mod, Opts]) -> init([Pool, Mod, Opts]) ->
WorkerSpec = fun(Id) -> WorkerSpec = fun(Id) ->
{{worker, Id}, {ecpool_worker, start_link, [Pool, Id, Mod, Opts]}, #{id => {worker, Id},
transient, 5000, worker, [ecpool_worker, Mod]} start => {ecpool_worker, start_link, [Pool, Id, Mod, Opts]},
end, restart => transient,
shutdown => 5000,
type => worker,
modules => [ecpool_worker, Mod]}
end,
Workers = [WorkerSpec(I) || I <- lists:seq(1, pool_size(Opts))], Workers = [WorkerSpec(I) || I <- lists:seq(1, pool_size(Opts))],
{ok, { {one_for_one, 10, 60}, Workers} }. {ok, { {one_for_one, 10, 60}, Workers} }.

View File

@ -1,29 +1,22 @@
%%% Copyright (c) 2015 eMQTT.IO, All Rights Reserved. %%--------------------------------------------------------------------
%%% %% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%% Permission is hereby granted, free of charge, to any person obtaining a copy %%
%%% of this software and associated documentation files (the "Software"), to deal %% Licensed under the Apache License, Version 2.0 (the "License");
%%% in the Software without restriction, including without limitation the rights %% you may not use this file except in compliance with the License.
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell %% You may obtain a copy of the License at
%%% copies of the Software, and to permit persons to whom the Software is %%
%%% furnished to do so, subject to the following conditions: %% http://www.apache.org/licenses/LICENSE-2.0
%%% %%
%%% The above copyright notice and this permission notice shall be included in all %% Unless required by applicable law or agreed to in writing, software
%%% copies or substantial portions of the Software. %% distributed under the License is distributed on an "AS IS" BASIS,
%%% %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR %% See the License for the specific language governing permissions and
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, %% limitations under the License.
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE %%--------------------------------------------------------------------
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%
%%% @author Feng Lee <feng@emqtt.io>
%%%
-module(ecpool_test). -module(ecpool_SUITE).
-ifdef(TEST). -compile(export_all).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
@ -45,32 +38,36 @@
{database, "mqtt"}, {database, "mqtt"},
{encoding, utf8}]). {encoding, utf8}]).
pool_test_() -> all() ->
{foreach, [{group, all}].
fun() ->
application:start(gproc),
application:start(ecpool)
end,
fun(_) ->
application:stop(ecpool),
application:stop(gproc)
end,
[?_test(t_start_pool()),
?_test(t_start_sup_pool()),
?_test(t_restart_client()),
?_test(t_reconnect_client())]}.
t_start_pool() -> groups() ->
[{all, [sequence],
[t_start_pool,
t_start_sup_pool,
t_restart_client,
t_reconnect_client
]}].
init_per_suite(Config) ->
{ok, _} = application:ensure_all_started(ecpool),
Config.
end_per_suite(_Config) ->
ok = application:stop(ecpool),
ok = application:stop(gproc).
t_start_pool(_Config) ->
ecpool:start_pool(?POOL, test_client, ?POOL_OPTS), ecpool:start_pool(?POOL, test_client, ?POOL_OPTS),
?assertEqual(10, length(ecpool:workers(test_pool))), ?assertEqual(10, length(ecpool:workers(test_pool))),
?debugFmt("~p~n", [ecpool:workers(test_pool)]), ?debugFmt("~p~n", [ecpool:workers(test_pool)]),
lists:foreach(fun(I) -> lists:foreach(fun(I) ->
ecpool:with_client(?POOL, fun(Client) -> ecpool:with_client(?POOL, fun(Client) ->
?debugFmt("Call ~p: ~p~n", [I, Client]) ?debugFmt("Call ~p: ~p~n", [I, Client])
end) end)
end, lists:seq(1, 10)). end, lists:seq(1, 10)).
t_start_sup_pool() -> t_start_sup_pool(_Config) ->
{ok, Pid1} = ecpool:start_sup_pool(xpool, test_client, ?POOL_OPTS), {ok, Pid1} = ecpool:start_sup_pool(xpool, test_client, ?POOL_OPTS),
{ok, Pid2} = ecpool:start_sup_pool(ypool, test_client, ?POOL_OPTS), {ok, Pid2} = ecpool:start_sup_pool(ypool, test_client, ?POOL_OPTS),
?assertEqual([{xpool, Pid1}, {ypool, Pid2}], lists:sort(ecpool_sup:pools())), ?assertEqual([{xpool, Pid1}, {ypool, Pid2}], lists:sort(ecpool_sup:pools())),
@ -78,7 +75,7 @@ t_start_sup_pool() ->
ecpool:stop_sup_pool(xpool), ecpool:stop_sup_pool(xpool),
?assertEqual([], ecpool_sup:pools()). ?assertEqual([], ecpool_sup:pools()).
t_restart_client() -> t_restart_client(_Config) ->
ecpool:start_pool(?POOL, test_client, [{pool_size, 4}]), ecpool:start_pool(?POOL, test_client, [{pool_size, 4}]),
?assertEqual(4, length(ecpool:workers(?POOL))), ?assertEqual(4, length(ecpool:workers(?POOL))),
ecpool:with_client(?POOL, fun(Client) -> test_client:stop(Client, normal) end), ecpool:with_client(?POOL, fun(Client) -> test_client:stop(Client, normal) end),
@ -87,22 +84,26 @@ t_restart_client() ->
ecpool:with_client(?POOL, fun(Client) -> test_client:stop(Client, {shutdown, x}) end), ecpool:with_client(?POOL, fun(Client) -> test_client:stop(Client, {shutdown, x}) end),
?debugFmt("~n~p~n", [ecpool:workers(?POOL)]), ?debugFmt("~n~p~n", [ecpool:workers(?POOL)]),
?assertEqual(2, length(ecpool:workers(?POOL))), ?assertEqual(2, length(ecpool:workers(?POOL))),
ecpool:with_client(?POOL, fun(Client) -> test_client:stop(Client, badarg) end), ecpool:with_client(?POOL, fun(Client) ->
test_client:stop(Client, badarg)
end),
timer:sleep(100), timer:sleep(100),
?debugFmt("~n~p~n", [ecpool:workers(?POOL)]), ?debugFmt("~n~p~n", [ecpool:workers(?POOL)]),
?assertEqual(2, length(ecpool:workers(?POOL))). ?assertEqual(2, length(ecpool:workers(?POOL))).
t_reconnect_client() -> t_reconnect_client(_Config) ->
ecpool:start_pool(?POOL, test_client, [{pool_size, 4}, {auto_reconnect, 1}]), ecpool:start_pool(?POOL, test_client, [{pool_size, 4}, {auto_reconnect, 1}]),
?assertEqual(4, length(ecpool:workers(?POOL))), ?assertEqual(4, length(ecpool:workers(?POOL))),
ecpool:with_client(?POOL, fun(Client) -> test_client:stop(Client, normal) end), ecpool:with_client(?POOL, fun(Client) ->
test_client:stop(Client, normal)
end),
?assert(lists:member(false, [ecpool_worker:is_connected(Pid) || {_, Pid} <- ecpool:workers(?POOL)])), ?assert(lists:member(false, [ecpool_worker:is_connected(Pid) || {_, Pid} <- ecpool:workers(?POOL)])),
timer:sleep(1100), timer:sleep(1100),
?assertNot(lists:member(false, [ecpool_worker:is_connected(Pid) || {_, Pid} <- ecpool:workers(?POOL)])), ?assertNot(lists:member(false, [ecpool_worker:is_connected(Pid) || {_, Pid} <- ecpool:workers(?POOL)])),
ecpool:with_client(?POOL, fun(Client) -> test_client:stop(Client, badarg) end), ecpool:with_client(?POOL, fun(Client) ->
test_client:stop(Client, {shutdown, badarg})
end),
?assert(lists:member(false, [ecpool_worker:is_connected(Pid) || {_, Pid} <- ecpool:workers(?POOL)])), ?assert(lists:member(false, [ecpool_worker:is_connected(Pid) || {_, Pid} <- ecpool:workers(?POOL)])),
timer:sleep(1100), timer:sleep(1100),
?assertEqual(4, length(ecpool:workers(?POOL))). ?assertEqual(4, length(ecpool:workers(?POOL))).
-endif.

View File

@ -1,27 +1,38 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(test_client). -module(test_client).
-behaviour(ecpool_worker). -behaviour(ecpool_worker).
-behaviour(gen_server). -behaviour(gen_server).
-define(SERVER, ?MODULE). -define(SERVER, ?MODULE).
%% ------------------------------------------------------------------ -export([connect/1,
%% API Function Exports stop/2
%% ------------------------------------------------------------------ ]).
-export([connect/1, stop/2]). -export([init/1,
handle_call/3,
%% ------------------------------------------------------------------ handle_cast/2,
%% gen_server Function Exports handle_info/2,
%% ------------------------------------------------------------------ terminate/2,
code_change/3
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, ]).
terminate/2, code_change/3]).
%% ------------------------------------------------------------------
%% API Function Definitions
%% ------------------------------------------------------------------
connect(Opts) -> connect(Opts) ->
gen_server:start_link(?MODULE, [Opts], []). gen_server:start_link(?MODULE, [Opts], []).
@ -29,9 +40,9 @@ connect(Opts) ->
stop(Pid, Reason) -> stop(Pid, Reason) ->
gen_server:call(Pid, {stop, Reason}). gen_server:call(Pid, {stop, Reason}).
%% ------------------------------------------------------------------ %%-----------------------------------------------------------------------------
%% gen_server Function Definitions %% gen_server Function Definitions
%% ------------------------------------------------------------------ %%-----------------------------------------------------------------------------
init(Args) -> init(Args) ->
{ok, Args}. {ok, Args}.
@ -39,7 +50,7 @@ init(Args) ->
handle_call({stop, Reason}, _From, State) -> handle_call({stop, Reason}, _From, State) ->
{stop, Reason, ok, State}; {stop, Reason, ok, State};
handle_call(_Request, _From, State) -> handle_call(_Req, _From, State) ->
{reply, ok, State}. {reply, ok, State}.
handle_cast(_Msg, State) -> handle_cast(_Msg, State) ->
@ -54,7 +65,3 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.
%% ------------------------------------------------------------------
%% Internal Function Definitions
%% ------------------------------------------------------------------