diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index a536029af..ff92d4f05 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -350,7 +350,7 @@ handle_cast({resume, ClientId, ClientPid}, Session) -> end, Session1, lists:reverse(InflightQ)), %% Dequeue pending messages - {noreply, dequeue(Session2), hibernate}; + noreply(dequeue(Session2)); %% PUBRAC handle_cast({puback, PktId}, Session = #session{client_id = ClientId, awaiting_ack = Awaiting}) -> @@ -358,10 +358,10 @@ handle_cast({puback, PktId}, Session = #session{client_id = ClientId, awaiting_a {ok, {_, TRef}} -> cancel_timer(TRef), Session1 = acked(PktId, Session), - {noreply, dequeue(Session1)}; + noreply(dequeue(Session1)); error -> lager:error("Session ~s cannot find PUBACK '~p'!", [ClientId, PktId]), - {noreply, Session} + noreply(Session) end; %% PUBREC @@ -374,10 +374,10 @@ handle_cast({pubrec, PktId}, Session = #session{client_id = ClientId, cancel_timer(TRef), TRef1 = timer(Timeout, {timeout, awaiting_comp, PktId}), Session1 = acked(PktId, Session#session{awaiting_comp = maps:put(PktId, TRef1, AwaitingComp)}), - {noreply, dequeue(Session1)}; + noreply(dequeue(Session1)); error -> lager:error("Session ~s cannot find PUBREC '~p'!", [ClientId, PktId]), - {noreply, Session} + noreply(Session) end; %% PUBREL @@ -387,10 +387,10 @@ handle_cast({pubrel, PktId}, Session = #session{client_id = ClientId, {ok, {Msg, TRef}} -> cancel_timer(TRef), emqttd_pubsub:publish(Msg), - {noreply, Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)}}; + noreply(Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)}); error -> lager:error("Session ~s cannot find PUBREL: pktid=~p!", [ClientId, PktId]), - {noreply, Session} + noreply(Session) end; %% PUBCOMP @@ -398,10 +398,10 @@ handle_cast({pubcomp, PktId}, Session = #session{client_id = ClientId, awaiting_ case maps:find(PktId, AwaitingComp) of {ok, TRef} -> cancel_timer(TRef), - {noreply, Session#session{awaiting_comp = maps:remove(PktId, AwaitingComp)}}; + noreply(Session#session{awaiting_comp = maps:remove(PktId, AwaitingComp)}); error -> lager:error("Session ~s cannot find PUBCOMP: PktId=~p", [ClientId, PktId]), - {noreply, Session} + noreply(Session) end; handle_cast(Msg, State) -> @@ -412,13 +412,13 @@ handle_cast(Msg, State) -> handle_info({dispatch, Msg}, Session = #session{client_pid = undefined, message_queue = Q}) when is_record(Msg, mqtt_message) -> - {noreply, Session#session{message_queue = emqttd_mqueue:in(Msg, Q)}}; + noreply(Session#session{message_queue = emqttd_mqueue:in(Msg, Q)}); %% Dispatch qos0 message directly to client handle_info({dispatch, Msg = #mqtt_message{qos = ?QOS_0}}, Session = #session{client_pid = ClientPid}) -> ClientPid ! {deliver, Msg}, - {noreply, Session}; + noreply(Session); handle_info({dispatch, Msg = #mqtt_message{qos = QoS}}, Session = #session{client_id = ClientId, message_queue = MsgQ}) @@ -435,7 +435,7 @@ handle_info({dispatch, Msg = #mqtt_message{qos = QoS}}, handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_pid = undefined, awaiting_ack = AwaitingAck}) -> %% just remove awaiting - {noreply, Session#session{awaiting_ack = maps:remove(PktId, AwaitingAck)}}; + noreply(Session#session{awaiting_ack = maps:remove(PktId, AwaitingAck)}); handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_id = ClientId, inflight_queue = InflightQ, @@ -444,7 +444,7 @@ handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_id = Clien {ok, {{0, _Timeout}, _TRef}} -> Session1 = Session#session{inflight_queue = lists:keydelete(PktId, 1, InflightQ), awaiting_ack = maps:remove(PktId, AwaitingAck)}, - {noreply, dequeue(Session1)}; + noreply(dequeue(Session1)); {ok, {{Retries, Timeout}, _TRef}} -> TRef = timer(Timeout, {timeout, awaiting_ack, PktId}), AwaitingAck1 = maps:put(PktId, {{Retries-1, Timeout*2}, TRef}, AwaitingAck), @@ -461,7 +461,7 @@ handle_info({timeout, awaiting_rel, PktId}, Session = #session{client_id = Clien {ok, {Msg, _TRef}} -> lager:error([{client, ClientId}], "Session ~s AwaitingRel Timout!~n" "Drop Message:~p", [ClientId, Msg]), - {noreply, Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)}}; + noreply(Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)}); error -> lager:error([{client, ClientId}], "Session ~s Cannot find AwaitingRel: PktId=~p", [ClientId, PktId]), {noreply, Session} @@ -473,11 +473,11 @@ handle_info({timeout, awaiting_comp, PktId}, Session = #session{client_id = Clie {ok, _TRef} -> lager:error([{client, ClientId}], "Session ~s " "Awaiting PUBCOMP Timout: PktId=~p!", [ClientId, PktId]), - {noreply, Session#session{awaiting_comp = maps:remove(PktId, Awaiting)}}; + noreply(Session#session{awaiting_comp = maps:remove(PktId, Awaiting)}); error -> lager:error([{client, ClientId}], "Session ~s " "Cannot find Awaiting PUBCOMP: PktId=~p", [ClientId, PktId]), - {noreply, Session} + noreply(Session) end; handle_info({'EXIT', ClientPid, _Reason}, Session = #session{clean_sess = true, @@ -491,10 +491,10 @@ handle_info({'EXIT', ClientPid, Reason}, Session = #session{clean_sess = fals lager:info("Session ~s unlink with client ~p: reason=~p", [ClientId, ClientPid, Reason]), TRef = timer(Expires, session_expired), - {noreply, Session#session{client_pid = undefined, expired_timer = TRef}, hibernate}; + noreply(Session#session{client_pid = undefined, expired_timer = TRef}); handle_info({'EXIT', Pid, _Reason}, Session = #session{client_id = ClientId, - client_pid = ClientPid}) -> + client_pid = ClientPid}) -> lager:error("Session ~s received unexpected EXIT:" " client_pid=~p, exit_pid=~p", [ClientId, ClientPid, Pid]), @@ -623,3 +623,5 @@ cancel_timer(undefined) -> cancel_timer(Ref) -> catch erlang:cancel_timer(Ref). +noreply(State) -> + {noreply, State, hibernate}.