session upgrade

This commit is contained in:
Feng Lee 2015-06-11 23:34:53 +08:00
parent 4313ed0cf3
commit 517c7eb7b6
3 changed files with 134 additions and 77 deletions

View File

@ -272,7 +272,7 @@ send({_From = SessPid, Message}, State = #proto_state{session = SessPid}) when i
%% message(qos1, qos2) not from session %% message(qos1, qos2) not from session
send({_From, Message = #mqtt_message{qos = Qos}}, State = #proto_state{session = Session}) send({_From, Message = #mqtt_message{qos = Qos}}, State = #proto_state{session = Session})
when (Qos =:= ?QOS_1) orelse (Qos =:= ?QOS_2) -> when (Qos =:= ?QOS_1) orelse (Qos =:= ?QOS_2) ->
{Message1, NewSession} = emqttd_session:store(Session, Message), {Message1, NewSession} = emqttd_session:await_ack(Session, Message),
send(emqtt_message:to_packet(Message1), State#proto_state{session = NewSession}); send(emqtt_message:to_packet(Message1), State#proto_state{session = NewSession});
send(Packet, State = #proto_state{sendfun = SendFun, peername = Peername}) when is_record(Packet, mqtt_packet) -> send(Packet, State = #proto_state{sendfun = SendFun, peername = Peername}) when is_record(Packet, mqtt_packet) ->

View File

@ -24,6 +24,7 @@
%%% %%%
%%% @end %%% @end
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
-module(emqttd_session). -module(emqttd_session).
-author("Feng Lee <feng@emqtt.io>"). -author("Feng Lee <feng@emqtt.io>").
@ -53,7 +54,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
-record(session_state, { -record(session, {
%% ClientId: Identifier of Session %% ClientId: Identifier of Session
clientid :: binary(), clientid :: binary(),
@ -100,14 +101,17 @@
%% 4, 8, 16 seconds if 3 retries:) %% 4, 8, 16 seconds if 3 retries:)
unack_retry_after = 4, unack_retry_after = 4,
%% session expired %% Awaiting PUBREL timeout
sess_expired_after = 48, await_rel_timeout = 8,
%% session expired after 48 hours
sess_expired_after = 172800,
sess_expired_timer, sess_expired_timer,
timestamp}). timestamp }).
-type session() :: #session_state{} | pid(). -type session() :: #session{} | pid().
%%%============================================================================= %%%=============================================================================
%%% Session API %%% Session API
@ -132,7 +136,7 @@ start({false = _CleanSess, ClientId, ClientPid}) ->
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec resume(session(), binary(), pid()) -> session(). -spec resume(session(), binary(), pid()) -> session().
resume(SessState = #session_state{}, _ClientId, _ClientPid) -> resume(SessState = #session{}, _ClientId, _ClientPid) ->
SessState; SessState;
resume(SessPid, ClientId, ClientPid) when is_pid(SessPid) -> resume(SessPid, ClientId, ClientPid) when is_pid(SessPid) ->
gen_server:cast(SessPid, {resume, ClientId, ClientPid}), gen_server:cast(SessPid, {resume, ClientId, ClientPid}),
@ -149,10 +153,12 @@ publish(Session, ClientId, {?QOS_0, Message}) ->
publish(Session, ClientId, {?QOS_1, Message}) -> publish(Session, ClientId, {?QOS_1, Message}) ->
emqttd_pubsub:publish(ClientId, Message), Session; emqttd_pubsub:publish(ClientId, Message), Session;
publish(SessState = #session_state{awaiting_rel = AwaitingRel}, _ClientId, publish(SessState = #session{awaiting_rel = AwaitingRel,
await_rel_timeout = Timeout}, _ClientId,
{?QOS_2, Message = #mqtt_message{msgid = MsgId}}) -> {?QOS_2, Message = #mqtt_message{msgid = MsgId}}) ->
%% store in awaiting_rel %% store in awaiting_rel
SessState#session_state{awaiting_rel = maps:put(MsgId, Message, AwaitingRel)}; TRef = erlang:send_after(Timeout * 1000, self(), {timeout, awaiting_rel, MsgId}),
SessState#session{awaiting_rel = maps:put(MsgId, {Message, TRef}, AwaitingRel)};
publish(SessPid, ClientId, {?QOS_2, Message}) when is_pid(SessPid) -> publish(SessPid, ClientId, {?QOS_2, Message}) when is_pid(SessPid) ->
gen_server:cast(SessPid, {publish, ClientId, {?QOS_2, Message}}), gen_server:cast(SessPid, {publish, ClientId, {?QOS_2, Message}}),
@ -163,59 +169,72 @@ publish(SessPid, ClientId, {?QOS_2, Message}) when is_pid(SessPid) ->
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec puback(session(), {mqtt_packet_type(), mqtt_packet_id()}) -> session(). -spec puback(session(), {mqtt_packet_type(), mqtt_packet_id()}) -> session().
puback(SessState = #session_state{clientid = ClientId, awaiting_ack = Awaiting}, {?PUBACK, PacketId}) -> puback(SessState = #session{clientid = ClientId, awaiting_ack = Awaiting}, {?PUBACK, PacketId}) ->
case maps:is_key(PacketId, Awaiting) of case maps:is_key(PacketId, Awaiting) of
true -> ok; true -> ok;
false -> lager:warning("Session ~s: PUBACK PacketId '~p' not found!", [ClientId, PacketId]) false -> lager:warning("Session ~s: PUBACK PacketId '~p' not found!", [ClientId, PacketId])
end, end,
SessState#session_state{awaiting_ack = maps:remove(PacketId, Awaiting)}; SessState#session{awaiting_ack = maps:remove(PacketId, Awaiting)};
puback(SessPid, {?PUBACK, PacketId}) when is_pid(SessPid) -> puback(SessPid, {?PUBACK, PacketId}) when is_pid(SessPid) ->
gen_server:cast(SessPid, {puback, PacketId}), SessPid; gen_server:cast(SessPid, {puback, PacketId}), SessPid;
%% PUBREC %% PUBREC
puback(SessState = #session_state{clientid = ClientId, puback(SessState = #session{clientid = ClientId,
awaiting_ack = AwaitingAck, awaiting_ack = AwaitingAck,
awaiting_comp = AwaitingComp}, {?PUBREC, PacketId}) -> awaiting_comp = AwaitingComp}, {?PUBREC, PacketId}) ->
case maps:is_key(PacketId, AwaitingAck) of case maps:is_key(PacketId, AwaitingAck) of
true -> ok; true -> ok;
false -> lager:warning("Session ~s: PUBREC PacketId '~p' not found!", [ClientId, PacketId]) false -> lager:warning("Session ~s: PUBREC PacketId '~p' not found!", [ClientId, PacketId])
end, end,
SessState#session_state{awaiting_ack = maps:remove(PacketId, AwaitingAck), SessState#session{awaiting_ack = maps:remove(PacketId, AwaitingAck),
awaiting_comp = maps:put(PacketId, true, AwaitingComp)}; awaiting_comp = maps:put(PacketId, true, AwaitingComp)};
puback(SessPid, {?PUBREC, PacketId}) when is_pid(SessPid) -> puback(SessPid, {?PUBREC, PacketId}) when is_pid(SessPid) ->
gen_server:cast(SessPid, {pubrec, PacketId}), SessPid; gen_server:cast(SessPid, {pubrec, PacketId}), SessPid;
%% PUBREL %% PUBREL
puback(SessState = #session_state{clientid = ClientId, puback(SessState = #session{clientid = ClientId,
awaiting_rel = Awaiting}, {?PUBREL, PacketId}) -> awaiting_rel = Awaiting}, {?PUBREL, PacketId}) ->
case maps:find(PacketId, Awaiting) of case maps:find(PacketId, Awaiting) of
{ok, Msg} -> emqttd_pubsub:publish(ClientId, Msg); {ok, {Msg, TRef}} ->
error -> lager:warning("Session ~s: PUBREL PacketId '~p' not found!", [ClientId, PacketId]) catch erlang:cancel_timer(TRef),
emqttd_pubsub:publish(ClientId, Msg);
error ->
lager:error("Session ~s PUBREL PacketId '~p' not found!", [ClientId, PacketId])
end, end,
SessState#session_state{awaiting_rel = maps:remove(PacketId, Awaiting)}; SessState#session{awaiting_rel = maps:remove(PacketId, Awaiting)};
puback(SessPid, {?PUBREL, PacketId}) when is_pid(SessPid) -> puback(SessPid, {?PUBREL, PacketId}) when is_pid(SessPid) ->
gen_server:cast(SessPid, {pubrel, PacketId}), SessPid; gen_server:cast(SessPid, {pubrel, PacketId}), SessPid;
%% PUBCOMP %% PUBCOMP
puback(SessState = #session_state{clientid = ClientId, puback(SessState = #session{clientid = ClientId,
awaiting_comp = AwaitingComp}, {?PUBCOMP, PacketId}) -> awaiting_comp = AwaitingComp}, {?PUBCOMP, PacketId}) ->
case maps:is_key(PacketId, AwaitingComp) of case maps:is_key(PacketId, AwaitingComp) of
true -> ok; true -> ok;
false -> lager:warning("Session ~s: PUBREC PacketId '~p' not exist", [ClientId, PacketId]) false -> lager:warning("Session ~s: PUBREC PacketId '~p' not exist", [ClientId, PacketId])
end, end,
SessState#session_state{awaiting_comp = maps:remove(PacketId, AwaitingComp)}; SessState#session{awaiting_comp = maps:remove(PacketId, AwaitingComp)};
puback(SessPid, {?PUBCOMP, PacketId}) when is_pid(SessPid) -> puback(SessPid, {?PUBCOMP, PacketId}) when is_pid(SessPid) ->
gen_server:cast(SessPid, {pubcomp, PacketId}), SessPid. gen_server:cast(SessPid, {pubcomp, PacketId}), SessPid.
timeout(awaiting_rel, MsgId, SessState = #session{clientid = ClientId, awaiting_rel = Awaiting}) ->
case maps:find(MsgId, Awaiting) of
{ok, {Msg, _TRef}} ->
lager:error([{client, ClientId}], "Session ~s Awaiting Rel Timout!~nDrop Message:~p", [ClientId, Msg]),
SessState#session{awaiting_rel = maps:remove(MsgId, Awaiting)};
error ->
lager:error([{client, ClientId}], "Session ~s Cannot find Awaiting Rel: MsgId=~p", [ClientId, MsgId]),
SessState
end.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Subscribe Topics %% @doc Subscribe Topics
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec subscribe(session(), [{binary(), mqtt_qos()}]) -> {ok, session(), [mqtt_qos()]}. -spec subscribe(session(), [{binary(), mqtt_qos()}]) -> {ok, session(), [mqtt_qos()]}.
subscribe(SessState = #session_state{clientid = ClientId, subscriptions = Subscriptions}, Topics) -> subscribe(SessState = #session{clientid = ClientId, subscriptions = Subscriptions}, Topics) ->
%% subscribe first and don't care if the subscriptions have been existed %% subscribe first and don't care if the subscriptions have been existed
{ok, GrantedQos} = emqttd_pubsub:subscribe(Topics), {ok, GrantedQos} = emqttd_pubsub:subscribe(Topics),
@ -242,7 +261,7 @@ subscribe(SessState = #session_state{clientid = ClientId, subscriptions = Subscr
end end
end, Subscriptions, Topics), end, Subscriptions, Topics),
{ok, SessState#session_state{subscriptions = Subscriptions1}, GrantedQos}; {ok, SessState#session{subscriptions = Subscriptions1}, GrantedQos};
subscribe(SessPid, Topics) when is_pid(SessPid) -> subscribe(SessPid, Topics) when is_pid(SessPid) ->
{ok, GrantedQos} = gen_server:call(SessPid, {subscribe, Topics}), {ok, GrantedQos} = gen_server:call(SessPid, {subscribe, Topics}),
@ -253,17 +272,23 @@ subscribe(SessPid, Topics) when is_pid(SessPid) ->
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec unsubscribe(session(), [binary()]) -> {ok, session()}. -spec unsubscribe(session(), [binary()]) -> {ok, session()}.
unsubscribe(SessState = #session_state{clientid = ClientId, subscriptions = Subscriptions}, Topics) -> unsubscribe(SessState = #session{clientid = ClientId, subscriptions = Subscriptions}, Topics) ->
%%TODO: refactor later.
case Topics -- maps:keys(SubMap) of
[] -> ok;
BadUnsubs -> lager:warning("~s should not unsubscribe ~p", [ClientId, BadUnsubs])
end,
%%unsubscribe from topic tree %%unsubscribe from topic tree
ok = emqttd_pubsub:unsubscribe(Topics), ok = emqttd_pubsub:unsubscribe(Topics),
lager:info([{client, ClientId}], "Client ~s unsubscribe ~p.", [ClientId, Topics]), lager:info([{client, ClientId}], "Client ~s unsubscribe ~p.", [ClientId, Topics]),
SubMap1 = lists:foldl(fun(Topic, Acc) -> maps:remove(Topic, Acc) end, SubMap, Topics),
{ok, SessState#session_state{submap = SubMap1}}; Subscriptions1 =
lists:foldl(fun(Topic, Acc) ->
case lists:keyfind(Topic, 1, Acc) of
{Topic, _Qos} ->
lists:keydelete(Topic, 1, Acc);
false ->
lager:warning([{client, ClientId}], "~s not subscribe ~s", [ClientId, Topic]), Acc
end
end, Subscriptions, Topics),
{ok, SessState#session{subscriptions = Subscriptions1}};
unsubscribe(SessPid, Topics) when is_pid(SessPid) -> unsubscribe(SessPid, Topics) when is_pid(SessPid) ->
gen_server:call(SessPid, {unsubscribe, Topics}), gen_server:call(SessPid, {unsubscribe, Topics}),
@ -277,31 +302,45 @@ unsubscribe(SessPid, Topics) when is_pid(SessPid) ->
destroy(SessPid, ClientId) when is_pid(SessPid) -> destroy(SessPid, ClientId) when is_pid(SessPid) ->
gen_server:cast(SessPid, {destroy, ClientId}). gen_server:cast(SessPid, {destroy, ClientId}).
%store message(qos1) that sent to client % message(qos1) is awaiting ack
store(SessState = #session_state{message_id = MsgId, awaiting_ack = Awaiting}, await_ack(Msg = #mqtt_message{qos = ?QOS_1}, SessState = #session{message_id = MsgId,
Message = #mqtt_message{qos = Qos}) when (Qos =:= ?QOS_1) orelse (Qos =:= ?QOS_2) -> inflight_queue = InflightQ,
awaiting_ack = Awaiting,
unack_retry_after = Time,
max_unack_retries = Retries}) ->
%% assign msgid before send
Msg1 = Msg#mqtt_message{msgid = MsgId},
TRef = erlang:send_after(Time * 1000, self(), {retry, MsgId}),
Awaiting1 = maps:put(MsgId, {TRef, Retries, Time}, Awaiting),
{Msg1, next_msgid(SessState#session{inflight_queue = [{MsgId, Msg1} | InflightQ],
awaiting_ack = Awaiting1})}.
% message(qos2) is awaiting ack
await_ack(Message = #mqtt_message{qos = Qos}, SessState = #session{message_id = MsgId, awaiting_ack = Awaiting},)
when (Qos =:= ?QOS_1) orelse (Qos =:= ?QOS_2) ->
%%assign msgid before send %%assign msgid before send
Message1 = Message#mqtt_message{msgid = MsgId}, Message1 = Message#mqtt_message{msgid = MsgId, dup = false},
Message2 = Message2 =
if if
Qos =:= ?QOS_2 -> Message1#mqtt_message{dup = false}; Qos =:= ?QOS_2 -> Message1#mqtt_message{dup = false};
true -> Message1 true -> Message1
end, end,
Awaiting1 = maps:put(MsgId, Message2, Awaiting), Awaiting1 = maps:put(MsgId, Message2, Awaiting),
{Message1, next_msg_id(SessState#session_state{awaiting_ack = Awaiting1})}. {Message1, next_msgid(SessState#session{awaiting_ack = Awaiting1})}.
initial_state(ClientId) -> initial_state(ClientId) ->
#session_state{clientid = ClientId, %%TODO: init session options.
subscriptions = [], #session{clientid = ClientId,
inflight_queue = [], subscriptions = [],
awaiting_queue = [], inflight_queue = [],
awaiting_ack = #{}, awaiting_queue = [],
awaiting_rel = #{}, awaiting_ack = #{},
awaiting_comp = #{}}. awaiting_rel = #{},
awaiting_comp = #{}}.
initial_state(ClientId, ClientPid) -> initial_state(ClientId, ClientPid) ->
State = initial_state(ClientId), State = initial_state(ClientId),
State#session_state{client_pid = ClientPid}. State#session{client_pid = ClientPid}.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Start a session process. %% @doc Start a session process.
@ -319,7 +358,7 @@ init([ClientId, ClientPid]) ->
true = link(ClientPid), true = link(ClientPid),
State = initial_state(ClientId, ClientPid), State = initial_state(ClientId, ClientPid),
MQueue = emqttd_mqueue:new(ClientId, emqttd:env(mqtt, queue)), MQueue = emqttd_mqueue:new(ClientId, emqttd:env(mqtt, queue)),
State1 = State#session_state{pending_queue = MQueue, State1 = State#session{pending_queue = MQueue,
timestamp = os:timestamp()}, timestamp = os:timestamp()},
{ok, init(emqttd:env(mqtt, session), State1), hibernate}. {ok, init(emqttd:env(mqtt, session), State1), hibernate}.
@ -328,19 +367,23 @@ init([], State) ->
%% Session expired after hours %% Session expired after hours
init([{expired_after, Hours} | Opts], State) -> init([{expired_after, Hours} | Opts], State) ->
init(Opts, State#session_state{sess_expired_after = Hours * 3600 * 1000}); init(Opts, State#session{sess_expired_after = Hours * 3600});
%% Max number of QoS 1 and 2 messages that can be inflight at one time. %% Max number of QoS 1 and 2 messages that can be inflight at one time.
init([{max_inflight_messages, MaxInflight} | Opts], State) -> init([{max_inflight_messages, MaxInflight} | Opts], State) ->
init(Opts, State#session_state{inflight_window = MaxInflight}); init(Opts, State#session{inflight_window = MaxInflight});
%% Max retries for unacknolege Qos1/2 messages %% Max retries for unacknolege Qos1/2 messages
init([{max_unack_retries, Retries} | Opts], State) -> init([{max_unack_retries, Retries} | Opts], State) ->
init(Opts, State#session_state{max_unack_retries = Retries}); init(Opts, State#session{max_unack_retries = Retries});
%% Retry after 4, 8, 16 seconds %% Retry after 4, 8, 16 seconds
init([{unack_retry_after, Secs} | Opts], State) -> init([{unack_retry_after, Secs} | Opts], State) ->
init(Opts, State#session_state{unack_retry_after = Secs * 1000}); init(Opts, State#session{unack_retry_after = Secs});
%% Awaiting PUBREL timeout
init([{await_rel_timeout, Secs} | Opts], State) ->
init(Opts, State#session{await_rel_timeout = Secs});
init([Opt | Opts], State) -> init([Opt | Opts], State) ->
lager:error("Bad Session Option: ~p", [Opt]), lager:error("Bad Session Option: ~p", [Opt]),
@ -358,7 +401,7 @@ handle_call(Req, _From, State) ->
lager:error("Unexpected request: ~p", [Req]), lager:error("Unexpected request: ~p", [Req]),
{reply, error, State}. {reply, error, State}.
handle_cast({resume, ClientId, ClientPid}, State = #session_state{ handle_cast({resume, ClientId, ClientPid}, State = #session{
clientid = ClientId, clientid = ClientId,
client_pid = OldClientPid, client_pid = OldClientPid,
msg_queue = Queue, msg_queue = Queue,
@ -399,7 +442,7 @@ handle_cast({resume, ClientId, ClientPid}, State = #session_state{
ClientPid ! {dispatch, {self(), Msg}} ClientPid ! {dispatch, {self(), Msg}}
end, emqttd_queue:all(Queue)), end, emqttd_queue:all(Queue)),
{noreply, State#session_state{client_pid = ClientPid, {noreply, State#session{client_pid = ClientPid,
msg_queue = emqttd_queue:clear(Queue), msg_queue = emqttd_queue:clear(Queue),
expire_timer = undefined}, hibernate}; expire_timer = undefined}, hibernate};
@ -423,7 +466,7 @@ handle_cast({pubcomp, PacketId}, State) ->
NewState = puback(State, {?PUBCOMP, PacketId}), NewState = puback(State, {?PUBCOMP, PacketId}),
{noreply, NewState}; {noreply, NewState};
handle_cast({destroy, ClientId}, State = #session_state{clientid = ClientId}) -> handle_cast({destroy, ClientId}, State = #session{clientid = ClientId}) ->
lager:warning("Session ~s destroyed", [ClientId]), lager:warning("Session ~s destroyed", [ClientId]),
{stop, normal, State}; {stop, normal, State};
@ -438,19 +481,23 @@ handle_info({dispatch, {_From, Messages}}, State) when is_list(Messages) ->
handle_info({dispatch, {_From, Message}}, State) -> handle_info({dispatch, {_From, Message}}, State) ->
{noreply, dispatch(Message, State)}; {noreply, dispatch(Message, State)};
handle_info({'EXIT', ClientPid, Reason}, State = #session_state{clientid = ClientId, handle_info({'EXIT', ClientPid, Reason}, State = #session{clientid = ClientId,
client_pid = ClientPid}) -> client_pid = ClientPid}) ->
lager:info("Session: client ~s@~p exited for ~p", [ClientId, ClientPid, Reason]), lager:info("Session: client ~s@~p exited for ~p", [ClientId, ClientPid, Reason]),
{noreply, start_expire_timer(State#session_state{client_pid = undefined})}; {noreply, start_expire_timer(State#session{client_pid = undefined})};
handle_info({'EXIT', ClientPid0, _Reason}, State = #session_state{client_pid = ClientPid}) -> handle_info({'EXIT', ClientPid0, _Reason}, State = #session{client_pid = ClientPid}) ->
lager:error("Unexpected Client EXIT: pid=~p, pid(state): ~p", [ClientPid0, ClientPid]), lager:error("Unexpected Client EXIT: pid=~p, pid(state): ~p", [ClientPid0, ClientPid]),
{noreply, State}; {noreply, State};
handle_info(session_expired, State = #session_state{clientid = ClientId}) -> handle_info(session_expired, State = #session{clientid = ClientId}) ->
lager:warning("Session ~s expired!", [ClientId]), lager:warning("Session ~s expired!", [ClientId]),
{stop, {shutdown, expired}, State}; {stop, {shutdown, expired}, State};
handle_info({timeout, awaiting_rel, MsgId}, SessState) ->
NewState = timeout(awaiting_rel, MsgId, SessState),
{noreply, NewState};
handle_info(Info, State) -> handle_info(Info, State) ->
lager:critical("Unexpected Info: ~p, State: ~p", [Info, State]), lager:critical("Unexpected Info: ~p, State: ~p", [Info, State]),
{noreply, State}. {noreply, State}.
@ -465,32 +512,40 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions %%% Internal functions
%%%============================================================================= %%%=============================================================================
dispatch(Message, State = #session_state{clientid = ClientId, %% client is offline
client_pid = undefined}) -> dispatch(Msg, SessState = #session{client_pid = undefined}) ->
queue(ClientId, Message, State); queue(Msg, SessState);
dispatch(Message = #mqtt_message{qos = ?QOS_0}, State = #session_state{client_pid = ClientPid}) -> %% dispatch qos0 directly
ClientPid ! {dispatch, {self(), Message}}, dispatch(Msg = #mqtt_message{qos = ?QOS_0}, SessState = #session{client_pid = ClientPid}) ->
State; ClientPid ! {dispatch, {self(), Msg}}, SessState;
dispatch(Message = #mqtt_message{qos = Qos}, State = #session_state{client_pid = ClientPid}) %% queue if inflight_queue is full
when (Qos =:= ?QOS_1) orelse (Qos =:= ?QOS_2) -> dispatch(Msg = #mqtt_message{qos = Qos}, SessState = #session{inflight_window = InflightWin,
{Message1, NewState} = store(State, Message), inflight_queue = InflightQ})
ClientPid ! {dispatch, {self(), Message1}}, when (Qos > ?QOS_0) andalso (length(InflightQ) >= InflightWin) ->
NewState. %%TODO: set alarms
lager:error([{clientid, ClientId}], "Session ~s inflight_queue is full!", [ClientId]),
queue(Msg, SessState);
queue(ClientId, Message, State = #session_state{msg_queue = Queue}) -> %% dispatch and await ack
State#session_state{msg_queue = emqttd_queue:in(ClientId, Message, Queue)}. dispatch(Msg = #mqtt_message{qos = Qos}, SessState = #session{client_pid = ClientPid})
when (Qos =:= ?QOS_1) orelse (Qos =:= ?QOS_2) ->
%% assign msgid and await
{NewMsg, NewState} = await_ack(Msg, SessState),
ClientPid ! {dispatch, {self(), NewMsg}},
next_msg_id(State = #session_state{message_id = 16#ffff}) -> queue(Msg, SessState = #session{pending_queue = Queue}) ->
State#session_state{message_id = 1}; SessState#session{pending_queue = emqttd_mqueue:in(Msg, Queue)}.
next_msg_id(State = #session_state{message_id = MsgId}) -> next_msgid(State = #session{message_id = 16#ffff}) ->
State#session_state{message_id = MsgId + 1}. State#session{message_id = 1};
start_expire_timer(State = #session_state{expires = Expires, next_msgid(State = #session{message_id = MsgId}) ->
expire_timer = OldTimer}) -> State#session{message_id = MsgId + 1}.
start_expire_timer(State = #session{expires = Expires, expire_timer = OldTimer}) ->
emqttd_util:cancel_timer(OldTimer), emqttd_util:cancel_timer(OldTimer),
Timer = erlang:send_after(Expires * 1000, self(), session_expired), Timer = erlang:send_after(Expires * 1000, self(), session_expired),
State#session_state{expire_timer = Timer}. State#session{expire_timer = Timer}.

View File

@ -94,7 +94,9 @@
%% Max retries for unacknolege Qos1/2 messages %% Max retries for unacknolege Qos1/2 messages
{max_unack_retries, 3}, {max_unack_retries, 3},
%% Retry after 4, 8, 16 seconds %% Retry after 4, 8, 16 seconds
{unack_retry_after, 4} {unack_retry_after, 4},
%% Awaiting PUBREL timeout
{await_rel_timeout, 8}
]}, ]},
{queue, [ {queue, [
%% Max messages queued when client is disconnected, or inflight messsage window is overload %% Max messages queued when client is disconnected, or inflight messsage window is overload