Add 'local_session/0 function, change reg_session/3 unreg_session/1 functions

This commit is contained in:
Feng Lee 2017-02-16 11:29:55 +08:00
parent 5a49196a07
commit 2021667021
1 changed files with 17 additions and 9 deletions

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2012-2017 Feng Lee <feng@emqtt.io>. %% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -14,9 +14,10 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc Session Manager
-module(emqttd_sm). -module(emqttd_sm).
-author("Feng Lee <feng@emqtt.io>").
-behaviour(gen_server2). -behaviour(gen_server2).
-include("emqttd.hrl"). -include("emqttd.hrl").
@ -32,10 +33,12 @@
%% API Function Exports %% API Function Exports
-export([start_link/2]). -export([start_link/2]).
-export([start_session/2, lookup_session/1, reg_session/3, unreg_session/1]). -export([start_session/2, lookup_session/1, register_session/3, unregister_session/1]).
-export([dispatch/3]). -export([dispatch/3]).
-export([local_sessions/0]).
%% gen_server Function Exports %% gen_server Function Exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
@ -91,13 +94,13 @@ lookup_session(ClientId) ->
end. end.
%% @doc Register a session with info. %% @doc Register a session with info.
-spec(reg_session(binary(), boolean(), [tuple()]) -> true). -spec(register_session(binary(), boolean(), [tuple()]) -> true).
reg_session(ClientId, CleanSess, Properties) -> register_session(ClientId, CleanSess, Properties) ->
ets:insert(mqtt_local_session, {ClientId, self(), CleanSess, Properties}). ets:insert(mqtt_local_session, {ClientId, self(), CleanSess, Properties}).
%% @doc Unregister a session. %% @doc Unregister a session.
-spec(unreg_session(binary()) -> true). -spec(unregister_session(binary()) -> true).
unreg_session(ClientId) -> unregister_session(ClientId) ->
ets:delete(mqtt_local_session, ClientId). ets:delete(mqtt_local_session, ClientId).
dispatch(ClientId, Topic, Msg) -> dispatch(ClientId, Topic, Msg) ->
@ -110,8 +113,12 @@ dispatch(ClientId, Topic, Msg) ->
call(SM, Req) -> call(SM, Req) ->
gen_server2:call(SM, Req, ?TIMEOUT). %%infinity). gen_server2:call(SM, Req, ?TIMEOUT). %%infinity).
%% @doc for debug.
local_sessions() ->
ets:tab2list(mqtt_local_session).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% gen_server callbacks %% gen_server Callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([Pool, Id]) -> init([Pool, Id]) ->
@ -171,6 +178,7 @@ handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
[] -> [] ->
ok; ok;
[Sess = #mqtt_session{sess_pid = DownPid}] -> [Sess = #mqtt_session{sess_pid = DownPid}] ->
emqttd_stats:del_session_stats(ClientId),
mnesia:delete_object(mqtt_session, Sess, write); mnesia:delete_object(mqtt_session, Sess, write);
[_Sess] -> [_Sess] ->
ok ok
@ -208,7 +216,7 @@ create_session({CleanSess, {ClientId, Username}, ClientPid}, State) ->
create_session(CleanSess, {ClientId, Username}, ClientPid) -> create_session(CleanSess, {ClientId, Username}, ClientPid) ->
case emqttd_session_sup:start_session(CleanSess, {ClientId, Username}, ClientPid) of case emqttd_session_sup:start_session(CleanSess, {ClientId, Username}, ClientPid) of
{ok, SessPid} -> {ok, SessPid} ->
Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid, persistent = not CleanSess}, Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid, clean_sess = CleanSess},
case insert_session(Session) of case insert_session(Session) of
{aborted, {conflict, ConflictPid}} -> {aborted, {conflict, ConflictPid}} ->
%% Conflict with othe node? %% Conflict with othe node?