diff --git a/apps/emqttd/src/emqttd_inflight.erl b/apps/emqttd/src/emqttd_inflight.erl deleted file mode 100644 index 8f3df986c..000000000 --- a/apps/emqttd/src/emqttd_inflight.erl +++ /dev/null @@ -1,71 +0,0 @@ -%%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. -%%% -%%% Permission is hereby granted, free of charge, to any person obtaining a copy -%%% of this software and associated documentation files (the "Software"), to deal -%%% in the Software without restriction, including without limitation the rights -%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -%%% copies of the Software, and to permit persons to whom the Software is -%%% furnished to do so, subject to the following conditions: -%%% -%%% The above copyright notice and this permission notice shall be included in all -%%% copies or substantial portions of the Software. -%%% -%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -%%% SOFTWARE. -%%%----------------------------------------------------------------------------- -%%% @doc -%%% Inflight window of message queue. Wrap a list with len. -%%% -%%% @end -%%%----------------------------------------------------------------------------- - --module(emqttd_inflight). - --author("Feng Lee "). - --include("emqttd.hrl"). - --export([new/2, is_full/1, len/1, in/2, ack/2]). - --define(MAX_SIZE, 100). - --record(inflight, {name, q = [], len = 0, size = ?MAX_SIZE}). - --type inflight() :: #inflight{}. - --export_type([inflight/0]). - -new(Name, Max) -> - #inflight{name = Name, size = Max}. - -is_full(#inflight{size = 0}) -> - false; -is_full(#inflight{len = Len, size = Size}) when Len < Size -> - false; -is_full(_Inflight) -> - true. - -len(#inflight{len = Len}) -> - Len. - -in(_Msg, #inflight{len = Len, size = Size}) - when Len =:= Size -> {error, full}; - -in(Msg = #mqtt_message{msgid = MsgId}, Inflight = #inflight{q = Q, len = Len}) -> - {ok, Inflight#inflight{q = [{MsgId, Msg}|Q], len = Len +1}}. - -ack(MsgId, Inflight = #inflight{q = Q, len = Len}) -> - case lists:keyfind(MsgId, 1, Q) of - false -> - lager:error("Inflight(~s) cannot find msgid: ~p", [MsgId]), - Inflight; - _Msg -> - Inflight#inflight{q = lists:keydelete(MsgId, 1, Q), len = Len - 1} - end. - diff --git a/apps/emqttd/src/emqttd_mqueue.erl b/apps/emqttd/src/emqttd_mqueue.erl index c9c7527be..85d598844 100644 --- a/apps/emqttd/src/emqttd_mqueue.erl +++ b/apps/emqttd/src/emqttd_mqueue.erl @@ -112,28 +112,21 @@ len(#mqueue{len = Len}) -> Len. %% @end %%------------------------------------------------------------------------------ --spec in({new | old, mqtt_message()}, mqueue()) -> mqueue(). +-spec in({newcome | pending, mqtt_message()}, mqueue()) -> mqueue(). %% drop qos0 in({_, #mqtt_message{qos = ?QOS_0}}, MQ = #mqueue{qos0 = false}) -> MQ; %% simply drop the oldest one if queue is full, improve later -in({new, Msg}, MQ = #mqueue{name = Name, q = Q, len = Len, max_len = MaxLen}) +in(Msg, MQ = #mqueue{name = Name, q = Q, len = Len, max_len = MaxLen}) when Len =:= MaxLen -> {{value, OldMsg}, Q2} = queue:out(Q), - lager:error("queue(~s) drop message: ~p", [Name, OldMsg]), + lager:error("MQueue(~s) drop message: ~p", [Name, OldMsg]), MQ#mqueue{q = queue:in(Msg, Q2)}; -in({old, Msg}, MQ = #mqueue{name = Name, len = Len, max_len = MaxLen}) - when Len =:= MaxLen -> - lager:error("queue(~s) drop message: ~p", [Name, Msg]), MQ; - -in({new, Msg}, MQ = #mqueue{q = Q, len = Len}) -> - maybe_set_alarm(MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1}); - -in({old, Msg}, MQ = #mqueue{q = Q, len = Len}) -> - MQ#mqueue{q = queue:in_r(Msg, Q), len = Len + 1}. +in(Msg, MQ = #mqueue{q = Q, len = Len}) -> + maybe_set_alarm(MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1}). out(MQ = #mqueue{len = 0}) -> {empty, MQ}; diff --git a/apps/emqttd/src/emqttd_msg_store.erl b/apps/emqttd/src/emqttd_msg_store.erl index f304d0aad..fb2df9171 100644 --- a/apps/emqttd/src/emqttd_msg_store.erl +++ b/apps/emqttd/src/emqttd_msg_store.erl @@ -123,7 +123,7 @@ redeliver(Topic, CPid) when is_binary(Topic) andalso is_pid(CPid) -> dispatch(_CPid, []) -> ignore; dispatch(CPid, Msgs) when is_list(Msgs) -> - CPid ! {dispatch, [Msg || Msg <- Msgs]}; + [CPid ! {dispatch, Msg} || Msg <- Msgs]; dispatch(CPid, Msg) when is_record(Msg, mqtt_message) -> CPid ! {dispatch, Msg}. diff --git a/apps/emqttd/src/emqttd_session.erl b/apps/emqttd/src/emqttd_session.erl index 384e49ae1..5cba1619d 100644 --- a/apps/emqttd/src/emqttd_session.erl +++ b/apps/emqttd/src/emqttd_session.erl @@ -40,6 +40,8 @@ %%% %%% 5. Optionally, QoS 0 messages pending transmission to the Client. %%% +%%% State of Message: newcome, inflight, pending +%%% %%% @end %%%----------------------------------------------------------------------------- @@ -86,13 +88,15 @@ %% QoS 1 and QoS 2 messages which have been sent to the Client, %% but have not been completely acknowledged. %% Client <- Broker - inflight_queue :: emqttd_inflight:inflight(), + inflight_queue :: list(), + + max_inflight = 0, %% All qos1, qos2 messages published to when client is disconnected. %% QoS 1 and QoS 2 messages pending transmission to the Client. %% %% Optionally, QoS 0 messages pending transmission to the Client. - pending_queue :: emqttd_mqueue:mqueue(), + message_queue :: emqttd_mqueue:mqueue(), %% Inflight qos2 messages received from client and waiting for pubrel. %% QoS 2 messages which have been received from the Client, @@ -100,25 +104,26 @@ %% Client -> Broker awaiting_rel :: map(), + %% Awaiting PUBREL timeout + await_rel_timeout = 8, + + %% Max Packets that Awaiting PUBREL + max_awaiting_rel = 100, + %% Awaiting timers for ack, rel and comp. awaiting_ack :: map(), - awaiting_comp :: map(), - %% Retries to resend the unacked messages unack_retries = 3, %% 4, 8, 16 seconds if 3 retries:) unack_timeout = 4, - %% Awaiting PUBREL timeout - await_rel_timeout = 8, - - %% Max Packets that Awaiting PUBREL - max_awaiting_rel = 100, + %% Awaiting for PUBCOMP + awaiting_comp :: map(), %% session expired after 48 hours - expired_after = 48, + expired_after = 172800, expired_timer, @@ -128,7 +133,7 @@ %% @doc Start a session. %% @end %%------------------------------------------------------------------------------ --spec start_link(boolean(), binary(), pid()) -> {ok, pid()} | {error, any()}. +-spec start_link(boolean(), mqtt_clientid(), pid()) -> {ok, pid()} | {error, any()}. start_link(CleanSess, ClientId, ClientPid) -> gen_server:start_link(?MODULE, [CleanSess, ClientId, ClientPid], []). @@ -136,7 +141,7 @@ start_link(CleanSess, ClientId, ClientPid) -> %% @doc Resume a session. %% @end %%------------------------------------------------------------------------------ --spec resume(pid(), binary(), pid()) -> ok. +-spec resume(pid(), mqtt_clientid(), pid()) -> ok. resume(Session, ClientId, ClientPid) -> gen_server:cast(Session, {resume, ClientId, ClientPid}). @@ -144,7 +149,7 @@ resume(Session, ClientId, ClientPid) -> %% @doc Destroy a session. %% @end %%------------------------------------------------------------------------------ --spec destroy(Session:: pid(), ClientId :: binary()) -> ok. +-spec destroy(pid(), mqtt_clientid()) -> ok. destroy(Session, ClientId) -> gen_server:call(Session, {destroy, ClientId}). @@ -160,7 +165,7 @@ subscribe(Session, TopicTable) -> %% @doc Publish message %% @end %%------------------------------------------------------------------------------ --spec publish(Session :: pid(), {mqtt_qos(), mqtt_message()}) -> ok. +-spec publish(pid(), mqtt_message()) -> ok. publish(_Session, Msg = #mqtt_message{qos = ?QOS_0}) -> %% publish qos0 directly emqttd_pubsub:publish(Msg); @@ -210,24 +215,23 @@ init([CleanSess, ClientId, ClientPid]) -> true = link(ClientPid), QEnv = emqttd:env(mqtt, queue), SessEnv = emqttd:env(mqtt, session), - PendingQ = emqttd_mqueue:new(ClientId, QEnv), - InflightQ = emqttd_inflight:new(ClientId, emqttd_opts:g(max_inflight, SessEnv)), Session = #session{ - clean_sess = CleanSess, - clientid = ClientId, - client_pid = ClientPid, - subscriptions = [], - inflight_queue = InflightQ, - pending_queue = PendingQ, - awaiting_rel = #{}, - awaiting_ack = #{}, - awaiting_comp = #{}, - unack_retries = emqttd_opts:g(unack_retries, SessEnv), - unack_timeout = emqttd_opts:g(unack_timeout, SessEnv), - await_rel_timeout = emqttd_opts:g(await_rel_timeout, SessEnv), - max_awaiting_rel = emqttd_opts:g(max_awaiting_rel, SessEnv), - expired_after = emqttd_opts:g(expired_after, SessEnv) * 3600, - timestamp = os:timestamp()}, + clean_sess = CleanSess, + clientid = ClientId, + client_pid = ClientPid, + subscriptions = [], + inflight_queue = [], + max_inflight = emqttd_opts:g(max_inflight, SessEnv, 0), + message_queue = emqttd_mqueue:new(ClientId, QEnv), + awaiting_rel = #{}, + awaiting_ack = #{}, + awaiting_comp = #{}, + unack_retries = emqttd_opts:g(unack_retries, SessEnv), + unack_timeout = emqttd_opts:g(unack_timeout, SessEnv), + await_rel_timeout = emqttd_opts:g(await_rel_timeout, SessEnv), + max_awaiting_rel = emqttd_opts:g(max_awaiting_rel, SessEnv), + expired_after = emqttd_opts:g(expired_after, SessEnv) * 3600, + timestamp = os:timestamp()}, {ok, Session, hibernate}. handle_call({subscribe, Topics}, _From, Session = #session{clientid = ClientId, @@ -237,33 +241,36 @@ handle_call({subscribe, Topics}, _From, Session = #session{clientid = ClientId, {ok, GrantedQos} = emqttd_pubsub:subscribe(Topics), lager:info([{client, ClientId}], "Session ~s subscribe ~p, Granted QoS: ~p", - [ClientId, Topics, GrantedQos]), + [ClientId, Topics, GrantedQos]), Subscriptions1 = lists:foldl(fun({Topic, Qos}, Acc) -> - case lists:keyfind(Topic, 1, Acc) of - {Topic, Qos} -> - lager:warning([{client, ClientId}], "Session ~s resubscribe ~p: qos = ~p", [ClientId, Topic, Qos]), Acc; - {Topic, Old} -> - lager:warning([{client, ClientId}], "Session ~s resubscribe ~p: old qos=~p, new qos=~p", - [ClientId, Topic, Old, Qos]), - lists:keyreplace(Topic, 1, Acc, {Topic, Qos}); - false -> - %%TODO: the design is ugly, rewrite later...:( - %% : 3.8.4 - %% Where the Topic Filter is not identical to any existing Subscription’s filter, - %% a new Subscription is created and all matching retained messages are sent. - emqttd_msg_store:redeliver(Topic, self()), - [{Topic, Qos} | Acc] - end - end, Subscriptions, Topics), + case lists:keyfind(Topic, 1, Acc) of + {Topic, Qos} -> + lager:warning([{client, ClientId}], "Session ~s " + "resubscribe ~p: qos = ~p", [ClientId, Topic, Qos]), Acc; + {Topic, OldQos} -> + lager:warning([{client, ClientId}], "Session ~s " + "resubscribe ~p: old qos=~p, new qos=~p", [ClientId, Topic, OldQos, Qos]), + lists:keyreplace(Topic, 1, Acc, {Topic, Qos}); + false -> + %%TODO: the design is ugly, rewrite later...:( + %% : 3.8.4 + %% Where the Topic Filter is not identical to any existing Subscription’s filter, + %% a new Subscription is created and all matching retained messages are sent. + emqttd_msg_store:redeliver(Topic, self()), + [{Topic, Qos} | Acc] + end + end, Subscriptions, Topics), {reply, {ok, GrantedQos}, Session#session{subscriptions = Subscriptions1}}; -handle_call({unsubscribe, Topics}, _From, Session = #session{clientid = ClientId, subscriptions = Subscriptions}) -> +handle_call({unsubscribe, Topics}, _From, Session = #session{clientid = ClientId, + subscriptions = Subscriptions}) -> - %%unsubscribe from topic tree + %% unsubscribe from topic tree ok = emqttd_pubsub:unsubscribe(Topics), - lager:info([{client, ClientId}], "Session ~s unsubscribe ~p.", [ClientId, Topics]), + + lager:info([{client, ClientId}], "Session ~s unsubscribe ~p", [ClientId, Topics]), Subscriptions1 = lists:foldl(fun(Topic, Acc) -> @@ -271,21 +278,24 @@ handle_call({unsubscribe, Topics}, _From, Session = #session{clientid = ClientId {Topic, _Qos} -> lists:keydelete(Topic, 1, Acc); false -> - lager:warning([{client, ClientId}], "~s not subscribe ~s", [ClientId, Topic]), Acc + lager:warning([{client, ClientId}], "Session ~s not subscribe ~s", [ClientId, Topic]), Acc end end, Subscriptions, Topics), {reply, ok, Session#session{subscriptions = Subscriptions1}}; -handle_call({publish, Message = #mqtt_message{qos = ?QOS_2, msgid = MsgId}}, _From, - Session = #session{clientid = ClientId, awaiting_rel = AwaitingRel, await_rel_timeout = Timeout}) -> +handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, msgid = MsgId}}, _From, + Session = #session{clientid = ClientId, + awaiting_rel = AwaitingRel, + await_rel_timeout = Timeout}) -> case check_awaiting_rel(Session) of - true -> + true -> TRef = timer(Timeout, {timeout, awaiting_rel, MsgId}), - {reply, ok, Session#session{awaiting_rel = maps:put(MsgId, {Message, TRef}, AwaitingRel)}}; + AwaitingRel1 = maps:put(MsgId, {Msg, TRef}, AwaitingRel), + {reply, ok, Session#session{awaiting_rel = AwaitingRel1}}; false -> - lager:error([{clientid, ClientId}], "Session ~s " - " dropped Qos2 message for too many awaiting_rel: ~p", [ClientId, Message]), + lager:critical([{clientid, ClientId}], "Session ~s dropped Qos2 message " + "for too many awaiting_rel: ~p", [ClientId, Msg]), {reply, {error, dropped}, Session} end; @@ -297,59 +307,52 @@ handle_call(Req, _From, State) -> lager:critical("Unexpected Request: ~p", [Req]), {reply, {error, badreq}, State}. -handle_cast({resume, ClientId, ClientPid}, State = #session{ - clientid = ClientId, - client_pid = OldClientPid, - pending_queue = Queue, - awaiting_ack = AwaitingAck, - awaiting_comp = AwaitingComp, - expired_timer = ETimer}) -> +handle_cast({resume, ClientId, ClientPid}, Session) -> + + #session{clientid = ClientId, + client_pid = OldClientPid, + inflight_queue = InflightQ, + awaiting_ack = AwaitingAck, + awaiting_comp = AwaitingComp, + expired_timer = ETimer} = Session, lager:info([{client, ClientId}], "Session ~s resumed by ~p",[ClientId, ClientPid]), - %% kick old client... - if - OldClientPid =:= undefined -> - ok; - OldClientPid =:= ClientPid -> - ok; - true -> - lager:error("Session '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, ClientPid, OldClientPid]), - unlink(OldClientPid), - OldClientPid ! {stop, duplicate_id, ClientPid} - end, + %% cancel expired timer + cancel_timer(ETimer), - %% cancel timeout timer - emqttd_util:cancel_timer(ETimer), + kick(ClientId, ClientPid, OldClientPid), - %% redelivery PUBREL - lists:foreach(fun(MsgId) -> - ClientPid ! {redeliver, {?PUBREL, MsgId}} - end, maps:keys(AwaitingComp)), + %% Redeliver PUBREL + [ClientPid ! {redeliver, {?PUBREL, MsgId}} || MsgId <- maps:keys(AwaitingComp)], - %% redelivery messages that awaiting PUBACK or PUBREC - Dup = fun(Msg) -> Msg#mqtt_message{dup = true} end, - lists:foreach(fun(Msg) -> - ClientPid ! {dispatch, {self(), Dup(Msg)}} - end, maps:values(AwaitingAck)), + %% Clear awaiting_ack timers + [cancel_timer(TRef) || {_, TRef} <- maps:values(AwaitingAck)], - %% send offline messages - lists:foreach(fun(Msg) -> - ClientPid ! {dispatch, {self(), Msg}} - end, emqttd_queue:all(Queue)), + %% Clear awaiting_comp timers + [cancel_timer(TRef) || TRef <- maps:values(AwaitingComp)], - {noreply, State#session{client_pid = ClientPid, - %%TODO: - pending_queue = emqttd_queue:clear(Queue), - expired_timer = undefined}, hibernate}; + Session1 = Session#session{client_pid = ClientPid, + awaiting_ack = #{}, + awaiting_comp = #{}, + expired_timer = undefined}, + %% Redeliver inflight messages + Session2 = + lists:foldl(fun({_Id, Msg}, Sess) -> + redeliver(Msg#mqtt_message{dup = true}, Sess) + end, Session1, lists:reverse(InflightQ)), -handle_cast({puback, MsgId}, Session = #session{clientid = ClientId, inflight_queue = Q, awaiting_ack = Awaiting}) -> + %% Dequeue pending messages + {noreply, dequeue(Session2), hibernate}; + +%% PUBRAC +handle_cast({puback, MsgId}, Session = #session{clientid = ClientId, awaiting_ack = Awaiting}) -> case maps:find(MsgId, Awaiting) of {ok, {_, TRef}} -> - catch erlang:cancel_timer(TRef), - {noreply, dispatch(Session#session{inflight_queue = emqttd_inflight:ack(MsgId, Q), - awaiting_ack = maps:remove(MsgId, Awaiting)})}; + cancel_timer(TRef), + Session1 = acked(MsgId, Session), + {noreply, dequeue(Session1)}; error -> lager:error("Session ~s cannot find PUBACK '~p'!", [ClientId, MsgId]), {noreply, Session} @@ -362,33 +365,36 @@ handle_cast({pubrec, MsgId}, Session = #session{clientid = ClientId, await_rel_timeout = Timeout}) -> case maps:find(MsgId, AwaitingAck) of {ok, {_, TRef}} -> - catch erlang:cancel_timer(TRef), + cancel_timer(TRef), TRef1 = timer(Timeout, {timeout, awaiting_comp, MsgId}), - {noreply, dispatch(Session#session{awaiting_ack = maps:remove(MsgId, AwaitingAck), - awaiting_comp = maps:put(MsgId, TRef1, AwaitingComp)})}; + Session1 = acked(MsgId, Session#session{awaiting_comp = maps:put(MsgId, TRef1, AwaitingComp)}), + {noreply, dequeue(Session1)}; error -> lager:error("Session ~s cannot find PUBREC '~p'!", [ClientId, MsgId]), {noreply, Session} end; -handle_cast({pubrel, MsgId}, Session = #session{clientid = ClientId, awaiting_rel = AwaitingRel}) -> +%% PUBREL +handle_cast({pubrel, MsgId}, Session = #session{clientid = ClientId, + awaiting_rel = AwaitingRel}) -> case maps:find(MsgId, AwaitingRel) of {ok, {Msg, TRef}} -> - catch erlang:cancel_timer(TRef), + cancel_timer(TRef), emqttd_pubsub:publish(Msg), {noreply, Session#session{awaiting_rel = maps:remove(MsgId, AwaitingRel)}}; error -> - lager:error("Session ~s cannot find PUBREL '~p'!", [ClientId, MsgId]), + lager:error("Session ~s cannot find PUBREL: msgid=~p!", [ClientId, MsgId]), {noreply, Session} end; %% PUBCOMP handle_cast({pubcomp, MsgId}, Session = #session{clientid = ClientId, awaiting_comp = AwaitingComp}) -> - case maps:is_key(MsgId, AwaitingComp) of - true -> + case maps:find(MsgId, AwaitingComp) of + {ok, TRef} -> + cancel_timer(TRef), {noreply, Session#session{awaiting_comp = maps:remove(MsgId, AwaitingComp)}}; - false -> - lager:error("Session ~s cannot find PUBREC MsgId '~p'", [ClientId, MsgId]), + error -> + lager:error("Session ~s cannot find PUBCOMP: MsgId=~p", [ClientId, MsgId]), {noreply, Session} end; @@ -396,61 +402,103 @@ handle_cast(Msg, State) -> lager:critical("Unexpected Msg: ~p, State: ~p", [Msg, State]), {noreply, State}. -handle_info({dispatch, MsgList}, Session) when is_list(MsgList) -> - NewSession = lists:foldl(fun(Msg, S) -> - dispatch({new, Msg}, S) - end, Session, MsgList), - {noreply, NewSession}; +%% Queue messages when client is offline +handle_info({dispatch, Msg}, Session = #session{client_pid = undefined, + message_queue = Q}) + when is_record(Msg, mqtt_message) -> + {noreply, Session#session{message_queue = emqttd_mqueue:in(Msg, Q)}}; -handle_info({dispatch, {old, Msg}}, Session) when is_record(Msg, mqtt_message) -> - {noreply, dispatch({old, Msg}, Session)}; +%% Dispatch qos0 message directly to client +handle_info({dispatch, Msg = #mqtt_message{qos = ?QOS_0}}, + Session = #session{client_pid = ClientPid}) -> + ClientPid ! {deliver, Msg}, + {noreply, Session}; -handle_info({dispatch, Msg}, Session) when is_record(Msg, mqtt_message) -> - {noreply, dispatch({new, Msg}, Session)}; +handle_info({dispatch, Msg = #mqtt_message{qos = QoS}}, + Session = #session{clientid = ClientId, message_queue = MsgQ}) + when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 -> + + case check_inflight(Session) of + true -> + {noreply, deliver(Msg, Session)}; + false -> + lager:warning([{client, ClientId}], "Session ~s inflight queue is full!", [ClientId]), + {noreply, Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)}} + end; + +handle_info({timeout, awaiting_ack, MsgId}, Session = #session{client_pid = undefined, + awaiting_ack = AwaitingAck}) -> + %% just remove awaiting + {noreply, Session#session{awaiting_ack = maps:remove(MsgId, AwaitingAck)}}; + +handle_info({timeout, awaiting_ack, MsgId}, Session = #session{clientid = ClientId, + inflight_queue = InflightQ, + awaiting_ack = AwaitingAck}) -> + case maps:find(MsgId, AwaitingAck) of + {ok, {{0, _Timeout}, _TRef}} -> + Session1 = Session#session{inflight_queue = lists:keydelete(MsgId, 1, InflightQ), + awaiting_ack = maps:remove(MsgId, AwaitingAck)}, + {noreply, dequeue(Session1)}; + {ok, {{Retries, Timeout}, _TRef}} -> + TRef = timer(Timeout, {timeout, awaiting_ack, MsgId}), + AwaitingAck1 = maps:put(MsgId, {{Retries-1, Timeout*2}, TRef}, AwaitingAck), + {noreply, Session#session{awaiting_ack = AwaitingAck1}}; + error -> + lager:error([{client, ClientId}], "Session ~s " + "cannot find Awaiting Ack:~p", [ClientId, MsgId]), + {noreply, Session} + end; + +handle_info({timeout, awaiting_rel, MsgId}, Session = #session{clientid = ClientId, + awaiting_rel = AwaitingRel}) -> + case maps:find(MsgId, AwaitingRel) of + {ok, {Msg, _TRef}} -> + lager:error([{client, ClientId}], "Session ~s AwaitingRel Timout!~n" + "Drop Message:~p", [ClientId, Msg]), + {noreply, Session#session{awaiting_rel = maps:remove(MsgId, AwaitingRel)}}; + error -> + lager:error([{client, ClientId}], "Session ~s Cannot find AwaitingRel: MsgId=~p", [ClientId, MsgId]), + {noreply, Session} + end; + +handle_info({timeout, awaiting_comp, MsgId}, Session = #session{clientid = ClientId, + awaiting_comp = Awaiting}) -> + case maps:find(MsgId, Awaiting) of + {ok, _TRef} -> + lager:error([{client, ClientId}], "Session ~s " + "Awaiting PUBCOMP Timout: MsgId=~p!", [ClientId, MsgId]), + {noreply, Session#session{awaiting_comp = maps:remove(MsgId, Awaiting)}}; + error -> + lager:error([{client, ClientId}], "Session ~s " + "Cannot find Awaiting PUBCOMP: MsgId=~p", [ClientId, MsgId]), + {noreply, Session} + end; + +handle_info({'EXIT', ClientPid, _Reason}, Session = #session{clean_sess = true, + client_pid = ClientPid}) -> + {stop, normal, Session}; handle_info({'EXIT', ClientPid, Reason}, Session = #session{clean_sess = false, clientid = ClientId, client_pid = ClientPid, expired_after = Expires}) -> - %%TODO: Clean puback, pubrel, pubcomp timers - lager:info("Session ~s: client ~p exited for ~p", [ClientId, ClientPid, Reason]), - TRef = timer(Expires * 1000, session_expired), - {noreply, Session#session{expired_timer = TRef}}; + lager:info("Session ~s unlink with client ~p: reason=~p", [ClientId, ClientPid, Reason]), + TRef = timer(Expires, session_expired), + {noreply, Session#session{expired_timer = TRef}, hibernate}; -handle_info({'EXIT', ClientPid, _Reason}, Session = #session{clean_sess = true, client_pid = ClientPid}) -> - %%TODO: reason... - {stop, normal, Session}; +handle_info({'EXIT', Pid, _Reason}, Session = #session{clientid = ClientId, + client_pid = ClientPid}) -> + + lager:error("Session ~s received unexpected EXIT:" + " client_pid=~p, exit_pid=~p", [ClientId, ClientPid, Pid]), + {noreply, Session}; -handle_info({'EXIT', ClientPid0, _Reason}, State = #session{client_pid = ClientPid}) -> - lager:critical("Unexpected Client EXIT: pid=~p, pid(state): ~p", [ClientPid0, ClientPid]), - {noreply, State}; - -handle_info(session_expired, State = #session{clientid = ClientId}) -> +handle_info(session_expired, Session = #session{clientid = ClientId}) -> lager:error("Session ~s expired, shutdown now!", [ClientId]), - {stop, {shutdown, expired}, State}; + {stop, {shutdown, expired}, Session}; -handle_info({timeout, awaiting_rel, MsgId}, Session = #session{clientid = ClientId, awaiting_rel = Awaiting}) -> - case maps:find(MsgId, Awaiting) of - {ok, {Msg, _TRef}} -> - lager:error([{client, ClientId}], "Session ~s Awaiting Rel Timout!~nDrop Message:~p", [ClientId, Msg]), - {noreply, Session#session{awaiting_rel = maps:remove(MsgId, Awaiting)}}; - error -> - lager:error([{client, ClientId}], "Session ~s Cannot find Awaiting Rel: MsgId=~p", [ClientId, MsgId]), - {noreply, Session} - end; - -handle_info({timeout, awaiting_comp, MsgId}, Session = #session{clientid = ClientId, awaiting_comp = Awaiting}) -> - case maps:find(MsgId, Awaiting) of - {ok, _TRef} -> - lager:error([{client, ClientId}], "Session ~s Awaiting PUBCOMP Timout: MsgId=~p!", [ClientId, MsgId]), - {noreply, Session#session{awaiting_comp = maps:remove(MsgId, Awaiting)}}; - error -> - lager:error([{client, ClientId}], "Session ~s Cannot find Awaiting PUBCOMP: MsgId=~p", [ClientId, MsgId]), - {noreply, Session} - end; - -handle_info(Info, Session) -> - lager:critical("Unexpected Info: ~p, Session: ~p", [Info, Session]), +handle_info(Info, Session = #session{clientid = ClientId}) -> + lager:critical("Session ~s received unexpected info: ~p", [ClientId, Info]), {noreply, Session}. terminate(_Reason, _Session) -> @@ -464,11 +512,26 @@ code_change(_OldVsn, Session, _Extra) -> %%%============================================================================= %%------------------------------------------------------------------------------ -%% @private -%% @doc Plubish Qos2 message from client -> broker, and then wait for pubrel. -%% @end +%% Kick duplicated client %%------------------------------------------------------------------------------ +kick(_ClientId, _ClientPid, undefined) -> + ok; +kick(_ClientId, ClientPid, ClientPid) -> + ok; +kick(ClientId, ClientPid, OldClientPid) -> + lager:error("Session '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, ClientPid, OldClientPid]), + unlink(OldClientPid), + OldClientPid ! {stop, duplicate_id, ClientPid}. + +%%------------------------------------------------------------------------------ +%% Check inflight and awaiting_rel +%%------------------------------------------------------------------------------ + +check_inflight(#session{max_inflight = 0}) -> + true; +check_inflight(#session{max_inflight = Max, inflight_queue = Q}) -> + Max > length(Q). check_awaiting_rel(#session{max_awaiting_rel = 0}) -> true; @@ -476,72 +539,61 @@ check_awaiting_rel(#session{awaiting_rel = AwaitingRel, max_awaiting_rel = MaxLen}) -> maps:size(AwaitingRel) < MaxLen. -%%%============================================================================= -%%% Dispatch message from broker -> client. -%%%============================================================================= +%%------------------------------------------------------------------------------ +%% Dequeue and Deliver +%%------------------------------------------------------------------------------ -dispatch(Session = #session{client_pid = undefined}) -> - %% do nothing +dequeue(Session = #session{client_pid = undefined}) -> + %% do nothing if client is disconnected Session; -dispatch(Session = #session{pending_queue = PendingQ}) -> - case emqttd_mqueue:out(PendingQ) of - {empty, _Q} -> - Session; - {{value, Msg}, Q1} -> - self() ! {dispatch, {old, Msg}}, - Session#session{pending_queue = Q1} - end. - -%% queued the message if client is offline -dispatch({Type, Msg}, Session = #session{client_pid = undefined, - pending_queue= PendingQ}) -> - Session#session{pending_queue = emqttd_mqueue:in({Type, Msg}, PendingQ)}; - -%% dispatch qos0 directly to client process -dispatch({_Type, Msg} = #mqtt_message{qos = ?QOS_0}, Session = #session{client_pid = ClientPid}) -> - ClientPid ! {deliver, Msg}, Session; - -%% dispatch qos1/2 message and wait for puback -dispatch({Type, Msg = #mqtt_message{qos = Qos}}, Session = #session{clientid = ClientId, - client_pid = ClientPid, - message_id = MsgId, - pending_queue = PendingQ, - inflight_queue= InflightQ}) - when Qos =:= ?QOS_1 orelse Qos =:= ?QOS_2 -> - %% assign id first - Msg1 = Msg#mqtt_message{msgid = MsgId}, - Msg2 = - if - Qos =:= ?QOS_1 -> Msg1; - Qos =:= ?QOS_2 -> Msg1#mqtt_message{dup = false} - end, - case emqttd_inflight:in(Msg1, InflightQ) of - {error, full} -> - lager:error("Session ~s inflight queue is full!", [ClientId]), - Session#session{pending_queue = emqttd_mqueue:in({Type, Msg}, PendingQ)}; - {ok, InflightQ1} -> - ClientPid ! {deliver, Msg1}, - await_ack(Msg1, next_msgid(Session#session{inflight_queue = InflightQ1})) +dequeue(Session) -> + case check_inflight(Session) of + true -> dequeue2(Session); + false -> Session end. -deliver(Msg, Session) -> - ok. +dequeue2(Session = #session{message_queue = Q}) -> + case emqttd_mqueue:out(Q) of + {empty, _Q} -> Session; + {{value, Msg}, Q1} -> + Session1 = deliver(Msg, Session#session{message_queue = Q1}), + dequeue(Session1) %% dequeue more + end. -await(Msg, Session) -> - ok. +deliver(Msg = #mqtt_message{qos = ?QOS_0}, Session = #session{client_pid = ClientPid}) -> + ClientPid ! {deliver, Msg}, Session; -% message(qos1/2) is awaiting ack -await_ack(Msg = #mqtt_message{msgid = MsgId}, Session = #session{awaiting_ack = Awaiting, - unack_retries = Retries, - unack_timeout = Timeout}) -> - - TRef = timer(Timeout * 1000, {retry, MsgId}), +deliver(Msg = #mqtt_message{qos = QoS}, Session = #session{message_id = MsgId, + client_pid = ClientPid, + inflight_queue = InflightQ}) + when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 -> + Msg1 = Msg#mqtt_message{msgid = MsgId, dup = false}, + ClientPid ! {deliver, Msg1}, + await(Msg1, next_msgid(Session#session{inflight_queue = [{MsgId, Msg1}|InflightQ]})). + +redeliver(Msg = #mqtt_message{qos = ?QOS_0}, Session) -> + deliver(Msg, Session); + +redeliver(Msg = #mqtt_message{qos = QoS}, Session = #session{client_pid = ClientPid}) + when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 -> + ClientPid ! {deliver, Msg}, + await(Msg, Session). + +%%------------------------------------------------------------------------------ +%% Awaiting ack for qos1, qos2 message +%%------------------------------------------------------------------------------ +await(#mqtt_message{msgid = MsgId}, Session = #session{awaiting_ack = Awaiting, + unack_retries = Retries, + unack_timeout = Timeout}) -> + TRef = timer(Timeout, {timeout, awaiting_ack, MsgId}), Awaiting1 = maps:put(MsgId, {{Retries, Timeout}, TRef}, Awaiting), Session#session{awaiting_ack = Awaiting1}. -timer(Timeout, TimeoutMsg) -> - erlang:send_after(Timeout * 1000, self(), TimeoutMsg). +acked(MsgId, Session = #session{inflight_queue = InflightQ, + awaiting_ack = Awaiting}) -> + Session#session{inflight_queue = lists:keydelete(MsgId, 1, InflightQ), + awaiting_ack = maps:remove(MsgId, Awaiting)}. next_msgid(Session = #session{message_id = 16#ffff}) -> Session#session{message_id = 1}; @@ -549,3 +601,11 @@ next_msgid(Session = #session{message_id = 16#ffff}) -> next_msgid(Session = #session{message_id = MsgId}) -> Session#session{message_id = MsgId + 1}. +timer(Timeout, TimeoutMsg) -> + erlang:send_after(Timeout * 1000, self(), TimeoutMsg). + +cancel_timer(undefined) -> + undefined; +cancel_timer(Ref) -> + catch erlang:cancel_timer(Ref). +