fix pool, table

This commit is contained in:
Feng Lee 2015-05-22 18:39:35 +08:00
parent e911025811
commit f75c807aaf
4 changed files with 81 additions and 67 deletions

View File

@ -33,7 +33,7 @@
-define(SERVER, ?MODULE).
%% API Exports
-export([start_link/3]).
-export([start_link/2, pool/0, table/0]).
-export([lookup/1, register/1, unregister/1]).
@ -43,7 +43,9 @@
-record(state, {id, tab, statsfun}).
-define(POOL, cm_pool).
-define(CM_POOL, cm_pool).
-define(CLIENT_TAB, mqtt_client).
%%%=============================================================================
%%% API
@ -53,12 +55,15 @@
%% @doc Start client manager
%% @end
%%------------------------------------------------------------------------------
-spec start_link(Id, TabId, StatsFun) -> {ok, pid()} | ignore | {error, any()} when
-spec start_link(Id, StatsFun) -> {ok, pid()} | ignore | {error, any()} when
Id :: pos_integer(),
TabId :: ets:tid(),
StatsFun :: fun().
start_link(Id, TabId, StatsFun) ->
gen_server:start_link(?MODULE, [Id, TabId, StatsFun], []).
start_link(Id, StatsFun) ->
gen_server:start_link(?MODULE, [Id, StatsFun], []).
pool() -> ?CM_POOL.
table() -> ?CLIENT_TAB.
%%------------------------------------------------------------------------------
%% @doc Lookup client pid with clientId
@ -66,7 +71,7 @@ start_link(Id, TabId, StatsFun) ->
%%------------------------------------------------------------------------------
-spec lookup(ClientId :: binary()) -> pid() | undefined.
lookup(ClientId) when is_binary(ClientId) ->
case ets:lookup(emqttd_cm_sup:table(), ClientId) of
case ets:lookup(?CLIENT_TAB, ClientId) of
[{_, Pid, _}] -> Pid;
[] -> undefined
end.
@ -77,7 +82,7 @@ lookup(ClientId) when is_binary(ClientId) ->
%%------------------------------------------------------------------------------
-spec register(ClientId :: binary()) -> ok.
register(ClientId) when is_binary(ClientId) ->
CmPid = gproc_pool:pick_worker(?POOL, ClientId),
CmPid = gproc_pool:pick_worker(?CM_POOL, ClientId),
gen_server:call(CmPid, {register, ClientId, self()}, infinity).
%%------------------------------------------------------------------------------
@ -86,19 +91,19 @@ register(ClientId) when is_binary(ClientId) ->
%%------------------------------------------------------------------------------
-spec unregister(ClientId :: binary()) -> ok.
unregister(ClientId) when is_binary(ClientId) ->
CmPid = gproc_pool:pick_worker(?POOL, ClientId),
CmPid = gproc_pool:pick_worker(?CM_POOL, ClientId),
gen_server:cast(CmPid, {unregister, ClientId, self()}).
%%%=============================================================================
%%% gen_server callbacks
%%%=============================================================================
init([Id, TabId, StatsFun]) ->
gproc_pool:connect_worker(?POOL, {?MODULE, Id}),
{ok, #state{id = Id, tab = TabId, statsfun = StatsFun}}.
init([Id, StatsFun]) ->
gproc_pool:connect_worker(?CM_POOL, {?MODULE, Id}),
{ok, #state{id = Id, statsfun = StatsFun}}.
handle_call({register, ClientId, Pid}, _From, State = #state{tab = Tab}) ->
case ets:lookup(Tab, ClientId) of
handle_call({register, ClientId, Pid}, _From, State) ->
case ets:lookup(?CLIENT_TAB, ClientId) of
[{_, Pid, _}] ->
lager:error("clientId '~s' has been registered with ~p", [ClientId, Pid]),
ignore;
@ -106,9 +111,9 @@ handle_call({register, ClientId, Pid}, _From, State = #state{tab = Tab}) ->
lager:error("clientId '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, Pid, OldPid]),
OldPid ! {stop, duplicate_id, Pid},
erlang:demonitor(MRef),
ets:insert(Tab, {ClientId, Pid, erlang:monitor(process, Pid)});
ets:insert(?CLIENT_TAB, {ClientId, Pid, erlang:monitor(process, Pid)});
[] ->
ets:insert(Tab, {ClientId, Pid, erlang:monitor(process, Pid)})
ets:insert(?CLIENT_TAB, {ClientId, Pid, erlang:monitor(process, Pid)})
end,
{reply, ok, setstats(State)};
@ -116,11 +121,11 @@ handle_call(Req, _From, State) ->
lager:error("unexpected request: ~p", [Req]),
{reply, {error, badreq}, State}.
handle_cast({unregister, ClientId, Pid}, State = #state{tab = TabId}) ->
case ets:lookup(TabId, ClientId) of
handle_cast({unregister, ClientId, Pid}, State) ->
case ets:lookup(?CLIENT_TAB, ClientId) of
[{_, Pid, MRef}] ->
erlang:demonitor(MRef, [flush]),
ets:delete(TabId, ClientId);
ets:delete(?CLIENT_TAB, ClientId);
[_] ->
ignore;
[] ->
@ -131,15 +136,15 @@ handle_cast({unregister, ClientId, Pid}, State = #state{tab = TabId}) ->
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({'DOWN', MRef, process, DownPid, _Reason}, State = #state{tab = TabId}) ->
ets:match_delete(TabId, {'_', DownPid, MRef}),
handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
ets:match_delete(?CLIENT_TAB, {'_', DownPid, MRef}),
{noreply, setstats(State)};
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, #state{id = Id}) ->
gproc_pool:disconnect_worker(?POOL, {?MODULE, Id}), ok.
gproc_pool:disconnect_worker(?CM_POOL, {?MODULE, Id}), ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@ -148,6 +153,6 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%%=============================================================================
setstats(State = #state{tab = TabId, statsfun = StatsFun}) ->
StatsFun(ets:info(TabId, size)), State.
setstats(State = #state{statsfun = StatsFun}) ->
StatsFun(ets:info(?CLIENT_TAB, size)), State.

View File

@ -33,30 +33,26 @@
-behaviour(supervisor).
%% API
-export([start_link/0, table/0]).
-export([start_link/0]).
%% Supervisor callbacks
-export([init/1]).
-define(CLIENT_TAB, mqtt_client).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
table() -> ?CLIENT_TAB.
init([]) ->
TabId = ets:new(?CLIENT_TAB, [set, named_table, public,
{write_concurrency, true}]),
ets:new(emqttd_cm:table(), [set, named_table, public,
{write_concurrency, true}]),
Schedulers = erlang:system_info(schedulers),
gproc_pool:new(cm_pool, hash, [{size, Schedulers}]),
gproc_pool:new(emqttd_cm:pool(), hash, [{size, Schedulers}]),
StatsFun = emqttd_stats:statsfun('clients/count', 'clients/max'),
Children = lists:map(
fun(I) ->
Name = {emqttd_cm, I},
gproc_pool:add_worker(cm_pool, Name, I),
{Name, {emqttd_cm, start_link, [I, TabId, StatsFun]},
permanent, 10000, worker, [emqttd_cm]}
gproc_pool:add_worker(emqttd_cm:pool(), Name, I),
{Name, {emqttd_cm, start_link, [I, StatsFun]},
permanent, 10000, worker, [emqttd_cm]}
end, lists:seq(1, Schedulers)),
{ok, {{one_for_all, 10, 100}, Children}}.

View File

@ -45,7 +45,7 @@
-behaviour(gen_server).
%% API Function Exports
-export([start_link/3]).
-export([start_link/2, pool/0, table/0]).
-export([lookup_session/1, start_session/2, destroy_session/1]).
@ -55,17 +55,35 @@
-record(state, {id, tabid, statsfun}).
-define(POOL, sm_pool).
-define(SM_POOL, sm_pool).
-define(SESSION_TAB, mqtt_session).
%%%=============================================================================
%%% API
%%%=============================================================================
-spec start_link(Id, TabId, StatsFun) -> {ok, pid()} | ignore | {error, any()} when
%%------------------------------------------------------------------------------
%% @doc Start a session manager
%% @end
%%------------------------------------------------------------------------------
-spec start_link(Id, StatsFun) -> {ok, pid()} | ignore | {error, any()} when
Id :: pos_integer(),
TabId :: ets:tid(),
StatsFun :: fun().
start_link(Id, TabId, StatsFun) ->
gen_server:start_link(?MODULE, [Id, TabId, StatsFun], []).
start_link(Id, StatsFun) ->
gen_server:start_link(?MODULE, [Id, StatsFun], []).
%%------------------------------------------------------------------------------
%% @doc Pool name.
%% @end
%%------------------------------------------------------------------------------
pool() -> ?SM_POOL.
%%------------------------------------------------------------------------------
%% @doc Table name.
%% @end
%%------------------------------------------------------------------------------
table() -> ?SESSION_TAB.
%%------------------------------------------------------------------------------
%% @doc Lookup Session Pid
@ -84,7 +102,7 @@ lookup_session(ClientId) ->
%%------------------------------------------------------------------------------
-spec start_session(binary(), pid()) -> {ok, pid()} | {error, any()}.
start_session(ClientId, ClientPid) ->
SmPid = gproc_pool:pick_worker(?POOL, ClientId),
SmPid = gproc_pool:pick_worker(?SM_POOL, ClientId),
gen_server:call(SmPid, {start_session, ClientId, ClientPid}).
%%------------------------------------------------------------------------------
@ -93,28 +111,27 @@ start_session(ClientId, ClientPid) ->
%%------------------------------------------------------------------------------
-spec destroy_session(binary()) -> ok.
destroy_session(ClientId) ->
SmPid = gproc_pool:pick_worker(?POOL, ClientId),
SmPid = gproc_pool:pick_worker(?SM_POOL, ClientId),
gen_server:call(SmPid, {destroy_session, ClientId}).
%%%=============================================================================
%%% gen_server callbacks
%%%=============================================================================
init([Id, TabId, StatsFun]) ->
gproc_pool:connect_worker(?POOL, {?MODULE, Id}),
{ok, #state{id = Id, tabid = TabId, statsfun = StatsFun}}.
init([Id, StatsFun]) ->
gproc_pool:connect_worker(?SM_POOL, {?MODULE, Id}),
{ok, #state{id = Id, statsfun = StatsFun}}.
handle_call({start_session, ClientId, ClientPid}, _From, State = #state{tabid = Tab}) ->
handle_call({start_session, ClientId, ClientPid}, _From, State) ->
Reply =
case ets:lookup(Tab, ClientId) of
case ets:lookup(?SESSION_TAB, ClientId) of
[{_, SessPid, _MRef}] ->
emqttd_session:resume(SessPid, ClientId, ClientPid),
{ok, SessPid};
[] ->
case emqttd_session_sup:start_session(ClientId, ClientPid) of
{ok, SessPid} ->
ets:insert(Tab, {ClientId, SessPid,
erlang:monitor(process, SessPid)}),
ets:insert(?SESSION_TAB, {ClientId, SessPid, erlang:monitor(process, SessPid)}),
{ok, SessPid};
{error, Error} ->
{error, Error}
@ -122,12 +139,12 @@ handle_call({start_session, ClientId, ClientPid}, _From, State = #state{tabid =
end,
{reply, Reply, setstats(State)};
handle_call({destroy_session, ClientId}, _From, State = #state{tabid = Tab}) ->
case ets:lookup(Tab, ClientId) of
handle_call({destroy_session, ClientId}, _From, State) ->
case ets:lookup(?SESSION_TAB, ClientId) of
[{_, SessPid, MRef}] ->
emqttd_session:destroy(SessPid, ClientId),
erlang:demonitor(MRef, [flush]),
ets:delete(Tab, ClientId);
ets:delete(?SESSION_TAB, ClientId);
[] ->
ignore
end,
@ -147,7 +164,7 @@ handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, #state{id = Id}) ->
gproc_pool:disconnect_worker(?POOL, {?MODULE, Id}), ok.
gproc_pool:disconnect_worker(?SM_POOL, {?MODULE, Id}), ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@ -156,6 +173,6 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%%=============================================================================
setstats(State = #state{tabid = TabId, statsfun = StatsFun}) ->
StatsFun(ets:info(TabId, size)), State.
setstats(State = #state{statsfun = StatsFun}) ->
StatsFun(ets:info(?SESSION_TAB, size)), State.

View File

@ -31,32 +31,28 @@
-include("emqttd.hrl").
%% API
-export([start_link/0, table/0]).
-export([start_link/0]).
-behaviour(supervisor).
%% Supervisor callbacks
-export([init/1]).
-define(SESSION_TAB, mqtt_session).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
table() -> ?SESSION_TAB.
init([]) ->
TabId = ets:new(?SESSION_TAB, [set, named_table, public,
{write_concurrency, true}]),
ets:new(emqttd_sm:table(), [set, named_table, public,
{write_concurrency, true}]),
Schedulers = erlang:system_info(schedulers),
gproc_pool:new(sm_pool, hash, [{size, Schedulers}]),
gproc_pool:new(emqttd_sm:pool(), hash, [{size, Schedulers}]),
StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'),
Children = lists:map(
fun(I) ->
Name = {emqttd_sm, I},
gproc_pool:add_worker(sm_pool, Name, I),
{Name, {emqttd_sm, start_link, [I, TabId, StatsFun]},
permanent, 10000, worker, [emqttd_sm]}
gproc_pool:add_worker(emqttd_sm:pool(), Name, I),
{Name, {emqttd_sm, start_link, [I, StatsFun]},
permanent, 10000, worker, [emqttd_sm]}
end, lists:seq(1, Schedulers)),
{ok, {{one_for_all, 10, 100}, Children}}.