From a0f90b3ac632a839e7f2503bfa17f76a932667c7 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sun, 14 Jun 2015 19:24:03 +0800 Subject: [PATCH] upgrade session --- apps/emqtt/include/emqtt.hrl | 2 +- apps/emqtt/include/emqtt_packet.hrl | 2 +- apps/emqtt/src/emqtt_message.erl | 5 +- apps/emqttd/src/emqttd_bridge.erl | 4 +- apps/emqttd/src/emqttd_client.erl | 12 +- .../{emqttd_mqwin.erl => emqttd_inflight.erl} | 47 +- apps/emqttd/src/emqttd_mqueue.erl | 20 +- apps/emqttd/src/emqttd_msg_store.erl | 4 +- apps/emqttd/src/emqttd_protocol.erl | 85 ++- apps/emqttd/src/emqttd_pubsub.erl | 4 +- apps/emqttd/src/emqttd_session.erl | 557 +++++++++--------- apps/emqttd/src/emqttd_session_sup.erl | 15 +- apps/emqttd/src/emqttd_sm.erl | 56 +- apps/emqttd/src/emqttd_ws_client.erl | 11 +- rel/files/emqttd.config | 12 +- 15 files changed, 422 insertions(+), 414 deletions(-) rename apps/emqttd/src/{emqttd_mqwin.erl => emqttd_inflight.erl} (62%) diff --git a/apps/emqtt/include/emqtt.hrl b/apps/emqtt/include/emqtt.hrl index 49a19cf62..e0a2aab63 100644 --- a/apps/emqtt/include/emqtt.hrl +++ b/apps/emqtt/include/emqtt.hrl @@ -57,7 +57,7 @@ -record(mqtt_message, { topic :: binary(), %% topic published to - from :: mqtt_clientid() | atom(), %% from clientid + from :: binary() | atom(), %% from clientid qos = ?QOS_0 :: mqtt_qos(), retain = false :: boolean(), dup = false :: boolean(), diff --git a/apps/emqtt/include/emqtt_packet.hrl b/apps/emqtt/include/emqtt_packet.hrl index 105a939cf..5a965e98c 100644 --- a/apps/emqtt/include/emqtt_packet.hrl +++ b/apps/emqtt/include/emqtt_packet.hrl @@ -162,7 +162,7 @@ #mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT}, variable = Var}). -define(CONNACK_PACKET(ReturnCode), - #mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}, + #mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}, variable = #mqtt_packet_connack{return_code = ReturnCode}}). -define(PUBLISH_PACKET(Qos, Topic, PacketId, Payload), diff --git a/apps/emqtt/src/emqtt_message.erl b/apps/emqtt/src/emqtt_message.erl index 54a46526c..61da1011e 100644 --- a/apps/emqtt/src/emqtt_message.erl +++ b/apps/emqtt/src/emqtt_message.erl @@ -32,7 +32,7 @@ -include("emqtt_packet.hrl"). --export([from_packet/1, to_packet/1]). +-export([from_packet/1, from_packet/2, to_packet/1]). -export([set_flag/1, set_flag/2, unset_flag/1, unset_flag/2]). @@ -70,6 +70,9 @@ from_packet(#mqtt_packet_connect{will_retain = Retain, dup = false, payload = Msg}. +from_packet(ClientId, Packet) -> + Msg = from_packet(Packet), Msg#mqtt_message{from = ClientId}. + %%------------------------------------------------------------------------------ %% @doc Message to packet %% @end diff --git a/apps/emqttd/src/emqttd_bridge.erl b/apps/emqttd/src/emqttd_bridge.erl index 26919b47f..7f2d8345d 100644 --- a/apps/emqttd/src/emqttd_bridge.erl +++ b/apps/emqttd/src/emqttd_bridge.erl @@ -106,11 +106,11 @@ handle_call(_Request, _From, State) -> handle_cast(_Msg, State) -> {noreply, State}. -handle_info({dispatch, {_From, Msg}}, State = #state{node = Node, status = down}) -> +handle_info({dispatch, Msg}, State = #state{node = Node, status = down}) -> lager:warning("Bridge Dropped Msg for ~p Down:~n~p", [Node, Msg]), {noreply, State}; -handle_info({dispatch, {_From, Msg}}, State = #state{node = Node, status = up}) -> +handle_info({dispatch, Msg}, State = #state{node = Node, status = up}) -> rpc:cast(Node, emqttd_pubsub, publish, [transform(Msg, State)]), {noreply, State}; diff --git a/apps/emqttd/src/emqttd_client.erl b/apps/emqttd/src/emqttd_client.erl index de5d65d45..a61b3373b 100644 --- a/apps/emqttd/src/emqttd_client.erl +++ b/apps/emqttd/src/emqttd_client.erl @@ -106,16 +106,8 @@ handle_info({stop, duplicate_id, _NewPid}, State=#state{proto_state = ProtoState [emqttd_protocol:clientid(ProtoState), ConnName]), stop({shutdown, duplicate_id}, State); -%%TODO: ok?? -handle_info({dispatch, {From, Messages}}, #state{proto_state = ProtoState} = State) when is_list(Messages) -> - ProtoState1 = - lists:foldl(fun(Message, PState) -> - {ok, PState1} = emqttd_protocol:send({From, Message}, PState), PState1 - end, ProtoState, Messages), - {noreply, State#state{proto_state = ProtoState1}}; - -handle_info({dispatch, {From, Message}}, #state{proto_state = ProtoState} = State) -> - {ok, ProtoState1} = emqttd_protocol:send({From, Message}, ProtoState), +handle_info({deliver, Message}, #state{proto_state = ProtoState} = State) -> + {ok, ProtoState1} = emqttd_protocol:send(Message, ProtoState), {noreply, State#state{proto_state = ProtoState1}}; handle_info({redeliver, {?PUBREL, PacketId}}, #state{proto_state = ProtoState} = State) -> diff --git a/apps/emqttd/src/emqttd_mqwin.erl b/apps/emqttd/src/emqttd_inflight.erl similarity index 62% rename from apps/emqttd/src/emqttd_mqwin.erl rename to apps/emqttd/src/emqttd_inflight.erl index acc90bf41..1eb69de8f 100644 --- a/apps/emqttd/src/emqttd_mqwin.erl +++ b/apps/emqttd/src/emqttd_inflight.erl @@ -25,42 +25,47 @@ %%% @end %%%----------------------------------------------------------------------------- --module(emqttd_mqwin). +-module(emqttd_inflight). -author("Feng Lee "). --export([new/2, len/1, in/2, ack/2]). +-include_lib("emqtt/include/emqtt.hrl"). --define(WIN_SIZE, 100). +-export([new/2, is_full/1, len/1, in/2, ack/2]). --record(mqwin, {name, - w = [], %% window list - len = 0, %% current window len - size = ?WIN_SIZE}). +-define(MAX_SIZE, 100). --type mqwin() :: #mqwin{}. +-record(inflight, {name, q = [], len = 0, size = ?MAX_SIZE}). --export_type([mqwin/0]). +-type inflight() :: #inflight{}. -new(Name, Opts) -> - WinSize = emqttd_opts:g(inflight_window, Opts, ?WIN_SIZE), - #mqwin{name = Name, size = WinSize}. +-export_type([inflight/0]). -len(#mqwin{len = Len}) -> +new(Name, Max) -> + #inflight{name = Name, size = Max}. + +is_full(#inflight{size = 0}) -> + false; +is_full(#inflight{len = Len, size = Size}) when Len < Size -> + false; +is_full(_Inflight) -> + true. + +len(#inflight{len = Len}) -> Len. -in(_Msg, #mqwin{len = Len, size = Size}) +in(_Msg, #inflight{len = Len, size = Size}) when Len =:= Size -> {error, full}; -in(Msg, Win = #mqwin{w = W, len = Len}) -> - {ok, Win#mqwin{w = [Msg|W], len = Len +1}}. +in(Msg = #mqtt_message{msgid = MsgId}, Inflight = #inflight{q = Q, len = Len}) -> + {ok, Inflight#inflight{q = [{MsgId, Msg}|Q], len = Len +1}}. -ack(MsgId, QWin = #mqwin{w = W, len = Len}) -> - case lists:keyfind(MsgId, 2, W) of +ack(MsgId, Inflight = #inflight{q = Q, len = Len}) -> + case lists:keyfind(MsgId, 1, Q) of false -> - lager:error("qwin(~s) cannot find msgid: ~p", [MsgId]), QWin; + lager:error("Inflight(~s) cannot find msgid: ~p", [MsgId]), + Inflight; _Msg -> - QWin#mqwin{w = lists:keydelete(MsgId, 2, W), len = Len - 1} + Inflight#inflight{q = lists:keydelete(MsgId, 1, Q), len = Len - 1} end. - diff --git a/apps/emqttd/src/emqttd_mqueue.erl b/apps/emqttd/src/emqttd_mqueue.erl index 84381f036..6a37a707e 100644 --- a/apps/emqttd/src/emqttd_mqueue.erl +++ b/apps/emqttd/src/emqttd_mqueue.erl @@ -56,7 +56,7 @@ -export([new/2, name/1, is_empty/1, is_full/1, - len/1, in/2, out/2]). + len/1, in/2, out/1]). -define(LOW_WM, 0.2). @@ -112,22 +112,30 @@ len(#mqueue{len = Len}) -> Len. %% @doc Queue one message. %% @end %%------------------------------------------------------------------------------ --spec in(mqtt_message(), mqueue()) -> mqueue(). + +-spec in({new | old, mqtt_message()}, mqueue()) -> mqueue(). %% drop qos0 -in(#mqtt_message{qos = ?QOS_0}, MQ = #mqueue{qos0 = false}) -> +in({_, #mqtt_message{qos = ?QOS_0}}, MQ = #mqueue{qos0 = false}) -> MQ; %% simply drop the oldest one if queue is full, improve later -in(Msg, MQ = #mqueue{name = Name, len = Len, max_len = MaxLen}) +in({new, Msg}, MQ = #mqueue{name = Name, q = Q, len = Len, max_len = MaxLen}) when Len =:= MaxLen -> {{value, OldMsg}, Q2} = queue:out(Q), lager:error("queue(~s) drop message: ~p", [Name, OldMsg]), MQ#mqueue{q = queue:in(Msg, Q2)}; -in(Msg, MQ = #mqueue{q = Q, len = Len}) -> +in({old, Msg}, MQ = #mqueue{name = Name, len = Len, max_len = MaxLen}) + when Len =:= MaxLen -> + lager:error("queue(~s) drop message: ~p", [Name, Msg]), MQ; + +in({new, Msg}, MQ = #mqueue{q = Q, len = Len}) -> maybe_set_alarm(MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1}); +in({old, Msg}, MQ = #mqueue{q = Q, len = Len}) -> + MQ#mqueue{q = queue:in_r(Msg, Q), len = Len + 1}. + out(MQ = #mqueue{len = 0}) -> {empty, MQ}; @@ -143,7 +151,7 @@ maybe_set_alarm(MQ = #mqueue{name = Name, len = Len, high_wm = HighWM, alarm = f maybe_set_alarm(MQ) -> MQ. -maybe_clear_alarm(MQ = #mqueue{name = Name, len = Len, low_watermark = LowWM, alarm = true}) +maybe_clear_alarm(MQ = #mqueue{name = Name, len = Len, low_wm = LowWM, alarm = true}) when Len =< LowWM -> emqttd_alarm:clear_alarm({queue_high_watermark, Name}), MQ#mqueue{alarm = false}; diff --git a/apps/emqttd/src/emqttd_msg_store.erl b/apps/emqttd/src/emqttd_msg_store.erl index 511b7cb22..c0b37cbc8 100644 --- a/apps/emqttd/src/emqttd_msg_store.erl +++ b/apps/emqttd/src/emqttd_msg_store.erl @@ -123,7 +123,7 @@ redeliver(Topic, CPid) when is_binary(Topic) andalso is_pid(CPid) -> dispatch(_CPid, []) -> ignore; dispatch(CPid, Msgs) when is_list(Msgs) -> - CPid ! {dispatch, {self(), [Msg || Msg <- Msgs]}}; + CPid ! {dispatch, [Msg || Msg <- Msgs]}; dispatch(CPid, Msg) when is_record(Msg, mqtt_message) -> - CPid ! {dispatch, {self(), Msg}}. + CPid ! {dispatch, Msg}. diff --git a/apps/emqttd/src/emqttd_protocol.erl b/apps/emqttd/src/emqttd_protocol.erl index bd500ab26..3626c6203 100644 --- a/apps/emqttd/src/emqttd_protocol.erl +++ b/apps/emqttd/src/emqttd_protocol.erl @@ -51,8 +51,7 @@ username, clientid, clean_sess, - sessmod, - session, %% session state or session pid + session, will_msg, max_clientid_len = ?MAX_CLIENTID_LEN, client_pid @@ -152,13 +151,13 @@ handle(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername} emqttd_cm:register(client(State2)), %%Starting session - {ok, SessMod, Session} = emqttd_sm:start_session(CleanSess, clientid(State2)), + {ok, Session} = emqttd_sm:start_session(CleanSess, clientid(State2)), %% Start keepalive start_keepalive(KeepAlive), %% ACCEPT - {?CONNACK_ACCEPT, State2#proto_state{sessmod = SessMod, session = Session, will_msg = willmsg(Var)}}; + {?CONNACK_ACCEPT, State2#proto_state{session = Session, will_msg = willmsg(Var)}}; {error, Reason}-> lager:error("~s@~s: username '~s', login failed - ~s", [ClientId, emqttd_net:format(Peername), Username, Reason]), @@ -177,7 +176,7 @@ handle(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), State = #proto_state{clientid = ClientId, session = Session}) -> case check_acl(publish, Topic, State) of allow -> - do_publish(Session, ClientId, ?QOS_0, Packet); + do_publish(Session, ClientId, Packet); deny -> lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic]) end, @@ -187,7 +186,7 @@ handle(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload), State = #proto_state{clientid = ClientId, session = Session}) -> case check_acl(publish, Topic, State) of allow -> - do_publish(Session, ClientId, ?QOS_1, Packet), + do_publish(Session, ClientId, Packet), send(?PUBACK_PACKET(?PUBACK, PacketId), State); deny -> lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic]), @@ -198,26 +197,28 @@ handle(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload), State = #proto_state{clientid = ClientId, session = Session}) -> case check_acl(publish, Topic, State) of allow -> - NewSession = do_publish(Session, ClientId, ?QOS_2, Packet), - send(?PUBACK_PACKET(?PUBREC, PacketId), State#proto_state{session = NewSession}); + do_publish(Session, ClientId, Packet), + send(?PUBACK_PACKET(?PUBREC, PacketId), State); deny -> lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic]), {ok, State} end; -handle(?PUBACK_PACKET(Type, PacketId), State = #proto_state{session = Session}) - when Type >= ?PUBACK andalso Type =< ?PUBCOMP -> - NewSession = emqttd_session:puback(Session, {Type, PacketId}), - NewState = State#proto_state{session = NewSession}, - if - Type =:= ?PUBREC -> - send(?PUBREL_PACKET(PacketId), NewState); - Type =:= ?PUBREL -> - send(?PUBACK_PACKET(?PUBCOMP, PacketId), NewState); - true -> - ok - end, - {ok, NewState}; +handle(?PUBACK_PACKET(?PUBACK, PacketId), State = #proto_state{session = Session}) -> + emqttd_session:puback(Session, PacketId), + {ok, State}; + +handle(?PUBACK_PACKET(?PUBREC, PacketId), State = #proto_state{session = Session}) -> + emqttd_session:pubrec(Session, PacketId), + send(?PUBREL_PACKET(PacketId), State); + +handle(?PUBACK_PACKET(?PUBREL, PacketId), State = #proto_state{session = Session}) -> + emqttd_session:pubrel(Session, PacketId), + send(?PUBACK_PACKET(?PUBCOMP, PacketId), State); + +handle(?PUBACK_PACKET(?PUBCOMP, PacketId), State = #proto_state{session = Session}) -> + emqttd_session:pubcomp(Session, PacketId), + {ok, State}; %% protect from empty topic list handle(?SUBSCRIBE_PACKET(PacketId, []), State) -> @@ -233,13 +234,13 @@ handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{clientid = false -> TopicTable1 = emqttd_broker:foldl_hooks(client_subscribe, [], TopicTable), %%TODO: GrantedQos should be renamed. - {ok, NewSession, GrantedQos} = emqttd_session:subscribe(Session, TopicTable1), - send(?SUBACK_PACKET(PacketId, GrantedQos), State#proto_state{session = NewSession}) + {ok, GrantedQos} = emqttd_session:subscribe(Session, TopicTable1), + send(?SUBACK_PACKET(PacketId, GrantedQos), State) end; -handle({subscribe, Topic, Qos}, State = #proto_state{session = Session}) -> - {ok, NewSession, _GrantedQos} = emqttd_session:subscribe(Session, [{Topic, Qos}]), - {ok, State#proto_state{session = NewSession}}; +handle({subscribe, TopicTable}, State = #proto_state{session = Session}) -> + {ok, _GrantedQos} = emqttd_session:subscribe(Session, TopicTable), + {ok, State}; %% protect from empty topic list handle(?UNSUBSCRIBE_PACKET(PacketId, []), State) -> @@ -247,34 +248,24 @@ handle(?UNSUBSCRIBE_PACKET(PacketId, []), State) -> handle(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{session = Session}) -> Topics1 = emqttd_broker:foldl_hooks(client_unsubscribe, [], Topics), - {ok, NewSession} = emqttd_session:unsubscribe(Session, Topics1), - send(?UNSUBACK_PACKET(PacketId), State#proto_state{session = NewSession}); + ok = emqttd_session:unsubscribe(Session, Topics1), + send(?UNSUBACK_PACKET(PacketId), State); handle(?PACKET(?PINGREQ), State) -> send(?PACKET(?PINGRESP), State); handle(?PACKET(?DISCONNECT), State) -> - %%TODO: how to handle session? % clean willmsg {stop, normal, State#proto_state{will_msg = undefined}}. -do_publish(Session, ClientId, Qos, Packet) -> - Message = emqttd_broker:foldl_hooks(client_publish, [], emqtt_message:from_packet(Packet)), - emqttd_session:publish(Session, ClientId, {Qos, Message}). +do_publish(Session, ClientId, Packet) -> + Msg = emqtt_message:from_packet(ClientId, Packet), + Msg1 = emqttd_broker:foldl_hooks(client_publish, [], Msg), + emqttd_session:publish(Session, Msg1). --spec send({pid() | tuple(), mqtt_message()} | mqtt_packet(), proto_state()) -> {ok, proto_state()}. -%% qos0 message -send({_From, Message = #mqtt_message{qos = ?QOS_0}}, State) -> - send(emqtt_message:to_packet(Message), State); - -%% message from session -send({_From = SessPid, Message}, State = #proto_state{session = SessPid}) when is_pid(SessPid) -> - send(emqtt_message:to_packet(Message), State); -%% message(qos1, qos2) not from session -send({_From, Message = #mqtt_message{qos = Qos}}, State = #proto_state{session = Session}) - when (Qos =:= ?QOS_1) orelse (Qos =:= ?QOS_2) -> - {Message1, NewSession} = emqttd_session:await_ack(Session, Message), - send(emqtt_message:to_packet(Message1), State#proto_state{session = NewSession}); +-spec send(mqtt_message() | mqtt_packet(), proto_state()) -> {ok, proto_state()}. +send(Msg, State) when is_record(Msg, mqtt_message) -> + send(emqtt_message:to_packet(Msg), State); send(Packet, State = #proto_state{sendfun = SendFun, peername = Peername}) when is_record(Packet, mqtt_packet) -> trace(send, Packet, State), @@ -331,8 +322,8 @@ clientid(ClientId, _State) -> ClientId. send_willmsg(_ClientId, undefined) -> ignore; %%TODO:should call session... -send_willmsg(ClientId, WillMsg) -> - emqttd_pubsub:publish(ClientId, WillMsg). +send_willmsg(ClientId, WillMsg) -> + emqttd_pubsub:publish(WillMsg#mqtt_message{from = ClientId}). start_keepalive(0) -> ignore; diff --git a/apps/emqttd/src/emqttd_pubsub.erl b/apps/emqttd/src/emqttd_pubsub.erl index b89355c76..51fcc3c1e 100644 --- a/apps/emqttd/src/emqttd_pubsub.erl +++ b/apps/emqttd/src/emqttd_pubsub.erl @@ -177,7 +177,7 @@ publish(From, <<"$Q/", _/binary>> = Queue, #mqtt_message{qos = Qos} = Msg) -> Qos > SubQos -> Msg#mqtt_message{qos = SubQos}; true -> Msg end, - SubPid ! {dispatch, {self(), Msg1}} + SubPid ! {dispatch, Msg1} end, mnesia:dirty_read(queue, Queue)); publish(_From, Topic, Msg) when is_binary(Topic) -> @@ -202,7 +202,7 @@ dispatch(Topic, #mqtt_message{qos = Qos} = Msg ) when is_binary(Topic) -> Qos > SubQos -> Msg#mqtt_message{qos = SubQos}; true -> Msg end, - SubPid ! {dispatch, {self(), Msg1}} + SubPid ! {dispatch, Msg1} end, Subscribers), length(Subscribers). diff --git a/apps/emqttd/src/emqttd_session.erl b/apps/emqttd/src/emqttd_session.erl index 01a910e3c..4cd480165 100644 --- a/apps/emqttd/src/emqttd_session.erl +++ b/apps/emqttd/src/emqttd_session.erl @@ -53,31 +53,26 @@ -include_lib("emqtt/include/emqtt_packet.hrl"). -%% Start gen_server --export([start_link/2, resume/3, destroy/2]). - -%% Init Session State --export([new/1]). +%% Session API +-export([start_link/3, resume/3, destroy/2]). %% PubSub APIs --export([publish/3, - puback/2, - subscribe/2, - unsubscribe/2, - await/2, - dispatch/2]). +-export([publish/2, + puback/2, pubrec/2, pubrel/2, pubcomp/2, + subscribe/2, unsubscribe/2]). %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -record(session, { - %% ClientId: Identifier of Session - clientid :: binary(), %% Clean Session Flag clean_sess = true, + %% ClientId: Identifier of Session + clientid :: binary(), + %% Client Pid linked with session client_pid :: pid(), @@ -91,7 +86,7 @@ %% QoS 1 and QoS 2 messages which have been sent to the Client, %% but have not been completely acknowledged. %% Client <- Broker - inflight_window :: emqttd_mqwin:mqwin(), + inflight_queue :: emqttd_inflight:inflight(), %% All qos1, qos2 messages published to when client is disconnected. %% QoS 1 and QoS 2 messages pending transmission to the Client. @@ -129,167 +124,121 @@ timestamp}). --type session() :: #session{}. - -%%%============================================================================= -%%% Session API -%%%============================================================================= - %%------------------------------------------------------------------------------ -%% @doc Start a session process. +%% @doc Start a session. %% @end %%------------------------------------------------------------------------------ -start_link(ClientId, ClientPid) -> - gen_server:start_link(?MODULE, [ClientId, ClientPid], []). +start_link(CleanSess, ClientId, ClientPid) -> + gen_server:start_link(?MODULE, [CleanSess, ClientId, ClientPid], []). %%------------------------------------------------------------------------------ %% @doc Resume a session. %% @end %%------------------------------------------------------------------------------ -resume(Session, _ClientId, _ClientPid) when is_record(Session, session) -> - Session; -resume(SessPid, ClientId, ClientPid) when is_pid(SessPid) -> - gen_server:cast(SessPid, {resume, ClientId, ClientPid}), SessPid. +resume(Session, ClientId, ClientPid) when is_pid(Session) -> + gen_server:cast(Session, {resume, ClientId, ClientPid}). %%------------------------------------------------------------------------------ %% @doc Destroy a session. %% @end %%------------------------------------------------------------------------------ --spec destroy(SessPid :: pid(), ClientId :: binary()) -> ok. -destroy(SessPid, ClientId) when is_pid(SessPid) -> - gen_server:cast(SessPid, {destroy, ClientId}), SessPid. +-spec destroy(Session:: pid(), ClientId :: binary()) -> ok. +destroy(Session, ClientId) when is_pid(Session) -> + gen_server:call(Session, {destroy, ClientId}). %%------------------------------------------------------------------------------ -%% @doc Init Session State. +%% @doc Publish message %% @end %%------------------------------------------------------------------------------ --spec new(binary()) -> session(). -new(ClientId) -> +-spec publish(Session :: pid(), {mqtt_qos(), mqtt_message()}) -> ok. +publish(Session, Msg = #mqtt_message{qos = ?QOS_0}) when is_pid(Session) -> + %% publish qos0 directly + emqttd_pubsub:publish(Msg); + +publish(Session, Msg = #mqtt_message{qos = ?QOS_1}) when is_pid(Session) -> + %% publish qos1 directly, and client will puback + emqttd_pubsub:publish(Msg); + +publish(Session, Msg = #mqtt_message{qos = ?QOS_2}) when is_pid(Session) -> + %% publish qos2 by session + gen_server:cast(Session, {publish, Msg}). + +%%------------------------------------------------------------------------------ +%% @doc PubAck message +%% @end +%%------------------------------------------------------------------------------ +-spec puback(Session :: pid(), MsgId :: mqtt_packet_id()) -> ok. +puback(Session, MsgId) when is_pid(Session) -> + gen_server:cast(Session, {puback, MsgId}). + +-spec pubrec(Session :: pid(), MsgId :: mqtt_packet_id()) -> ok. +pubrec(Session, MsgId) when is_pid(Session) -> + gen_server:cast(Session, {pubrec, MsgId}). + +-spec pubrel(Session :: pid(), MsgId :: mqtt_packet_id()) -> ok. +pubrel(Session, MsgId) when is_pid(Session) -> + gen_server:cast(Session, {pubrel, MsgId}). + +-spec pubcomp(Session :: pid(), MsgId :: mqtt_packet_id()) -> ok. +pubcomp(Session, MsgId) when is_pid(Session) -> + gen_server:cast(Session, {pubcomp, MsgId}). + +%%------------------------------------------------------------------------------ +%% @doc Subscribe Topics +%% @end +%%------------------------------------------------------------------------------ +-spec subscribe(Session :: pid(), [{binary(), mqtt_qos()}]) -> {ok, [mqtt_qos()]}. +subscribe(Session, Topics) when is_pid(Session) -> + gen_server:call(Session, {subscribe, Topics}). + +%%------------------------------------------------------------------------------ +%% @doc Unsubscribe Topics +%% @end +%%------------------------------------------------------------------------------ +-spec unsubscribe(Session :: pid(), [Topic :: binary()]) -> ok. +unsubscribe(Session, Topics) when is_pid(Session) -> + gen_server:call(Session, {unsubscribe, Topics}). + +%%%============================================================================= +%%% gen_server callbacks +%%%============================================================================= +init([CleanSess, ClientId, ClientPid]) -> + if + CleanSess =:= false -> + process_flag(trap_exit, true), + true = link(ClientPid); + CleanSess =:= true -> + ok + end, QEnv = emqttd:env(mqtt, queue), SessEnv = emqttd:env(mqtt, session), - #session{ + PendingQ = emqttd_mqueue:new(ClientId, QEnv), + InflightQ = emqttd_inflight:new(ClientId, emqttd_opts:g(max_inflight, SessEnv)), + Session = #session{ + clean_sess = CleanSess, clientid = ClientId, - clean_sess = true, + client_pid = ClientPid, subscriptions = [], - inflight_window = emqttd_mqwin:new(ClientId, QEnv), - pending_queue = emqttd_mqueue:new(ClientId, QEnv), - awaiting_rel = #{}, + inflight_queue = InflightQ, + pending_queue = PendingQ, + awaiting_rel = #{}, awaiting_ack = #{}, awaiting_comp = #{}, unack_retries = emqttd_opts:g(unack_retries, SessEnv), unack_timeout = emqttd_opts:g(unack_timeout, SessEnv), await_rel_timeout = emqttd_opts:g(await_rel_timeout, SessEnv), max_awaiting_rel = emqttd_opts:g(max_awaiting_rel, SessEnv), - expired_after = emqttd_opts:g(expired_after, SessEnv) * 3600 - }. + expired_after = emqttd_opts:g(expired_after, SessEnv) * 3600, + timestamp = os:timestamp() + }, + {ok, Session, hibernate}. -%%------------------------------------------------------------------------------ -%% @doc Publish message -%% @end -%%------------------------------------------------------------------------------ --spec publish(session() | pid(), mqtt_clientid(), {mqtt_qos(), mqtt_message()}) -> session() | pid(). -publish(Session, ClientId, {?QOS_0, Message}) -> - %% publish qos0 directly - emqttd_pubsub:publish(ClientId, Message), Session; - -publish(Session, ClientId, {?QOS_1, Message}) -> - %% publish qos1 directly, and client will puback - emqttd_pubsub:publish(ClientId, Message), Session; - -publish(Session = #session{awaiting_rel = AwaitingRel, - await_rel_timeout = Timeout, - max_awaiting_rel = MaxLen}, ClientId, - {?QOS_2, Message = #mqtt_message{msgid = MsgId}}) -> - case maps:size(AwaitingRel) >= MaxLen of - true -> lager:error([{clientid, ClientId}], "Session ~s " - " dropped Qos2 message for too many awaiting_rel: ~p", [ClientId, Message]); - false -> - %% store in awaiting_rel - TRef = erlang:send_after(Timeout * 1000, self(), {timeout, awaiting_rel, MsgId}), - Session#session{awaiting_rel = maps:put(MsgId, {Message, TRef}, AwaitingRel)}; - end; -publish(SessPid, ClientId, {?QOS_2, Message}) when is_pid(SessPid) -> - gen_server:cast(SessPid, {publish, ClientId, {?QOS_2, Message}}), SessPid. - -%%------------------------------------------------------------------------------ -%% @doc PubAck message -%% @end -%%------------------------------------------------------------------------------ - --spec puback(session(), {mqtt_packet_type(), mqtt_packet_id()}) -> session(). -puback(Session = #session{clientid = ClientId, awaiting_ack = Awaiting}, {?PUBACK, PacketId}) -> - case maps:is_key(PacketId, Awaiting) of - true -> ok; - false -> lager:warning("Session ~s: PUBACK PacketId '~p' not found!", [ClientId, PacketId]) - end, - Session#session{awaiting_ack = maps:remove(PacketId, Awaiting)}; - -puback(SessPid, {?PUBACK, PacketId}) when is_pid(SessPid) -> - gen_server:cast(SessPid, {puback, {?PUBACK, PacketId}); - -%% PUBREC -puback(Session = #session{clientid = ClientId, - awaiting_ack = AwaitingAck, - awaiting_comp = AwaitingComp}, {?PUBREC, PacketId}) -> - case maps:is_key(PacketId, AwaitingAck) of - true -> ok; - false -> lager:warning("Session ~s: PUBREC PacketId '~p' not found!", [ClientId, PacketId]) - end, - Session#session{awaiting_ack = maps:remove(PacketId, AwaitingAck), - awaiting_comp = maps:put(PacketId, true, AwaitingComp)}; - -puback(SessPid, {?PUBREC, PacketId}) when is_pid(SessPid) -> - gen_server:cast(SessPid, {puback, {?PUBREC, PacketId}); - -%% PUBREL -puback(Session = #session{clientid = ClientId, awaiting_rel = Awaiting}, {?PUBREL, PacketId}) -> - case maps:find(PacketId, Awaiting) of - {ok, {Msg, TRef}} -> - catch erlang:cancel_timer(TRef), - emqttd_pubsub:publish(ClientId, Msg); - error -> - lager:error("Session ~s cannot find PUBREL PacketId '~p'!", [ClientId, PacketId]) - end, - Session#session{awaiting_rel = maps:remove(PacketId, Awaiting)}; - -puback(SessPid, {?PUBREL, PacketId}) when is_pid(SessPid) -> - gen_server:cast(SessPid, {puback, {?PUBREL, PacketId}); - -%% PUBCOMP -puback(Session = #session{clientid = ClientId, - awaiting_comp = AwaitingComp}, {?PUBCOMP, PacketId}) -> - case maps:is_key(PacketId, AwaitingComp) of - true -> ok; - false -> lager:warning("Session ~s: PUBREC PacketId '~p' not exist", [ClientId, PacketId]) - end, - Session#session{awaiting_comp = maps:remove(PacketId, AwaitingComp)}; - -puback(SessPid, {?PUBCOMP, PacketId}) when is_pid(SessPid) -> - gen_server:cast(SessPid, {puback, {?PUBCOMP, PacketId}); - -wait_ack - -timeout(awaiting_rel, MsgId, Session = #session{clientid = ClientId, awaiting_rel = Awaiting}) -> - case maps:find(MsgId, Awaiting) of - {ok, {Msg, _TRef}} -> - lager:error([{client, ClientId}], "Session ~s Awaiting Rel Timout!~nDrop Message:~p", [ClientId, Msg]), - Session#session{awaiting_rel = maps:remove(MsgId, Awaiting)}; - error -> - lager:error([{client, ClientId}], "Session ~s Cannot find Awaiting Rel: MsgId=~p", [ClientId, MsgId]), - Session - end. - -%%------------------------------------------------------------------------------ -%% @doc Subscribe Topics -%% @end -%%------------------------------------------------------------------------------ --spec subscribe(session() | pid(), [{binary(), mqtt_qos()}]) -> {ok, session() | pid(), [mqtt_qos()]}. -subscribe(Session = #session{clientid = ClientId, subscriptions = Subscriptions}, Topics) -> +handle_call({subscribe, Topics}, _From, Session = #session{clientid = ClientId, subscriptions = Subscriptions}) -> %% subscribe first and don't care if the subscriptions have been existed {ok, GrantedQos} = emqttd_pubsub:subscribe(Topics), - lager:info([{client, ClientId}], "Client ~s subscribe ~p. Granted QoS: ~p", + lager:info([{client, ClientId}], "Session ~s subscribe ~p. Granted QoS: ~p", [ClientId, Topics, GrantedQos]), Subscriptions1 = @@ -310,19 +259,9 @@ subscribe(Session = #session{clientid = ClientId, subscriptions = Subscriptions} [{Topic, Qos} | Acc] end end, Subscriptions, Topics), + {reply, {ok, GrantedQos}, Session#session{subscriptions = Subscriptions1}}; - {ok, Session#session{subscriptions = Subscriptions1}, GrantedQos}; - -subscribe(SessPid, Topics) when is_pid(SessPid) -> - {ok, GrantedQos} = gen_server:call(SessPid, {subscribe, Topics}), - {ok, SessPid, GrantedQos}. - -%%------------------------------------------------------------------------------ -%% @doc Unsubscribe Topics -%% @end -%%------------------------------------------------------------------------------ --spec unsubscribe(session() | pid(), [binary()]) -> {ok, session() | pid()}. -unsubscribe(Session = #session{clientid = ClientId, subscriptions = Subscriptions}, Topics) -> +handle_call({unsubscribe, Topics}, _From, Session = #session{clientid = ClientId, subscriptions = Subscriptions}) -> %%unsubscribe from topic tree ok = emqttd_pubsub:unsubscribe(Topics), @@ -338,63 +277,11 @@ unsubscribe(Session = #session{clientid = ClientId, subscriptions = Subscription end end, Subscriptions, Topics), - {ok, Session#session{subscriptions = Subscriptions1}}; + {reply, ok, Session#session{subscriptions = Subscriptions1}}; -unsubscribe(SessPid, Topics) when is_pid(SessPid) -> - gen_server:call(SessPid, {unsubscribe, Topics}), - {ok, SessPid}. - -%%------------------------------------------------------------------------------ -%% @doc Destroy Session -%% @end -%%------------------------------------------------------------------------------ - -% message(qos1) is awaiting ack -await_ack(Msg = #mqtt_message{qos = ?QOS_1}, Session = #session{message_id = MsgId, - inflight_queue = InflightQ, - awaiting_ack = Awaiting, - unack_retry_after = Time, - max_unack_retries = Retries}) -> - %% assign msgid before send - Msg1 = Msg#mqtt_message{msgid = MsgId}, - TRef = erlang:send_after(Time * 1000, self(), {retry, MsgId}), - Awaiting1 = maps:put(MsgId, {TRef, Retries, Time}, Awaiting), - {Msg1, next_msgid(Session#session{inflight_queue = [{MsgId, Msg1} | InflightQ], - awaiting_ack = Awaiting1})}. - -% message(qos2) is awaiting ack -await_ack(Message = #mqtt_message{qos = Qos}, Session = #session{message_id = MsgId, awaiting_ack = Awaiting},) - when (Qos =:= ?QOS_1) orelse (Qos =:= ?QOS_2) -> - %%assign msgid before send - Message1 = Message#mqtt_message{msgid = MsgId, dup = false}, - Message2 = - if - Qos =:= ?QOS_2 -> Message1#mqtt_message{dup = false}; - true -> Message1 - end, - Awaiting1 = maps:put(MsgId, Message2, Awaiting), - {Message1, next_msgid(Session#session{awaiting_ack = Awaiting1})}. - - -%%%============================================================================= -%%% gen_server callbacks -%%%============================================================================= -init([ClientId, ClientPid]) -> - process_flag(trap_exit, true), - true = link(ClientPid), - Session = emqttd_session:new(ClientId), - {ok, Session#session{clean_sess = false, - client_pid = ClientPid, - timestamp = os:timestamp()}, hibernate}. - - -handle_call({subscribe, Topics}, _From, Session) -> - {ok, NewSession, GrantedQos} = subscribe(Session, Topics), - {reply, {ok, GrantedQos}, NewSession}; - -handle_call({unsubscribe, Topics}, _From, Session) -> - {ok, NewSession} = unsubscribe(Session, Topics), - {reply, ok, NewSession}; +handle_call({destroy, ClientId}, _From, Session = #session{clientid = ClientId}) -> + lager:warning("Session ~s destroyed", [ClientId]), + {stop, {shutdown, destroy}, ok, Session}; handle_call(Req, _From, State) -> lager:error("Unexpected Request: ~p", [Req]), @@ -403,10 +290,10 @@ handle_call(Req, _From, State) -> handle_cast({resume, ClientId, ClientPid}, State = #session{ clientid = ClientId, client_pid = OldClientPid, - msg_queue = Queue, + pending_queue = Queue, awaiting_ack = AwaitingAck, awaiting_comp = AwaitingComp, - expire_timer = ETimer}) -> + expired_timer = ETimer}) -> lager:info([{client, ClientId}], "Session ~s resumed by ~p",[ClientId, ClientPid]), @@ -426,8 +313,8 @@ handle_cast({resume, ClientId, ClientPid}, State = #session{ emqttd_util:cancel_timer(ETimer), %% redelivery PUBREL - lists:foreach(fun(PacketId) -> - ClientPid ! {redeliver, {?PUBREL, PacketId}} + lists:foreach(fun(MsgId) -> + ClientPid ! {redeliver, {?PUBREL, MsgId}} end, maps:keys(AwaitingComp)), %% redelivery messages that awaiting PUBACK or PUBREC @@ -442,45 +329,114 @@ handle_cast({resume, ClientId, ClientPid}, State = #session{ end, emqttd_queue:all(Queue)), {noreply, State#session{client_pid = ClientPid, - msg_queue = emqttd_queue:clear(Queue), - expire_timer = undefined}, hibernate}; + %%TODO: + pending_queue = emqttd_queue:clear(Queue), + expired_timer = undefined}, hibernate}; -handle_cast({publish, ClientId, {?QOS_2, Message}}, Session) -> - {noreply, publish(Session, ClientId, {?QOS_2, Message})}; +handle_cast({publish, Message = #mqtt_message{qos = ?QOS_2}}, Session) -> + {noreply, publish_qos2(Message, Session)}; -handle_cast({puback, {PubAck, PacketId}, Session) -> - {noreply, puback(Session, {PubAck, PacketId})}; -handle_cast({destroy, ClientId}, Session = #session{clientid = ClientId}) -> - lager:warning("Session ~s destroyed", [ClientId]), - {stop, normal, Session}; +handle_cast({puback, MsgId}, Session = #session{clientid = ClientId, inflight_queue = Q, awaiting_ack = Awaiting}) -> + case maps:find(MsgId, Awaiting) of + {ok, {_, TRef}} -> + catch erlang:cancel_timer(TRef), + {noreply, dispatch(Session#session{inflight_queue = emqttd_inflight:ack(MsgId, Q), + awaiting_ack = maps:remove(MsgId, Awaiting)})}; + error -> + lager:error("Session ~s cannot find PUBACK '~p'!", [ClientId, MsgId]), + {noreply, Session} + end; + +%% PUBREC +handle_cast({pubrec, MsgId}, Session = #session{clientid = ClientId, + awaiting_ack = AwaitingAck, + awaiting_comp = AwaitingComp, + await_rel_timeout = Timeout}) -> + case maps:find(MsgId, AwaitingAck) of + {ok, {_, TRef}} -> + catch erlang:cancel_timer(TRef), + TRef1 = timer(Timeout, {timeout, awaiting_comp, MsgId}), + {noreply, dispatch(Session#session{awaiting_ack = maps:remove(MsgId, AwaitingAck), + awaiting_comp = maps:put(MsgId, TRef1, AwaitingComp)})}; + error -> + lager:error("Session ~s cannot find PUBREC '~p'!", [ClientId, MsgId]), + {noreply, Session} + end; + +handle_cast({pubrel, MsgId}, Session = #session{clientid = ClientId, awaiting_rel = Awaiting}) -> + case maps:find(MsgId, Awaiting) of + {ok, {Msg, TRef}} -> + catch erlang:cancel_timer(TRef), + emqttd_pubsub:publish(Msg), + {noreply, Session#session{awaiting_rel = maps:remove(MsgId, Awaiting)}}; + error -> + lager:error("Session ~s cannot find PUBREL'~p'!", [ClientId, MsgId]), + {noreply, Session} + end; + +%% PUBCOMP +handle_cast({pubcomp, MsgId}, Session = #session{clientid = ClientId, awaiting_comp = AwaitingComp}) -> + case maps:is_key(MsgId, AwaitingComp) of + true -> + {noreply, Session#session{awaiting_comp = maps:remove(MsgId, AwaitingComp)}}; + false -> + lager:error("Session ~s cannot find PUBREC MsgId '~p'", [ClientId, MsgId]), + {noreply, Session} + end; handle_cast(Msg, State) -> - lager:critical("Unexpected Msg: ~p, State: ~p", [Msg, State]), + lager:critical("Unexpected Msg: ~p, State: ~p", [Msg, State]), {noreply, State}. -handle_info({dispatch, {_From, Messages}}, Session) when is_list(Messages) -> - F = fun(Message, S) -> dispatch(Message, S) end, - {noreply, lists:foldl(F, Session, Messages)}; +handle_info({dispatch, MsgList}, Session) when is_list(MsgList) -> + NewSession = lists:foldl(fun(Msg, S) -> + dispatch({new, Msg}, S) + end, Session, MsgList), + {noreply, NewSession}; -handle_info({dispatch, {_From, Message}}, State) -> - {noreply, dispatch(Message, State)}; +handle_info({dispatch, {old, Msg}}, Session) when is_record(Msg, mqtt_message) -> + {noreply, dispatch({old, Msg}, Session)}; -handle_info({'EXIT', ClientPid, Reason}, Session = #session{clientid = ClientId, - client_pid = ClientPid}) -> - lager:info("Session: client ~s@~p exited for ~p", [ClientId, ClientPid, Reason]), - {noreply, start_expire_timer(Session#session{client_pid = undefined})}; +handle_info({dispatch, Msg}, Session) when is_record(Msg, mqtt_message) -> + {noreply, dispatch({new, Msg}, Session)}; + +handle_info({'EXIT', ClientPid, Reason}, Session = #session{clean_sess = false, + clientid = ClientId, + client_pid = ClientPid, + expired_after = Expires}) -> + %%TODO: Clean puback, pubrel, pubcomp timers + lager:info("Session ~s: client ~p exited for ~p", [ClientId, ClientPid, Reason]), + TRef = timer(Expires * 1000, session_expired), + {noreply, Session#session{expired_timer = TRef}}; handle_info({'EXIT', ClientPid0, _Reason}, State = #session{client_pid = ClientPid}) -> - lager:error("Unexpected Client EXIT: pid=~p, pid(state): ~p", [ClientPid0, ClientPid]), + lager:critical("Unexpected Client EXIT: pid=~p, pid(state): ~p", [ClientPid0, ClientPid]), {noreply, State}; handle_info(session_expired, State = #session{clientid = ClientId}) -> - lager:warning("Session ~s expired!", [ClientId]), + lager:error("Session ~s expired, shutdown now!", [ClientId]), {stop, {shutdown, expired}, State}; -handle_info({timeout, awaiting_rel, MsgId}, Session) -> - {noreply, timeout(awaiting_rel, MsgId, Session)}; +handle_info({timeout, awaiting_rel, MsgId}, Session = #session{clientid = ClientId, awaiting_rel = Awaiting}) -> + case maps:find(MsgId, Awaiting) of + {ok, {Msg, _TRef}} -> + lager:error([{client, ClientId}], "Session ~s Awaiting Rel Timout!~nDrop Message:~p", [ClientId, Msg]), + {noreply, Session#session{awaiting_rel = maps:remove(MsgId, Awaiting)}}; + error -> + lager:error([{client, ClientId}], "Session ~s Cannot find Awaiting Rel: MsgId=~p", [ClientId, MsgId]), + {noreply, Session} + end; + +handle_info({timeout, awaiting_comp, MsgId}, Session = #session{clientid = ClientId, awaiting_comp = Awaiting}) -> + case maps:find(MsgId, Awaiting) of + {ok, _TRef} -> + lager:error([{client, ClientId}], "Session ~s Awaiting PUBCOMP Timout: MsgId=~p!", [ClientId, MsgId]), + {noreply, Session#session{awaiting_comp = maps:remove(MsgId, Awaiting)}}; + error -> + lager:error([{client, ClientId}], "Session ~s Cannot find Awaiting PUBCOMP: MsgId=~p", [ClientId, MsgId]), + {noreply, Session} + end; handle_info(Info, Session) -> lager:critical("Unexpected Info: ~p, Session: ~p", [Info, Session]), @@ -492,53 +448,106 @@ terminate(_Reason, _Session) -> code_change(_OldVsn, Session, _Extra) -> {ok, Session}. +%%%============================================================================= +%%% Internal functions +%%%============================================================================= + +%%------------------------------------------------------------------------------ +%% @private +%% @doc Plubish Qos2 message from client -> broker, and then wait for pubrel. +%% @end +%%------------------------------------------------------------------------------ + +publish_qos2(Message = #mqtt_message{qos = ?QOS_2,msgid = MsgId}, Session = #session{clientid = ClientId, + awaiting_rel = AwaitingRel, + await_rel_timeout = Timeout}) -> + + case check_awaiting_rel(Session) of + true -> + TRef = timer(Timeout, {timeout, awaiting_rel, MsgId}), + Session#session{awaiting_rel = maps:put(MsgId, {Message, TRef}, AwaitingRel)}; + false -> + lager:error([{clientid, ClientId}], "Session ~s " + " dropped Qos2 message for too many awaiting_rel: ~p", [ClientId, Message]), + Session + end. + +check_awaiting_rel(#session{max_awaiting_rel = 0}) -> + true; +check_awaiting_rel(#session{awaiting_rel = AwaitingRel, + max_awaiting_rel = MaxLen}) -> + maps:size(AwaitingRel) < MaxLen. + %%%============================================================================= %%% Dispatch message from broker -> client. %%%============================================================================= +dispatch(Session = #session{client_pid = undefined}) -> + %% do nothing + Session; + +dispatch(Session = #session{pending_queue = PendingQ}) -> + case emqttd_mqueue:out(PendingQ) of + {empty, _Q} -> + Session; + {{value, Msg}, Q1} -> + self() ! {dispatch, {old, Msg}}, + Session#session{pending_queue = Q1} + end. + %% queued the message if client is offline -dispatch(Msg, Session = #session{client_pid = undefined}) -> - queue(Msg, Session); +dispatch({Type, Msg}, Session = #session{client_pid = undefined, + pending_queue= PendingQ}) -> + Session#session{pending_queue = emqttd_mqueue:in({Type, Msg}, PendingQ)}; %% dispatch qos0 directly to client process -dispatch(Msg = #mqtt_message{qos = ?QOS_0}, Session = #session{client_pid = ClientPid}) -> - ClientPid ! {dispatch, {self(), Msg}}, Session; +dispatch({_Type, Msg} = #mqtt_message{qos = ?QOS_0}, Session = #session{client_pid = ClientPid}) -> + ClientPid ! {deliver, Msg}, Session; -%% dispatch qos1/2 messages and wait for puback -dispatch(Msg = #mqtt_message{qos = Qos}, Session = #session{clientid = ClientId, - message_id = MsgId, - pending_queue = Q, - inflight_window = Win}) - when (Qos =:= ?QOS_1) orelse (Qos =:= ?QOS_2) -> - - case emqttd_mqwin:is_full(InflightWin) of - true -> - lager:error("Session ~s inflight window is full!", [ClientId]), - Session#session{pending_queue = emqttd_mqueue:in(Msg, Q)}; - false -> - Msg1 = Msg#mqtt_message{msgid = MsgId}, - Msg2 = - if - Qos =:= ?QOS_2 -> Msg1#mqtt_message{dup = false}; - true -> Msg1 - end, - ClientPid ! {dispatch, {self(), Msg2}}, - NewWin = emqttd_mqwin:in(Msg2, Win), - await_ack(Msg2, next_msgid(Session#session{inflight_window = NewWin})) +%% dispatch qos1/2 message and wait for puback +dispatch({Type, Msg = #mqtt_message{qos = Qos}}, Session = #session{clientid = ClientId, + client_pid = ClientPid, + message_id = MsgId, + pending_queue = PendingQ, + inflight_queue= InflightQ}) + when Qos =:= ?QOS_1 orelse Qos =:= ?QOS_2 -> + %% assign id first + Msg1 = Msg#mqtt_message{msgid = MsgId}, + Msg2 = + if + Qos =:= ?QOS_1 -> Msg1; + Qos =:= ?QOS_2 -> Msg1#mqtt_message{dup = false} + end, + case emqttd_inflight:in(Msg1, InflightQ) of + {error, full} -> + lager:error("Session ~s inflight queue is full!", [ClientId]), + Session#session{pending_queue = emqttd_mqueue:in({Type, Msg}, PendingQ)}; + {ok, InflightQ1} -> + ClientPid ! {deliver, Msg1}, + await_ack(Msg1, next_msgid(Session#session{inflight_queue = InflightQ1})) end. -queue(Msg, Session = #session{pending_queue= Queue}) -> - Session#session{pending_queue = emqttd_mqueue:in(Msg, Queue)}. +deliver(Msg, Session) -> + ok. -next_msgid(State = #session{message_id = 16#ffff}) -> - State#session{message_id = 1}; +await(Msg, Session) -> + ok. -next_msgid(State = #session{message_id = MsgId}) -> - State#session{message_id = MsgId + 1}. +% message(qos1/2) is awaiting ack +await_ack(Msg = #mqtt_message{msgid = MsgId}, Session = #session{awaiting_ack = Awaiting, + unack_retries = Retries, + unack_timeout = Timeout}) -> + + TRef = timer(Timeout * 1000, {retry, MsgId}), + Awaiting1 = maps:put(MsgId, {{Retries, Timeout}, TRef}, Awaiting), + Session#session{awaiting_ack = Awaiting1}. -start_expire_timer(Session = #session{expired_after = Expires, - expired_timer = OldTimer}) -> - emqttd_util:cancel_timer(OldTimer), - Timer = erlang:send_after(Expires * 1000, self(), session_expired), - Session#session{expired_timer = Timer}. +timer(Timeout, TimeoutMsg) -> + erlang:send_after(Timeout * 1000, self(), TimeoutMsg). + +next_msgid(Session = #session{message_id = 16#ffff}) -> + Session#session{message_id = 1}; + +next_msgid(Session = #session{message_id = MsgId}) -> + Session#session{message_id = MsgId + 1}. diff --git a/apps/emqttd/src/emqttd_session_sup.erl b/apps/emqttd/src/emqttd_session_sup.erl index d4f790847..9fe7f8ca3 100644 --- a/apps/emqttd/src/emqttd_session_sup.erl +++ b/apps/emqttd/src/emqttd_session_sup.erl @@ -30,7 +30,7 @@ -behavior(supervisor). --export([start_link/0, start_session/2]). +-export([start_link/0, start_session/3]). -export([init/1]). @@ -46,16 +46,17 @@ start_link() -> %% @doc Start a session %% @end %%------------------------------------------------------------------------------ --spec start_session(binary(), pid()) -> {ok, pid()}. -start_session(ClientId, ClientPid) -> - supervisor:start_child(?MODULE, [ClientId, ClientPid]). +-spec start_session(boolean(), binary(), pid()) -> {ok, pid()}. +start_session(CleanSess, ClientId, ClientPid) -> + supervisor:start_child(?MODULE, [CleanSess, ClientId, ClientPid]). %%%============================================================================= %%% Supervisor callbacks %%%============================================================================= init([]) -> - {ok, {{simple_one_for_one, 10, 10}, - [{session, {emqttd_session_proc, start_link, []}, - transient, 10000, worker, [emqttd_session_proc]}]}}. + {ok, {{simple_one_for_one, 0, 1}, + [{session, {emqttd_session, start_link, []}, + transient, 10000, worker, [emqttd_session]}]}}. + diff --git a/apps/emqttd/src/emqttd_sm.erl b/apps/emqttd/src/emqttd_sm.erl index 56c0ea9f9..f1e7041f2 100644 --- a/apps/emqttd/src/emqttd_sm.erl +++ b/apps/emqttd/src/emqttd_sm.erl @@ -90,20 +90,10 @@ table() -> ?SESSION_TAB. %% @end %%------------------------------------------------------------------------------ --spec start_session(CleanSess :: boolean(), binary()) -> {ok, module(), record() | pid()}. -start_session(true, ClientId) -> - %% destroy old session if existed - ok = destroy_session(ClientId), - {ok, emqttd_session, emqttd_session:new(ClientId)}; - -start_session(false, ClientId) -> - SmPid = gproc_pool:pick_worker(?SM_POOL, ClientId), - case call(SmPid, {start_session, ClientId, self()}) of - {ok, SessPid} -> - {ok, emqttd_session_proc, SessPid}; - {error, Error} -> - {error, Error} - end. +-spec start_session(CleanSess :: boolean(), binary()) -> {ok, pid()} | {error, any()}. +start_session(CleanSess, ClientId) -> + SM = gproc_pool:pick_worker(?SM_POOL, ClientId), + call(SM, {start_session, {CleanSess, ClientId, self()}}). %%------------------------------------------------------------------------------ %% @doc Lookup Session Pid @@ -122,10 +112,10 @@ lookup_session(ClientId) -> %%------------------------------------------------------------------------------ -spec destroy_session(binary()) -> ok. destroy_session(ClientId) -> - SmPid = gproc_pool:pick_worker(?SM_POOL, ClientId), - call(SmPid, {destroy_session, ClientId}). + SM = gproc_pool:pick_worker(?SM_POOL, ClientId), + call(SM, {destroy_session, ClientId}). -call(SmPid, Req) -> gen_server:call(SmPid, Req). +call(SM, Req) -> gen_server:call(SM, Req). %%%============================================================================= %%% gen_server callbacks @@ -135,23 +125,27 @@ init([Id, StatsFun]) -> gproc_pool:connect_worker(?SM_POOL, {?MODULE, Id}), {ok, #state{id = Id, statsfun = StatsFun}}. -handle_call({start_session, ClientId, ClientPid}, _From, State) -> +handle_call({start_session, {false, ClientId, ClientPid}}, _From, State) -> Reply = case ets:lookup(?SESSION_TAB, ClientId) of [{_, SessPid, _MRef}] -> - emqttd_session_proc:resume(SessPid, ClientId, ClientPid), + emqttd_session:resume(SessPid, ClientId, ClientPid), {ok, SessPid}; [] -> - case emqttd_session_sup:start_session(ClientId, ClientPid) of - {ok, SessPid} -> - ets:insert(?SESSION_TAB, {ClientId, SessPid, erlang:monitor(process, SessPid)}), - {ok, SessPid}; - {error, Error} -> - {error, Error} - end + new_session(false, ClientId, ClientPid) end, {reply, Reply, setstats(State)}; +handle_call({start_session, {true, ClientId, ClientPid}}, _From, State) -> + case ets:lookup(?SESSION_TAB, ClientId) of + [{_, SessPid, MRef}] -> + erlang:demonitor(MRef, [flush]), + emqttd_session:destroy_session(SessPid, ClientId); + [] -> + ok + end, + {reply, new_session(true, ClientId, ClientPid), setstats(State)}; + handle_call({destroy_session, ClientId}, _From, State) -> case ets:lookup(?SESSION_TAB, ClientId) of [{_, SessPid, MRef}] -> @@ -186,6 +180,16 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================= +new_session(CleanSess, ClientId, ClientPid) -> + case emqttd_session_sup:start_session(CleanSess, ClientId, ClientPid) of + {ok, SessPid} -> + ets:insert(?SESSION_TAB, {ClientId, SessPid, erlang:monitor(process, SessPid)}), + {ok, SessPid}; + {error, Error} -> + {error, Error} + end. + setstats(State = #state{statsfun = StatsFun}) -> StatsFun(ets:info(?SESSION_TAB, size)), State. + diff --git a/apps/emqttd/src/emqttd_ws_client.erl b/apps/emqttd/src/emqttd_ws_client.erl index 52a7ba7d9..ef7a3ad41 100644 --- a/apps/emqttd/src/emqttd_ws_client.erl +++ b/apps/emqttd/src/emqttd_ws_client.erl @@ -130,15 +130,8 @@ handle_cast({received, Packet}, State = #state{proto_state = ProtoState}) -> handle_cast(_Msg, State) -> {noreply, State}. -handle_info({dispatch, {From, Messages}}, #state{proto_state = ProtoState} = State) when is_list(Messages) -> - ProtoState1 = - lists:foldl(fun(Message, PState) -> - {ok, PState1} = emqttd_protocol:send({From, Message}, PState), PState1 - end, ProtoState, Messages), - {noreply, State#state{proto_state = ProtoState1}}; - -handle_info({dispatch, {From, Message}}, #state{proto_state = ProtoState} = State) -> - {ok, ProtoState1} = emqttd_protocol:send({From, Message}, ProtoState), +handle_info({deliver, Message}, #state{proto_state = ProtoState} = State) -> + {ok, ProtoState1} = emqttd_protocol:send(Message, ProtoState), {noreply, State#state{proto_state = ProtoState1}}; handle_info({redeliver, {?PUBREL, PacketId}}, #state{proto_state = ProtoState} = State) -> diff --git a/rel/files/emqttd.config b/rel/files/emqttd.config index 9995ddea1..e72a06862 100644 --- a/rel/files/emqttd.config +++ b/rel/files/emqttd.config @@ -90,6 +90,10 @@ %% Expired after 2 days {expired_after, 48}, + %% Max number of QoS 1 and 2 messages that can be “in flight” at one time. + %% 0 means no limit + {max_inflight, 100}, + %% Max retries for unack Qos1/2 messages {unack_retries, 3}, @@ -99,16 +103,14 @@ %% Awaiting PUBREL Timeout {await_rel_timeout, 8}, - %% Max Packets that Awaiting PUBREL - {max_awaiting_rel, 100} + %% Max Packets that Awaiting PUBREL, 0 means no limit + {max_awaiting_rel, 0} + ]}, {queue, [ %% Max queue length {max_length, 1000}, - %% Max number of QoS 1 and 2 messages that can be “in flight” at one time. - {inflight_window, 100}, - %% Low watermark of queued messsages {low_watermark, 0.2},