From 20216670216399accd330e9296176ff16d2304ad Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 16 Feb 2017 11:29:55 +0800 Subject: [PATCH] Add 'local_session/0 function, change reg_session/3 unreg_session/1 functions --- src/emqttd_sm.erl | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/src/emqttd_sm.erl b/src/emqttd_sm.erl index 704242b2d..02dfdb879 100644 --- a/src/emqttd_sm.erl +++ b/src/emqttd_sm.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2012-2017 Feng Lee . +%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -14,9 +14,10 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc Session Manager -module(emqttd_sm). +-author("Feng Lee "). + -behaviour(gen_server2). -include("emqttd.hrl"). @@ -32,10 +33,12 @@ %% API Function Exports -export([start_link/2]). --export([start_session/2, lookup_session/1, reg_session/3, unreg_session/1]). +-export([start_session/2, lookup_session/1, register_session/3, unregister_session/1]). -export([dispatch/3]). +-export([local_sessions/0]). + %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -91,13 +94,13 @@ lookup_session(ClientId) -> end. %% @doc Register a session with info. --spec(reg_session(binary(), boolean(), [tuple()]) -> true). -reg_session(ClientId, CleanSess, Properties) -> +-spec(register_session(binary(), boolean(), [tuple()]) -> true). +register_session(ClientId, CleanSess, Properties) -> ets:insert(mqtt_local_session, {ClientId, self(), CleanSess, Properties}). %% @doc Unregister a session. --spec(unreg_session(binary()) -> true). -unreg_session(ClientId) -> +-spec(unregister_session(binary()) -> true). +unregister_session(ClientId) -> ets:delete(mqtt_local_session, ClientId). dispatch(ClientId, Topic, Msg) -> @@ -110,8 +113,12 @@ dispatch(ClientId, Topic, Msg) -> call(SM, Req) -> gen_server2:call(SM, Req, ?TIMEOUT). %%infinity). +%% @doc for debug. +local_sessions() -> + ets:tab2list(mqtt_local_session). + %%-------------------------------------------------------------------- -%% gen_server callbacks +%% gen_server Callbacks %%-------------------------------------------------------------------- init([Pool, Id]) -> @@ -171,6 +178,7 @@ handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> [] -> ok; [Sess = #mqtt_session{sess_pid = DownPid}] -> + emqttd_stats:del_session_stats(ClientId), mnesia:delete_object(mqtt_session, Sess, write); [_Sess] -> ok @@ -208,7 +216,7 @@ create_session({CleanSess, {ClientId, Username}, ClientPid}, State) -> create_session(CleanSess, {ClientId, Username}, ClientPid) -> case emqttd_session_sup:start_session(CleanSess, {ClientId, Username}, ClientPid) of {ok, SessPid} -> - Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid, persistent = not CleanSess}, + Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid, clean_sess = CleanSess}, case insert_session(Session) of {aborted, {conflict, ConflictPid}} -> %% Conflict with othe node?