misc
This commit is contained in:
parent
c65d047fda
commit
cefff9c0eb
|
@ -354,13 +354,12 @@ handle_cast({resume, ClientId, ClientPid}, Session) ->
|
|||
%% Dequeue pending messages
|
||||
noreply(dequeue(Session2));
|
||||
|
||||
%% PUBRAC
|
||||
handle_cast({puback, PktId}, Session = #session{client_id = ClientId, awaiting_ack = Awaiting}) ->
|
||||
case maps:find(PktId, Awaiting) of
|
||||
%% PUBACK
|
||||
handle_cast({puback, PktId}, Session = #session{client_id = ClientId, awaiting_ack = AwaitingAck}) ->
|
||||
case maps:find(PktId, AwaitingAck) of
|
||||
{ok, {_, TRef}} ->
|
||||
cancel_timer(TRef),
|
||||
Session1 = acked(PktId, Session),
|
||||
noreply(dequeue(Session1));
|
||||
noreply(dequeue(acked(PktId, Session)));
|
||||
error ->
|
||||
lager:error("Session ~s cannot find PUBACK '~p'!", [ClientId, PktId]),
|
||||
noreply(Session)
|
||||
|
@ -375,7 +374,8 @@ handle_cast({pubrec, PktId}, Session = #session{client_id = ClientId,
|
|||
{ok, {_, TRef}} ->
|
||||
cancel_timer(TRef),
|
||||
TRef1 = timer(Timeout, {timeout, awaiting_comp, PktId}),
|
||||
Session1 = acked(PktId, Session#session{awaiting_comp = maps:put(PktId, TRef1, AwaitingComp)}),
|
||||
AwaitingComp1 = maps:put(PktId, TRef1, AwaitingComp),
|
||||
Session1 = acked(PktId, Session#session{awaiting_comp = AwaitingComp1}),
|
||||
noreply(dequeue(Session1));
|
||||
error ->
|
||||
lager:error("Session ~s cannot find PUBREC '~p'!", [ClientId, PktId]),
|
||||
|
@ -439,9 +439,9 @@ handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_pid = unde
|
|||
%% just remove awaiting
|
||||
noreply(Session#session{awaiting_ack = maps:remove(PktId, AwaitingAck)});
|
||||
|
||||
handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_id = ClientId,
|
||||
handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_id = ClientId,
|
||||
inflight_queue = InflightQ,
|
||||
awaiting_ack = AwaitingAck}) ->
|
||||
awaiting_ack = AwaitingAck}) ->
|
||||
case maps:find(PktId, AwaitingAck) of
|
||||
{ok, {{0, _Timeout}, _TRef}} ->
|
||||
Session1 = Session#session{inflight_queue = lists:keydelete(PktId, 1, InflightQ),
|
||||
|
@ -564,10 +564,11 @@ dequeue(Session) ->
|
|||
|
||||
dequeue2(Session = #session{message_queue = Q}) ->
|
||||
case emqttd_mqueue:out(Q) of
|
||||
{empty, _Q} -> Session;
|
||||
{empty, _Q} ->
|
||||
Session;
|
||||
{{value, Msg}, Q1} ->
|
||||
Session1 = deliver(Msg, Session#session{message_queue = Q1}),
|
||||
dequeue(Session1) %% dequeue more
|
||||
%% dequeue more
|
||||
dequeue(deliver(Msg, Session#session{message_queue = Q1}))
|
||||
end.
|
||||
|
||||
deliver(Msg = #mqtt_message{qos = ?QOS_0}, Session = #session{client_pid = ClientPid}) ->
|
||||
|
@ -608,8 +609,8 @@ acked(PktId, Session = #session{client_id = ClientId,
|
|||
false ->
|
||||
lager:error("Session(~s) cannot find acked message: ~p", [PktId])
|
||||
end,
|
||||
Session#session{inflight_queue = lists:keydelete(PktId, 1, InflightQ),
|
||||
awaiting_ack = maps:remove(PktId, Awaiting)}.
|
||||
Session#session{awaiting_ack = maps:remove(PktId, Awaiting),
|
||||
inflight_queue = lists:keydelete(PktId, 1, InflightQ)}.
|
||||
|
||||
next_packet_id(Session = #session{packet_id = 16#ffff}) ->
|
||||
Session#session{packet_id = 1};
|
||||
|
@ -627,3 +628,4 @@ cancel_timer(Ref) ->
|
|||
|
||||
noreply(State) ->
|
||||
{noreply, State, hibernate}.
|
||||
|
||||
|
|
Loading…
Reference in New Issue