diff --git a/apps/emqttd/src/emqttd_cm.erl b/apps/emqttd/src/emqttd_cm.erl index d425646e6..7758f8ec0 100644 --- a/apps/emqttd/src/emqttd_cm.erl +++ b/apps/emqttd/src/emqttd_cm.erl @@ -33,7 +33,7 @@ -define(SERVER, ?MODULE). %% API Exports --export([start_link/3]). +-export([start_link/2, pool/0, table/0]). -export([lookup/1, register/1, unregister/1]). @@ -43,7 +43,9 @@ -record(state, {id, tab, statsfun}). --define(POOL, cm_pool). +-define(CM_POOL, cm_pool). + +-define(CLIENT_TAB, mqtt_client). %%%============================================================================= %%% API @@ -53,12 +55,15 @@ %% @doc Start client manager %% @end %%------------------------------------------------------------------------------ --spec start_link(Id, TabId, StatsFun) -> {ok, pid()} | ignore | {error, any()} when +-spec start_link(Id, StatsFun) -> {ok, pid()} | ignore | {error, any()} when Id :: pos_integer(), - TabId :: ets:tid(), StatsFun :: fun(). -start_link(Id, TabId, StatsFun) -> - gen_server:start_link(?MODULE, [Id, TabId, StatsFun], []). +start_link(Id, StatsFun) -> + gen_server:start_link(?MODULE, [Id, StatsFun], []). + +pool() -> ?CM_POOL. + +table() -> ?CLIENT_TAB. %%------------------------------------------------------------------------------ %% @doc Lookup client pid with clientId @@ -66,7 +71,7 @@ start_link(Id, TabId, StatsFun) -> %%------------------------------------------------------------------------------ -spec lookup(ClientId :: binary()) -> pid() | undefined. lookup(ClientId) when is_binary(ClientId) -> - case ets:lookup(emqttd_cm_sup:table(), ClientId) of + case ets:lookup(?CLIENT_TAB, ClientId) of [{_, Pid, _}] -> Pid; [] -> undefined end. @@ -77,7 +82,7 @@ lookup(ClientId) when is_binary(ClientId) -> %%------------------------------------------------------------------------------ -spec register(ClientId :: binary()) -> ok. register(ClientId) when is_binary(ClientId) -> - CmPid = gproc_pool:pick_worker(?POOL, ClientId), + CmPid = gproc_pool:pick_worker(?CM_POOL, ClientId), gen_server:call(CmPid, {register, ClientId, self()}, infinity). %%------------------------------------------------------------------------------ @@ -86,19 +91,19 @@ register(ClientId) when is_binary(ClientId) -> %%------------------------------------------------------------------------------ -spec unregister(ClientId :: binary()) -> ok. unregister(ClientId) when is_binary(ClientId) -> - CmPid = gproc_pool:pick_worker(?POOL, ClientId), + CmPid = gproc_pool:pick_worker(?CM_POOL, ClientId), gen_server:cast(CmPid, {unregister, ClientId, self()}). %%%============================================================================= %%% gen_server callbacks %%%============================================================================= -init([Id, TabId, StatsFun]) -> - gproc_pool:connect_worker(?POOL, {?MODULE, Id}), - {ok, #state{id = Id, tab = TabId, statsfun = StatsFun}}. +init([Id, StatsFun]) -> + gproc_pool:connect_worker(?CM_POOL, {?MODULE, Id}), + {ok, #state{id = Id, statsfun = StatsFun}}. -handle_call({register, ClientId, Pid}, _From, State = #state{tab = Tab}) -> - case ets:lookup(Tab, ClientId) of +handle_call({register, ClientId, Pid}, _From, State) -> + case ets:lookup(?CLIENT_TAB, ClientId) of [{_, Pid, _}] -> lager:error("clientId '~s' has been registered with ~p", [ClientId, Pid]), ignore; @@ -106,9 +111,9 @@ handle_call({register, ClientId, Pid}, _From, State = #state{tab = Tab}) -> lager:error("clientId '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, Pid, OldPid]), OldPid ! {stop, duplicate_id, Pid}, erlang:demonitor(MRef), - ets:insert(Tab, {ClientId, Pid, erlang:monitor(process, Pid)}); + ets:insert(?CLIENT_TAB, {ClientId, Pid, erlang:monitor(process, Pid)}); [] -> - ets:insert(Tab, {ClientId, Pid, erlang:monitor(process, Pid)}) + ets:insert(?CLIENT_TAB, {ClientId, Pid, erlang:monitor(process, Pid)}) end, {reply, ok, setstats(State)}; @@ -116,11 +121,11 @@ handle_call(Req, _From, State) -> lager:error("unexpected request: ~p", [Req]), {reply, {error, badreq}, State}. -handle_cast({unregister, ClientId, Pid}, State = #state{tab = TabId}) -> - case ets:lookup(TabId, ClientId) of +handle_cast({unregister, ClientId, Pid}, State) -> + case ets:lookup(?CLIENT_TAB, ClientId) of [{_, Pid, MRef}] -> erlang:demonitor(MRef, [flush]), - ets:delete(TabId, ClientId); + ets:delete(?CLIENT_TAB, ClientId); [_] -> ignore; [] -> @@ -131,15 +136,15 @@ handle_cast({unregister, ClientId, Pid}, State = #state{tab = TabId}) -> handle_cast(_Msg, State) -> {noreply, State}. -handle_info({'DOWN', MRef, process, DownPid, _Reason}, State = #state{tab = TabId}) -> - ets:match_delete(TabId, {'_', DownPid, MRef}), +handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> + ets:match_delete(?CLIENT_TAB, {'_', DownPid, MRef}), {noreply, setstats(State)}; handle_info(_Info, State) -> {noreply, State}. terminate(_Reason, #state{id = Id}) -> - gproc_pool:disconnect_worker(?POOL, {?MODULE, Id}), ok. + gproc_pool:disconnect_worker(?CM_POOL, {?MODULE, Id}), ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -148,6 +153,6 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================= -setstats(State = #state{tab = TabId, statsfun = StatsFun}) -> - StatsFun(ets:info(TabId, size)), State. +setstats(State = #state{statsfun = StatsFun}) -> + StatsFun(ets:info(?CLIENT_TAB, size)), State. diff --git a/apps/emqttd/src/emqttd_cm_sup.erl b/apps/emqttd/src/emqttd_cm_sup.erl index 666b87548..53a338404 100644 --- a/apps/emqttd/src/emqttd_cm_sup.erl +++ b/apps/emqttd/src/emqttd_cm_sup.erl @@ -33,30 +33,26 @@ -behaviour(supervisor). %% API --export([start_link/0, table/0]). +-export([start_link/0]). %% Supervisor callbacks -export([init/1]). --define(CLIENT_TAB, mqtt_client). - start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). -table() -> ?CLIENT_TAB. - init([]) -> - TabId = ets:new(?CLIENT_TAB, [set, named_table, public, - {write_concurrency, true}]), + ets:new(emqttd_cm:table(), [set, named_table, public, + {write_concurrency, true}]), Schedulers = erlang:system_info(schedulers), - gproc_pool:new(cm_pool, hash, [{size, Schedulers}]), + gproc_pool:new(emqttd_cm:pool(), hash, [{size, Schedulers}]), StatsFun = emqttd_stats:statsfun('clients/count', 'clients/max'), Children = lists:map( fun(I) -> Name = {emqttd_cm, I}, - gproc_pool:add_worker(cm_pool, Name, I), - {Name, {emqttd_cm, start_link, [I, TabId, StatsFun]}, - permanent, 10000, worker, [emqttd_cm]} + gproc_pool:add_worker(emqttd_cm:pool(), Name, I), + {Name, {emqttd_cm, start_link, [I, StatsFun]}, + permanent, 10000, worker, [emqttd_cm]} end, lists:seq(1, Schedulers)), {ok, {{one_for_all, 10, 100}, Children}}. diff --git a/apps/emqttd/src/emqttd_sm.erl b/apps/emqttd/src/emqttd_sm.erl index 6a1c87325..05d9e62b0 100644 --- a/apps/emqttd/src/emqttd_sm.erl +++ b/apps/emqttd/src/emqttd_sm.erl @@ -45,7 +45,7 @@ -behaviour(gen_server). %% API Function Exports --export([start_link/3]). +-export([start_link/2, pool/0, table/0]). -export([lookup_session/1, start_session/2, destroy_session/1]). @@ -55,17 +55,35 @@ -record(state, {id, tabid, statsfun}). --define(POOL, sm_pool). +-define(SM_POOL, sm_pool). + +-define(SESSION_TAB, mqtt_session). %%%============================================================================= %%% API %%%============================================================================= --spec start_link(Id, TabId, StatsFun) -> {ok, pid()} | ignore | {error, any()} when + +%%------------------------------------------------------------------------------ +%% @doc Start a session manager +%% @end +%%------------------------------------------------------------------------------ +-spec start_link(Id, StatsFun) -> {ok, pid()} | ignore | {error, any()} when Id :: pos_integer(), - TabId :: ets:tid(), StatsFun :: fun(). -start_link(Id, TabId, StatsFun) -> - gen_server:start_link(?MODULE, [Id, TabId, StatsFun], []). +start_link(Id, StatsFun) -> + gen_server:start_link(?MODULE, [Id, StatsFun], []). + +%%------------------------------------------------------------------------------ +%% @doc Pool name. +%% @end +%%------------------------------------------------------------------------------ +pool() -> ?SM_POOL. + +%%------------------------------------------------------------------------------ +%% @doc Table name. +%% @end +%%------------------------------------------------------------------------------ +table() -> ?SESSION_TAB. %%------------------------------------------------------------------------------ %% @doc Lookup Session Pid @@ -84,7 +102,7 @@ lookup_session(ClientId) -> %%------------------------------------------------------------------------------ -spec start_session(binary(), pid()) -> {ok, pid()} | {error, any()}. start_session(ClientId, ClientPid) -> - SmPid = gproc_pool:pick_worker(?POOL, ClientId), + SmPid = gproc_pool:pick_worker(?SM_POOL, ClientId), gen_server:call(SmPid, {start_session, ClientId, ClientPid}). %%------------------------------------------------------------------------------ @@ -93,28 +111,27 @@ start_session(ClientId, ClientPid) -> %%------------------------------------------------------------------------------ -spec destroy_session(binary()) -> ok. destroy_session(ClientId) -> - SmPid = gproc_pool:pick_worker(?POOL, ClientId), + SmPid = gproc_pool:pick_worker(?SM_POOL, ClientId), gen_server:call(SmPid, {destroy_session, ClientId}). %%%============================================================================= %%% gen_server callbacks %%%============================================================================= -init([Id, TabId, StatsFun]) -> - gproc_pool:connect_worker(?POOL, {?MODULE, Id}), - {ok, #state{id = Id, tabid = TabId, statsfun = StatsFun}}. +init([Id, StatsFun]) -> + gproc_pool:connect_worker(?SM_POOL, {?MODULE, Id}), + {ok, #state{id = Id, statsfun = StatsFun}}. -handle_call({start_session, ClientId, ClientPid}, _From, State = #state{tabid = Tab}) -> +handle_call({start_session, ClientId, ClientPid}, _From, State) -> Reply = - case ets:lookup(Tab, ClientId) of + case ets:lookup(?SESSION_TAB, ClientId) of [{_, SessPid, _MRef}] -> emqttd_session:resume(SessPid, ClientId, ClientPid), {ok, SessPid}; [] -> case emqttd_session_sup:start_session(ClientId, ClientPid) of {ok, SessPid} -> - ets:insert(Tab, {ClientId, SessPid, - erlang:monitor(process, SessPid)}), + ets:insert(?SESSION_TAB, {ClientId, SessPid, erlang:monitor(process, SessPid)}), {ok, SessPid}; {error, Error} -> {error, Error} @@ -122,12 +139,12 @@ handle_call({start_session, ClientId, ClientPid}, _From, State = #state{tabid = end, {reply, Reply, setstats(State)}; -handle_call({destroy_session, ClientId}, _From, State = #state{tabid = Tab}) -> - case ets:lookup(Tab, ClientId) of +handle_call({destroy_session, ClientId}, _From, State) -> + case ets:lookup(?SESSION_TAB, ClientId) of [{_, SessPid, MRef}] -> emqttd_session:destroy(SessPid, ClientId), erlang:demonitor(MRef, [flush]), - ets:delete(Tab, ClientId); + ets:delete(?SESSION_TAB, ClientId); [] -> ignore end, @@ -147,7 +164,7 @@ handle_info(_Info, State) -> {noreply, State}. terminate(_Reason, #state{id = Id}) -> - gproc_pool:disconnect_worker(?POOL, {?MODULE, Id}), ok. + gproc_pool:disconnect_worker(?SM_POOL, {?MODULE, Id}), ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -156,6 +173,6 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================= -setstats(State = #state{tabid = TabId, statsfun = StatsFun}) -> - StatsFun(ets:info(TabId, size)), State. +setstats(State = #state{statsfun = StatsFun}) -> + StatsFun(ets:info(?SESSION_TAB, size)), State. diff --git a/apps/emqttd/src/emqttd_sm_sup.erl b/apps/emqttd/src/emqttd_sm_sup.erl index 15bdf13ec..ece44dd38 100644 --- a/apps/emqttd/src/emqttd_sm_sup.erl +++ b/apps/emqttd/src/emqttd_sm_sup.erl @@ -31,32 +31,28 @@ -include("emqttd.hrl"). %% API --export([start_link/0, table/0]). +-export([start_link/0]). -behaviour(supervisor). %% Supervisor callbacks -export([init/1]). --define(SESSION_TAB, mqtt_session). - start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). -table() -> ?SESSION_TAB. - init([]) -> - TabId = ets:new(?SESSION_TAB, [set, named_table, public, - {write_concurrency, true}]), + ets:new(emqttd_sm:table(), [set, named_table, public, + {write_concurrency, true}]), Schedulers = erlang:system_info(schedulers), - gproc_pool:new(sm_pool, hash, [{size, Schedulers}]), + gproc_pool:new(emqttd_sm:pool(), hash, [{size, Schedulers}]), StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'), Children = lists:map( fun(I) -> Name = {emqttd_sm, I}, - gproc_pool:add_worker(sm_pool, Name, I), - {Name, {emqttd_sm, start_link, [I, TabId, StatsFun]}, - permanent, 10000, worker, [emqttd_sm]} + gproc_pool:add_worker(emqttd_sm:pool(), Name, I), + {Name, {emqttd_sm, start_link, [I, StatsFun]}, + permanent, 10000, worker, [emqttd_sm]} end, lists:seq(1, Schedulers)), {ok, {{one_for_all, 10, 100}, Children}}.