hibernate

This commit is contained in:
Feng 2015-07-09 10:08:35 +08:00
parent 30875c0706
commit 0b12082256
1 changed files with 20 additions and 18 deletions

View File

@ -350,7 +350,7 @@ handle_cast({resume, ClientId, ClientPid}, Session) ->
end, Session1, lists:reverse(InflightQ)), end, Session1, lists:reverse(InflightQ)),
%% Dequeue pending messages %% Dequeue pending messages
{noreply, dequeue(Session2), hibernate}; noreply(dequeue(Session2));
%% PUBRAC %% PUBRAC
handle_cast({puback, PktId}, Session = #session{client_id = ClientId, awaiting_ack = Awaiting}) -> handle_cast({puback, PktId}, Session = #session{client_id = ClientId, awaiting_ack = Awaiting}) ->
@ -358,10 +358,10 @@ handle_cast({puback, PktId}, Session = #session{client_id = ClientId, awaiting_a
{ok, {_, TRef}} -> {ok, {_, TRef}} ->
cancel_timer(TRef), cancel_timer(TRef),
Session1 = acked(PktId, Session), Session1 = acked(PktId, Session),
{noreply, dequeue(Session1)}; noreply(dequeue(Session1));
error -> error ->
lager:error("Session ~s cannot find PUBACK '~p'!", [ClientId, PktId]), lager:error("Session ~s cannot find PUBACK '~p'!", [ClientId, PktId]),
{noreply, Session} noreply(Session)
end; end;
%% PUBREC %% PUBREC
@ -374,10 +374,10 @@ handle_cast({pubrec, PktId}, Session = #session{client_id = ClientId,
cancel_timer(TRef), cancel_timer(TRef),
TRef1 = timer(Timeout, {timeout, awaiting_comp, PktId}), TRef1 = timer(Timeout, {timeout, awaiting_comp, PktId}),
Session1 = acked(PktId, Session#session{awaiting_comp = maps:put(PktId, TRef1, AwaitingComp)}), Session1 = acked(PktId, Session#session{awaiting_comp = maps:put(PktId, TRef1, AwaitingComp)}),
{noreply, dequeue(Session1)}; noreply(dequeue(Session1));
error -> error ->
lager:error("Session ~s cannot find PUBREC '~p'!", [ClientId, PktId]), lager:error("Session ~s cannot find PUBREC '~p'!", [ClientId, PktId]),
{noreply, Session} noreply(Session)
end; end;
%% PUBREL %% PUBREL
@ -387,10 +387,10 @@ handle_cast({pubrel, PktId}, Session = #session{client_id = ClientId,
{ok, {Msg, TRef}} -> {ok, {Msg, TRef}} ->
cancel_timer(TRef), cancel_timer(TRef),
emqttd_pubsub:publish(Msg), emqttd_pubsub:publish(Msg),
{noreply, Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)}}; noreply(Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)});
error -> error ->
lager:error("Session ~s cannot find PUBREL: pktid=~p!", [ClientId, PktId]), lager:error("Session ~s cannot find PUBREL: pktid=~p!", [ClientId, PktId]),
{noreply, Session} noreply(Session)
end; end;
%% PUBCOMP %% PUBCOMP
@ -398,10 +398,10 @@ handle_cast({pubcomp, PktId}, Session = #session{client_id = ClientId, awaiting_
case maps:find(PktId, AwaitingComp) of case maps:find(PktId, AwaitingComp) of
{ok, TRef} -> {ok, TRef} ->
cancel_timer(TRef), cancel_timer(TRef),
{noreply, Session#session{awaiting_comp = maps:remove(PktId, AwaitingComp)}}; noreply(Session#session{awaiting_comp = maps:remove(PktId, AwaitingComp)});
error -> error ->
lager:error("Session ~s cannot find PUBCOMP: PktId=~p", [ClientId, PktId]), lager:error("Session ~s cannot find PUBCOMP: PktId=~p", [ClientId, PktId]),
{noreply, Session} noreply(Session)
end; end;
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
@ -412,13 +412,13 @@ handle_cast(Msg, State) ->
handle_info({dispatch, Msg}, Session = #session{client_pid = undefined, handle_info({dispatch, Msg}, Session = #session{client_pid = undefined,
message_queue = Q}) message_queue = Q})
when is_record(Msg, mqtt_message) -> when is_record(Msg, mqtt_message) ->
{noreply, Session#session{message_queue = emqttd_mqueue:in(Msg, Q)}}; noreply(Session#session{message_queue = emqttd_mqueue:in(Msg, Q)});
%% Dispatch qos0 message directly to client %% Dispatch qos0 message directly to client
handle_info({dispatch, Msg = #mqtt_message{qos = ?QOS_0}}, handle_info({dispatch, Msg = #mqtt_message{qos = ?QOS_0}},
Session = #session{client_pid = ClientPid}) -> Session = #session{client_pid = ClientPid}) ->
ClientPid ! {deliver, Msg}, ClientPid ! {deliver, Msg},
{noreply, Session}; noreply(Session);
handle_info({dispatch, Msg = #mqtt_message{qos = QoS}}, handle_info({dispatch, Msg = #mqtt_message{qos = QoS}},
Session = #session{client_id = ClientId, message_queue = MsgQ}) Session = #session{client_id = ClientId, message_queue = MsgQ})
@ -435,7 +435,7 @@ handle_info({dispatch, Msg = #mqtt_message{qos = QoS}},
handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_pid = undefined, handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_pid = undefined,
awaiting_ack = AwaitingAck}) -> awaiting_ack = AwaitingAck}) ->
%% just remove awaiting %% just remove awaiting
{noreply, Session#session{awaiting_ack = maps:remove(PktId, AwaitingAck)}}; noreply(Session#session{awaiting_ack = maps:remove(PktId, AwaitingAck)});
handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_id = ClientId, handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_id = ClientId,
inflight_queue = InflightQ, inflight_queue = InflightQ,
@ -444,7 +444,7 @@ handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_id = Clien
{ok, {{0, _Timeout}, _TRef}} -> {ok, {{0, _Timeout}, _TRef}} ->
Session1 = Session#session{inflight_queue = lists:keydelete(PktId, 1, InflightQ), Session1 = Session#session{inflight_queue = lists:keydelete(PktId, 1, InflightQ),
awaiting_ack = maps:remove(PktId, AwaitingAck)}, awaiting_ack = maps:remove(PktId, AwaitingAck)},
{noreply, dequeue(Session1)}; noreply(dequeue(Session1));
{ok, {{Retries, Timeout}, _TRef}} -> {ok, {{Retries, Timeout}, _TRef}} ->
TRef = timer(Timeout, {timeout, awaiting_ack, PktId}), TRef = timer(Timeout, {timeout, awaiting_ack, PktId}),
AwaitingAck1 = maps:put(PktId, {{Retries-1, Timeout*2}, TRef}, AwaitingAck), AwaitingAck1 = maps:put(PktId, {{Retries-1, Timeout*2}, TRef}, AwaitingAck),
@ -461,7 +461,7 @@ handle_info({timeout, awaiting_rel, PktId}, Session = #session{client_id = Clien
{ok, {Msg, _TRef}} -> {ok, {Msg, _TRef}} ->
lager:error([{client, ClientId}], "Session ~s AwaitingRel Timout!~n" lager:error([{client, ClientId}], "Session ~s AwaitingRel Timout!~n"
"Drop Message:~p", [ClientId, Msg]), "Drop Message:~p", [ClientId, Msg]),
{noreply, Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)}}; noreply(Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)});
error -> error ->
lager:error([{client, ClientId}], "Session ~s Cannot find AwaitingRel: PktId=~p", [ClientId, PktId]), lager:error([{client, ClientId}], "Session ~s Cannot find AwaitingRel: PktId=~p", [ClientId, PktId]),
{noreply, Session} {noreply, Session}
@ -473,11 +473,11 @@ handle_info({timeout, awaiting_comp, PktId}, Session = #session{client_id = Clie
{ok, _TRef} -> {ok, _TRef} ->
lager:error([{client, ClientId}], "Session ~s " lager:error([{client, ClientId}], "Session ~s "
"Awaiting PUBCOMP Timout: PktId=~p!", [ClientId, PktId]), "Awaiting PUBCOMP Timout: PktId=~p!", [ClientId, PktId]),
{noreply, Session#session{awaiting_comp = maps:remove(PktId, Awaiting)}}; noreply(Session#session{awaiting_comp = maps:remove(PktId, Awaiting)});
error -> error ->
lager:error([{client, ClientId}], "Session ~s " lager:error([{client, ClientId}], "Session ~s "
"Cannot find Awaiting PUBCOMP: PktId=~p", [ClientId, PktId]), "Cannot find Awaiting PUBCOMP: PktId=~p", [ClientId, PktId]),
{noreply, Session} noreply(Session)
end; end;
handle_info({'EXIT', ClientPid, _Reason}, Session = #session{clean_sess = true, handle_info({'EXIT', ClientPid, _Reason}, Session = #session{clean_sess = true,
@ -491,10 +491,10 @@ handle_info({'EXIT', ClientPid, Reason}, Session = #session{clean_sess = fals
lager:info("Session ~s unlink with client ~p: reason=~p", lager:info("Session ~s unlink with client ~p: reason=~p",
[ClientId, ClientPid, Reason]), [ClientId, ClientPid, Reason]),
TRef = timer(Expires, session_expired), TRef = timer(Expires, session_expired),
{noreply, Session#session{client_pid = undefined, expired_timer = TRef}, hibernate}; noreply(Session#session{client_pid = undefined, expired_timer = TRef});
handle_info({'EXIT', Pid, _Reason}, Session = #session{client_id = ClientId, handle_info({'EXIT', Pid, _Reason}, Session = #session{client_id = ClientId,
client_pid = ClientPid}) -> client_pid = ClientPid}) ->
lager:error("Session ~s received unexpected EXIT:" lager:error("Session ~s received unexpected EXIT:"
" client_pid=~p, exit_pid=~p", [ClientId, ClientPid, Pid]), " client_pid=~p, exit_pid=~p", [ClientId, ClientPid, Pid]),
@ -623,3 +623,5 @@ cancel_timer(undefined) ->
cancel_timer(Ref) -> cancel_timer(Ref) ->
catch erlang:cancel_timer(Ref). catch erlang:cancel_timer(Ref).
noreply(State) ->
{noreply, State, hibernate}.