diff --git a/apps/emqtt/src/emqtt_session.erl b/apps/emqtt/src/emqtt_session.erl index b25667b0a..373bbf551 100644 --- a/apps/emqtt/src/emqtt_session.erl +++ b/apps/emqtt/src/emqtt_session.erl @@ -29,7 +29,7 @@ %% ------------------------------------------------------------------ %% API Function Exports %% ------------------------------------------------------------------ --export([start/1, resume/2, publish/2, puback/2, subscribe/2, unsubscribe/2]). +-export([start/1, resume/3, publish/2, puback/2, subscribe/2, unsubscribe/2]). %%start gen_server -export([start_link/3]). @@ -50,6 +50,7 @@ awaiting_ack :: map(), awaiting_rel :: map(), expires, + expire_timer, max_queue }). %% ------------------------------------------------------------------ @@ -67,10 +68,10 @@ start({false = CleanSess, ClientId, ClientPid}) -> %% ------------------------------------------------------------------ %% Session API %% ------------------------------------------------------------------ -resume(SessState = #session_state{}, _ClientPid) -> +resume(SessState = #session_state{}, _ClientId, _ClientPid) -> SessState; -resume(SessPid, ClientPid) when is_pid(SessPid) -> - gen_server:cast(SessPid, {resume, ClientPid}), +resume(SessPid, ClientId, ClientPid) when is_pid(SessPid) -> + gen_server:cast(SessPid, {resume, ClientId, ClientPid}), SessPid. publish(_, {?QOS_0, Message}) -> @@ -187,6 +188,17 @@ handle_call({unsubscribe, Topics}, _From, State) -> handle_call(Req, _From, State) -> {stop, {badreq, Req}, State}. +handle_cast({resume, ClientId, ClientPid}, State = #session_state { + client_id = ClientId, + client_pid = undefined, + messages = Messages, + expire_timer = ETimer}) -> + lager:info("Session: client ~s resumed by ~p", [ClientId, ClientPid]), + erlang:cancel_timer(ETimer), + [ClientPid ! {dispatch, {self(), Message}} || Message <- Messages], + NewState = State#session_state{ client_pid = ClientPid, messages = [], expire_timer = undefined}, + {noreply, NewState}; + handle_cast({publish, ?QOS_2, Message}, State) -> NewState = publish(State, {?QOS_2, Message}), {noreply, NewState}; @@ -210,6 +222,27 @@ handle_cast({pubcomp, PacketId}, State) -> handle_cast(Msg, State) -> {stop, {badmsg, Msg}, State}. +handle_info({dispatch, {_From, Message}}, State = #session_state{ + client_pid = undefined, messages = Messages}) -> + %%TODO: queue len + NewState = State#session_state{messages = [Message | Messages]}, + {noreply, NewState}; + +handle_info({dispatch, {_From, Message}}, State = #session_state{client_pid = ClientPid}) -> + %%TODO: replace From with self(), ok? + ClientPid ! {dispatch, {self(), Message}}, + {noreply, State}; + +handle_info({'EXIT', ClientPid, Reason}, State = #session_state{ + client_id = ClientId, client_pid = ClientPid, expires = Expires}) -> + lager:warning("Session: client ~s@~p exited, caused by ~p", [ClientId, ClientPid, Reason]), + Timer = erlang:send_after(Expires * 1000, self(), session_expired), + {noreply, State#session_state{ client_pid = undefined, expire_timer = Timer}}; + +handle_info(session_expired, State = #session_state{client_id = ClientId}) -> + lager:warning("Session: ~s session expired!", [ClientId]), + {stop, {shutdown, expired}, State}; + handle_info(Info, State) -> {stop, {badinfo, Info}, State}. diff --git a/apps/emqtt/src/emqtt_sm.erl b/apps/emqtt/src/emqtt_sm.erl index e473e057c..406c699be 100644 --- a/apps/emqtt/src/emqtt_sm.erl +++ b/apps/emqtt/src/emqtt_sm.erl @@ -116,7 +116,7 @@ handle_call({start_session, ClientId, ClientPid}, _From, State) -> Reply = case ets:lookup(?TABLE, ClientId) of [{_, SessPid, _MRef}] -> - emqtt_session:resume(SessPid, ClientPid), + emqtt_session:resume(SessPid, ClientId, ClientPid), {ok, SessPid}; [] -> case emqtt_session_sup:start_session(ClientId, ClientPid) of