diff --git a/.gitignore b/.gitignore index 8e46d5a07..eb07ba1a4 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,4 @@ ebin rel/example_project .concrete/DEV_MODE .rebar +tmp diff --git a/Makefile b/Makefile new file mode 100644 index 000000000..aa560d8ad --- /dev/null +++ b/Makefile @@ -0,0 +1,36 @@ +.PHONY: deps test + +BASE_DIR = $(shell pwd) +REBAR=$(BASE_DIR)/rebar + +all: deps compile xref + +deps: + @$(REBAR) get-deps + +compile: + @$(REBAR) compile + +xref: + @$(REBAR) xref skip_deps=true + +clean: + @$(REBAR) clean + +test: + @$(REBAR) skip_deps=true eunit + +edoc: + @$(REBAR) doc + +PLT = $(BASE_DIR)/.ecpool_dialyzer.plt +APPS = erts kernel stdlib sasl crypto syntax_tools ssl public_key mnesia inets compiler + +check_plt: compile + dialyzer --check_plt --plt $(PLT) --apps $(APPS) deps/*/ebin ebin + +build_plt: compile + dialyzer --build_plt --output_plt $(PLT) --apps $(APPS) deps/*/ebin ebin + +dialyzer: compile + dialyzer -Wno_return --plt $(PLT) deps/*/ebin ebin diff --git a/TODO b/TODO new file mode 100644 index 000000000..0a5c48056 --- /dev/null +++ b/TODO @@ -0,0 +1,8 @@ + +worker -> ecql + +integrage with ecql + +management api + +unit tests diff --git a/rebar b/rebar new file mode 100755 index 000000000..c2b7e2022 Binary files /dev/null and b/rebar differ diff --git a/rebar.config b/rebar.config new file mode 100644 index 000000000..671b78cf8 --- /dev/null +++ b/rebar.config @@ -0,0 +1,31 @@ +%% -*- mode: erlang;erlang-indent-level: 4;indent-tabs-mode: nil -*- +%% ex: ts=4 sw=4 ft=erlang et + +{require_min_otp_vsn, "R17"}. + +%warnings_as_errors, warn_untyped_record, +{erl_opts, [ + warn_export_all, + warn_unused_import, + {i, "include"}, + {src_dirs, ["src"]} + ]}. + +{xref_checks, [undefined_function_calls]}. +{cover_enabled, true}. +{cover_print_enabled, true}. + +{edoc_opts, [{dialyzer_specs, all}, + {report_missing_type, true}, + {report_type_mismatch, true}, + {pretty_print, erl_pp}, + {preprocess, true}]}. + +{validate_app_modules, true}. + +{eunit_opts, [verbose]}. + +{deps, [ + {gproc, ".*", {git, "git://github.com/uwiger/gproc.git", {branch, "master"}}} +]}. + diff --git a/src/ecpool.app.src b/src/ecpool.app.src new file mode 100644 index 000000000..db637ed4e --- /dev/null +++ b/src/ecpool.app.src @@ -0,0 +1,13 @@ +{application, ecpool, + [ + {description, "Erlang Client/Connection Pool"}, + {vsn, "0.1"}, + {registered, []}, + {applications, [ + kernel, + stdlib, + gproc + ]}, + {mod, { ecpool_app, []}}, + {env, []} + ]}. diff --git a/src/ecpool.erl b/src/ecpool.erl new file mode 100644 index 000000000..8fc2c7d24 --- /dev/null +++ b/src/ecpool.erl @@ -0,0 +1,53 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc ecpool API. +%%% +%%% @author Feng Lee +%%%----------------------------------------------------------------------------- + +-module(ecpool). + +-export([name/1, start_pool/3, with_client/2, with_client/3, stop_pool/1]). + +name(Pool) -> + {?MODULE, Pool}. + +start_pool(Pool, Mod, Opts) when is_atom(Pool) -> + ecpool_pool_sup:start_link(Pool, Mod, Opts). + +stop_pool(Pool) when is_atom(Pool) -> + ecpool_sup:stop_pool(Pool). + +with_client(Pool, Fun) when is_atom(Pool) -> + Worker = gproc_pool:pick_worker({?MODULE, Pool}), + with_worker(Worker, Fun). + +with_client(Pool, Key, Fun) when is_atom(Pool) -> + Worker = gproc_pool:pick_worker({?MODULE, Pool}, Key), + with_worker(Worker, Fun). + +with_worker(Worker, Fun) -> + case ecpool_worker:client(Worker) of + {ok, Client} -> Fun(Client); + {error, Reason} -> {error, Reason} + end. + diff --git a/src/ecpool_app.erl b/src/ecpool_app.erl new file mode 100644 index 000000000..2f903528d --- /dev/null +++ b/src/ecpool_app.erl @@ -0,0 +1,39 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc Erlang Connection/Client Pool Application. +%%% +%%% @author Feng Lee +%%%----------------------------------------------------------------------------- + +-module(ecpool_app). + +-behaviour(application). + +%% Application callbacks +-export([start/2, stop/1]). + +start(_StartType, _StartArgs) -> + ecpool_sup:start_link(). + +stop(_State) -> + ok. + diff --git a/src/ecpool_pool.erl b/src/ecpool_pool.erl new file mode 100644 index 000000000..c55f38e9f --- /dev/null +++ b/src/ecpool_pool.erl @@ -0,0 +1,105 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc Wrap gproc_pool. +%%% +%%% @author Feng Lee +%%%----------------------------------------------------------------------------- + +-module(ecpool_pool). + +-behaviour(gen_server). + +-import(proplists, [get_value/3]). + +%% API Function Exports +-export([start_link/2, info/1]). + +%% ------------------------------------------------------------------ +%% gen_server Function Exports +%% ------------------------------------------------------------------ + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(state, {name, size, type}). + +%%%============================================================================= +%%% API +%%%============================================================================= + +start_link(Pool, Opts) -> + gen_server:start_link(?MODULE, [Pool, Opts], []). + +info(Pid) -> + gen_server:call(Pid, info). + +%%%============================================================================= +%%% gen_server callbacks +%%%============================================================================= + +init([Pool, Opts]) -> + Schedulers = erlang:system_info(schedulers), + PoolSize = get_value(pool_size, Opts, Schedulers), + PoolType = get_value(pool_type, Opts, random), + ensure_pool(ecpool:name(Pool), PoolType, [{size, PoolSize}]), + lists:foreach(fun(I) -> + ensure_pool_worker(ecpool:name(Pool), {Pool, I}, I) + end, lists:seq(1, PoolSize)), + {ok, #state{name = Pool, size = PoolSize, type = PoolType}}. + +ensure_pool(Pool, Type, Opts) -> + try gproc_pool:new(Pool, Type, Opts) + catch + error:exists -> ok + end. + +ensure_pool_worker(Pool, Name, Slot) -> + try gproc_pool:add_worker(Pool, Name, Slot) + catch + error:exists -> ok + end. + +handle_call(info, _From, State = #state{name = Pool, size = Size, + type = Type}) -> + Workers = gproc_pool:active_workers(ecpool:name(Pool)), + Info = [{pool_name, Pool}, {pool_size, Size}, + {pool_type, Type}, {workers, Workers}], + {reply, Info, State}; + +handle_call(_Request, _From, State) -> + {reply, {error, unexpected_req}, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, #state{name = Pool, size = Size}) -> + lists:foreach(fun(I) -> + gproc_pool:remove_worker(ecpool:name(Pool), {Pool, I}) + end, lists:seq(1, Size)), + gproc_pool:delete(ecpool:name(Pool)). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + diff --git a/src/ecpool_pool_sup.erl b/src/ecpool_pool_sup.erl new file mode 100644 index 000000000..bff48d5ea --- /dev/null +++ b/src/ecpool_pool_sup.erl @@ -0,0 +1,46 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc ecpool pool supervisor. +%%% +%%% @author Feng Lee +%%%----------------------------------------------------------------------------- + +-module(ecpool_pool_sup). + +-behaviour(supervisor). + +%% API +-export([start_link/3]). + +%% Supervisor callbacks +-export([init/1]). + +start_link(Pool, Mod, Opts) -> + supervisor:start_link(?MODULE, [Pool, Mod, Opts]). + +init([Pool, Mod, Opts]) -> + {ok, { {one_for_all, 10, 100}, [ + {pool, {ecpool_pool, start_link, [Pool, Opts]}, + transient, 16#ffff, worker, [ecpool_pool]}, + {worker_sup, {ecpool_worker_sup, start_link, [Pool, Mod, Opts]}, + transient, infinity, supervisor, [ecpool_worker_sup]}] }}. + diff --git a/src/ecpool_sup.erl b/src/ecpool_sup.erl new file mode 100644 index 000000000..196f54fc1 --- /dev/null +++ b/src/ecpool_sup.erl @@ -0,0 +1,83 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc ecpool supervisor. +%%% +%%% @author Feng Lee +%%%----------------------------------------------------------------------------- + +-module(ecpool_sup). + +-behaviour(supervisor). + +%% API +-export([start_link/0, start_pool/3, stop_pool/1, pools/0, pool/1]). + +%% Supervisor callbacks +-export([init/1]). + +%% @doc Start supervisor. +-spec start_link() -> {ok, pid()}. +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +start_pool(Pool, Mod, Opts) when is_atom(Pool) -> + supervisor:start_child(?MODULE, pool_spec(Pool, Mod, Opts)). + +-spec stop_pool(Pool :: atom()) -> ok | {error, any()}. +stop_pool(Pool) when is_atom(Pool) -> + ChildId = child_id(Pool), + case supervisor:terminate_child(?MODULE, ChildId) of + ok -> + supervisor:delete_child(?MODULE, ChildId); + {error, Reason} -> + {error, Reason} + end. + +%% @doc All Pools supervisored by ecpool_sup. +-spec pools() -> [{atom(), pid()}]. +pools() -> + [{Pool, Pid} || {{pool_sup, Pool}, Pid, supervisor, _} + <- supervisor:which_children(?MODULE)]. + +%% @doc Find a pool. +-spec pool(atom()) -> undefined | pid(). +pool(Pool) when is_atom(Pool) -> + ChildId = child_id(Pool), + case [Pid || {Id, Pid, supervisor, _} <- supervisor:which_children(?MODULE), Id =:= ChildId] of + [] -> undefined; + L -> hd(L) + end. + +%%%============================================================================= +%%% Supervisor callbacks +%%%============================================================================= + +init([]) -> + {ok, { {one_for_one, 10, 100}, []} }. + +pool_spec(Pool, Mod, Opts) -> + {child_id(Pool), + {ecpool_pool_sup, start_link, [Pool, Mod, Opts]}, + transient, infinity, supervisor, [ecpool_pool_sup]}. + +child_id(Pool) -> {pool_sup, Pool}. + diff --git a/src/ecpool_worker.erl b/src/ecpool_worker.erl new file mode 100644 index 000000000..a7e4088ed --- /dev/null +++ b/src/ecpool_worker.erl @@ -0,0 +1,143 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2015-2016 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc ecpool worker. +%%% +%%% @author Feng Lee +%%%----------------------------------------------------------------------------- + +-module(ecpool_worker). + +-behaviour(gen_server). + +%% API Function Exports +-export([start_link/4, client/1]). + +%% gen_server Function Exports +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(state, {pool, id, client, mod, opts}). + +%%%============================================================================= +%%% Callback +%%%============================================================================= + +-ifdef(use_specs). + +-callback connect(ConnOpts :: list()) -> {ok, pid()} | {error, any()}. + +-else. + +-export([behaviour_info/1]). + +behaviour_info(callbacks) -> + [{connect, 1}]; + +behaviour_info(_Other) -> + undefined. + +-endif. + +%%%============================================================================= +%%% API +%%%============================================================================= + +%% @doc Start the pool worker. +-spec start_link(atom(), pos_integer(), module(), list()) -> + {ok, pid()} | ignore | {error, any()}. +start_link(Pool, Id, Mod, Opts) -> + gen_server:start_link(?MODULE, [Pool, Id, Mod, Opts], []). + +%% @doc Get client/connection. +-spec client(pid()) -> undefined | pid(). +client(Pid) -> gen_server:call(Pid, client, infinity). + +%%%============================================================================= +%%% gen_server callbacks +%%%============================================================================= + +init([Pool, Id, Mod, Opts]) -> + process_flag(trap_exit, true), + State = #state{pool = Pool, id = Id, mod = Mod, opts = Opts}, + case connect(State) of + {ok, Client} -> + gproc_pool:connect_worker(ecpool:name(Pool), {Pool, Id}), + {ok, State#state{client = Client}}; + {error, Error} -> + {stop, Error} + end. + +handle_call(client, _From, State = #state{client = undefined}) -> + {reply, {error, disconnected}, State}; + +handle_call(client, _From, State = #state{client = Client}) -> + {reply, {ok, Client}, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({'EXIT', Pid, Reason}, State = #state{client = Pid, opts = Opts}) -> + case proplists:get_value(auto_reconnect, Opts, false) of + false -> + {stop, Reason, State}; + Secs -> + erlang:send_after(timer:seconds(Secs), self(), reconnect), + {noreply, State#state{client = undefined}} + end; + +handle_info(reconnect, State = #state{opts = Opts}) -> + case connect(State) of + {ok, Client} -> + {noreply, State#state{client = Client}}; + {error, _Error} -> + Secs = proplists:get_value(auto_reconnect, Opts), + erlang:send_after(timer:seconds(Secs), self(), reconnect), + {noreply, State#state{client = undefined}} + end; + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, #state{pool = Pool, id = Id}) -> + gproc_pool:disconnect_worker(ecpool:name(Pool), {Pool, Id}). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%============================================================================= +%%% Internal Functions +%%%============================================================================= + +connect(#state{mod = Mod, opts = Opts}) -> + Mod:connect(connopts(Opts, [])). + +connopts([], Acc) -> + Acc; +connopts([{pool_size, _} | Opts], Acc) -> + connopts(Opts, Acc); +connopts([{pool_type, _} | Opts], Acc) -> + connopts(Opts, Acc); +connopts([{auto_reconnect, _} | Opts], Acc) -> + connopts(Opts, Acc); +connopts([Opt | Opts], Acc) -> + connopts(Opts, [Opt | Acc]). + diff --git a/src/ecpool_worker_sup.erl b/src/ecpool_worker_sup.erl new file mode 100644 index 000000000..2d50bd7e9 --- /dev/null +++ b/src/ecpool_worker_sup.erl @@ -0,0 +1,51 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc ecpool worker supervisor. +%%% +%%% @author Feng Lee +%%%----------------------------------------------------------------------------- + +-module(ecpool_worker_sup). + +-behaviour(supervisor). + +-export([start_link/3, workers/1, init/1]). + +start_link(Pool, Mod, Opts) when is_atom(Pool) -> + supervisor:start_link(?MODULE, [Pool, Mod, Opts]). + +workers(WorkerSup) -> + [{ChildId, Pid} || {ChildId, Pid, worker, _} + <- supervisor:which_children(WorkerSup)]. + +init([Pool, Mod, Opts]) -> + WorkerSpec = fun(Id) -> + {{worker, Id}, {ecpool_worker, start_link, [Pool, Id, Mod, Opts]}, + transient, 5000, worker, [ecpool_worker]} + end, + Workers = [WorkerSpec(I) || I <- lists:seq(1, pool_size(Opts))], + {ok, { {one_for_one, 10, 60}, Workers} }. + +pool_size(Opts) -> + Schedulers = erlang:system_info(schedulers), + proplists:get_value(pool_size, Opts, Schedulers). + diff --git a/test/ecpool_test.erl b/test/ecpool_test.erl new file mode 100644 index 000000000..294c3243b --- /dev/null +++ b/test/ecpool_test.erl @@ -0,0 +1,32 @@ + +-module(ecpool_test). + +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). + +-define(POOL_OPTS, [ + %% schedulers number + {pool_size, 10}, + %% round-robbin | random | hash + {pool_type, random}, + %% false | pos_integer() + {auto_reconnect, false}, + + {host, "localhost"}, + {port, 5432}, + {username, "feng"}, + {password, ""}, + {database, "mqtt"}, + {encoding, utf8}]). + +pool_test() -> + application:start(gproc), + application:start(ecpool), + ecpool:start_pool(test_pool, test_client, ?POOL_OPTS). + %%ecpool:stop_pool(test_pool), + %%application:stop(ecpool), + %%application:stop(gproc). + + +-endif. diff --git a/test/test_client.erl b/test/test_client.erl new file mode 100644 index 000000000..7864bb4ad --- /dev/null +++ b/test/test_client.erl @@ -0,0 +1,54 @@ + +-module(test_client). + +-behaviour(ecpool_worker). + +-behaviour(gen_server). +-define(SERVER, ?MODULE). + +%% ------------------------------------------------------------------ +%% API Function Exports +%% ------------------------------------------------------------------ + +-export([connect/1]). + +%% ------------------------------------------------------------------ +%% gen_server Function Exports +%% ------------------------------------------------------------------ + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%% ------------------------------------------------------------------ +%% API Function Definitions +%% ------------------------------------------------------------------ + +connect(Opts) -> + gen_server:start_link(?MODULE, [Opts], []). + +%% ------------------------------------------------------------------ +%% gen_server Function Definitions +%% ------------------------------------------------------------------ + +init(Args) -> + {ok, Args}. + +handle_call(_Request, _From, State) -> + {reply, ok, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%% ------------------------------------------------------------------ +%% Internal Function Definitions +%% ------------------------------------------------------------------ +