session, finally count down

This commit is contained in:
Feng Lee 2015-06-17 01:25:08 +08:00
parent 08a64ee97b
commit 38e0ba08d2
4 changed files with 278 additions and 296 deletions

View File

@ -1,71 +0,0 @@
%%%-----------------------------------------------------------------------------
%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% Inflight window of message queue. Wrap a list with len.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_inflight).
-author("Feng Lee <feng@emqtt.io>").
-include("emqttd.hrl").
-export([new/2, is_full/1, len/1, in/2, ack/2]).
-define(MAX_SIZE, 100).
-record(inflight, {name, q = [], len = 0, size = ?MAX_SIZE}).
-type inflight() :: #inflight{}.
-export_type([inflight/0]).
new(Name, Max) ->
#inflight{name = Name, size = Max}.
is_full(#inflight{size = 0}) ->
false;
is_full(#inflight{len = Len, size = Size}) when Len < Size ->
false;
is_full(_Inflight) ->
true.
len(#inflight{len = Len}) ->
Len.
in(_Msg, #inflight{len = Len, size = Size})
when Len =:= Size -> {error, full};
in(Msg = #mqtt_message{msgid = MsgId}, Inflight = #inflight{q = Q, len = Len}) ->
{ok, Inflight#inflight{q = [{MsgId, Msg}|Q], len = Len +1}}.
ack(MsgId, Inflight = #inflight{q = Q, len = Len}) ->
case lists:keyfind(MsgId, 1, Q) of
false ->
lager:error("Inflight(~s) cannot find msgid: ~p", [MsgId]),
Inflight;
_Msg ->
Inflight#inflight{q = lists:keydelete(MsgId, 1, Q), len = Len - 1}
end.

View File

@ -112,28 +112,21 @@ len(#mqueue{len = Len}) -> Len.
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec in({new | old, mqtt_message()}, mqueue()) -> mqueue(). -spec in({newcome | pending, mqtt_message()}, mqueue()) -> mqueue().
%% drop qos0 %% drop qos0
in({_, #mqtt_message{qos = ?QOS_0}}, MQ = #mqueue{qos0 = false}) -> in({_, #mqtt_message{qos = ?QOS_0}}, MQ = #mqueue{qos0 = false}) ->
MQ; MQ;
%% simply drop the oldest one if queue is full, improve later %% simply drop the oldest one if queue is full, improve later
in({new, Msg}, MQ = #mqueue{name = Name, q = Q, len = Len, max_len = MaxLen}) in(Msg, MQ = #mqueue{name = Name, q = Q, len = Len, max_len = MaxLen})
when Len =:= MaxLen -> when Len =:= MaxLen ->
{{value, OldMsg}, Q2} = queue:out(Q), {{value, OldMsg}, Q2} = queue:out(Q),
lager:error("queue(~s) drop message: ~p", [Name, OldMsg]), lager:error("MQueue(~s) drop message: ~p", [Name, OldMsg]),
MQ#mqueue{q = queue:in(Msg, Q2)}; MQ#mqueue{q = queue:in(Msg, Q2)};
in({old, Msg}, MQ = #mqueue{name = Name, len = Len, max_len = MaxLen}) in(Msg, MQ = #mqueue{q = Q, len = Len}) ->
when Len =:= MaxLen -> maybe_set_alarm(MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1}).
lager:error("queue(~s) drop message: ~p", [Name, Msg]), MQ;
in({new, Msg}, MQ = #mqueue{q = Q, len = Len}) ->
maybe_set_alarm(MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1});
in({old, Msg}, MQ = #mqueue{q = Q, len = Len}) ->
MQ#mqueue{q = queue:in_r(Msg, Q), len = Len + 1}.
out(MQ = #mqueue{len = 0}) -> out(MQ = #mqueue{len = 0}) ->
{empty, MQ}; {empty, MQ};

View File

@ -123,7 +123,7 @@ redeliver(Topic, CPid) when is_binary(Topic) andalso is_pid(CPid) ->
dispatch(_CPid, []) -> dispatch(_CPid, []) ->
ignore; ignore;
dispatch(CPid, Msgs) when is_list(Msgs) -> dispatch(CPid, Msgs) when is_list(Msgs) ->
CPid ! {dispatch, [Msg || Msg <- Msgs]}; [CPid ! {dispatch, Msg} || Msg <- Msgs];
dispatch(CPid, Msg) when is_record(Msg, mqtt_message) -> dispatch(CPid, Msg) when is_record(Msg, mqtt_message) ->
CPid ! {dispatch, Msg}. CPid ! {dispatch, Msg}.

View File

@ -40,6 +40,8 @@
%%% %%%
%%% 5. Optionally, QoS 0 messages pending transmission to the Client. %%% 5. Optionally, QoS 0 messages pending transmission to the Client.
%%% %%%
%%% State of Message: newcome, inflight, pending
%%%
%%% @end %%% @end
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
@ -86,13 +88,15 @@
%% QoS 1 and QoS 2 messages which have been sent to the Client, %% QoS 1 and QoS 2 messages which have been sent to the Client,
%% but have not been completely acknowledged. %% but have not been completely acknowledged.
%% Client <- Broker %% Client <- Broker
inflight_queue :: emqttd_inflight:inflight(), inflight_queue :: list(),
max_inflight = 0,
%% All qos1, qos2 messages published to when client is disconnected. %% All qos1, qos2 messages published to when client is disconnected.
%% QoS 1 and QoS 2 messages pending transmission to the Client. %% QoS 1 and QoS 2 messages pending transmission to the Client.
%% %%
%% Optionally, QoS 0 messages pending transmission to the Client. %% Optionally, QoS 0 messages pending transmission to the Client.
pending_queue :: emqttd_mqueue:mqueue(), message_queue :: emqttd_mqueue:mqueue(),
%% Inflight qos2 messages received from client and waiting for pubrel. %% Inflight qos2 messages received from client and waiting for pubrel.
%% QoS 2 messages which have been received from the Client, %% QoS 2 messages which have been received from the Client,
@ -100,25 +104,26 @@
%% Client -> Broker %% Client -> Broker
awaiting_rel :: map(), awaiting_rel :: map(),
%% Awaiting PUBREL timeout
await_rel_timeout = 8,
%% Max Packets that Awaiting PUBREL
max_awaiting_rel = 100,
%% Awaiting timers for ack, rel and comp. %% Awaiting timers for ack, rel and comp.
awaiting_ack :: map(), awaiting_ack :: map(),
awaiting_comp :: map(),
%% Retries to resend the unacked messages %% Retries to resend the unacked messages
unack_retries = 3, unack_retries = 3,
%% 4, 8, 16 seconds if 3 retries:) %% 4, 8, 16 seconds if 3 retries:)
unack_timeout = 4, unack_timeout = 4,
%% Awaiting PUBREL timeout %% Awaiting for PUBCOMP
await_rel_timeout = 8, awaiting_comp :: map(),
%% Max Packets that Awaiting PUBREL
max_awaiting_rel = 100,
%% session expired after 48 hours %% session expired after 48 hours
expired_after = 48, expired_after = 172800,
expired_timer, expired_timer,
@ -128,7 +133,7 @@
%% @doc Start a session. %% @doc Start a session.
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec start_link(boolean(), binary(), pid()) -> {ok, pid()} | {error, any()}. -spec start_link(boolean(), mqtt_clientid(), pid()) -> {ok, pid()} | {error, any()}.
start_link(CleanSess, ClientId, ClientPid) -> start_link(CleanSess, ClientId, ClientPid) ->
gen_server:start_link(?MODULE, [CleanSess, ClientId, ClientPid], []). gen_server:start_link(?MODULE, [CleanSess, ClientId, ClientPid], []).
@ -136,7 +141,7 @@ start_link(CleanSess, ClientId, ClientPid) ->
%% @doc Resume a session. %% @doc Resume a session.
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec resume(pid(), binary(), pid()) -> ok. -spec resume(pid(), mqtt_clientid(), pid()) -> ok.
resume(Session, ClientId, ClientPid) -> resume(Session, ClientId, ClientPid) ->
gen_server:cast(Session, {resume, ClientId, ClientPid}). gen_server:cast(Session, {resume, ClientId, ClientPid}).
@ -144,7 +149,7 @@ resume(Session, ClientId, ClientPid) ->
%% @doc Destroy a session. %% @doc Destroy a session.
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec destroy(Session:: pid(), ClientId :: binary()) -> ok. -spec destroy(pid(), mqtt_clientid()) -> ok.
destroy(Session, ClientId) -> destroy(Session, ClientId) ->
gen_server:call(Session, {destroy, ClientId}). gen_server:call(Session, {destroy, ClientId}).
@ -160,7 +165,7 @@ subscribe(Session, TopicTable) ->
%% @doc Publish message %% @doc Publish message
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec publish(Session :: pid(), {mqtt_qos(), mqtt_message()}) -> ok. -spec publish(pid(), mqtt_message()) -> ok.
publish(_Session, Msg = #mqtt_message{qos = ?QOS_0}) -> publish(_Session, Msg = #mqtt_message{qos = ?QOS_0}) ->
%% publish qos0 directly %% publish qos0 directly
emqttd_pubsub:publish(Msg); emqttd_pubsub:publish(Msg);
@ -210,15 +215,14 @@ init([CleanSess, ClientId, ClientPid]) ->
true = link(ClientPid), true = link(ClientPid),
QEnv = emqttd:env(mqtt, queue), QEnv = emqttd:env(mqtt, queue),
SessEnv = emqttd:env(mqtt, session), SessEnv = emqttd:env(mqtt, session),
PendingQ = emqttd_mqueue:new(ClientId, QEnv),
InflightQ = emqttd_inflight:new(ClientId, emqttd_opts:g(max_inflight, SessEnv)),
Session = #session{ Session = #session{
clean_sess = CleanSess, clean_sess = CleanSess,
clientid = ClientId, clientid = ClientId,
client_pid = ClientPid, client_pid = ClientPid,
subscriptions = [], subscriptions = [],
inflight_queue = InflightQ, inflight_queue = [],
pending_queue = PendingQ, max_inflight = emqttd_opts:g(max_inflight, SessEnv, 0),
message_queue = emqttd_mqueue:new(ClientId, QEnv),
awaiting_rel = #{}, awaiting_rel = #{},
awaiting_ack = #{}, awaiting_ack = #{},
awaiting_comp = #{}, awaiting_comp = #{},
@ -243,10 +247,11 @@ handle_call({subscribe, Topics}, _From, Session = #session{clientid = ClientId,
lists:foldl(fun({Topic, Qos}, Acc) -> lists:foldl(fun({Topic, Qos}, Acc) ->
case lists:keyfind(Topic, 1, Acc) of case lists:keyfind(Topic, 1, Acc) of
{Topic, Qos} -> {Topic, Qos} ->
lager:warning([{client, ClientId}], "Session ~s resubscribe ~p: qos = ~p", [ClientId, Topic, Qos]), Acc; lager:warning([{client, ClientId}], "Session ~s "
{Topic, Old} -> "resubscribe ~p: qos = ~p", [ClientId, Topic, Qos]), Acc;
lager:warning([{client, ClientId}], "Session ~s resubscribe ~p: old qos=~p, new qos=~p", {Topic, OldQos} ->
[ClientId, Topic, Old, Qos]), lager:warning([{client, ClientId}], "Session ~s "
"resubscribe ~p: old qos=~p, new qos=~p", [ClientId, Topic, OldQos, Qos]),
lists:keyreplace(Topic, 1, Acc, {Topic, Qos}); lists:keyreplace(Topic, 1, Acc, {Topic, Qos});
false -> false ->
%%TODO: the design is ugly, rewrite later...:( %%TODO: the design is ugly, rewrite later...:(
@ -259,11 +264,13 @@ handle_call({subscribe, Topics}, _From, Session = #session{clientid = ClientId,
end, Subscriptions, Topics), end, Subscriptions, Topics),
{reply, {ok, GrantedQos}, Session#session{subscriptions = Subscriptions1}}; {reply, {ok, GrantedQos}, Session#session{subscriptions = Subscriptions1}};
handle_call({unsubscribe, Topics}, _From, Session = #session{clientid = ClientId, subscriptions = Subscriptions}) -> handle_call({unsubscribe, Topics}, _From, Session = #session{clientid = ClientId,
subscriptions = Subscriptions}) ->
%% unsubscribe from topic tree %% unsubscribe from topic tree
ok = emqttd_pubsub:unsubscribe(Topics), ok = emqttd_pubsub:unsubscribe(Topics),
lager:info([{client, ClientId}], "Session ~s unsubscribe ~p.", [ClientId, Topics]),
lager:info([{client, ClientId}], "Session ~s unsubscribe ~p", [ClientId, Topics]),
Subscriptions1 = Subscriptions1 =
lists:foldl(fun(Topic, Acc) -> lists:foldl(fun(Topic, Acc) ->
@ -271,21 +278,24 @@ handle_call({unsubscribe, Topics}, _From, Session = #session{clientid = ClientId
{Topic, _Qos} -> {Topic, _Qos} ->
lists:keydelete(Topic, 1, Acc); lists:keydelete(Topic, 1, Acc);
false -> false ->
lager:warning([{client, ClientId}], "~s not subscribe ~s", [ClientId, Topic]), Acc lager:warning([{client, ClientId}], "Session ~s not subscribe ~s", [ClientId, Topic]), Acc
end end
end, Subscriptions, Topics), end, Subscriptions, Topics),
{reply, ok, Session#session{subscriptions = Subscriptions1}}; {reply, ok, Session#session{subscriptions = Subscriptions1}};
handle_call({publish, Message = #mqtt_message{qos = ?QOS_2, msgid = MsgId}}, _From, handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, msgid = MsgId}}, _From,
Session = #session{clientid = ClientId, awaiting_rel = AwaitingRel, await_rel_timeout = Timeout}) -> Session = #session{clientid = ClientId,
awaiting_rel = AwaitingRel,
await_rel_timeout = Timeout}) ->
case check_awaiting_rel(Session) of case check_awaiting_rel(Session) of
true -> true ->
TRef = timer(Timeout, {timeout, awaiting_rel, MsgId}), TRef = timer(Timeout, {timeout, awaiting_rel, MsgId}),
{reply, ok, Session#session{awaiting_rel = maps:put(MsgId, {Message, TRef}, AwaitingRel)}}; AwaitingRel1 = maps:put(MsgId, {Msg, TRef}, AwaitingRel),
{reply, ok, Session#session{awaiting_rel = AwaitingRel1}};
false -> false ->
lager:error([{clientid, ClientId}], "Session ~s " lager:critical([{clientid, ClientId}], "Session ~s dropped Qos2 message "
" dropped Qos2 message for too many awaiting_rel: ~p", [ClientId, Message]), "for too many awaiting_rel: ~p", [ClientId, Msg]),
{reply, {error, dropped}, Session} {reply, {error, dropped}, Session}
end; end;
@ -297,59 +307,52 @@ handle_call(Req, _From, State) ->
lager:critical("Unexpected Request: ~p", [Req]), lager:critical("Unexpected Request: ~p", [Req]),
{reply, {error, badreq}, State}. {reply, {error, badreq}, State}.
handle_cast({resume, ClientId, ClientPid}, State = #session{ handle_cast({resume, ClientId, ClientPid}, Session) ->
clientid = ClientId,
#session{clientid = ClientId,
client_pid = OldClientPid, client_pid = OldClientPid,
pending_queue = Queue, inflight_queue = InflightQ,
awaiting_ack = AwaitingAck, awaiting_ack = AwaitingAck,
awaiting_comp = AwaitingComp, awaiting_comp = AwaitingComp,
expired_timer = ETimer}) -> expired_timer = ETimer} = Session,
lager:info([{client, ClientId}], "Session ~s resumed by ~p",[ClientId, ClientPid]), lager:info([{client, ClientId}], "Session ~s resumed by ~p",[ClientId, ClientPid]),
%% kick old client... %% cancel expired timer
if cancel_timer(ETimer),
OldClientPid =:= undefined ->
ok;
OldClientPid =:= ClientPid ->
ok;
true ->
lager:error("Session '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, ClientPid, OldClientPid]),
unlink(OldClientPid),
OldClientPid ! {stop, duplicate_id, ClientPid}
end,
%% cancel timeout timer kick(ClientId, ClientPid, OldClientPid),
emqttd_util:cancel_timer(ETimer),
%% redelivery PUBREL %% Redeliver PUBREL
lists:foreach(fun(MsgId) -> [ClientPid ! {redeliver, {?PUBREL, MsgId}} || MsgId <- maps:keys(AwaitingComp)],
ClientPid ! {redeliver, {?PUBREL, MsgId}}
end, maps:keys(AwaitingComp)),
%% redelivery messages that awaiting PUBACK or PUBREC %% Clear awaiting_ack timers
Dup = fun(Msg) -> Msg#mqtt_message{dup = true} end, [cancel_timer(TRef) || {_, TRef} <- maps:values(AwaitingAck)],
lists:foreach(fun(Msg) ->
ClientPid ! {dispatch, {self(), Dup(Msg)}}
end, maps:values(AwaitingAck)),
%% send offline messages %% Clear awaiting_comp timers
lists:foreach(fun(Msg) -> [cancel_timer(TRef) || TRef <- maps:values(AwaitingComp)],
ClientPid ! {dispatch, {self(), Msg}}
end, emqttd_queue:all(Queue)),
{noreply, State#session{client_pid = ClientPid, Session1 = Session#session{client_pid = ClientPid,
%%TODO: awaiting_ack = #{},
pending_queue = emqttd_queue:clear(Queue), awaiting_comp = #{},
expired_timer = undefined}, hibernate}; expired_timer = undefined},
%% Redeliver inflight messages
Session2 =
lists:foldl(fun({_Id, Msg}, Sess) ->
redeliver(Msg#mqtt_message{dup = true}, Sess)
end, Session1, lists:reverse(InflightQ)),
handle_cast({puback, MsgId}, Session = #session{clientid = ClientId, inflight_queue = Q, awaiting_ack = Awaiting}) -> %% Dequeue pending messages
{noreply, dequeue(Session2), hibernate};
%% PUBRAC
handle_cast({puback, MsgId}, Session = #session{clientid = ClientId, awaiting_ack = Awaiting}) ->
case maps:find(MsgId, Awaiting) of case maps:find(MsgId, Awaiting) of
{ok, {_, TRef}} -> {ok, {_, TRef}} ->
catch erlang:cancel_timer(TRef), cancel_timer(TRef),
{noreply, dispatch(Session#session{inflight_queue = emqttd_inflight:ack(MsgId, Q), Session1 = acked(MsgId, Session),
awaiting_ack = maps:remove(MsgId, Awaiting)})}; {noreply, dequeue(Session1)};
error -> error ->
lager:error("Session ~s cannot find PUBACK '~p'!", [ClientId, MsgId]), lager:error("Session ~s cannot find PUBACK '~p'!", [ClientId, MsgId]),
{noreply, Session} {noreply, Session}
@ -362,33 +365,36 @@ handle_cast({pubrec, MsgId}, Session = #session{clientid = ClientId,
await_rel_timeout = Timeout}) -> await_rel_timeout = Timeout}) ->
case maps:find(MsgId, AwaitingAck) of case maps:find(MsgId, AwaitingAck) of
{ok, {_, TRef}} -> {ok, {_, TRef}} ->
catch erlang:cancel_timer(TRef), cancel_timer(TRef),
TRef1 = timer(Timeout, {timeout, awaiting_comp, MsgId}), TRef1 = timer(Timeout, {timeout, awaiting_comp, MsgId}),
{noreply, dispatch(Session#session{awaiting_ack = maps:remove(MsgId, AwaitingAck), Session1 = acked(MsgId, Session#session{awaiting_comp = maps:put(MsgId, TRef1, AwaitingComp)}),
awaiting_comp = maps:put(MsgId, TRef1, AwaitingComp)})}; {noreply, dequeue(Session1)};
error -> error ->
lager:error("Session ~s cannot find PUBREC '~p'!", [ClientId, MsgId]), lager:error("Session ~s cannot find PUBREC '~p'!", [ClientId, MsgId]),
{noreply, Session} {noreply, Session}
end; end;
handle_cast({pubrel, MsgId}, Session = #session{clientid = ClientId, awaiting_rel = AwaitingRel}) -> %% PUBREL
handle_cast({pubrel, MsgId}, Session = #session{clientid = ClientId,
awaiting_rel = AwaitingRel}) ->
case maps:find(MsgId, AwaitingRel) of case maps:find(MsgId, AwaitingRel) of
{ok, {Msg, TRef}} -> {ok, {Msg, TRef}} ->
catch erlang:cancel_timer(TRef), cancel_timer(TRef),
emqttd_pubsub:publish(Msg), emqttd_pubsub:publish(Msg),
{noreply, Session#session{awaiting_rel = maps:remove(MsgId, AwaitingRel)}}; {noreply, Session#session{awaiting_rel = maps:remove(MsgId, AwaitingRel)}};
error -> error ->
lager:error("Session ~s cannot find PUBREL '~p'!", [ClientId, MsgId]), lager:error("Session ~s cannot find PUBREL: msgid=~p!", [ClientId, MsgId]),
{noreply, Session} {noreply, Session}
end; end;
%% PUBCOMP %% PUBCOMP
handle_cast({pubcomp, MsgId}, Session = #session{clientid = ClientId, awaiting_comp = AwaitingComp}) -> handle_cast({pubcomp, MsgId}, Session = #session{clientid = ClientId, awaiting_comp = AwaitingComp}) ->
case maps:is_key(MsgId, AwaitingComp) of case maps:find(MsgId, AwaitingComp) of
true -> {ok, TRef} ->
cancel_timer(TRef),
{noreply, Session#session{awaiting_comp = maps:remove(MsgId, AwaitingComp)}}; {noreply, Session#session{awaiting_comp = maps:remove(MsgId, AwaitingComp)}};
false -> error ->
lager:error("Session ~s cannot find PUBREC MsgId '~p'", [ClientId, MsgId]), lager:error("Session ~s cannot find PUBCOMP: MsgId=~p", [ClientId, MsgId]),
{noreply, Session} {noreply, Session}
end; end;
@ -396,61 +402,103 @@ handle_cast(Msg, State) ->
lager:critical("Unexpected Msg: ~p, State: ~p", [Msg, State]), lager:critical("Unexpected Msg: ~p, State: ~p", [Msg, State]),
{noreply, State}. {noreply, State}.
handle_info({dispatch, MsgList}, Session) when is_list(MsgList) -> %% Queue messages when client is offline
NewSession = lists:foldl(fun(Msg, S) -> handle_info({dispatch, Msg}, Session = #session{client_pid = undefined,
dispatch({new, Msg}, S) message_queue = Q})
end, Session, MsgList), when is_record(Msg, mqtt_message) ->
{noreply, NewSession}; {noreply, Session#session{message_queue = emqttd_mqueue:in(Msg, Q)}};
handle_info({dispatch, {old, Msg}}, Session) when is_record(Msg, mqtt_message) -> %% Dispatch qos0 message directly to client
{noreply, dispatch({old, Msg}, Session)}; handle_info({dispatch, Msg = #mqtt_message{qos = ?QOS_0}},
Session = #session{client_pid = ClientPid}) ->
ClientPid ! {deliver, Msg},
{noreply, Session};
handle_info({dispatch, Msg}, Session) when is_record(Msg, mqtt_message) -> handle_info({dispatch, Msg = #mqtt_message{qos = QoS}},
{noreply, dispatch({new, Msg}, Session)}; Session = #session{clientid = ClientId, message_queue = MsgQ})
when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 ->
handle_info({'EXIT', ClientPid, Reason}, Session = #session{clean_sess = false, case check_inflight(Session) of
clientid = ClientId, true ->
client_pid = ClientPid, {noreply, deliver(Msg, Session)};
expired_after = Expires}) -> false ->
%%TODO: Clean puback, pubrel, pubcomp timers lager:warning([{client, ClientId}], "Session ~s inflight queue is full!", [ClientId]),
lager:info("Session ~s: client ~p exited for ~p", [ClientId, ClientPid, Reason]), {noreply, Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)}}
TRef = timer(Expires * 1000, session_expired), end;
{noreply, Session#session{expired_timer = TRef}};
handle_info({'EXIT', ClientPid, _Reason}, Session = #session{clean_sess = true, client_pid = ClientPid}) -> handle_info({timeout, awaiting_ack, MsgId}, Session = #session{client_pid = undefined,
%%TODO: reason... awaiting_ack = AwaitingAck}) ->
{stop, normal, Session}; %% just remove awaiting
{noreply, Session#session{awaiting_ack = maps:remove(MsgId, AwaitingAck)}};
handle_info({'EXIT', ClientPid0, _Reason}, State = #session{client_pid = ClientPid}) -> handle_info({timeout, awaiting_ack, MsgId}, Session = #session{clientid = ClientId,
lager:critical("Unexpected Client EXIT: pid=~p, pid(state): ~p", [ClientPid0, ClientPid]), inflight_queue = InflightQ,
{noreply, State}; awaiting_ack = AwaitingAck}) ->
case maps:find(MsgId, AwaitingAck) of
{ok, {{0, _Timeout}, _TRef}} ->
Session1 = Session#session{inflight_queue = lists:keydelete(MsgId, 1, InflightQ),
awaiting_ack = maps:remove(MsgId, AwaitingAck)},
{noreply, dequeue(Session1)};
{ok, {{Retries, Timeout}, _TRef}} ->
TRef = timer(Timeout, {timeout, awaiting_ack, MsgId}),
AwaitingAck1 = maps:put(MsgId, {{Retries-1, Timeout*2}, TRef}, AwaitingAck),
{noreply, Session#session{awaiting_ack = AwaitingAck1}};
error ->
lager:error([{client, ClientId}], "Session ~s "
"cannot find Awaiting Ack:~p", [ClientId, MsgId]),
{noreply, Session}
end;
handle_info(session_expired, State = #session{clientid = ClientId}) -> handle_info({timeout, awaiting_rel, MsgId}, Session = #session{clientid = ClientId,
lager:error("Session ~s expired, shutdown now!", [ClientId]), awaiting_rel = AwaitingRel}) ->
{stop, {shutdown, expired}, State}; case maps:find(MsgId, AwaitingRel) of
handle_info({timeout, awaiting_rel, MsgId}, Session = #session{clientid = ClientId, awaiting_rel = Awaiting}) ->
case maps:find(MsgId, Awaiting) of
{ok, {Msg, _TRef}} -> {ok, {Msg, _TRef}} ->
lager:error([{client, ClientId}], "Session ~s Awaiting Rel Timout!~nDrop Message:~p", [ClientId, Msg]), lager:error([{client, ClientId}], "Session ~s AwaitingRel Timout!~n"
{noreply, Session#session{awaiting_rel = maps:remove(MsgId, Awaiting)}}; "Drop Message:~p", [ClientId, Msg]),
{noreply, Session#session{awaiting_rel = maps:remove(MsgId, AwaitingRel)}};
error -> error ->
lager:error([{client, ClientId}], "Session ~s Cannot find AwaitingRel: MsgId=~p", [ClientId, MsgId]), lager:error([{client, ClientId}], "Session ~s Cannot find AwaitingRel: MsgId=~p", [ClientId, MsgId]),
{noreply, Session} {noreply, Session}
end; end;
handle_info({timeout, awaiting_comp, MsgId}, Session = #session{clientid = ClientId, awaiting_comp = Awaiting}) -> handle_info({timeout, awaiting_comp, MsgId}, Session = #session{clientid = ClientId,
awaiting_comp = Awaiting}) ->
case maps:find(MsgId, Awaiting) of case maps:find(MsgId, Awaiting) of
{ok, _TRef} -> {ok, _TRef} ->
lager:error([{client, ClientId}], "Session ~s Awaiting PUBCOMP Timout: MsgId=~p!", [ClientId, MsgId]), lager:error([{client, ClientId}], "Session ~s "
"Awaiting PUBCOMP Timout: MsgId=~p!", [ClientId, MsgId]),
{noreply, Session#session{awaiting_comp = maps:remove(MsgId, Awaiting)}}; {noreply, Session#session{awaiting_comp = maps:remove(MsgId, Awaiting)}};
error -> error ->
lager:error([{client, ClientId}], "Session ~s Cannot find Awaiting PUBCOMP: MsgId=~p", [ClientId, MsgId]), lager:error([{client, ClientId}], "Session ~s "
"Cannot find Awaiting PUBCOMP: MsgId=~p", [ClientId, MsgId]),
{noreply, Session} {noreply, Session}
end; end;
handle_info(Info, Session) -> handle_info({'EXIT', ClientPid, _Reason}, Session = #session{clean_sess = true,
lager:critical("Unexpected Info: ~p, Session: ~p", [Info, Session]), client_pid = ClientPid}) ->
{stop, normal, Session};
handle_info({'EXIT', ClientPid, Reason}, Session = #session{clean_sess = false,
clientid = ClientId,
client_pid = ClientPid,
expired_after = Expires}) ->
lager:info("Session ~s unlink with client ~p: reason=~p", [ClientId, ClientPid, Reason]),
TRef = timer(Expires, session_expired),
{noreply, Session#session{expired_timer = TRef}, hibernate};
handle_info({'EXIT', Pid, _Reason}, Session = #session{clientid = ClientId,
client_pid = ClientPid}) ->
lager:error("Session ~s received unexpected EXIT:"
" client_pid=~p, exit_pid=~p", [ClientId, ClientPid, Pid]),
{noreply, Session};
handle_info(session_expired, Session = #session{clientid = ClientId}) ->
lager:error("Session ~s expired, shutdown now!", [ClientId]),
{stop, {shutdown, expired}, Session};
handle_info(Info, Session = #session{clientid = ClientId}) ->
lager:critical("Session ~s received unexpected info: ~p", [ClientId, Info]),
{noreply, Session}. {noreply, Session}.
terminate(_Reason, _Session) -> terminate(_Reason, _Session) ->
@ -464,11 +512,26 @@ code_change(_OldVsn, Session, _Extra) ->
%%%============================================================================= %%%=============================================================================
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @private %% Kick duplicated client
%% @doc Plubish Qos2 message from client -> broker, and then wait for pubrel.
%% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
kick(_ClientId, _ClientPid, undefined) ->
ok;
kick(_ClientId, ClientPid, ClientPid) ->
ok;
kick(ClientId, ClientPid, OldClientPid) ->
lager:error("Session '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, ClientPid, OldClientPid]),
unlink(OldClientPid),
OldClientPid ! {stop, duplicate_id, ClientPid}.
%%------------------------------------------------------------------------------
%% Check inflight and awaiting_rel
%%------------------------------------------------------------------------------
check_inflight(#session{max_inflight = 0}) ->
true;
check_inflight(#session{max_inflight = Max, inflight_queue = Q}) ->
Max > length(Q).
check_awaiting_rel(#session{max_awaiting_rel = 0}) -> check_awaiting_rel(#session{max_awaiting_rel = 0}) ->
true; true;
@ -476,72 +539,61 @@ check_awaiting_rel(#session{awaiting_rel = AwaitingRel,
max_awaiting_rel = MaxLen}) -> max_awaiting_rel = MaxLen}) ->
maps:size(AwaitingRel) < MaxLen. maps:size(AwaitingRel) < MaxLen.
%%%============================================================================= %%------------------------------------------------------------------------------
%%% Dispatch message from broker -> client. %% Dequeue and Deliver
%%%============================================================================= %%------------------------------------------------------------------------------
dispatch(Session = #session{client_pid = undefined}) -> dequeue(Session = #session{client_pid = undefined}) ->
%% do nothing %% do nothing if client is disconnected
Session; Session;
dispatch(Session = #session{pending_queue = PendingQ}) -> dequeue(Session) ->
case emqttd_mqueue:out(PendingQ) of case check_inflight(Session) of
{empty, _Q} -> true -> dequeue2(Session);
Session; false -> Session
{{value, Msg}, Q1} ->
self() ! {dispatch, {old, Msg}},
Session#session{pending_queue = Q1}
end. end.
%% queued the message if client is offline dequeue2(Session = #session{message_queue = Q}) ->
dispatch({Type, Msg}, Session = #session{client_pid = undefined, case emqttd_mqueue:out(Q) of
pending_queue= PendingQ}) -> {empty, _Q} -> Session;
Session#session{pending_queue = emqttd_mqueue:in({Type, Msg}, PendingQ)}; {{value, Msg}, Q1} ->
Session1 = deliver(Msg, Session#session{message_queue = Q1}),
dequeue(Session1) %% dequeue more
end.
%% dispatch qos0 directly to client process deliver(Msg = #mqtt_message{qos = ?QOS_0}, Session = #session{client_pid = ClientPid}) ->
dispatch({_Type, Msg} = #mqtt_message{qos = ?QOS_0}, Session = #session{client_pid = ClientPid}) ->
ClientPid ! {deliver, Msg}, Session; ClientPid ! {deliver, Msg}, Session;
%% dispatch qos1/2 message and wait for puback deliver(Msg = #mqtt_message{qos = QoS}, Session = #session{message_id = MsgId,
dispatch({Type, Msg = #mqtt_message{qos = Qos}}, Session = #session{clientid = ClientId,
client_pid = ClientPid, client_pid = ClientPid,
message_id = MsgId,
pending_queue = PendingQ,
inflight_queue = InflightQ}) inflight_queue = InflightQ})
when Qos =:= ?QOS_1 orelse Qos =:= ?QOS_2 -> when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 ->
%% assign id first Msg1 = Msg#mqtt_message{msgid = MsgId, dup = false},
Msg1 = Msg#mqtt_message{msgid = MsgId},
Msg2 =
if
Qos =:= ?QOS_1 -> Msg1;
Qos =:= ?QOS_2 -> Msg1#mqtt_message{dup = false}
end,
case emqttd_inflight:in(Msg1, InflightQ) of
{error, full} ->
lager:error("Session ~s inflight queue is full!", [ClientId]),
Session#session{pending_queue = emqttd_mqueue:in({Type, Msg}, PendingQ)};
{ok, InflightQ1} ->
ClientPid ! {deliver, Msg1}, ClientPid ! {deliver, Msg1},
await_ack(Msg1, next_msgid(Session#session{inflight_queue = InflightQ1})) await(Msg1, next_msgid(Session#session{inflight_queue = [{MsgId, Msg1}|InflightQ]})).
end.
deliver(Msg, Session) -> redeliver(Msg = #mqtt_message{qos = ?QOS_0}, Session) ->
ok. deliver(Msg, Session);
await(Msg, Session) -> redeliver(Msg = #mqtt_message{qos = QoS}, Session = #session{client_pid = ClientPid})
ok. when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 ->
ClientPid ! {deliver, Msg},
await(Msg, Session).
% message(qos1/2) is awaiting ack %%------------------------------------------------------------------------------
await_ack(Msg = #mqtt_message{msgid = MsgId}, Session = #session{awaiting_ack = Awaiting, %% Awaiting ack for qos1, qos2 message
%%------------------------------------------------------------------------------
await(#mqtt_message{msgid = MsgId}, Session = #session{awaiting_ack = Awaiting,
unack_retries = Retries, unack_retries = Retries,
unack_timeout = Timeout}) -> unack_timeout = Timeout}) ->
TRef = timer(Timeout, {timeout, awaiting_ack, MsgId}),
TRef = timer(Timeout * 1000, {retry, MsgId}),
Awaiting1 = maps:put(MsgId, {{Retries, Timeout}, TRef}, Awaiting), Awaiting1 = maps:put(MsgId, {{Retries, Timeout}, TRef}, Awaiting),
Session#session{awaiting_ack = Awaiting1}. Session#session{awaiting_ack = Awaiting1}.
timer(Timeout, TimeoutMsg) -> acked(MsgId, Session = #session{inflight_queue = InflightQ,
erlang:send_after(Timeout * 1000, self(), TimeoutMsg). awaiting_ack = Awaiting}) ->
Session#session{inflight_queue = lists:keydelete(MsgId, 1, InflightQ),
awaiting_ack = maps:remove(MsgId, Awaiting)}.
next_msgid(Session = #session{message_id = 16#ffff}) -> next_msgid(Session = #session{message_id = 16#ffff}) ->
Session#session{message_id = 1}; Session#session{message_id = 1};
@ -549,3 +601,11 @@ next_msgid(Session = #session{message_id = 16#ffff}) ->
next_msgid(Session = #session{message_id = MsgId}) -> next_msgid(Session = #session{message_id = MsgId}) ->
Session#session{message_id = MsgId + 1}. Session#session{message_id = MsgId + 1}.
timer(Timeout, TimeoutMsg) ->
erlang:send_after(Timeout * 1000, self(), TimeoutMsg).
cancel_timer(undefined) ->
undefined;
cancel_timer(Ref) ->
catch erlang:cancel_timer(Ref).