diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 05ceab617..a536029af 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -313,7 +313,7 @@ handle_call(Req, _From, State) -> handle_cast({resume, ClientId, ClientPid}, Session) -> - #session{client_id = ClientId, + #session{client_id = ClientId, client_pid = OldClientPid, inflight_queue = InflightQ, awaiting_ack = AwaitingAck, @@ -417,18 +417,18 @@ handle_info({dispatch, Msg}, Session = #session{client_pid = undefined, %% Dispatch qos0 message directly to client handle_info({dispatch, Msg = #mqtt_message{qos = ?QOS_0}}, Session = #session{client_pid = ClientPid}) -> - ClientPid ! {deliver, Msg}, + ClientPid ! {deliver, Msg}, {noreply, Session}; handle_info({dispatch, Msg = #mqtt_message{qos = QoS}}, - Session = #session{client_id = ClientId, message_queue = MsgQ}) + Session = #session{client_id = ClientId, message_queue = MsgQ}) when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 -> case check_inflight(Session) of true -> {noreply, deliver(Msg, Session)}; false -> - lager:warning([{client, ClientId}], "Session ~s inflight queue is full!", [ClientId]), + lager:error([{client, ClientId}], "Session ~s inflight queue is full!", [ClientId]), {noreply, Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)}} end; @@ -443,7 +443,7 @@ handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_id = Clien case maps:find(PktId, AwaitingAck) of {ok, {{0, _Timeout}, _TRef}} -> Session1 = Session#session{inflight_queue = lists:keydelete(PktId, 1, InflightQ), - awaiting_ack = maps:remove(PktId, AwaitingAck)}, + awaiting_ack = maps:remove(PktId, AwaitingAck)}, {noreply, dequeue(Session1)}; {ok, {{Retries, Timeout}, _TRef}} -> TRef = timer(Timeout, {timeout, awaiting_ack, PktId}), @@ -484,11 +484,12 @@ handle_info({'EXIT', ClientPid, _Reason}, Session = #session{clean_sess = true, client_pid = ClientPid}) -> {stop, normal, Session}; -handle_info({'EXIT', ClientPid, Reason}, Session = #session{clean_sess = false, - client_id = ClientId, - client_pid = ClientPid, +handle_info({'EXIT', ClientPid, Reason}, Session = #session{clean_sess = false, + client_id = ClientId, + client_pid = ClientPid, expired_after = Expires}) -> - lager:info("Session ~s unlink with client ~p: reason=~p", [ClientId, ClientPid, Reason]), + lager:info("Session ~s unlink with client ~p: reason=~p", + [ClientId, ClientPid, Reason]), TRef = timer(Expires, session_expired), {noreply, Session#session{client_pid = undefined, expired_timer = TRef}, hibernate}; @@ -541,7 +542,7 @@ check_inflight(#session{max_inflight = Max, inflight_queue = Q}) -> check_awaiting_rel(#session{max_awaiting_rel = 0}) -> true; -check_awaiting_rel(#session{awaiting_rel = AwaitingRel, +check_awaiting_rel(#session{awaiting_rel = AwaitingRel, max_awaiting_rel = MaxLen}) -> maps:size(AwaitingRel) < MaxLen.