diff --git a/apps/emqtt/src/emqtt_protocol.erl b/apps/emqtt/src/emqtt_protocol.erl index 6900924bb..0e2a22354 100644 --- a/apps/emqtt/src/emqtt_protocol.erl +++ b/apps/emqtt/src/emqtt_protocol.erl @@ -138,10 +138,8 @@ handle_packet(?CONNECT, Packet = #mqtt_packet { send_packet( #mqtt_packet { header = #mqtt_packet_header { type = ?CONNACK }, variable = #mqtt_packet_connack{ return_code = ReturnCode1 }}, State1 ), - %% + %%Starting session {ok, Session} = emqtt_session:start({CleanSess, ClientId, self()}), - emqtt_session:resume(Session), - %%TODO: Resume session {ok, State1#proto_state { session = Session }}; handle_packet(?PUBLISH, Packet = #mqtt_packet { diff --git a/apps/emqtt/src/emqtt_session.erl b/apps/emqtt/src/emqtt_session.erl index ae6271098..059a70e34 100644 --- a/apps/emqtt/src/emqtt_session.erl +++ b/apps/emqtt/src/emqtt_session.erl @@ -31,6 +31,8 @@ %% ------------------------------------------------------------------ -export([start/1, resume/1, publish/2, puback/2]). +%%start gen_server +-export([start_link/3]). %% ------------------------------------------------------------------ %% gen_server Function Exports %% ------------------------------------------------------------------ @@ -45,22 +47,30 @@ subscriptions = [], messages = [], %% do not receive rel awaiting_ack, - awaiting_rel }). + awaiting_rel, + expires, + max_queue }). %% ------------------------------------------------------------------ -%% API Function Definitions +%% Start Session %% ------------------------------------------------------------------ -start({true = CleanSess, ClientId, ClientPid}) -> - %%destroy old session - %%TODO: emqtt_sm:destory_session(ClientId), +start({true = CleanSess, ClientId, _ClientPid}) -> + %%Destroy old session if CleanSess is true before. + ok = emqtt_sm:destory_session(ClientId), {ok, initial_state(ClientId)}; start({false = CleanSess, ClientId, ClientPid}) -> - %%TODO: emqtt_sm:start_session({ClientId, ClientPid}) - gen_server:start_link(?MODULE, [ClientId, ClientPid], []). + {ok, SessPid} = emqtt_sm:start_session(ClientId, ClientPid), + {ok, SessPid}. -resume(#session_state {}) -> 'TODO'; -resume(SessPid) when is_pid(SessPid) -> 'TODO'. +%% ------------------------------------------------------------------ +%% Session API +%% ------------------------------------------------------------------ +resume(SessState = #session_state{}, _ClientPid) -> + SessState; +resume(SessPid, ClientPid) when is_pid(SessPid) -> + gen_server:cast(SessPid, {resume, ClientPid}), + SessPid. publish(_, {?QOS_0, Message}) -> emqtt_router:route(Message); @@ -113,10 +123,16 @@ initial_state(ClientId, ClientPid) -> %% gen_server Function Definitions %% ------------------------------------------------------------------ -init([ClientId, ClientPid]) -> +start_link(SessOpts, ClientId, ClientPid) -> + gen_server:start_link(?MODULE, [SessOpts, ClientId, ClientPid], []). + +init([SessOpts, ClientId, ClientPid]) -> process_flag(trap_exit, true), + %%TODO: OK? + true = link(ClientPid), State = initial_state(ClientId, ClientPid), - {ok, State}. + {ok, State#state{ expires = proplists:get_value(expires, SessOpts, 24) * 3600, + max_queue = proplists:get_value(max_queue, SessOpts, 1000) } }. handle_call(_Request, _From, State) -> {reply, ok, State}. diff --git a/apps/emqtt/src/emqtt_session_sup.erl b/apps/emqtt/src/emqtt_session_sup.erl new file mode 100644 index 000000000..d4a118bbf --- /dev/null +++ b/apps/emqtt/src/emqtt_session_sup.erl @@ -0,0 +1,57 @@ +%%----------------------------------------------------------------------------- +%% Copyright (c) 2012-2015, Feng Lee +%% +%% Permission is hereby granted, free of charge, to any person obtaining a copy +%% of this software and associated documentation files (the "Software"), to deal +%% in the Software without restriction, including without limitation the rights +%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%% copies of the Software, and to permit persons to whom the Software is +%% furnished to do so, subject to the following conditions: +%% +%% The above copyright notice and this permission notice shall be included in all +%% copies or substantial portions of the Software. +%% +%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%% SOFTWARE. +%%------------------------------------------------------------------------------ + +-module(emqtt_session_sup). + +-author('feng@emqtt.io'). + +-behavior(supervisor). + +-export([start_link/0, start_session/2]). + +-export([init/1]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/1 :: (list(tuple())) -> {ok, pid()}). + +-spec(start_session/2 :: (binary(), pid()) -> {ok, pid()}). + +-endif. + +%%---------------------------------------------------------------------------- + +start_link(SessOpts) -> + supervisor:start_link({local, ?MODULE}, ?MODULE, [SessOpts]). + +start_session(ClientId, ClientPid) -> + supervisor:start_child(?MODULE, [ClientId, ClientPid]). + +init([SessOpts]) -> + {ok, {{simple_one_for_one, 0, 1}, + [{session, {emqtt_session, start_link, [SessOpts]}, + transient, 10000, worker, [emqtt_session]}]}}. + + + diff --git a/apps/emqtt/src/emqtt_sm.erl b/apps/emqtt/src/emqtt_sm.erl index 791d60ad7..7714034ef 100644 --- a/apps/emqtt/src/emqtt_sm.erl +++ b/apps/emqtt/src/emqtt_sm.erl @@ -47,13 +47,15 @@ -define(SERVER, ?MODULE). +-define(TABLE, emqtt_session). + %% ------------------------------------------------------------------ %% API Function Exports %% ------------------------------------------------------------------ --export([start_link/1]). +-export([start_link/0]). --export([lookup/1, register/2, resume/2, destroy/1]). +-export([lookup_session/1, start_session/2, destroy_session/1]). %% ------------------------------------------------------------------ %% gen_server Function Exports @@ -62,34 +64,82 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, { expires = 24, %hours - max_queue = 1000 }). +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> {ok, pid()}). + +-spec(lookup_session/1 :: (binary()) -> pid() | undefined). + +-spec(start_session/2 :: (binary(), pid()) -> {ok, pid()} | {error, any()}). + +-spec(destroy_session/1 :: (binary()) -> ok). + +-endif. + +%%---------------------------------------------------------------------------- + +-record(state, {}). %% ------------------------------------------------------------------ %% API Function Definitions %% ------------------------------------------------------------------ -start_link(SessOpts) -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [SessOpts], []). -lookup(ClientId) -> ok. +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). -register(ClientId, Pid) -> ok. +lookup_session(ClientId) -> + case ets:lookup(?TABLE, ClientId) of + [{_, SessPid, _}] -> SessPid; + [] -> undefined + end. -resume(ClientId, Pid) -> ok. +start_session(ClientId, ClientPid) -> + gen_server:call(?SERVER, {start_session, ClientId, ClientPid}). -destroy(ClientId) -> ok. +destory_session(ClientId) -> + gen_server:call(?SERVER, {destory_session, ClientId}). %% ------------------------------------------------------------------ %% gen_server Function Definitions %% ------------------------------------------------------------------ -init(SessOpts) -> - State = #state{ expires = proplists:get_value(expires, SessOpts, 24) * 3600, - max_queue = proplists:get_value(max_queue, SessOpts, 1000) }, +init() -> + process_flag(trap_exit, true), + ets:new(?TABLE, [set, protected, named_table]), {ok, State}. +handle_call({start_session, ClientId, ClientPid}, _From, State) -> + Reply = + case ets:lookup(?TABLE, ClientId) of + [{_, SessPid, MRef}] -> + emqtt_session:resume(SessPid, ClientPid), + {ok, SessPid}; + [] -> + case emqtt_session_sup:start_session(ClientId, ClientPid) of + {ok, SessPid} -> + MRef = erlang:monitor(process, SessPid), + ets:insert(?TABLE, {ClientId, SessPid, MRef}), + {ok, SessPid}; + {error, Error} -> + {error, Error} + end + end, + {reply, Reply, State}; + +handle_call({destory_session, ClientId}, _From, State) -> + case ets:lookup(?TABLE, ClientId) of + [{_, SessPid, MRef}] -> + erlang:demonitor(MRef), + emqtt_session:destory(SessPid), + ets:delete(?TABLE, ClientId); + [] -> + ignore + end, + {reply, ok, State}; handle_call(_Request, _From, State) -> {reply, ok, State}. @@ -97,6 +147,10 @@ handle_call(_Request, _From, State) -> handle_cast(_Msg, State) -> {noreply, State}. +handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> + ets:match_delete(emqtt_client, {{'_', DownPid, MRef}}), + {noreply, State}; + handle_info(_Info, State) -> {noreply, State}. @@ -106,3 +160,4 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. +