emqttd_sm_sup
This commit is contained in:
parent
aa703dea36
commit
e911025811
|
@ -71,7 +71,7 @@ start_servers(Sup) ->
|
|||
{"emqttd trace", emqttd_trace},
|
||||
{"emqttd pooler", {supervisor, emqttd_pooler_sup}},
|
||||
{"emqttd client manager", {supervisor, emqttd_cm_sup}},
|
||||
{"emqttd session manager", emqttd_sm},
|
||||
{"emqttd session manager", {supervisor, emqttd_sm_sup}},
|
||||
{"emqttd session supervisor", {supervisor, emqttd_session_sup}},
|
||||
{"emqttd pubsub", {supervisor, emqttd_pubsub_sup}},
|
||||
{"emqttd stats", emqttd_stats},
|
||||
|
|
|
@ -44,10 +44,8 @@
|
|||
|
||||
-behaviour(gen_server).
|
||||
|
||||
-define(SERVER, ?MODULE).
|
||||
|
||||
%% API Function Exports
|
||||
-export([start_link/0]).
|
||||
-export([start_link/3]).
|
||||
|
||||
-export([lookup_session/1, start_session/2, destroy_session/1]).
|
||||
|
||||
|
@ -55,16 +53,19 @@
|
|||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||
terminate/2, code_change/3]).
|
||||
|
||||
-record(state, {tabid, statsfun}).
|
||||
-record(state, {id, tabid, statsfun}).
|
||||
|
||||
-define(SESSION_TAB, mqtt_session).
|
||||
-define(POOL, sm_pool).
|
||||
|
||||
%%%=============================================================================
|
||||
%%% API
|
||||
%%%=============================================================================
|
||||
-spec start_link() -> {ok, pid()} | ignore | {error, any()}.
|
||||
start_link() ->
|
||||
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
||||
-spec start_link(Id, TabId, StatsFun) -> {ok, pid()} | ignore | {error, any()} when
|
||||
Id :: pos_integer(),
|
||||
TabId :: ets:tid(),
|
||||
StatsFun :: fun().
|
||||
start_link(Id, TabId, StatsFun) ->
|
||||
gen_server:start_link(?MODULE, [Id, TabId, StatsFun], []).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% @doc Lookup Session Pid
|
||||
|
@ -72,7 +73,7 @@ start_link() ->
|
|||
%%------------------------------------------------------------------------------
|
||||
-spec lookup_session(binary()) -> pid() | undefined.
|
||||
lookup_session(ClientId) ->
|
||||
case ets:lookup(?SESSION_TAB, ClientId) of
|
||||
case ets:lookup(emqttd_sm_sup:table(), ClientId) of
|
||||
[{_, SessPid, _}] -> SessPid;
|
||||
[] -> undefined
|
||||
end.
|
||||
|
@ -83,7 +84,8 @@ lookup_session(ClientId) ->
|
|||
%%------------------------------------------------------------------------------
|
||||
-spec start_session(binary(), pid()) -> {ok, pid()} | {error, any()}.
|
||||
start_session(ClientId, ClientPid) ->
|
||||
gen_server:call(?SERVER, {start_session, ClientId, ClientPid}).
|
||||
SmPid = gproc_pool:pick_worker(?POOL, ClientId),
|
||||
gen_server:call(SmPid, {start_session, ClientId, ClientPid}).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% @doc Destroy a session
|
||||
|
@ -91,17 +93,16 @@ start_session(ClientId, ClientPid) ->
|
|||
%%------------------------------------------------------------------------------
|
||||
-spec destroy_session(binary()) -> ok.
|
||||
destroy_session(ClientId) ->
|
||||
gen_server:call(?SERVER, {destroy_session, ClientId}).
|
||||
SmPid = gproc_pool:pick_worker(?POOL, ClientId),
|
||||
gen_server:call(SmPid, {destroy_session, ClientId}).
|
||||
|
||||
%%%=============================================================================
|
||||
%%% gen_server callbacks
|
||||
%%%=============================================================================
|
||||
|
||||
init([]) ->
|
||||
process_flag(trap_exit, true),
|
||||
TabId = ets:new(?SESSION_TAB, [set, protected, named_table]),
|
||||
StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'),
|
||||
{ok, #state{tabid = TabId, statsfun = StatsFun}}.
|
||||
init([Id, TabId, StatsFun]) ->
|
||||
gproc_pool:connect_worker(?POOL, {?MODULE, Id}),
|
||||
{ok, #state{id = Id, tabid = TabId, statsfun = StatsFun}}.
|
||||
|
||||
handle_call({start_session, ClientId, ClientPid}, _From, State = #state{tabid = Tab}) ->
|
||||
Reply =
|
||||
|
@ -145,8 +146,8 @@ handle_info({'DOWN', MRef, process, DownPid, _Reason}, State = #state{tabid = Ta
|
|||
handle_info(_Info, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, _State) ->
|
||||
ok.
|
||||
terminate(_Reason, #state{id = Id}) ->
|
||||
gproc_pool:disconnect_worker(?POOL, {?MODULE, Id}), ok.
|
||||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
{ok, State}.
|
||||
|
@ -155,6 +156,6 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
%%% Internal functions
|
||||
%%%=============================================================================
|
||||
|
||||
setstats(State = #state{statsfun = StatsFun}) ->
|
||||
StatsFun(ets:info(?SESSION_TAB, size)), State.
|
||||
setstats(State = #state{tabid = TabId, statsfun = StatsFun}) ->
|
||||
StatsFun(ets:info(TabId, size)), State.
|
||||
|
||||
|
|
|
@ -0,0 +1,63 @@
|
|||
%%%-----------------------------------------------------------------------------
|
||||
%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
|
||||
%%%
|
||||
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
%%% of this software and associated documentation files (the "Software"), to deal
|
||||
%%% in the Software without restriction, including without limitation the rights
|
||||
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
%%% copies of the Software, and to permit persons to whom the Software is
|
||||
%%% furnished to do so, subject to the following conditions:
|
||||
%%%
|
||||
%%% The above copyright notice and this permission notice shall be included in all
|
||||
%%% copies or substantial portions of the Software.
|
||||
%%%
|
||||
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
%%% SOFTWARE.
|
||||
%%%-----------------------------------------------------------------------------
|
||||
%%% @doc
|
||||
%%% emqttd client manager supervisor.
|
||||
%%%
|
||||
%%% @end
|
||||
%%%-----------------------------------------------------------------------------
|
||||
-module(emqttd_sm_sup).
|
||||
|
||||
-author("Feng Lee <feng@emqtt.io>").
|
||||
|
||||
-include("emqttd.hrl").
|
||||
|
||||
%% API
|
||||
-export([start_link/0, table/0]).
|
||||
|
||||
-behaviour(supervisor).
|
||||
|
||||
%% Supervisor callbacks
|
||||
-export([init/1]).
|
||||
|
||||
-define(SESSION_TAB, mqtt_session).
|
||||
|
||||
start_link() ->
|
||||
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
||||
|
||||
table() -> ?SESSION_TAB.
|
||||
|
||||
init([]) ->
|
||||
TabId = ets:new(?SESSION_TAB, [set, named_table, public,
|
||||
{write_concurrency, true}]),
|
||||
Schedulers = erlang:system_info(schedulers),
|
||||
gproc_pool:new(sm_pool, hash, [{size, Schedulers}]),
|
||||
StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'),
|
||||
Children = lists:map(
|
||||
fun(I) ->
|
||||
Name = {emqttd_sm, I},
|
||||
gproc_pool:add_worker(sm_pool, Name, I),
|
||||
{Name, {emqttd_sm, start_link, [I, TabId, StatsFun]},
|
||||
permanent, 10000, worker, [emqttd_sm]}
|
||||
end, lists:seq(1, Schedulers)),
|
||||
{ok, {{one_for_all, 10, 100}, Children}}.
|
||||
|
||||
|
Loading…
Reference in New Issue