From e91102581193086ad5585f642b5118ec6f5e8f5d Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Tue, 19 May 2015 00:14:20 +0800 Subject: [PATCH] emqttd_sm_sup --- apps/emqttd/src/emqttd_app.erl | 2 +- apps/emqttd/src/emqttd_sm.erl | 41 ++++++++++---------- apps/emqttd/src/emqttd_sm_sup.erl | 63 +++++++++++++++++++++++++++++++ 3 files changed, 85 insertions(+), 21 deletions(-) create mode 100644 apps/emqttd/src/emqttd_sm_sup.erl diff --git a/apps/emqttd/src/emqttd_app.erl b/apps/emqttd/src/emqttd_app.erl index 80bbd968e..95018f5eb 100644 --- a/apps/emqttd/src/emqttd_app.erl +++ b/apps/emqttd/src/emqttd_app.erl @@ -71,7 +71,7 @@ start_servers(Sup) -> {"emqttd trace", emqttd_trace}, {"emqttd pooler", {supervisor, emqttd_pooler_sup}}, {"emqttd client manager", {supervisor, emqttd_cm_sup}}, - {"emqttd session manager", emqttd_sm}, + {"emqttd session manager", {supervisor, emqttd_sm_sup}}, {"emqttd session supervisor", {supervisor, emqttd_session_sup}}, {"emqttd pubsub", {supervisor, emqttd_pubsub_sup}}, {"emqttd stats", emqttd_stats}, diff --git a/apps/emqttd/src/emqttd_sm.erl b/apps/emqttd/src/emqttd_sm.erl index ef26cb44e..6a1c87325 100644 --- a/apps/emqttd/src/emqttd_sm.erl +++ b/apps/emqttd/src/emqttd_sm.erl @@ -44,10 +44,8 @@ -behaviour(gen_server). --define(SERVER, ?MODULE). - %% API Function Exports --export([start_link/0]). +-export([start_link/3]). -export([lookup_session/1, start_session/2, destroy_session/1]). @@ -55,16 +53,19 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {tabid, statsfun}). +-record(state, {id, tabid, statsfun}). --define(SESSION_TAB, mqtt_session). +-define(POOL, sm_pool). %%%============================================================================= %%% API %%%============================================================================= --spec start_link() -> {ok, pid()} | ignore | {error, any()}. -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +-spec start_link(Id, TabId, StatsFun) -> {ok, pid()} | ignore | {error, any()} when + Id :: pos_integer(), + TabId :: ets:tid(), + StatsFun :: fun(). +start_link(Id, TabId, StatsFun) -> + gen_server:start_link(?MODULE, [Id, TabId, StatsFun], []). %%------------------------------------------------------------------------------ %% @doc Lookup Session Pid @@ -72,7 +73,7 @@ start_link() -> %%------------------------------------------------------------------------------ -spec lookup_session(binary()) -> pid() | undefined. lookup_session(ClientId) -> - case ets:lookup(?SESSION_TAB, ClientId) of + case ets:lookup(emqttd_sm_sup:table(), ClientId) of [{_, SessPid, _}] -> SessPid; [] -> undefined end. @@ -83,7 +84,8 @@ lookup_session(ClientId) -> %%------------------------------------------------------------------------------ -spec start_session(binary(), pid()) -> {ok, pid()} | {error, any()}. start_session(ClientId, ClientPid) -> - gen_server:call(?SERVER, {start_session, ClientId, ClientPid}). + SmPid = gproc_pool:pick_worker(?POOL, ClientId), + gen_server:call(SmPid, {start_session, ClientId, ClientPid}). %%------------------------------------------------------------------------------ %% @doc Destroy a session @@ -91,17 +93,16 @@ start_session(ClientId, ClientPid) -> %%------------------------------------------------------------------------------ -spec destroy_session(binary()) -> ok. destroy_session(ClientId) -> - gen_server:call(?SERVER, {destroy_session, ClientId}). + SmPid = gproc_pool:pick_worker(?POOL, ClientId), + gen_server:call(SmPid, {destroy_session, ClientId}). %%%============================================================================= %%% gen_server callbacks %%%============================================================================= -init([]) -> - process_flag(trap_exit, true), - TabId = ets:new(?SESSION_TAB, [set, protected, named_table]), - StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'), - {ok, #state{tabid = TabId, statsfun = StatsFun}}. +init([Id, TabId, StatsFun]) -> + gproc_pool:connect_worker(?POOL, {?MODULE, Id}), + {ok, #state{id = Id, tabid = TabId, statsfun = StatsFun}}. handle_call({start_session, ClientId, ClientPid}, _From, State = #state{tabid = Tab}) -> Reply = @@ -145,8 +146,8 @@ handle_info({'DOWN', MRef, process, DownPid, _Reason}, State = #state{tabid = Ta handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, _State) -> - ok. +terminate(_Reason, #state{id = Id}) -> + gproc_pool:disconnect_worker(?POOL, {?MODULE, Id}), ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -155,6 +156,6 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================= -setstats(State = #state{statsfun = StatsFun}) -> - StatsFun(ets:info(?SESSION_TAB, size)), State. +setstats(State = #state{tabid = TabId, statsfun = StatsFun}) -> + StatsFun(ets:info(TabId, size)), State. diff --git a/apps/emqttd/src/emqttd_sm_sup.erl b/apps/emqttd/src/emqttd_sm_sup.erl new file mode 100644 index 000000000..15bdf13ec --- /dev/null +++ b/apps/emqttd/src/emqttd_sm_sup.erl @@ -0,0 +1,63 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqttd client manager supervisor. +%%% +%%% @end +%%%----------------------------------------------------------------------------- +-module(emqttd_sm_sup). + +-author("Feng Lee "). + +-include("emqttd.hrl"). + +%% API +-export([start_link/0, table/0]). + +-behaviour(supervisor). + +%% Supervisor callbacks +-export([init/1]). + +-define(SESSION_TAB, mqtt_session). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +table() -> ?SESSION_TAB. + +init([]) -> + TabId = ets:new(?SESSION_TAB, [set, named_table, public, + {write_concurrency, true}]), + Schedulers = erlang:system_info(schedulers), + gproc_pool:new(sm_pool, hash, [{size, Schedulers}]), + StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'), + Children = lists:map( + fun(I) -> + Name = {emqttd_sm, I}, + gproc_pool:add_worker(sm_pool, Name, I), + {Name, {emqttd_sm, start_link, [I, TabId, StatsFun]}, + permanent, 10000, worker, [emqttd_sm]} + end, lists:seq(1, Schedulers)), + {ok, {{one_for_all, 10, 100}, Children}}. + +