first commit
This commit is contained in:
parent
45e1764bc0
commit
ac24678d0e
|
@ -8,3 +8,4 @@ ebin
|
|||
rel/example_project
|
||||
.concrete/DEV_MODE
|
||||
.rebar
|
||||
tmp
|
||||
|
|
|
@ -0,0 +1,36 @@
|
|||
.PHONY: deps test
|
||||
|
||||
BASE_DIR = $(shell pwd)
|
||||
REBAR=$(BASE_DIR)/rebar
|
||||
|
||||
all: deps compile xref
|
||||
|
||||
deps:
|
||||
@$(REBAR) get-deps
|
||||
|
||||
compile:
|
||||
@$(REBAR) compile
|
||||
|
||||
xref:
|
||||
@$(REBAR) xref skip_deps=true
|
||||
|
||||
clean:
|
||||
@$(REBAR) clean
|
||||
|
||||
test:
|
||||
@$(REBAR) skip_deps=true eunit
|
||||
|
||||
edoc:
|
||||
@$(REBAR) doc
|
||||
|
||||
PLT = $(BASE_DIR)/.ecpool_dialyzer.plt
|
||||
APPS = erts kernel stdlib sasl crypto syntax_tools ssl public_key mnesia inets compiler
|
||||
|
||||
check_plt: compile
|
||||
dialyzer --check_plt --plt $(PLT) --apps $(APPS) deps/*/ebin ebin
|
||||
|
||||
build_plt: compile
|
||||
dialyzer --build_plt --output_plt $(PLT) --apps $(APPS) deps/*/ebin ebin
|
||||
|
||||
dialyzer: compile
|
||||
dialyzer -Wno_return --plt $(PLT) deps/*/ebin ebin
|
|
@ -0,0 +1,8 @@
|
|||
|
||||
worker -> ecql
|
||||
|
||||
integrage with ecql
|
||||
|
||||
management api
|
||||
|
||||
unit tests
|
|
@ -0,0 +1,31 @@
|
|||
%% -*- mode: erlang;erlang-indent-level: 4;indent-tabs-mode: nil -*-
|
||||
%% ex: ts=4 sw=4 ft=erlang et
|
||||
|
||||
{require_min_otp_vsn, "R17"}.
|
||||
|
||||
%warnings_as_errors, warn_untyped_record,
|
||||
{erl_opts, [
|
||||
warn_export_all,
|
||||
warn_unused_import,
|
||||
{i, "include"},
|
||||
{src_dirs, ["src"]}
|
||||
]}.
|
||||
|
||||
{xref_checks, [undefined_function_calls]}.
|
||||
{cover_enabled, true}.
|
||||
{cover_print_enabled, true}.
|
||||
|
||||
{edoc_opts, [{dialyzer_specs, all},
|
||||
{report_missing_type, true},
|
||||
{report_type_mismatch, true},
|
||||
{pretty_print, erl_pp},
|
||||
{preprocess, true}]}.
|
||||
|
||||
{validate_app_modules, true}.
|
||||
|
||||
{eunit_opts, [verbose]}.
|
||||
|
||||
{deps, [
|
||||
{gproc, ".*", {git, "git://github.com/uwiger/gproc.git", {branch, "master"}}}
|
||||
]}.
|
||||
|
|
@ -0,0 +1,13 @@
|
|||
{application, ecpool,
|
||||
[
|
||||
{description, "Erlang Client/Connection Pool"},
|
||||
{vsn, "0.1"},
|
||||
{registered, []},
|
||||
{applications, [
|
||||
kernel,
|
||||
stdlib,
|
||||
gproc
|
||||
]},
|
||||
{mod, { ecpool_app, []}},
|
||||
{env, []}
|
||||
]}.
|
|
@ -0,0 +1,53 @@
|
|||
%%%-----------------------------------------------------------------------------
|
||||
%%% Copyright (c) 2015 eMQTT.IO, All Rights Reserved.
|
||||
%%%
|
||||
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
%%% of this software and associated documentation files (the "Software"), to deal
|
||||
%%% in the Software without restriction, including without limitation the rights
|
||||
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
%%% copies of the Software, and to permit persons to whom the Software is
|
||||
%%% furnished to do so, subject to the following conditions:
|
||||
%%%
|
||||
%%% The above copyright notice and this permission notice shall be included in all
|
||||
%%% copies or substantial portions of the Software.
|
||||
%%%
|
||||
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
%%% SOFTWARE.
|
||||
%%%-----------------------------------------------------------------------------
|
||||
%%% @doc ecpool API.
|
||||
%%%
|
||||
%%% @author Feng Lee <feng@emqtt.io>
|
||||
%%%-----------------------------------------------------------------------------
|
||||
|
||||
-module(ecpool).
|
||||
|
||||
-export([name/1, start_pool/3, with_client/2, with_client/3, stop_pool/1]).
|
||||
|
||||
name(Pool) ->
|
||||
{?MODULE, Pool}.
|
||||
|
||||
start_pool(Pool, Mod, Opts) when is_atom(Pool) ->
|
||||
ecpool_pool_sup:start_link(Pool, Mod, Opts).
|
||||
|
||||
stop_pool(Pool) when is_atom(Pool) ->
|
||||
ecpool_sup:stop_pool(Pool).
|
||||
|
||||
with_client(Pool, Fun) when is_atom(Pool) ->
|
||||
Worker = gproc_pool:pick_worker({?MODULE, Pool}),
|
||||
with_worker(Worker, Fun).
|
||||
|
||||
with_client(Pool, Key, Fun) when is_atom(Pool) ->
|
||||
Worker = gproc_pool:pick_worker({?MODULE, Pool}, Key),
|
||||
with_worker(Worker, Fun).
|
||||
|
||||
with_worker(Worker, Fun) ->
|
||||
case ecpool_worker:client(Worker) of
|
||||
{ok, Client} -> Fun(Client);
|
||||
{error, Reason} -> {error, Reason}
|
||||
end.
|
||||
|
|
@ -0,0 +1,39 @@
|
|||
%%%-----------------------------------------------------------------------------
|
||||
%%% Copyright (c) 2015 eMQTT.IO, All Rights Reserved.
|
||||
%%%
|
||||
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
%%% of this software and associated documentation files (the "Software"), to deal
|
||||
%%% in the Software without restriction, including without limitation the rights
|
||||
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
%%% copies of the Software, and to permit persons to whom the Software is
|
||||
%%% furnished to do so, subject to the following conditions:
|
||||
%%%
|
||||
%%% The above copyright notice and this permission notice shall be included in all
|
||||
%%% copies or substantial portions of the Software.
|
||||
%%%
|
||||
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
%%% SOFTWARE.
|
||||
%%%-----------------------------------------------------------------------------
|
||||
%%% @doc Erlang Connection/Client Pool Application.
|
||||
%%%
|
||||
%%% @author Feng Lee <feng@emqtt.io>
|
||||
%%%-----------------------------------------------------------------------------
|
||||
|
||||
-module(ecpool_app).
|
||||
|
||||
-behaviour(application).
|
||||
|
||||
%% Application callbacks
|
||||
-export([start/2, stop/1]).
|
||||
|
||||
start(_StartType, _StartArgs) ->
|
||||
ecpool_sup:start_link().
|
||||
|
||||
stop(_State) ->
|
||||
ok.
|
||||
|
|
@ -0,0 +1,105 @@
|
|||
%%%-----------------------------------------------------------------------------
|
||||
%%% Copyright (c) 2015 eMQTT.IO, All Rights Reserved.
|
||||
%%%
|
||||
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
%%% of this software and associated documentation files (the "Software"), to deal
|
||||
%%% in the Software without restriction, including without limitation the rights
|
||||
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
%%% copies of the Software, and to permit persons to whom the Software is
|
||||
%%% furnished to do so, subject to the following conditions:
|
||||
%%%
|
||||
%%% The above copyright notice and this permission notice shall be included in all
|
||||
%%% copies or substantial portions of the Software.
|
||||
%%%
|
||||
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
%%% SOFTWARE.
|
||||
%%%-----------------------------------------------------------------------------
|
||||
%%% @doc Wrap gproc_pool.
|
||||
%%%
|
||||
%%% @author Feng Lee <feng@emqtt.io>
|
||||
%%%-----------------------------------------------------------------------------
|
||||
|
||||
-module(ecpool_pool).
|
||||
|
||||
-behaviour(gen_server).
|
||||
|
||||
-import(proplists, [get_value/3]).
|
||||
|
||||
%% API Function Exports
|
||||
-export([start_link/2, info/1]).
|
||||
|
||||
%% ------------------------------------------------------------------
|
||||
%% gen_server Function Exports
|
||||
%% ------------------------------------------------------------------
|
||||
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||
terminate/2, code_change/3]).
|
||||
|
||||
-record(state, {name, size, type}).
|
||||
|
||||
%%%=============================================================================
|
||||
%%% API
|
||||
%%%=============================================================================
|
||||
|
||||
start_link(Pool, Opts) ->
|
||||
gen_server:start_link(?MODULE, [Pool, Opts], []).
|
||||
|
||||
info(Pid) ->
|
||||
gen_server:call(Pid, info).
|
||||
|
||||
%%%=============================================================================
|
||||
%%% gen_server callbacks
|
||||
%%%=============================================================================
|
||||
|
||||
init([Pool, Opts]) ->
|
||||
Schedulers = erlang:system_info(schedulers),
|
||||
PoolSize = get_value(pool_size, Opts, Schedulers),
|
||||
PoolType = get_value(pool_type, Opts, random),
|
||||
ensure_pool(ecpool:name(Pool), PoolType, [{size, PoolSize}]),
|
||||
lists:foreach(fun(I) ->
|
||||
ensure_pool_worker(ecpool:name(Pool), {Pool, I}, I)
|
||||
end, lists:seq(1, PoolSize)),
|
||||
{ok, #state{name = Pool, size = PoolSize, type = PoolType}}.
|
||||
|
||||
ensure_pool(Pool, Type, Opts) ->
|
||||
try gproc_pool:new(Pool, Type, Opts)
|
||||
catch
|
||||
error:exists -> ok
|
||||
end.
|
||||
|
||||
ensure_pool_worker(Pool, Name, Slot) ->
|
||||
try gproc_pool:add_worker(Pool, Name, Slot)
|
||||
catch
|
||||
error:exists -> ok
|
||||
end.
|
||||
|
||||
handle_call(info, _From, State = #state{name = Pool, size = Size,
|
||||
type = Type}) ->
|
||||
Workers = gproc_pool:active_workers(ecpool:name(Pool)),
|
||||
Info = [{pool_name, Pool}, {pool_size, Size},
|
||||
{pool_type, Type}, {workers, Workers}],
|
||||
{reply, Info, State};
|
||||
|
||||
handle_call(_Request, _From, State) ->
|
||||
{reply, {error, unexpected_req}, State}.
|
||||
|
||||
handle_cast(_Msg, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
handle_info(_Info, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, #state{name = Pool, size = Size}) ->
|
||||
lists:foreach(fun(I) ->
|
||||
gproc_pool:remove_worker(ecpool:name(Pool), {Pool, I})
|
||||
end, lists:seq(1, Size)),
|
||||
gproc_pool:delete(ecpool:name(Pool)).
|
||||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
{ok, State}.
|
||||
|
|
@ -0,0 +1,46 @@
|
|||
%%%-----------------------------------------------------------------------------
|
||||
%%% Copyright (c) 2015 eMQTT.IO, All Rights Reserved.
|
||||
%%%
|
||||
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
%%% of this software and associated documentation files (the "Software"), to deal
|
||||
%%% in the Software without restriction, including without limitation the rights
|
||||
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
%%% copies of the Software, and to permit persons to whom the Software is
|
||||
%%% furnished to do so, subject to the following conditions:
|
||||
%%%
|
||||
%%% The above copyright notice and this permission notice shall be included in all
|
||||
%%% copies or substantial portions of the Software.
|
||||
%%%
|
||||
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
%%% SOFTWARE.
|
||||
%%%-----------------------------------------------------------------------------
|
||||
%%% @doc ecpool pool supervisor.
|
||||
%%%
|
||||
%%% @author Feng Lee <feng@emqtt.io>
|
||||
%%%-----------------------------------------------------------------------------
|
||||
|
||||
-module(ecpool_pool_sup).
|
||||
|
||||
-behaviour(supervisor).
|
||||
|
||||
%% API
|
||||
-export([start_link/3]).
|
||||
|
||||
%% Supervisor callbacks
|
||||
-export([init/1]).
|
||||
|
||||
start_link(Pool, Mod, Opts) ->
|
||||
supervisor:start_link(?MODULE, [Pool, Mod, Opts]).
|
||||
|
||||
init([Pool, Mod, Opts]) ->
|
||||
{ok, { {one_for_all, 10, 100}, [
|
||||
{pool, {ecpool_pool, start_link, [Pool, Opts]},
|
||||
transient, 16#ffff, worker, [ecpool_pool]},
|
||||
{worker_sup, {ecpool_worker_sup, start_link, [Pool, Mod, Opts]},
|
||||
transient, infinity, supervisor, [ecpool_worker_sup]}] }}.
|
||||
|
|
@ -0,0 +1,83 @@
|
|||
%%%-----------------------------------------------------------------------------
|
||||
%%% Copyright (c) 2015 eMQTT.IO, All Rights Reserved.
|
||||
%%%
|
||||
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
%%% of this software and associated documentation files (the "Software"), to deal
|
||||
%%% in the Software without restriction, including without limitation the rights
|
||||
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
%%% copies of the Software, and to permit persons to whom the Software is
|
||||
%%% furnished to do so, subject to the following conditions:
|
||||
%%%
|
||||
%%% The above copyright notice and this permission notice shall be included in all
|
||||
%%% copies or substantial portions of the Software.
|
||||
%%%
|
||||
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
%%% SOFTWARE.
|
||||
%%%-----------------------------------------------------------------------------
|
||||
%%% @doc ecpool supervisor.
|
||||
%%%
|
||||
%%% @author Feng Lee <feng@emqtt.io>
|
||||
%%%-----------------------------------------------------------------------------
|
||||
|
||||
-module(ecpool_sup).
|
||||
|
||||
-behaviour(supervisor).
|
||||
|
||||
%% API
|
||||
-export([start_link/0, start_pool/3, stop_pool/1, pools/0, pool/1]).
|
||||
|
||||
%% Supervisor callbacks
|
||||
-export([init/1]).
|
||||
|
||||
%% @doc Start supervisor.
|
||||
-spec start_link() -> {ok, pid()}.
|
||||
start_link() ->
|
||||
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
||||
|
||||
start_pool(Pool, Mod, Opts) when is_atom(Pool) ->
|
||||
supervisor:start_child(?MODULE, pool_spec(Pool, Mod, Opts)).
|
||||
|
||||
-spec stop_pool(Pool :: atom()) -> ok | {error, any()}.
|
||||
stop_pool(Pool) when is_atom(Pool) ->
|
||||
ChildId = child_id(Pool),
|
||||
case supervisor:terminate_child(?MODULE, ChildId) of
|
||||
ok ->
|
||||
supervisor:delete_child(?MODULE, ChildId);
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
%% @doc All Pools supervisored by ecpool_sup.
|
||||
-spec pools() -> [{atom(), pid()}].
|
||||
pools() ->
|
||||
[{Pool, Pid} || {{pool_sup, Pool}, Pid, supervisor, _}
|
||||
<- supervisor:which_children(?MODULE)].
|
||||
|
||||
%% @doc Find a pool.
|
||||
-spec pool(atom()) -> undefined | pid().
|
||||
pool(Pool) when is_atom(Pool) ->
|
||||
ChildId = child_id(Pool),
|
||||
case [Pid || {Id, Pid, supervisor, _} <- supervisor:which_children(?MODULE), Id =:= ChildId] of
|
||||
[] -> undefined;
|
||||
L -> hd(L)
|
||||
end.
|
||||
|
||||
%%%=============================================================================
|
||||
%%% Supervisor callbacks
|
||||
%%%=============================================================================
|
||||
|
||||
init([]) ->
|
||||
{ok, { {one_for_one, 10, 100}, []} }.
|
||||
|
||||
pool_spec(Pool, Mod, Opts) ->
|
||||
{child_id(Pool),
|
||||
{ecpool_pool_sup, start_link, [Pool, Mod, Opts]},
|
||||
transient, infinity, supervisor, [ecpool_pool_sup]}.
|
||||
|
||||
child_id(Pool) -> {pool_sup, Pool}.
|
||||
|
|
@ -0,0 +1,143 @@
|
|||
%%%-----------------------------------------------------------------------------
|
||||
%%% Copyright (c) 2015-2016 eMQTT.IO, All Rights Reserved.
|
||||
%%%
|
||||
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
%%% of this software and associated documentation files (the "Software"), to deal
|
||||
%%% in the Software without restriction, including without limitation the rights
|
||||
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
%%% copies of the Software, and to permit persons to whom the Software is
|
||||
%%% furnished to do so, subject to the following conditions:
|
||||
%%%
|
||||
%%% The above copyright notice and this permission notice shall be included in all
|
||||
%%% copies or substantial portions of the Software.
|
||||
%%%
|
||||
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
%%% SOFTWARE.
|
||||
%%%-----------------------------------------------------------------------------
|
||||
%%% @doc ecpool worker.
|
||||
%%%
|
||||
%%% @author Feng Lee <feng@emqtt.io>
|
||||
%%%-----------------------------------------------------------------------------
|
||||
|
||||
-module(ecpool_worker).
|
||||
|
||||
-behaviour(gen_server).
|
||||
|
||||
%% API Function Exports
|
||||
-export([start_link/4, client/1]).
|
||||
|
||||
%% gen_server Function Exports
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||
terminate/2, code_change/3]).
|
||||
|
||||
-record(state, {pool, id, client, mod, opts}).
|
||||
|
||||
%%%=============================================================================
|
||||
%%% Callback
|
||||
%%%=============================================================================
|
||||
|
||||
-ifdef(use_specs).
|
||||
|
||||
-callback connect(ConnOpts :: list()) -> {ok, pid()} | {error, any()}.
|
||||
|
||||
-else.
|
||||
|
||||
-export([behaviour_info/1]).
|
||||
|
||||
behaviour_info(callbacks) ->
|
||||
[{connect, 1}];
|
||||
|
||||
behaviour_info(_Other) ->
|
||||
undefined.
|
||||
|
||||
-endif.
|
||||
|
||||
%%%=============================================================================
|
||||
%%% API
|
||||
%%%=============================================================================
|
||||
|
||||
%% @doc Start the pool worker.
|
||||
-spec start_link(atom(), pos_integer(), module(), list()) ->
|
||||
{ok, pid()} | ignore | {error, any()}.
|
||||
start_link(Pool, Id, Mod, Opts) ->
|
||||
gen_server:start_link(?MODULE, [Pool, Id, Mod, Opts], []).
|
||||
|
||||
%% @doc Get client/connection.
|
||||
-spec client(pid()) -> undefined | pid().
|
||||
client(Pid) -> gen_server:call(Pid, client, infinity).
|
||||
|
||||
%%%=============================================================================
|
||||
%%% gen_server callbacks
|
||||
%%%=============================================================================
|
||||
|
||||
init([Pool, Id, Mod, Opts]) ->
|
||||
process_flag(trap_exit, true),
|
||||
State = #state{pool = Pool, id = Id, mod = Mod, opts = Opts},
|
||||
case connect(State) of
|
||||
{ok, Client} ->
|
||||
gproc_pool:connect_worker(ecpool:name(Pool), {Pool, Id}),
|
||||
{ok, State#state{client = Client}};
|
||||
{error, Error} ->
|
||||
{stop, Error}
|
||||
end.
|
||||
|
||||
handle_call(client, _From, State = #state{client = undefined}) ->
|
||||
{reply, {error, disconnected}, State};
|
||||
|
||||
handle_call(client, _From, State = #state{client = Client}) ->
|
||||
{reply, {ok, Client}, State}.
|
||||
|
||||
handle_cast(_Msg, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
handle_info({'EXIT', Pid, Reason}, State = #state{client = Pid, opts = Opts}) ->
|
||||
case proplists:get_value(auto_reconnect, Opts, false) of
|
||||
false ->
|
||||
{stop, Reason, State};
|
||||
Secs ->
|
||||
erlang:send_after(timer:seconds(Secs), self(), reconnect),
|
||||
{noreply, State#state{client = undefined}}
|
||||
end;
|
||||
|
||||
handle_info(reconnect, State = #state{opts = Opts}) ->
|
||||
case connect(State) of
|
||||
{ok, Client} ->
|
||||
{noreply, State#state{client = Client}};
|
||||
{error, _Error} ->
|
||||
Secs = proplists:get_value(auto_reconnect, Opts),
|
||||
erlang:send_after(timer:seconds(Secs), self(), reconnect),
|
||||
{noreply, State#state{client = undefined}}
|
||||
end;
|
||||
|
||||
handle_info(_Info, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, #state{pool = Pool, id = Id}) ->
|
||||
gproc_pool:disconnect_worker(ecpool:name(Pool), {Pool, Id}).
|
||||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
%%%=============================================================================
|
||||
%%% Internal Functions
|
||||
%%%=============================================================================
|
||||
|
||||
connect(#state{mod = Mod, opts = Opts}) ->
|
||||
Mod:connect(connopts(Opts, [])).
|
||||
|
||||
connopts([], Acc) ->
|
||||
Acc;
|
||||
connopts([{pool_size, _} | Opts], Acc) ->
|
||||
connopts(Opts, Acc);
|
||||
connopts([{pool_type, _} | Opts], Acc) ->
|
||||
connopts(Opts, Acc);
|
||||
connopts([{auto_reconnect, _} | Opts], Acc) ->
|
||||
connopts(Opts, Acc);
|
||||
connopts([Opt | Opts], Acc) ->
|
||||
connopts(Opts, [Opt | Acc]).
|
||||
|
|
@ -0,0 +1,51 @@
|
|||
%%%-----------------------------------------------------------------------------
|
||||
%%% Copyright (c) 2015 eMQTT.IO, All Rights Reserved.
|
||||
%%%
|
||||
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
%%% of this software and associated documentation files (the "Software"), to deal
|
||||
%%% in the Software without restriction, including without limitation the rights
|
||||
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
%%% copies of the Software, and to permit persons to whom the Software is
|
||||
%%% furnished to do so, subject to the following conditions:
|
||||
%%%
|
||||
%%% The above copyright notice and this permission notice shall be included in all
|
||||
%%% copies or substantial portions of the Software.
|
||||
%%%
|
||||
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
%%% SOFTWARE.
|
||||
%%%-----------------------------------------------------------------------------
|
||||
%%% @doc ecpool worker supervisor.
|
||||
%%%
|
||||
%%% @author Feng Lee <feng@emqtt.io>
|
||||
%%%-----------------------------------------------------------------------------
|
||||
|
||||
-module(ecpool_worker_sup).
|
||||
|
||||
-behaviour(supervisor).
|
||||
|
||||
-export([start_link/3, workers/1, init/1]).
|
||||
|
||||
start_link(Pool, Mod, Opts) when is_atom(Pool) ->
|
||||
supervisor:start_link(?MODULE, [Pool, Mod, Opts]).
|
||||
|
||||
workers(WorkerSup) ->
|
||||
[{ChildId, Pid} || {ChildId, Pid, worker, _}
|
||||
<- supervisor:which_children(WorkerSup)].
|
||||
|
||||
init([Pool, Mod, Opts]) ->
|
||||
WorkerSpec = fun(Id) ->
|
||||
{{worker, Id}, {ecpool_worker, start_link, [Pool, Id, Mod, Opts]},
|
||||
transient, 5000, worker, [ecpool_worker]}
|
||||
end,
|
||||
Workers = [WorkerSpec(I) || I <- lists:seq(1, pool_size(Opts))],
|
||||
{ok, { {one_for_one, 10, 60}, Workers} }.
|
||||
|
||||
pool_size(Opts) ->
|
||||
Schedulers = erlang:system_info(schedulers),
|
||||
proplists:get_value(pool_size, Opts, Schedulers).
|
||||
|
|
@ -0,0 +1,32 @@
|
|||
|
||||
-module(ecpool_test).
|
||||
|
||||
-ifdef(TEST).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
-define(POOL_OPTS, [
|
||||
%% schedulers number
|
||||
{pool_size, 10},
|
||||
%% round-robbin | random | hash
|
||||
{pool_type, random},
|
||||
%% false | pos_integer()
|
||||
{auto_reconnect, false},
|
||||
|
||||
{host, "localhost"},
|
||||
{port, 5432},
|
||||
{username, "feng"},
|
||||
{password, ""},
|
||||
{database, "mqtt"},
|
||||
{encoding, utf8}]).
|
||||
|
||||
pool_test() ->
|
||||
application:start(gproc),
|
||||
application:start(ecpool),
|
||||
ecpool:start_pool(test_pool, test_client, ?POOL_OPTS).
|
||||
%%ecpool:stop_pool(test_pool),
|
||||
%%application:stop(ecpool),
|
||||
%%application:stop(gproc).
|
||||
|
||||
|
||||
-endif.
|
|
@ -0,0 +1,54 @@
|
|||
|
||||
-module(test_client).
|
||||
|
||||
-behaviour(ecpool_worker).
|
||||
|
||||
-behaviour(gen_server).
|
||||
-define(SERVER, ?MODULE).
|
||||
|
||||
%% ------------------------------------------------------------------
|
||||
%% API Function Exports
|
||||
%% ------------------------------------------------------------------
|
||||
|
||||
-export([connect/1]).
|
||||
|
||||
%% ------------------------------------------------------------------
|
||||
%% gen_server Function Exports
|
||||
%% ------------------------------------------------------------------
|
||||
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||
terminate/2, code_change/3]).
|
||||
|
||||
%% ------------------------------------------------------------------
|
||||
%% API Function Definitions
|
||||
%% ------------------------------------------------------------------
|
||||
|
||||
connect(Opts) ->
|
||||
gen_server:start_link(?MODULE, [Opts], []).
|
||||
|
||||
%% ------------------------------------------------------------------
|
||||
%% gen_server Function Definitions
|
||||
%% ------------------------------------------------------------------
|
||||
|
||||
init(Args) ->
|
||||
{ok, Args}.
|
||||
|
||||
handle_call(_Request, _From, State) ->
|
||||
{reply, ok, State}.
|
||||
|
||||
handle_cast(_Msg, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
handle_info(_Info, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, _State) ->
|
||||
ok.
|
||||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
%% ------------------------------------------------------------------
|
||||
%% Internal Function Definitions
|
||||
%% ------------------------------------------------------------------
|
||||
|
Loading…
Reference in New Issue