session
This commit is contained in:
parent
04c2772859
commit
ddf831f361
|
@ -20,15 +20,30 @@
|
|||
%%% SOFTWARE.
|
||||
%%%-----------------------------------------------------------------------------
|
||||
%%% @doc
|
||||
%%% Simple message queue.
|
||||
%%%
|
||||
%%% A Simple in-memory message queue.
|
||||
%%%
|
||||
%%% Notice that MQTT is not an enterprise messaging queue. MQTT assume that client
|
||||
%%% should be online in most of the time.
|
||||
%%%
|
||||
%%% This module wraps an erlang queue to store offline messages temporarily for MQTT
|
||||
%%% persistent session.
|
||||
%%% This module implements a simple in-memory queue for MQTT persistent session.
|
||||
%%%
|
||||
%%% If the broker restarted or crashed, all the messages stored will be gone.
|
||||
%%% If the broker restarted or crashed, all the messages queued will be gone.
|
||||
%%%
|
||||
%%% Desgin of The Queue:
|
||||
%%% |<----------------- Max Len ----------------->|
|
||||
%%% -----------------------------------------------
|
||||
%%% IN -> | Pending Messages | Inflight Window | -> Out
|
||||
%%% -----------------------------------------------
|
||||
%%% |<--- Win Size --->|
|
||||
%%%
|
||||
%%%
|
||||
%%% 1. Inflight Window to store the messages awaiting for ack.
|
||||
%%%
|
||||
%%% 2. Suspend IN messages when the queue is deactive, or inflight windows is full.
|
||||
%%%
|
||||
%%% 3. If the queue is full, dropped qos0 messages if store_qos0 is true,
|
||||
%%% otherwise dropped the oldest pending one.
|
||||
%%%
|
||||
%%% @end
|
||||
%%%-----------------------------------------------------------------------------
|
||||
|
@ -40,104 +55,98 @@
|
|||
-include_lib("emqtt/include/emqtt.hrl").
|
||||
|
||||
-export([new/2, name/1,
|
||||
is_empty/1, len/1,
|
||||
in/2, out/1,
|
||||
peek/1,
|
||||
to_list/1]).
|
||||
|
||||
-define(MAX_LEN, 600).
|
||||
|
||||
-define(HIGH_WM, 0.6).
|
||||
is_empty/1, is_full/1,
|
||||
len/1, in/2, out/2]).
|
||||
|
||||
-define(LOW_WM, 0.2).
|
||||
|
||||
-define(HIGH_WM, 0.6).
|
||||
|
||||
-define(MAX_LEN, 1000).
|
||||
|
||||
-record(mqueue, {name,
|
||||
len = 0,
|
||||
q = queue:new(), %% pending queue
|
||||
len = 0, %% current queue len
|
||||
low_wm = ?LOW_WM,
|
||||
high_wm = ?HIGH_WM,
|
||||
max_len = ?MAX_LEN,
|
||||
queue = queue:new(),
|
||||
store_qos0 = false,
|
||||
high_watermark = ?HIGH_WM,
|
||||
low_watermark = ?LOW_WM,
|
||||
alert = false}).
|
||||
qos0 = false,
|
||||
alarm = false}).
|
||||
|
||||
-type mqueue() :: #mqueue{}.
|
||||
|
||||
-type queue_option() :: {max_queued_messages, pos_integer()} %% Max messages queued
|
||||
| {high_queue_watermark, float()} %% High watermark
|
||||
| {low_queue_watermark, float()} %% Low watermark
|
||||
| {queue_qos0_messages, boolean()}. %% Queue Qos0 messages?
|
||||
-type mqueue_option() :: {max_length, pos_integer()} %% Max queue length
|
||||
| {inflight_window, pos_integer()} %% Inflight Window
|
||||
| {low_watermark, float()} %% Low watermark
|
||||
| {high_watermark, float()} %% High watermark
|
||||
| {queue_qos0, boolean()}. %% Queue Qos0
|
||||
|
||||
-export_type([mqueue/0]).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% @doc New Queue.
|
||||
%% @end
|
||||
%%------------------------------------------------------------------------------
|
||||
-spec new(binary() | string(), list(queue_option())) -> mqueue().
|
||||
-spec new(binary(), list(mqueue_option())) -> mqueue().
|
||||
new(Name, Opts) ->
|
||||
MaxLen = emqttd_opts:g(max_queued_messages, Opts, ?MAX_LEN),
|
||||
HighWM = round(MaxLen * emqttd_opts:g(high_queue_watermark, Opts, ?HIGH_WM)),
|
||||
LowWM = round(MaxLen * emqttd_opts:g(low_queue_watermark, Opts, ?LOW_WM)),
|
||||
StoreQos0 = emqttd_opts:g(queue_qos0_messages, Opts, false),
|
||||
MaxLen = emqttd_opts:g(max_length, Opts, ?MAX_LEN),
|
||||
#mqueue{name = Name,
|
||||
max_len = MaxLen,
|
||||
store_qos0 = StoreQos0,
|
||||
high_watermark = HighWM,
|
||||
low_watermark = LowWM}.
|
||||
low_wm = round(MaxLen * emqttd_opts:g(low_watermark, Opts, ?LOW_WM)),
|
||||
high_wm = round(MaxLen * emqttd_opts:g(high_watermark, Opts, ?HIGH_WM)),
|
||||
qos0 = emqttd_opts:g(queue_qos0, Opts, true)}.
|
||||
|
||||
name(#mqueue{name = Name}) ->
|
||||
Name.
|
||||
|
||||
len(#mqueue{len = Len}) ->
|
||||
Len.
|
||||
|
||||
is_empty(#mqueue{len = 0}) -> true;
|
||||
is_empty(_Q) -> false.
|
||||
is_empty(_MQ) -> false.
|
||||
|
||||
is_full(#mqueue{len = Len, max_len = MaxLen})
|
||||
when Len =:= MaxLen -> true;
|
||||
is_full(_MQ) -> false.
|
||||
|
||||
len(#mqueue{len = Len}) -> Len.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% @doc
|
||||
%% Queue one message.
|
||||
%%
|
||||
%% @doc Queue one message.
|
||||
%% @end
|
||||
%%------------------------------------------------------------------------------
|
||||
-spec in(mqtt_message(), mqueue()) -> mqueue().
|
||||
in(#mqtt_message{qos = ?QOS_0}, MQ = #mqueue{store_qos0 = false}) ->
|
||||
|
||||
%% drop qos0
|
||||
in(#mqtt_message{qos = ?QOS_0}, MQ = #mqueue{qos0 = false}) ->
|
||||
MQ;
|
||||
%% queue is full, drop the oldest
|
||||
in(Msg, MQ = #mqueue{name = Name, len = Len, max_len = MaxLen, queue = Q}) when Len =:= MaxLen ->
|
||||
Q2 = case queue:out(Q) of
|
||||
{{value, OldMsg}, Q1} ->
|
||||
%%TODO: publish the dropped message to $SYS?
|
||||
lager:error("Queue(~s) drop message: ~p", [Name, OldMsg]),
|
||||
Q1;
|
||||
{empty, Q1} -> %% maybe max_len is 1
|
||||
Q1
|
||||
end,
|
||||
MQ#mqueue{queue = queue:in(Msg, Q2)};
|
||||
in(Msg, MQ = #mqueue{len = Len, queue = Q}) ->
|
||||
maybe_set_alarm(MQ#mqueue{len = Len+1, queue = queue:in(Msg, Q)}).
|
||||
|
||||
out(MQ = #mqueue{len = 0, queue = _Q}) ->
|
||||
%% simply drop the oldest one if queue is full, improve later
|
||||
in(Msg, MQ = #mqueue{name = Name, len = Len, max_len = MaxLen})
|
||||
when Len =:= MaxLen ->
|
||||
{{value, OldMsg}, Q2} = queue:out(Q),
|
||||
lager:error("queue(~s) drop message: ~p", [Name, OldMsg]),
|
||||
MQ#mqueue{q = queue:in(Msg, Q2)};
|
||||
|
||||
in(Msg, MQ = #mqueue{q = Q, len = Len}) ->
|
||||
maybe_set_alarm(MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1});
|
||||
|
||||
out(MQ = #mqueue{len = 0}) ->
|
||||
{empty, MQ};
|
||||
out(MQ = #mqueue{len = Len, queue = Q}) ->
|
||||
{Result, Q1} = queue:out(Q),
|
||||
{Result, maybe_clear_alarm(MQ#mqueue{len = Len - 1, queue = Q1})}.
|
||||
|
||||
peek(#mqueue{queue = Q}) ->
|
||||
queue:peek(Q).
|
||||
out(MQ = #mqueue{q = Q, len = Len}) ->
|
||||
{Result, Q2} = queue:out(Q),
|
||||
{Result, maybe_clear_alarm(MQ#mqueue{q = Q2, len = Len - 1})}.
|
||||
|
||||
to_list(#mqueue{queue = Q}) ->
|
||||
queue:to_list(Q).
|
||||
|
||||
maybe_set_alarm(MQ = #mqueue{name = Name, len = Len, high_watermark = HighWM, alert = false})
|
||||
maybe_set_alarm(MQ = #mqueue{name = Name, len = Len, high_wm = HighWM, alarm = false})
|
||||
when Len >= HighWM ->
|
||||
AlarmDescr = io_lib:format("len ~p > high_watermark ~p", [Len, HighWM]),
|
||||
emqttd_alarm:set_alarm({{queue_high_watermark, Name}, AlarmDescr}),
|
||||
MQ#mqueue{alert = true};
|
||||
MQ#mqueue{alarm = true};
|
||||
maybe_set_alarm(MQ) ->
|
||||
MQ.
|
||||
|
||||
maybe_clear_alarm(MQ = #mqueue{name = Name, len = Len, low_watermark = LowWM, alert = true})
|
||||
maybe_clear_alarm(MQ = #mqueue{name = Name, len = Len, low_watermark = LowWM, alarm = true})
|
||||
when Len =< LowWM ->
|
||||
emqttd_alarm:clear_alarm({queue_high_watermark, Name}), MQ#mqueue{alert = false};
|
||||
emqttd_alarm:clear_alarm({queue_high_watermark, Name}),
|
||||
MQ#mqueue{alarm = false};
|
||||
maybe_clear_alarm(MQ) ->
|
||||
MQ.
|
||||
|
||||
|
|
|
@ -0,0 +1,66 @@
|
|||
%%%-----------------------------------------------------------------------------
|
||||
%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
|
||||
%%%
|
||||
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
%%% of this software and associated documentation files (the "Software"), to deal
|
||||
%%% in the Software without restriction, including without limitation the rights
|
||||
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
%%% copies of the Software, and to permit persons to whom the Software is
|
||||
%%% furnished to do so, subject to the following conditions:
|
||||
%%%
|
||||
%%% The above copyright notice and this permission notice shall be included in all
|
||||
%%% copies or substantial portions of the Software.
|
||||
%%%
|
||||
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
%%% SOFTWARE.
|
||||
%%%-----------------------------------------------------------------------------
|
||||
%%% @doc
|
||||
%%% Inflight window of message queue. Wrap a list with len.
|
||||
%%%
|
||||
%%% @end
|
||||
%%%-----------------------------------------------------------------------------
|
||||
|
||||
-module(emqttd_mqwin).
|
||||
|
||||
-author("Feng Lee <feng@emqtt.io>").
|
||||
|
||||
-export([new/2, len/1, in/2, ack/2]).
|
||||
|
||||
-define(WIN_SIZE, 100).
|
||||
|
||||
-record(mqwin, {name,
|
||||
w = [], %% window list
|
||||
len = 0, %% current window len
|
||||
size = ?WIN_SIZE}).
|
||||
|
||||
-type mqwin() :: #mqwin{}.
|
||||
|
||||
-export_type([mqwin/0]).
|
||||
|
||||
new(Name, Opts) ->
|
||||
WinSize = emqttd_opts:g(inflight_window, Opts, ?WIN_SIZE),
|
||||
#mqwin{name = Name, size = WinSize}.
|
||||
|
||||
len(#mqwin{len = Len}) ->
|
||||
Len.
|
||||
|
||||
in(_Msg, #mqwin{len = Len, size = Size})
|
||||
when Len =:= Size -> {error, full};
|
||||
|
||||
in(Msg, Win = #mqwin{w = W, len = Len}) ->
|
||||
{ok, Win#mqwin{w = [Msg|W], len = Len +1}}.
|
||||
|
||||
ack(MsgId, QWin = #mqwin{w = W, len = Len}) ->
|
||||
case lists:keyfind(MsgId, 2, W) of
|
||||
false ->
|
||||
lager:error("qwin(~s) cannot find msgid: ~p", [MsgId]), QWin;
|
||||
_Msg ->
|
||||
QWin#mqwin{w = lists:keydelete(MsgId, 2, W), len = Len - 1}
|
||||
end.
|
||||
|
||||
|
|
@ -35,24 +35,20 @@
|
|||
|
||||
-include_lib("emqtt/include/emqtt_packet.hrl").
|
||||
|
||||
%% API Function Exports
|
||||
-define(SessProc, emqttd_session_proc).
|
||||
|
||||
%% Session Managenent APIs
|
||||
-export([start/1,
|
||||
resume/3,
|
||||
publish/3,
|
||||
destroy/2]).
|
||||
|
||||
%% PubSub APIs
|
||||
-export([publish/3,
|
||||
puback/2,
|
||||
subscribe/2,
|
||||
unsubscribe/2,
|
||||
destroy/2]).
|
||||
|
||||
%% This api looks strange... :(
|
||||
-export([store/2]).
|
||||
|
||||
%% Start gen_server
|
||||
-export([start_link/2]).
|
||||
|
||||
%% gen_server Function Exports
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||
terminate/2, code_change/3]).
|
||||
await/2,
|
||||
dispatch/2]).
|
||||
|
||||
-record(session, {
|
||||
%% ClientId: Identifier of Session
|
||||
|
@ -67,20 +63,11 @@
|
|||
%% Client’s subscriptions.
|
||||
subscriptions :: list(),
|
||||
|
||||
%% Inflight window size
|
||||
inflight_window = 40,
|
||||
|
||||
%% Inflight qos1, qos2 messages sent to the client but unacked,
|
||||
%% QoS 1 and QoS 2 messages which have been sent to the Client,
|
||||
%% but have not been completely acknowledged.
|
||||
%% Client <- Broker
|
||||
inflight_queue :: list(),
|
||||
|
||||
%% Inflight qos2 messages received from client and waiting for pubrel.
|
||||
%% QoS 2 messages which have been received from the Client,
|
||||
%% but have not been completely acknowledged.
|
||||
%% Client -> Broker
|
||||
awaiting_queue :: list(),
|
||||
inflight_window :: emqttd_mqwin:mqwin(),
|
||||
|
||||
%% All qos1, qos2 messages published to when client is disconnected.
|
||||
%% QoS 1 and QoS 2 messages pending transmission to the Client.
|
||||
|
@ -88,18 +75,22 @@
|
|||
%% Optionally, QoS 0 messages pending transmission to the Client.
|
||||
pending_queue :: emqttd_mqueue:mqueue(),
|
||||
|
||||
%% Inflight qos2 messages received from client and waiting for pubrel.
|
||||
%% QoS 2 messages which have been received from the Client,
|
||||
%% but have not been completely acknowledged.
|
||||
%% Client -> Broker
|
||||
awaiting_rel :: map(),
|
||||
|
||||
%% Awaiting timers for ack, rel and comp.
|
||||
awaiting_ack :: map(),
|
||||
|
||||
awaiting_rel :: map(),
|
||||
|
||||
awaiting_comp :: map(),
|
||||
|
||||
%% Retries to resend the unacked messages
|
||||
max_unack_retries = 3,
|
||||
unack_retries = 3,
|
||||
|
||||
%% 4, 8, 16 seconds if 3 retries:)
|
||||
unack_retry_after = 4,
|
||||
unack_timeout = 4,
|
||||
|
||||
%% Awaiting PUBREL timeout
|
||||
await_rel_timeout = 8,
|
||||
|
@ -111,7 +102,7 @@
|
|||
|
||||
timestamp}).
|
||||
|
||||
-type session() :: #session{} | pid().
|
||||
-type session() :: #session{}.
|
||||
|
||||
%%%=============================================================================
|
||||
%%% Session API
|
||||
|
@ -139,6 +130,7 @@ start({false = _CleanSess, ClientId, ClientPid}) ->
|
|||
resume(SessState = #session{}, _ClientId, _ClientPid) ->
|
||||
SessState;
|
||||
resume(SessPid, ClientId, ClientPid) when is_pid(SessPid) ->
|
||||
?SessProc:
|
||||
gen_server:cast(SessPid, {resume, ClientId, ClientPid}),
|
||||
SessPid.
|
||||
|
||||
|
@ -342,171 +334,6 @@ initial_state(ClientId, ClientPid) ->
|
|||
State = initial_state(ClientId),
|
||||
State#session{client_pid = ClientPid}.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% @doc Start a session process.
|
||||
%% @end
|
||||
%%------------------------------------------------------------------------------
|
||||
start_link(ClientId, ClientPid) ->
|
||||
gen_server:start_link(?MODULE, [ClientId, ClientPid], []).
|
||||
|
||||
%%%=============================================================================
|
||||
%%% gen_server callbacks
|
||||
%%%=============================================================================
|
||||
|
||||
init([ClientId, ClientPid]) ->
|
||||
process_flag(trap_exit, true),
|
||||
true = link(ClientPid),
|
||||
State = initial_state(ClientId, ClientPid),
|
||||
MQueue = emqttd_mqueue:new(ClientId, emqttd:env(mqtt, queue)),
|
||||
State1 = State#session{pending_queue = MQueue,
|
||||
timestamp = os:timestamp()},
|
||||
{ok, init(emqttd:env(mqtt, session), State1), hibernate}.
|
||||
|
||||
init([], State) ->
|
||||
State;
|
||||
|
||||
%% Session expired after hours
|
||||
init([{expired_after, Hours} | Opts], State) ->
|
||||
init(Opts, State#session{sess_expired_after = Hours * 3600});
|
||||
|
||||
%% Max number of QoS 1 and 2 messages that can be “inflight” at one time.
|
||||
init([{max_inflight_messages, MaxInflight} | Opts], State) ->
|
||||
init(Opts, State#session{inflight_window = MaxInflight});
|
||||
|
||||
%% Max retries for unacknolege Qos1/2 messages
|
||||
init([{max_unack_retries, Retries} | Opts], State) ->
|
||||
init(Opts, State#session{max_unack_retries = Retries});
|
||||
|
||||
%% Retry after 4, 8, 16 seconds
|
||||
init([{unack_retry_after, Secs} | Opts], State) ->
|
||||
init(Opts, State#session{unack_retry_after = Secs});
|
||||
|
||||
%% Awaiting PUBREL timeout
|
||||
init([{await_rel_timeout, Secs} | Opts], State) ->
|
||||
init(Opts, State#session{await_rel_timeout = Secs});
|
||||
|
||||
init([Opt | Opts], State) ->
|
||||
lager:error("Bad Session Option: ~p", [Opt]),
|
||||
init(Opts, State).
|
||||
|
||||
handle_call({subscribe, Topics}, _From, State) ->
|
||||
{ok, NewState, GrantedQos} = subscribe(State, Topics),
|
||||
{reply, {ok, GrantedQos}, NewState};
|
||||
|
||||
handle_call({unsubscribe, Topics}, _From, State) ->
|
||||
{ok, NewState} = unsubscribe(State, Topics),
|
||||
{reply, ok, NewState};
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
lager:error("Unexpected request: ~p", [Req]),
|
||||
{reply, error, State}.
|
||||
|
||||
handle_cast({resume, ClientId, ClientPid}, State = #session{
|
||||
clientid = ClientId,
|
||||
client_pid = OldClientPid,
|
||||
msg_queue = Queue,
|
||||
awaiting_ack = AwaitingAck,
|
||||
awaiting_comp = AwaitingComp,
|
||||
expire_timer = ETimer}) ->
|
||||
|
||||
lager:info([{client, ClientId}], "Session ~s resumed by ~p",[ClientId, ClientPid]),
|
||||
|
||||
%% kick old client...
|
||||
if
|
||||
OldClientPid =:= undefined ->
|
||||
ok;
|
||||
OldClientPid =:= ClientPid ->
|
||||
ok;
|
||||
true ->
|
||||
lager:error("Session '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, ClientPid, OldClientPid]),
|
||||
unlink(OldClientPid),
|
||||
OldClientPid ! {stop, duplicate_id, ClientPid}
|
||||
end,
|
||||
|
||||
%% cancel timeout timer
|
||||
emqttd_util:cancel_timer(ETimer),
|
||||
|
||||
%% redelivery PUBREL
|
||||
lists:foreach(fun(PacketId) ->
|
||||
ClientPid ! {redeliver, {?PUBREL, PacketId}}
|
||||
end, maps:keys(AwaitingComp)),
|
||||
|
||||
%% redelivery messages that awaiting PUBACK or PUBREC
|
||||
Dup = fun(Msg) -> Msg#mqtt_message{dup = true} end,
|
||||
lists:foreach(fun(Msg) ->
|
||||
ClientPid ! {dispatch, {self(), Dup(Msg)}}
|
||||
end, maps:values(AwaitingAck)),
|
||||
|
||||
%% send offline messages
|
||||
lists:foreach(fun(Msg) ->
|
||||
ClientPid ! {dispatch, {self(), Msg}}
|
||||
end, emqttd_queue:all(Queue)),
|
||||
|
||||
{noreply, State#session{client_pid = ClientPid,
|
||||
msg_queue = emqttd_queue:clear(Queue),
|
||||
expire_timer = undefined}, hibernate};
|
||||
|
||||
handle_cast({publish, ClientId, {?QOS_2, Message}}, State) ->
|
||||
NewState = publish(State, ClientId, {?QOS_2, Message}),
|
||||
{noreply, NewState};
|
||||
|
||||
handle_cast({puback, PacketId}, State) ->
|
||||
NewState = puback(State, {?PUBACK, PacketId}),
|
||||
{noreply, NewState};
|
||||
|
||||
handle_cast({pubrec, PacketId}, State) ->
|
||||
NewState = puback(State, {?PUBREC, PacketId}),
|
||||
{noreply, NewState};
|
||||
|
||||
handle_cast({pubrel, PacketId}, State) ->
|
||||
NewState = puback(State, {?PUBREL, PacketId}),
|
||||
{noreply, NewState};
|
||||
|
||||
handle_cast({pubcomp, PacketId}, State) ->
|
||||
NewState = puback(State, {?PUBCOMP, PacketId}),
|
||||
{noreply, NewState};
|
||||
|
||||
handle_cast({destroy, ClientId}, State = #session{clientid = ClientId}) ->
|
||||
lager:warning("Session ~s destroyed", [ClientId]),
|
||||
{stop, normal, State};
|
||||
|
||||
handle_cast(Msg, State) ->
|
||||
lager:critical("Unexpected Msg: ~p, State: ~p", [Msg, State]),
|
||||
{noreply, State}.
|
||||
|
||||
handle_info({dispatch, {_From, Messages}}, State) when is_list(Messages) ->
|
||||
F = fun(Message, S) -> dispatch(Message, S) end,
|
||||
{noreply, lists:foldl(F, State, Messages)};
|
||||
|
||||
handle_info({dispatch, {_From, Message}}, State) ->
|
||||
{noreply, dispatch(Message, State)};
|
||||
|
||||
handle_info({'EXIT', ClientPid, Reason}, State = #session{clientid = ClientId,
|
||||
client_pid = ClientPid}) ->
|
||||
lager:info("Session: client ~s@~p exited for ~p", [ClientId, ClientPid, Reason]),
|
||||
{noreply, start_expire_timer(State#session{client_pid = undefined})};
|
||||
|
||||
handle_info({'EXIT', ClientPid0, _Reason}, State = #session{client_pid = ClientPid}) ->
|
||||
lager:error("Unexpected Client EXIT: pid=~p, pid(state): ~p", [ClientPid0, ClientPid]),
|
||||
{noreply, State};
|
||||
|
||||
handle_info(session_expired, State = #session{clientid = ClientId}) ->
|
||||
lager:warning("Session ~s expired!", [ClientId]),
|
||||
{stop, {shutdown, expired}, State};
|
||||
|
||||
handle_info({timeout, awaiting_rel, MsgId}, SessState) ->
|
||||
NewState = timeout(awaiting_rel, MsgId, SessState),
|
||||
{noreply, NewState};
|
||||
|
||||
handle_info(Info, State) ->
|
||||
lager:critical("Unexpected Info: ~p, State: ~p", [Info, State]),
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, _State) ->
|
||||
ok.
|
||||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
%%%=============================================================================
|
||||
%%% Internal functions
|
||||
|
|
|
@ -27,3 +27,186 @@
|
|||
|
||||
-module(emqttd_session_proc).
|
||||
|
||||
-author("Feng Lee <feng@emqtt.io>").
|
||||
|
||||
-include("emqttd.hrl").
|
||||
|
||||
-include_lib("emqtt/include/emqtt.hrl").
|
||||
|
||||
-include_lib("emqtt/include/emqtt_packet.hrl").
|
||||
|
||||
%% Start gen_server
|
||||
-export([start_link/2]).
|
||||
|
||||
%% gen_server Function Exports
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||
terminate/2, code_change/3]).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% @doc Start a session process.
|
||||
%% @end
|
||||
%%------------------------------------------------------------------------------
|
||||
start_link(ClientId, ClientPid) ->
|
||||
gen_server:start_link(?MODULE, [ClientId, ClientPid], []).
|
||||
|
||||
%%%=============================================================================
|
||||
%%% gen_server callbacks
|
||||
%%%=============================================================================
|
||||
|
||||
init([ClientId, ClientPid]) ->
|
||||
process_flag(trap_exit, true),
|
||||
true = link(ClientPid),
|
||||
State = initial_state(ClientId, ClientPid),
|
||||
MQueue = emqttd_mqueue:new(ClientId, emqttd:env(mqtt, queue)),
|
||||
State1 = State#session{pending_queue = MQueue,
|
||||
timestamp = os:timestamp()},
|
||||
{ok, init(emqttd:env(mqtt, session), State1), hibernate}.
|
||||
|
||||
init([], State) ->
|
||||
State;
|
||||
|
||||
%% Session expired after hours
|
||||
init([{expired_after, Hours} | Opts], State) ->
|
||||
init(Opts, State#session{sess_expired_after = Hours * 3600});
|
||||
|
||||
%% Max number of QoS 1 and 2 messages that can be “inflight” at one time.
|
||||
init([{max_inflight_messages, MaxInflight} | Opts], State) ->
|
||||
init(Opts, State#session{inflight_window = MaxInflight});
|
||||
|
||||
%% Max retries for unacknolege Qos1/2 messages
|
||||
init([{max_unack_retries, Retries} | Opts], State) ->
|
||||
init(Opts, State#session{max_unack_retries = Retries});
|
||||
|
||||
%% Retry after 4, 8, 16 seconds
|
||||
init([{unack_retry_after, Secs} | Opts], State) ->
|
||||
init(Opts, State#session{unack_retry_after = Secs});
|
||||
|
||||
%% Awaiting PUBREL timeout
|
||||
init([{await_rel_timeout, Secs} | Opts], State) ->
|
||||
init(Opts, State#session{await_rel_timeout = Secs});
|
||||
|
||||
init([Opt | Opts], State) ->
|
||||
lager:error("Bad Session Option: ~p", [Opt]),
|
||||
init(Opts, State).
|
||||
|
||||
handle_call({subscribe, Topics}, _From, State) ->
|
||||
{ok, NewState, GrantedQos} = subscribe(State, Topics),
|
||||
{reply, {ok, GrantedQos}, NewState};
|
||||
|
||||
handle_call({unsubscribe, Topics}, _From, State) ->
|
||||
{ok, NewState} = unsubscribe(State, Topics),
|
||||
{reply, ok, NewState};
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
lager:error("Unexpected request: ~p", [Req]),
|
||||
{reply, error, State}.
|
||||
|
||||
handle_cast({resume, ClientId, ClientPid}, State = #session{
|
||||
clientid = ClientId,
|
||||
client_pid = OldClientPid,
|
||||
msg_queue = Queue,
|
||||
awaiting_ack = AwaitingAck,
|
||||
awaiting_comp = AwaitingComp,
|
||||
expire_timer = ETimer}) ->
|
||||
|
||||
lager:info([{client, ClientId}], "Session ~s resumed by ~p",[ClientId, ClientPid]),
|
||||
|
||||
%% kick old client...
|
||||
if
|
||||
OldClientPid =:= undefined ->
|
||||
ok;
|
||||
OldClientPid =:= ClientPid ->
|
||||
ok;
|
||||
true ->
|
||||
lager:error("Session '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, ClientPid, OldClientPid]),
|
||||
unlink(OldClientPid),
|
||||
OldClientPid ! {stop, duplicate_id, ClientPid}
|
||||
end,
|
||||
|
||||
%% cancel timeout timer
|
||||
emqttd_util:cancel_timer(ETimer),
|
||||
|
||||
%% redelivery PUBREL
|
||||
lists:foreach(fun(PacketId) ->
|
||||
ClientPid ! {redeliver, {?PUBREL, PacketId}}
|
||||
end, maps:keys(AwaitingComp)),
|
||||
|
||||
%% redelivery messages that awaiting PUBACK or PUBREC
|
||||
Dup = fun(Msg) -> Msg#mqtt_message{dup = true} end,
|
||||
lists:foreach(fun(Msg) ->
|
||||
ClientPid ! {dispatch, {self(), Dup(Msg)}}
|
||||
end, maps:values(AwaitingAck)),
|
||||
|
||||
%% send offline messages
|
||||
lists:foreach(fun(Msg) ->
|
||||
ClientPid ! {dispatch, {self(), Msg}}
|
||||
end, emqttd_queue:all(Queue)),
|
||||
|
||||
{noreply, State#session{client_pid = ClientPid,
|
||||
msg_queue = emqttd_queue:clear(Queue),
|
||||
expire_timer = undefined}, hibernate};
|
||||
|
||||
|
||||
handle_cast({publish, ClientId, {?QOS_2, Message}}, State) ->
|
||||
NewState = publish(State, ClientId, {?QOS_2, Message}),
|
||||
{noreply, NewState};
|
||||
|
||||
handle_cast({puback, PacketId}, State) ->
|
||||
NewState = puback(State, {?PUBACK, PacketId}),
|
||||
{noreply, NewState};
|
||||
|
||||
handle_cast({pubrec, PacketId}, State) ->
|
||||
NewState = puback(State, {?PUBREC, PacketId}),
|
||||
{noreply, NewState};
|
||||
|
||||
handle_cast({pubrel, PacketId}, State) ->
|
||||
NewState = puback(State, {?PUBREL, PacketId}),
|
||||
{noreply, NewState};
|
||||
|
||||
handle_cast({pubcomp, PacketId}, State) ->
|
||||
NewState = puback(State, {?PUBCOMP, PacketId}),
|
||||
{noreply, NewState};
|
||||
|
||||
handle_cast({destroy, ClientId}, State = #session{clientid = ClientId}) ->
|
||||
lager:warning("Session ~s destroyed", [ClientId]),
|
||||
{stop, normal, State};
|
||||
|
||||
handle_cast(Msg, State) ->
|
||||
lager:critical("Unexpected Msg: ~p, State: ~p", [Msg, State]),
|
||||
{noreply, State}.
|
||||
|
||||
handle_info({dispatch, {_From, Messages}}, State) when is_list(Messages) ->
|
||||
F = fun(Message, S) -> dispatch(Message, S) end,
|
||||
{noreply, lists:foldl(F, State, Messages)};
|
||||
|
||||
handle_info({dispatch, {_From, Message}}, State) ->
|
||||
{noreply, dispatch(Message, State)};
|
||||
|
||||
handle_info({'EXIT', ClientPid, Reason}, State = #session{clientid = ClientId,
|
||||
client_pid = ClientPid}) ->
|
||||
lager:info("Session: client ~s@~p exited for ~p", [ClientId, ClientPid, Reason]),
|
||||
{noreply, start_expire_timer(State#session{client_pid = undefined})};
|
||||
|
||||
handle_info({'EXIT', ClientPid0, _Reason}, State = #session{client_pid = ClientPid}) ->
|
||||
lager:error("Unexpected Client EXIT: pid=~p, pid(state): ~p", [ClientPid0, ClientPid]),
|
||||
{noreply, State};
|
||||
|
||||
handle_info(session_expired, State = #session{clientid = ClientId}) ->
|
||||
lager:warning("Session ~s expired!", [ClientId]),
|
||||
{stop, {shutdown, expired}, State};
|
||||
|
||||
handle_info({timeout, awaiting_rel, MsgId}, SessState) ->
|
||||
NewState = timeout(awaiting_rel, MsgId, SessState),
|
||||
{noreply, NewState};
|
||||
|
||||
handle_info(Info, State) ->
|
||||
lager:critical("Unexpected Info: ~p, State: ~p", [Info, State]),
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, _State) ->
|
||||
ok.
|
||||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
|
||||
|
|
|
@ -89,8 +89,6 @@
|
|||
{session, [
|
||||
%% Expired after 2 days
|
||||
{expired_after, 48},
|
||||
%% Max number of QoS 1 and 2 messages that can be “in flight” at one time.
|
||||
{max_inflight_messages, 40},
|
||||
%% Max retries for unacknolege Qos1/2 messages
|
||||
{max_unack_retries, 3},
|
||||
%% Retry after 4, 8, 16 seconds
|
||||
|
@ -99,14 +97,20 @@
|
|||
{await_rel_timeout, 8}
|
||||
]},
|
||||
{queue, [
|
||||
%% Max messages queued when client is disconnected, or inflight messsage window is overload
|
||||
{max_queued_messages, 200},
|
||||
%% High watermark of queued messsages
|
||||
{high_queue_watermark, 0.8},
|
||||
%% Max queue length
|
||||
{max_length, 1000},
|
||||
|
||||
%% Max number of QoS 1 and 2 messages that can be “in flight” at one time.
|
||||
{inflight_window, 100},
|
||||
|
||||
%% Low watermark of queued messsages
|
||||
{low_queue_watermark, 0.2},
|
||||
%% Queue Qos0 offline messages?
|
||||
{queue_qos0_messages, true}
|
||||
{low_watermark, 0.2},
|
||||
|
||||
%% High watermark of queued messsages
|
||||
{high_watermark, 0.6},
|
||||
|
||||
%% Queue Qos0 messages?
|
||||
{queue_qos0, true}
|
||||
]}
|
||||
]},
|
||||
%% Broker Options
|
||||
|
|
Loading…
Reference in New Issue