From ddf831f36156b95ffa8726bda160e9110d4d935f Mon Sep 17 00:00:00 2001 From: Feng Date: Fri, 12 Jun 2015 17:24:08 +0800 Subject: [PATCH] session --- apps/emqttd/src/emqttd_mqueue.erl | 143 ++++++++-------- apps/emqttd/src/emqttd_mqwin.erl | 66 ++++++++ apps/emqttd/src/emqttd_session.erl | 215 +++--------------------- apps/emqttd/src/emqttd_session_proc.erl | 183 ++++++++++++++++++++ rel/files/emqttd.config | 22 ++- 5 files changed, 359 insertions(+), 270 deletions(-) create mode 100644 apps/emqttd/src/emqttd_mqwin.erl diff --git a/apps/emqttd/src/emqttd_mqueue.erl b/apps/emqttd/src/emqttd_mqueue.erl index 874aa82d0..84381f036 100644 --- a/apps/emqttd/src/emqttd_mqueue.erl +++ b/apps/emqttd/src/emqttd_mqueue.erl @@ -20,15 +20,30 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% Simple message queue. +%%% +%%% A Simple in-memory message queue. %%% %%% Notice that MQTT is not an enterprise messaging queue. MQTT assume that client %%% should be online in most of the time. %%% -%%% This module wraps an erlang queue to store offline messages temporarily for MQTT -%%% persistent session. +%%% This module implements a simple in-memory queue for MQTT persistent session. %%% -%%% If the broker restarted or crashed, all the messages stored will be gone. +%%% If the broker restarted or crashed, all the messages queued will be gone. +%%% +%%% Desgin of The Queue: +%%% |<----------------- Max Len ----------------->| +%%% ----------------------------------------------- +%%% IN -> | Pending Messages | Inflight Window | -> Out +%%% ----------------------------------------------- +%%% |<--- Win Size --->| +%%% +%%% +%%% 1. Inflight Window to store the messages awaiting for ack. +%%% +%%% 2. Suspend IN messages when the queue is deactive, or inflight windows is full. +%%% +%%% 3. If the queue is full, dropped qos0 messages if store_qos0 is true, +%%% otherwise dropped the oldest pending one. %%% %%% @end %%%----------------------------------------------------------------------------- @@ -40,104 +55,98 @@ -include_lib("emqtt/include/emqtt.hrl"). -export([new/2, name/1, - is_empty/1, len/1, - in/2, out/1, - peek/1, - to_list/1]). - --define(MAX_LEN, 600). - --define(HIGH_WM, 0.6). + is_empty/1, is_full/1, + len/1, in/2, out/2]). -define(LOW_WM, 0.2). +-define(HIGH_WM, 0.6). + +-define(MAX_LEN, 1000). + -record(mqueue, {name, - len = 0, - max_len = ?MAX_LEN, - queue = queue:new(), - store_qos0 = false, - high_watermark = ?HIGH_WM, - low_watermark = ?LOW_WM, - alert = false}). + q = queue:new(), %% pending queue + len = 0, %% current queue len + low_wm = ?LOW_WM, + high_wm = ?HIGH_WM, + max_len = ?MAX_LEN, + qos0 = false, + alarm = false}). -type mqueue() :: #mqueue{}. --type queue_option() :: {max_queued_messages, pos_integer()} %% Max messages queued - | {high_queue_watermark, float()} %% High watermark - | {low_queue_watermark, float()} %% Low watermark - | {queue_qos0_messages, boolean()}. %% Queue Qos0 messages? +-type mqueue_option() :: {max_length, pos_integer()} %% Max queue length + | {inflight_window, pos_integer()} %% Inflight Window + | {low_watermark, float()} %% Low watermark + | {high_watermark, float()} %% High watermark + | {queue_qos0, boolean()}. %% Queue Qos0 + +-export_type([mqueue/0]). %%------------------------------------------------------------------------------ %% @doc New Queue. %% @end %%------------------------------------------------------------------------------ --spec new(binary() | string(), list(queue_option())) -> mqueue(). +-spec new(binary(), list(mqueue_option())) -> mqueue(). new(Name, Opts) -> - MaxLen = emqttd_opts:g(max_queued_messages, Opts, ?MAX_LEN), - HighWM = round(MaxLen * emqttd_opts:g(high_queue_watermark, Opts, ?HIGH_WM)), - LowWM = round(MaxLen * emqttd_opts:g(low_queue_watermark, Opts, ?LOW_WM)), - StoreQos0 = emqttd_opts:g(queue_qos0_messages, Opts, false), - #mqueue{name = Name, - max_len = MaxLen, - store_qos0 = StoreQos0, - high_watermark = HighWM, - low_watermark = LowWM}. + MaxLen = emqttd_opts:g(max_length, Opts, ?MAX_LEN), + #mqueue{name = Name, + max_len = MaxLen, + low_wm = round(MaxLen * emqttd_opts:g(low_watermark, Opts, ?LOW_WM)), + high_wm = round(MaxLen * emqttd_opts:g(high_watermark, Opts, ?HIGH_WM)), + qos0 = emqttd_opts:g(queue_qos0, Opts, true)}. name(#mqueue{name = Name}) -> Name. -len(#mqueue{len = Len}) -> - Len. - is_empty(#mqueue{len = 0}) -> true; -is_empty(_Q) -> false. +is_empty(_MQ) -> false. + +is_full(#mqueue{len = Len, max_len = MaxLen}) + when Len =:= MaxLen -> true; +is_full(_MQ) -> false. + +len(#mqueue{len = Len}) -> Len. %%------------------------------------------------------------------------------ -%% @doc -%% Queue one message. -%% +%% @doc Queue one message. %% @end %%------------------------------------------------------------------------------ -spec in(mqtt_message(), mqueue()) -> mqueue(). -in(#mqtt_message{qos = ?QOS_0}, MQ = #mqueue{store_qos0 = false}) -> + +%% drop qos0 +in(#mqtt_message{qos = ?QOS_0}, MQ = #mqueue{qos0 = false}) -> MQ; -%% queue is full, drop the oldest -in(Msg, MQ = #mqueue{name = Name, len = Len, max_len = MaxLen, queue = Q}) when Len =:= MaxLen -> - Q2 = case queue:out(Q) of - {{value, OldMsg}, Q1} -> - %%TODO: publish the dropped message to $SYS? - lager:error("Queue(~s) drop message: ~p", [Name, OldMsg]), - Q1; - {empty, Q1} -> %% maybe max_len is 1 - Q1 - end, - MQ#mqueue{queue = queue:in(Msg, Q2)}; -in(Msg, MQ = #mqueue{len = Len, queue = Q}) -> - maybe_set_alarm(MQ#mqueue{len = Len+1, queue = queue:in(Msg, Q)}). -out(MQ = #mqueue{len = 0, queue = _Q}) -> +%% simply drop the oldest one if queue is full, improve later +in(Msg, MQ = #mqueue{name = Name, len = Len, max_len = MaxLen}) + when Len =:= MaxLen -> + {{value, OldMsg}, Q2} = queue:out(Q), + lager:error("queue(~s) drop message: ~p", [Name, OldMsg]), + MQ#mqueue{q = queue:in(Msg, Q2)}; + +in(Msg, MQ = #mqueue{q = Q, len = Len}) -> + maybe_set_alarm(MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1}); + +out(MQ = #mqueue{len = 0}) -> {empty, MQ}; -out(MQ = #mqueue{len = Len, queue = Q}) -> - {Result, Q1} = queue:out(Q), - {Result, maybe_clear_alarm(MQ#mqueue{len = Len - 1, queue = Q1})}. -peek(#mqueue{queue = Q}) -> - queue:peek(Q). +out(MQ = #mqueue{q = Q, len = Len}) -> + {Result, Q2} = queue:out(Q), + {Result, maybe_clear_alarm(MQ#mqueue{q = Q2, len = Len - 1})}. -to_list(#mqueue{queue = Q}) -> - queue:to_list(Q). - -maybe_set_alarm(MQ = #mqueue{name = Name, len = Len, high_watermark = HighWM, alert = false}) +maybe_set_alarm(MQ = #mqueue{name = Name, len = Len, high_wm = HighWM, alarm = false}) when Len >= HighWM -> AlarmDescr = io_lib:format("len ~p > high_watermark ~p", [Len, HighWM]), emqttd_alarm:set_alarm({{queue_high_watermark, Name}, AlarmDescr}), - MQ#mqueue{alert = true}; + MQ#mqueue{alarm = true}; maybe_set_alarm(MQ) -> MQ. -maybe_clear_alarm(MQ = #mqueue{name = Name, len = Len, low_watermark = LowWM, alert = true}) +maybe_clear_alarm(MQ = #mqueue{name = Name, len = Len, low_watermark = LowWM, alarm = true}) when Len =< LowWM -> - emqttd_alarm:clear_alarm({queue_high_watermark, Name}), MQ#mqueue{alert = false}; + emqttd_alarm:clear_alarm({queue_high_watermark, Name}), + MQ#mqueue{alarm = false}; maybe_clear_alarm(MQ) -> MQ. diff --git a/apps/emqttd/src/emqttd_mqwin.erl b/apps/emqttd/src/emqttd_mqwin.erl new file mode 100644 index 000000000..acc90bf41 --- /dev/null +++ b/apps/emqttd/src/emqttd_mqwin.erl @@ -0,0 +1,66 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% Inflight window of message queue. Wrap a list with len. +%%% +%%% @end +%%%----------------------------------------------------------------------------- + +-module(emqttd_mqwin). + +-author("Feng Lee "). + +-export([new/2, len/1, in/2, ack/2]). + +-define(WIN_SIZE, 100). + +-record(mqwin, {name, + w = [], %% window list + len = 0, %% current window len + size = ?WIN_SIZE}). + +-type mqwin() :: #mqwin{}. + +-export_type([mqwin/0]). + +new(Name, Opts) -> + WinSize = emqttd_opts:g(inflight_window, Opts, ?WIN_SIZE), + #mqwin{name = Name, size = WinSize}. + +len(#mqwin{len = Len}) -> + Len. + +in(_Msg, #mqwin{len = Len, size = Size}) + when Len =:= Size -> {error, full}; + +in(Msg, Win = #mqwin{w = W, len = Len}) -> + {ok, Win#mqwin{w = [Msg|W], len = Len +1}}. + +ack(MsgId, QWin = #mqwin{w = W, len = Len}) -> + case lists:keyfind(MsgId, 2, W) of + false -> + lager:error("qwin(~s) cannot find msgid: ~p", [MsgId]), QWin; + _Msg -> + QWin#mqwin{w = lists:keydelete(MsgId, 2, W), len = Len - 1} + end. + + diff --git a/apps/emqttd/src/emqttd_session.erl b/apps/emqttd/src/emqttd_session.erl index 6b6d961a4..44bc76bb4 100644 --- a/apps/emqttd/src/emqttd_session.erl +++ b/apps/emqttd/src/emqttd_session.erl @@ -35,24 +35,20 @@ -include_lib("emqtt/include/emqtt_packet.hrl"). -%% API Function Exports +-define(SessProc, emqttd_session_proc). + +%% Session Managenent APIs -export([start/1, resume/3, - publish/3, + destroy/2]). + +%% PubSub APIs +-export([publish/3, puback/2, subscribe/2, unsubscribe/2, - destroy/2]). - -%% This api looks strange... :( --export([store/2]). - -%% Start gen_server --export([start_link/2]). - -%% gen_server Function Exports --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). + await/2, + dispatch/2]). -record(session, { %% ClientId: Identifier of Session @@ -67,20 +63,11 @@ %% Client’s subscriptions. subscriptions :: list(), - %% Inflight window size - inflight_window = 40, - %% Inflight qos1, qos2 messages sent to the client but unacked, %% QoS 1 and QoS 2 messages which have been sent to the Client, %% but have not been completely acknowledged. %% Client <- Broker - inflight_queue :: list(), - - %% Inflight qos2 messages received from client and waiting for pubrel. - %% QoS 2 messages which have been received from the Client, - %% but have not been completely acknowledged. - %% Client -> Broker - awaiting_queue :: list(), + inflight_window :: emqttd_mqwin:mqwin(), %% All qos1, qos2 messages published to when client is disconnected. %% QoS 1 and QoS 2 messages pending transmission to the Client. @@ -88,18 +75,22 @@ %% Optionally, QoS 0 messages pending transmission to the Client. pending_queue :: emqttd_mqueue:mqueue(), + %% Inflight qos2 messages received from client and waiting for pubrel. + %% QoS 2 messages which have been received from the Client, + %% but have not been completely acknowledged. + %% Client -> Broker + awaiting_rel :: map(), + %% Awaiting timers for ack, rel and comp. awaiting_ack :: map(), - awaiting_rel :: map(), - awaiting_comp :: map(), %% Retries to resend the unacked messages - max_unack_retries = 3, + unack_retries = 3, %% 4, 8, 16 seconds if 3 retries:) - unack_retry_after = 4, + unack_timeout = 4, %% Awaiting PUBREL timeout await_rel_timeout = 8, @@ -109,9 +100,9 @@ sess_expired_timer, - timestamp }). + timestamp}). --type session() :: #session{} | pid(). +-type session() :: #session{}. %%%============================================================================= %%% Session API @@ -139,6 +130,7 @@ start({false = _CleanSess, ClientId, ClientPid}) -> resume(SessState = #session{}, _ClientId, _ClientPid) -> SessState; resume(SessPid, ClientId, ClientPid) when is_pid(SessPid) -> + ?SessProc: gen_server:cast(SessPid, {resume, ClientId, ClientPid}), SessPid. @@ -342,171 +334,6 @@ initial_state(ClientId, ClientPid) -> State = initial_state(ClientId), State#session{client_pid = ClientPid}. -%%------------------------------------------------------------------------------ -%% @doc Start a session process. -%% @end -%%------------------------------------------------------------------------------ -start_link(ClientId, ClientPid) -> - gen_server:start_link(?MODULE, [ClientId, ClientPid], []). - -%%%============================================================================= -%%% gen_server callbacks -%%%============================================================================= - -init([ClientId, ClientPid]) -> - process_flag(trap_exit, true), - true = link(ClientPid), - State = initial_state(ClientId, ClientPid), - MQueue = emqttd_mqueue:new(ClientId, emqttd:env(mqtt, queue)), - State1 = State#session{pending_queue = MQueue, - timestamp = os:timestamp()}, - {ok, init(emqttd:env(mqtt, session), State1), hibernate}. - -init([], State) -> - State; - -%% Session expired after hours -init([{expired_after, Hours} | Opts], State) -> - init(Opts, State#session{sess_expired_after = Hours * 3600}); - -%% Max number of QoS 1 and 2 messages that can be “inflight” at one time. -init([{max_inflight_messages, MaxInflight} | Opts], State) -> - init(Opts, State#session{inflight_window = MaxInflight}); - -%% Max retries for unacknolege Qos1/2 messages -init([{max_unack_retries, Retries} | Opts], State) -> - init(Opts, State#session{max_unack_retries = Retries}); - -%% Retry after 4, 8, 16 seconds -init([{unack_retry_after, Secs} | Opts], State) -> - init(Opts, State#session{unack_retry_after = Secs}); - -%% Awaiting PUBREL timeout -init([{await_rel_timeout, Secs} | Opts], State) -> - init(Opts, State#session{await_rel_timeout = Secs}); - -init([Opt | Opts], State) -> - lager:error("Bad Session Option: ~p", [Opt]), - init(Opts, State). - -handle_call({subscribe, Topics}, _From, State) -> - {ok, NewState, GrantedQos} = subscribe(State, Topics), - {reply, {ok, GrantedQos}, NewState}; - -handle_call({unsubscribe, Topics}, _From, State) -> - {ok, NewState} = unsubscribe(State, Topics), - {reply, ok, NewState}; - -handle_call(Req, _From, State) -> - lager:error("Unexpected request: ~p", [Req]), - {reply, error, State}. - -handle_cast({resume, ClientId, ClientPid}, State = #session{ - clientid = ClientId, - client_pid = OldClientPid, - msg_queue = Queue, - awaiting_ack = AwaitingAck, - awaiting_comp = AwaitingComp, - expire_timer = ETimer}) -> - - lager:info([{client, ClientId}], "Session ~s resumed by ~p",[ClientId, ClientPid]), - - %% kick old client... - if - OldClientPid =:= undefined -> - ok; - OldClientPid =:= ClientPid -> - ok; - true -> - lager:error("Session '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, ClientPid, OldClientPid]), - unlink(OldClientPid), - OldClientPid ! {stop, duplicate_id, ClientPid} - end, - - %% cancel timeout timer - emqttd_util:cancel_timer(ETimer), - - %% redelivery PUBREL - lists:foreach(fun(PacketId) -> - ClientPid ! {redeliver, {?PUBREL, PacketId}} - end, maps:keys(AwaitingComp)), - - %% redelivery messages that awaiting PUBACK or PUBREC - Dup = fun(Msg) -> Msg#mqtt_message{dup = true} end, - lists:foreach(fun(Msg) -> - ClientPid ! {dispatch, {self(), Dup(Msg)}} - end, maps:values(AwaitingAck)), - - %% send offline messages - lists:foreach(fun(Msg) -> - ClientPid ! {dispatch, {self(), Msg}} - end, emqttd_queue:all(Queue)), - - {noreply, State#session{client_pid = ClientPid, - msg_queue = emqttd_queue:clear(Queue), - expire_timer = undefined}, hibernate}; - -handle_cast({publish, ClientId, {?QOS_2, Message}}, State) -> - NewState = publish(State, ClientId, {?QOS_2, Message}), - {noreply, NewState}; - -handle_cast({puback, PacketId}, State) -> - NewState = puback(State, {?PUBACK, PacketId}), - {noreply, NewState}; - -handle_cast({pubrec, PacketId}, State) -> - NewState = puback(State, {?PUBREC, PacketId}), - {noreply, NewState}; - -handle_cast({pubrel, PacketId}, State) -> - NewState = puback(State, {?PUBREL, PacketId}), - {noreply, NewState}; - -handle_cast({pubcomp, PacketId}, State) -> - NewState = puback(State, {?PUBCOMP, PacketId}), - {noreply, NewState}; - -handle_cast({destroy, ClientId}, State = #session{clientid = ClientId}) -> - lager:warning("Session ~s destroyed", [ClientId]), - {stop, normal, State}; - -handle_cast(Msg, State) -> - lager:critical("Unexpected Msg: ~p, State: ~p", [Msg, State]), - {noreply, State}. - -handle_info({dispatch, {_From, Messages}}, State) when is_list(Messages) -> - F = fun(Message, S) -> dispatch(Message, S) end, - {noreply, lists:foldl(F, State, Messages)}; - -handle_info({dispatch, {_From, Message}}, State) -> - {noreply, dispatch(Message, State)}; - -handle_info({'EXIT', ClientPid, Reason}, State = #session{clientid = ClientId, - client_pid = ClientPid}) -> - lager:info("Session: client ~s@~p exited for ~p", [ClientId, ClientPid, Reason]), - {noreply, start_expire_timer(State#session{client_pid = undefined})}; - -handle_info({'EXIT', ClientPid0, _Reason}, State = #session{client_pid = ClientPid}) -> - lager:error("Unexpected Client EXIT: pid=~p, pid(state): ~p", [ClientPid0, ClientPid]), - {noreply, State}; - -handle_info(session_expired, State = #session{clientid = ClientId}) -> - lager:warning("Session ~s expired!", [ClientId]), - {stop, {shutdown, expired}, State}; - -handle_info({timeout, awaiting_rel, MsgId}, SessState) -> - NewState = timeout(awaiting_rel, MsgId, SessState), - {noreply, NewState}; - -handle_info(Info, State) -> - lager:critical("Unexpected Info: ~p, State: ~p", [Info, State]), - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. %%%============================================================================= %%% Internal functions diff --git a/apps/emqttd/src/emqttd_session_proc.erl b/apps/emqttd/src/emqttd_session_proc.erl index 5f08b68af..43b60b75e 100644 --- a/apps/emqttd/src/emqttd_session_proc.erl +++ b/apps/emqttd/src/emqttd_session_proc.erl @@ -27,3 +27,186 @@ -module(emqttd_session_proc). +-author("Feng Lee "). + +-include("emqttd.hrl"). + +-include_lib("emqtt/include/emqtt.hrl"). + +-include_lib("emqtt/include/emqtt_packet.hrl"). + +%% Start gen_server +-export([start_link/2]). + +%% gen_server Function Exports +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%%------------------------------------------------------------------------------ +%% @doc Start a session process. +%% @end +%%------------------------------------------------------------------------------ +start_link(ClientId, ClientPid) -> + gen_server:start_link(?MODULE, [ClientId, ClientPid], []). + +%%%============================================================================= +%%% gen_server callbacks +%%%============================================================================= + +init([ClientId, ClientPid]) -> + process_flag(trap_exit, true), + true = link(ClientPid), + State = initial_state(ClientId, ClientPid), + MQueue = emqttd_mqueue:new(ClientId, emqttd:env(mqtt, queue)), + State1 = State#session{pending_queue = MQueue, + timestamp = os:timestamp()}, + {ok, init(emqttd:env(mqtt, session), State1), hibernate}. + +init([], State) -> + State; + +%% Session expired after hours +init([{expired_after, Hours} | Opts], State) -> + init(Opts, State#session{sess_expired_after = Hours * 3600}); + +%% Max number of QoS 1 and 2 messages that can be “inflight” at one time. +init([{max_inflight_messages, MaxInflight} | Opts], State) -> + init(Opts, State#session{inflight_window = MaxInflight}); + +%% Max retries for unacknolege Qos1/2 messages +init([{max_unack_retries, Retries} | Opts], State) -> + init(Opts, State#session{max_unack_retries = Retries}); + +%% Retry after 4, 8, 16 seconds +init([{unack_retry_after, Secs} | Opts], State) -> + init(Opts, State#session{unack_retry_after = Secs}); + +%% Awaiting PUBREL timeout +init([{await_rel_timeout, Secs} | Opts], State) -> + init(Opts, State#session{await_rel_timeout = Secs}); + +init([Opt | Opts], State) -> + lager:error("Bad Session Option: ~p", [Opt]), + init(Opts, State). + +handle_call({subscribe, Topics}, _From, State) -> + {ok, NewState, GrantedQos} = subscribe(State, Topics), + {reply, {ok, GrantedQos}, NewState}; + +handle_call({unsubscribe, Topics}, _From, State) -> + {ok, NewState} = unsubscribe(State, Topics), + {reply, ok, NewState}; + +handle_call(Req, _From, State) -> + lager:error("Unexpected request: ~p", [Req]), + {reply, error, State}. + +handle_cast({resume, ClientId, ClientPid}, State = #session{ + clientid = ClientId, + client_pid = OldClientPid, + msg_queue = Queue, + awaiting_ack = AwaitingAck, + awaiting_comp = AwaitingComp, + expire_timer = ETimer}) -> + + lager:info([{client, ClientId}], "Session ~s resumed by ~p",[ClientId, ClientPid]), + + %% kick old client... + if + OldClientPid =:= undefined -> + ok; + OldClientPid =:= ClientPid -> + ok; + true -> + lager:error("Session '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, ClientPid, OldClientPid]), + unlink(OldClientPid), + OldClientPid ! {stop, duplicate_id, ClientPid} + end, + + %% cancel timeout timer + emqttd_util:cancel_timer(ETimer), + + %% redelivery PUBREL + lists:foreach(fun(PacketId) -> + ClientPid ! {redeliver, {?PUBREL, PacketId}} + end, maps:keys(AwaitingComp)), + + %% redelivery messages that awaiting PUBACK or PUBREC + Dup = fun(Msg) -> Msg#mqtt_message{dup = true} end, + lists:foreach(fun(Msg) -> + ClientPid ! {dispatch, {self(), Dup(Msg)}} + end, maps:values(AwaitingAck)), + + %% send offline messages + lists:foreach(fun(Msg) -> + ClientPid ! {dispatch, {self(), Msg}} + end, emqttd_queue:all(Queue)), + + {noreply, State#session{client_pid = ClientPid, + msg_queue = emqttd_queue:clear(Queue), + expire_timer = undefined}, hibernate}; + + +handle_cast({publish, ClientId, {?QOS_2, Message}}, State) -> + NewState = publish(State, ClientId, {?QOS_2, Message}), + {noreply, NewState}; + +handle_cast({puback, PacketId}, State) -> + NewState = puback(State, {?PUBACK, PacketId}), + {noreply, NewState}; + +handle_cast({pubrec, PacketId}, State) -> + NewState = puback(State, {?PUBREC, PacketId}), + {noreply, NewState}; + +handle_cast({pubrel, PacketId}, State) -> + NewState = puback(State, {?PUBREL, PacketId}), + {noreply, NewState}; + +handle_cast({pubcomp, PacketId}, State) -> + NewState = puback(State, {?PUBCOMP, PacketId}), + {noreply, NewState}; + +handle_cast({destroy, ClientId}, State = #session{clientid = ClientId}) -> + lager:warning("Session ~s destroyed", [ClientId]), + {stop, normal, State}; + +handle_cast(Msg, State) -> + lager:critical("Unexpected Msg: ~p, State: ~p", [Msg, State]), + {noreply, State}. + +handle_info({dispatch, {_From, Messages}}, State) when is_list(Messages) -> + F = fun(Message, S) -> dispatch(Message, S) end, + {noreply, lists:foldl(F, State, Messages)}; + +handle_info({dispatch, {_From, Message}}, State) -> + {noreply, dispatch(Message, State)}; + +handle_info({'EXIT', ClientPid, Reason}, State = #session{clientid = ClientId, + client_pid = ClientPid}) -> + lager:info("Session: client ~s@~p exited for ~p", [ClientId, ClientPid, Reason]), + {noreply, start_expire_timer(State#session{client_pid = undefined})}; + +handle_info({'EXIT', ClientPid0, _Reason}, State = #session{client_pid = ClientPid}) -> + lager:error("Unexpected Client EXIT: pid=~p, pid(state): ~p", [ClientPid0, ClientPid]), + {noreply, State}; + +handle_info(session_expired, State = #session{clientid = ClientId}) -> + lager:warning("Session ~s expired!", [ClientId]), + {stop, {shutdown, expired}, State}; + +handle_info({timeout, awaiting_rel, MsgId}, SessState) -> + NewState = timeout(awaiting_rel, MsgId, SessState), + {noreply, NewState}; + +handle_info(Info, State) -> + lager:critical("Unexpected Info: ~p, State: ~p", [Info, State]), + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + diff --git a/rel/files/emqttd.config b/rel/files/emqttd.config index da32fbfc9..2df163bae 100644 --- a/rel/files/emqttd.config +++ b/rel/files/emqttd.config @@ -89,8 +89,6 @@ {session, [ %% Expired after 2 days {expired_after, 48}, - %% Max number of QoS 1 and 2 messages that can be “in flight” at one time. - {max_inflight_messages, 40}, %% Max retries for unacknolege Qos1/2 messages {max_unack_retries, 3}, %% Retry after 4, 8, 16 seconds @@ -99,14 +97,20 @@ {await_rel_timeout, 8} ]}, {queue, [ - %% Max messages queued when client is disconnected, or inflight messsage window is overload - {max_queued_messages, 200}, - %% High watermark of queued messsages - {high_queue_watermark, 0.8}, + %% Max queue length + {max_length, 1000}, + + %% Max number of QoS 1 and 2 messages that can be “in flight” at one time. + {inflight_window, 100}, + %% Low watermark of queued messsages - {low_queue_watermark, 0.2}, - %% Queue Qos0 offline messages? - {queue_qos0_messages, true} + {low_watermark, 0.2}, + + %% High watermark of queued messsages + {high_watermark, 0.6}, + + %% Queue Qos0 messages? + {queue_qos0, true} ]} ]}, %% Broker Options