diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 930454667..4a8a35933 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -354,13 +354,12 @@ handle_cast({resume, ClientId, ClientPid}, Session) -> %% Dequeue pending messages noreply(dequeue(Session2)); -%% PUBRAC -handle_cast({puback, PktId}, Session = #session{client_id = ClientId, awaiting_ack = Awaiting}) -> - case maps:find(PktId, Awaiting) of +%% PUBACK +handle_cast({puback, PktId}, Session = #session{client_id = ClientId, awaiting_ack = AwaitingAck}) -> + case maps:find(PktId, AwaitingAck) of {ok, {_, TRef}} -> cancel_timer(TRef), - Session1 = acked(PktId, Session), - noreply(dequeue(Session1)); + noreply(dequeue(acked(PktId, Session))); error -> lager:error("Session ~s cannot find PUBACK '~p'!", [ClientId, PktId]), noreply(Session) @@ -375,7 +374,8 @@ handle_cast({pubrec, PktId}, Session = #session{client_id = ClientId, {ok, {_, TRef}} -> cancel_timer(TRef), TRef1 = timer(Timeout, {timeout, awaiting_comp, PktId}), - Session1 = acked(PktId, Session#session{awaiting_comp = maps:put(PktId, TRef1, AwaitingComp)}), + AwaitingComp1 = maps:put(PktId, TRef1, AwaitingComp), + Session1 = acked(PktId, Session#session{awaiting_comp = AwaitingComp1}), noreply(dequeue(Session1)); error -> lager:error("Session ~s cannot find PUBREC '~p'!", [ClientId, PktId]), @@ -439,9 +439,9 @@ handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_pid = unde %% just remove awaiting noreply(Session#session{awaiting_ack = maps:remove(PktId, AwaitingAck)}); -handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_id = ClientId, +handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_id = ClientId, inflight_queue = InflightQ, - awaiting_ack = AwaitingAck}) -> + awaiting_ack = AwaitingAck}) -> case maps:find(PktId, AwaitingAck) of {ok, {{0, _Timeout}, _TRef}} -> Session1 = Session#session{inflight_queue = lists:keydelete(PktId, 1, InflightQ), @@ -564,10 +564,11 @@ dequeue(Session) -> dequeue2(Session = #session{message_queue = Q}) -> case emqttd_mqueue:out(Q) of - {empty, _Q} -> Session; + {empty, _Q} -> + Session; {{value, Msg}, Q1} -> - Session1 = deliver(Msg, Session#session{message_queue = Q1}), - dequeue(Session1) %% dequeue more + %% dequeue more + dequeue(deliver(Msg, Session#session{message_queue = Q1})) end. deliver(Msg = #mqtt_message{qos = ?QOS_0}, Session = #session{client_pid = ClientPid}) -> @@ -608,8 +609,8 @@ acked(PktId, Session = #session{client_id = ClientId, false -> lager:error("Session(~s) cannot find acked message: ~p", [PktId]) end, - Session#session{inflight_queue = lists:keydelete(PktId, 1, InflightQ), - awaiting_ack = maps:remove(PktId, Awaiting)}. + Session#session{awaiting_ack = maps:remove(PktId, Awaiting), + inflight_queue = lists:keydelete(PktId, 1, InflightQ)}. next_packet_id(Session = #session{packet_id = 16#ffff}) -> Session#session{packet_id = 1}; @@ -627,3 +628,4 @@ cancel_timer(Ref) -> noreply(State) -> {noreply, State, hibernate}. +