session resume and expired

This commit is contained in:
Ery Lee 2015-01-14 14:08:39 +08:00
parent 85be3eef49
commit fb56eee21d
2 changed files with 38 additions and 5 deletions

View File

@ -29,7 +29,7 @@
%% ------------------------------------------------------------------
%% API Function Exports
%% ------------------------------------------------------------------
-export([start/1, resume/2, publish/2, puback/2, subscribe/2, unsubscribe/2]).
-export([start/1, resume/3, publish/2, puback/2, subscribe/2, unsubscribe/2]).
%%start gen_server
-export([start_link/3]).
@ -50,6 +50,7 @@
awaiting_ack :: map(),
awaiting_rel :: map(),
expires,
expire_timer,
max_queue }).
%% ------------------------------------------------------------------
@ -67,10 +68,10 @@ start({false = CleanSess, ClientId, ClientPid}) ->
%% ------------------------------------------------------------------
%% Session API
%% ------------------------------------------------------------------
resume(SessState = #session_state{}, _ClientPid) ->
resume(SessState = #session_state{}, _ClientId, _ClientPid) ->
SessState;
resume(SessPid, ClientPid) when is_pid(SessPid) ->
gen_server:cast(SessPid, {resume, ClientPid}),
resume(SessPid, ClientId, ClientPid) when is_pid(SessPid) ->
gen_server:cast(SessPid, {resume, ClientId, ClientPid}),
SessPid.
publish(_, {?QOS_0, Message}) ->
@ -187,6 +188,17 @@ handle_call({unsubscribe, Topics}, _From, State) ->
handle_call(Req, _From, State) ->
{stop, {badreq, Req}, State}.
handle_cast({resume, ClientId, ClientPid}, State = #session_state {
client_id = ClientId,
client_pid = undefined,
messages = Messages,
expire_timer = ETimer}) ->
lager:info("Session: client ~s resumed by ~p", [ClientId, ClientPid]),
erlang:cancel_timer(ETimer),
[ClientPid ! {dispatch, {self(), Message}} || Message <- Messages],
NewState = State#session_state{ client_pid = ClientPid, messages = [], expire_timer = undefined},
{noreply, NewState};
handle_cast({publish, ?QOS_2, Message}, State) ->
NewState = publish(State, {?QOS_2, Message}),
{noreply, NewState};
@ -210,6 +222,27 @@ handle_cast({pubcomp, PacketId}, State) ->
handle_cast(Msg, State) ->
{stop, {badmsg, Msg}, State}.
handle_info({dispatch, {_From, Message}}, State = #session_state{
client_pid = undefined, messages = Messages}) ->
%%TODO: queue len
NewState = State#session_state{messages = [Message | Messages]},
{noreply, NewState};
handle_info({dispatch, {_From, Message}}, State = #session_state{client_pid = ClientPid}) ->
%%TODO: replace From with self(), ok?
ClientPid ! {dispatch, {self(), Message}},
{noreply, State};
handle_info({'EXIT', ClientPid, Reason}, State = #session_state{
client_id = ClientId, client_pid = ClientPid, expires = Expires}) ->
lager:warning("Session: client ~s@~p exited, caused by ~p", [ClientId, ClientPid, Reason]),
Timer = erlang:send_after(Expires * 1000, self(), session_expired),
{noreply, State#session_state{ client_pid = undefined, expire_timer = Timer}};
handle_info(session_expired, State = #session_state{client_id = ClientId}) ->
lager:warning("Session: ~s session expired!", [ClientId]),
{stop, {shutdown, expired}, State};
handle_info(Info, State) ->
{stop, {badinfo, Info}, State}.

View File

@ -116,7 +116,7 @@ handle_call({start_session, ClientId, ClientPid}, _From, State) ->
Reply =
case ets:lookup(?TABLE, ClientId) of
[{_, SessPid, _MRef}] ->
emqtt_session:resume(SessPid, ClientPid),
emqtt_session:resume(SessPid, ClientId, ClientPid),
{ok, SessPid};
[] ->
case emqtt_session_sup:start_session(ClientId, ClientPid) of