This commit is contained in:
Feng 2015-07-08 22:12:33 +08:00
parent 86cfbf8c43
commit 711a875e23
1 changed files with 11 additions and 10 deletions

View File

@ -313,7 +313,7 @@ handle_call(Req, _From, State) ->
handle_cast({resume, ClientId, ClientPid}, Session) -> handle_cast({resume, ClientId, ClientPid}, Session) ->
#session{client_id = ClientId, #session{client_id = ClientId,
client_pid = OldClientPid, client_pid = OldClientPid,
inflight_queue = InflightQ, inflight_queue = InflightQ,
awaiting_ack = AwaitingAck, awaiting_ack = AwaitingAck,
@ -417,18 +417,18 @@ handle_info({dispatch, Msg}, Session = #session{client_pid = undefined,
%% Dispatch qos0 message directly to client %% Dispatch qos0 message directly to client
handle_info({dispatch, Msg = #mqtt_message{qos = ?QOS_0}}, handle_info({dispatch, Msg = #mqtt_message{qos = ?QOS_0}},
Session = #session{client_pid = ClientPid}) -> Session = #session{client_pid = ClientPid}) ->
ClientPid ! {deliver, Msg}, ClientPid ! {deliver, Msg},
{noreply, Session}; {noreply, Session};
handle_info({dispatch, Msg = #mqtt_message{qos = QoS}}, handle_info({dispatch, Msg = #mqtt_message{qos = QoS}},
Session = #session{client_id = ClientId, message_queue = MsgQ}) Session = #session{client_id = ClientId, message_queue = MsgQ})
when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 -> when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 ->
case check_inflight(Session) of case check_inflight(Session) of
true -> true ->
{noreply, deliver(Msg, Session)}; {noreply, deliver(Msg, Session)};
false -> false ->
lager:warning([{client, ClientId}], "Session ~s inflight queue is full!", [ClientId]), lager:error([{client, ClientId}], "Session ~s inflight queue is full!", [ClientId]),
{noreply, Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)}} {noreply, Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)}}
end; end;
@ -443,7 +443,7 @@ handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_id = Clien
case maps:find(PktId, AwaitingAck) of case maps:find(PktId, AwaitingAck) of
{ok, {{0, _Timeout}, _TRef}} -> {ok, {{0, _Timeout}, _TRef}} ->
Session1 = Session#session{inflight_queue = lists:keydelete(PktId, 1, InflightQ), Session1 = Session#session{inflight_queue = lists:keydelete(PktId, 1, InflightQ),
awaiting_ack = maps:remove(PktId, AwaitingAck)}, awaiting_ack = maps:remove(PktId, AwaitingAck)},
{noreply, dequeue(Session1)}; {noreply, dequeue(Session1)};
{ok, {{Retries, Timeout}, _TRef}} -> {ok, {{Retries, Timeout}, _TRef}} ->
TRef = timer(Timeout, {timeout, awaiting_ack, PktId}), TRef = timer(Timeout, {timeout, awaiting_ack, PktId}),
@ -484,11 +484,12 @@ handle_info({'EXIT', ClientPid, _Reason}, Session = #session{clean_sess = true,
client_pid = ClientPid}) -> client_pid = ClientPid}) ->
{stop, normal, Session}; {stop, normal, Session};
handle_info({'EXIT', ClientPid, Reason}, Session = #session{clean_sess = false, handle_info({'EXIT', ClientPid, Reason}, Session = #session{clean_sess = false,
client_id = ClientId, client_id = ClientId,
client_pid = ClientPid, client_pid = ClientPid,
expired_after = Expires}) -> expired_after = Expires}) ->
lager:info("Session ~s unlink with client ~p: reason=~p", [ClientId, ClientPid, Reason]), lager:info("Session ~s unlink with client ~p: reason=~p",
[ClientId, ClientPid, Reason]),
TRef = timer(Expires, session_expired), TRef = timer(Expires, session_expired),
{noreply, Session#session{client_pid = undefined, expired_timer = TRef}, hibernate}; {noreply, Session#session{client_pid = undefined, expired_timer = TRef}, hibernate};
@ -541,7 +542,7 @@ check_inflight(#session{max_inflight = Max, inflight_queue = Q}) ->
check_awaiting_rel(#session{max_awaiting_rel = 0}) -> check_awaiting_rel(#session{max_awaiting_rel = 0}) ->
true; true;
check_awaiting_rel(#session{awaiting_rel = AwaitingRel, check_awaiting_rel(#session{awaiting_rel = AwaitingRel,
max_awaiting_rel = MaxLen}) -> max_awaiting_rel = MaxLen}) ->
maps:size(AwaitingRel) < MaxLen. maps:size(AwaitingRel) < MaxLen.