diff --git a/apps/emqttd/src/emqttd_session.erl b/apps/emqttd/src/emqttd_session.erl index 44bc76bb4..4a1f44309 100644 --- a/apps/emqttd/src/emqttd_session.erl +++ b/apps/emqttd/src/emqttd_session.erl @@ -20,8 +20,26 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc +%%% %%% emqttd session. %%% +%%% Session State in the broker consists of: +%%% +%%% 1. The Client’s subscriptions. +%%% +%%% 2. inflight qos1, qos2 messages sent to the client but unacked, QoS 1 and QoS 2 +%%% messages which have been sent to the Client, but have not been completely +%%% acknowledged. +%%% +%%% 3. inflight qos2 messages received from client and waiting for pubrel. QoS 2 +%%% messages which have been received from the Client, but have not been +%%% completely acknowledged. +%%% +%%% 4. all qos1, qos2 messages published to when client is disconnected. +%%% QoS 1 and QoS 2 messages pending transmission to the Client. +%%% +%%% 5. Optionally, QoS 0 messages pending transmission to the Client. +%%% %%% @end %%%----------------------------------------------------------------------------- @@ -35,8 +53,6 @@ -include_lib("emqtt/include/emqtt_packet.hrl"). --define(SessProc, emqttd_session_proc). - %% Session Managenent APIs -export([start/1, resume/3, @@ -104,6 +120,10 @@ -type session() :: #session{}. +-export_type([session/0]). + +-define(SESSION(Sess), is_record(Sess, session)). + %%%============================================================================= %%% Session API %%%============================================================================= @@ -116,23 +136,15 @@ start({true = _CleanSess, ClientId, _ClientPid}) -> %%Destroy old session if CleanSess is true before. ok = emqttd_sm:destroy_session(ClientId), - {ok, initial_state(ClientId)}; - -start({false = _CleanSess, ClientId, ClientPid}) -> - {ok, SessPid} = emqttd_sm:start_session(ClientId, ClientPid), - {ok, SessPid}. + {ok, initial_state(ClientId)}. %%------------------------------------------------------------------------------ %% @doc Resume Session %% @end %%------------------------------------------------------------------------------ -spec resume(session(), binary(), pid()) -> session(). -resume(SessState = #session{}, _ClientId, _ClientPid) -> - SessState; -resume(SessPid, ClientId, ClientPid) when is_pid(SessPid) -> - ?SessProc: - gen_server:cast(SessPid, {resume, ClientId, ClientPid}), - SessPid. +resume(Session = #session{}, _ClientId, _ClientPid) -> + Session. %%------------------------------------------------------------------------------ %% @doc Publish message @@ -145,47 +157,38 @@ publish(Session, ClientId, {?QOS_0, Message}) -> publish(Session, ClientId, {?QOS_1, Message}) -> emqttd_pubsub:publish(ClientId, Message), Session; -publish(SessState = #session{awaiting_rel = AwaitingRel, - await_rel_timeout = Timeout}, _ClientId, +publish(Session = #session{awaiting_rel = AwaitingRel, + await_rel_timeout = Timeout}, _ClientId, {?QOS_2, Message = #mqtt_message{msgid = MsgId}}) -> %% store in awaiting_rel TRef = erlang:send_after(Timeout * 1000, self(), {timeout, awaiting_rel, MsgId}), - SessState#session{awaiting_rel = maps:put(MsgId, {Message, TRef}, AwaitingRel)}; - -publish(SessPid, ClientId, {?QOS_2, Message}) when is_pid(SessPid) -> - gen_server:cast(SessPid, {publish, ClientId, {?QOS_2, Message}}), - SessPid. + Session#session{awaiting_rel = maps:put(MsgId, {Message, TRef}, AwaitingRel)}. %%------------------------------------------------------------------------------ %% @doc PubAck message %% @end %%------------------------------------------------------------------------------ -spec puback(session(), {mqtt_packet_type(), mqtt_packet_id()}) -> session(). -puback(SessState = #session{clientid = ClientId, awaiting_ack = Awaiting}, {?PUBACK, PacketId}) -> +puback(Session = #session{clientid = ClientId, awaiting_ack = Awaiting}, {?PUBACK, PacketId}) -> case maps:is_key(PacketId, Awaiting) of true -> ok; false -> lager:warning("Session ~s: PUBACK PacketId '~p' not found!", [ClientId, PacketId]) end, - SessState#session{awaiting_ack = maps:remove(PacketId, Awaiting)}; -puback(SessPid, {?PUBACK, PacketId}) when is_pid(SessPid) -> - gen_server:cast(SessPid, {puback, PacketId}), SessPid; + Session#session{awaiting_ack = maps:remove(PacketId, Awaiting)}; %% PUBREC -puback(SessState = #session{clientid = ClientId, +puback(Session = #session{clientid = ClientId, awaiting_ack = AwaitingAck, awaiting_comp = AwaitingComp}, {?PUBREC, PacketId}) -> case maps:is_key(PacketId, AwaitingAck) of true -> ok; false -> lager:warning("Session ~s: PUBREC PacketId '~p' not found!", [ClientId, PacketId]) end, - SessState#session{awaiting_ack = maps:remove(PacketId, AwaitingAck), + Session#session{awaiting_ack = maps:remove(PacketId, AwaitingAck), awaiting_comp = maps:put(PacketId, true, AwaitingComp)}; -puback(SessPid, {?PUBREC, PacketId}) when is_pid(SessPid) -> - gen_server:cast(SessPid, {pubrec, PacketId}), SessPid; - %% PUBREL -puback(SessState = #session{clientid = ClientId, +puback(Session = #session{clientid = ClientId, awaiting_rel = Awaiting}, {?PUBREL, PacketId}) -> case maps:find(PacketId, Awaiting) of {ok, {Msg, TRef}} -> @@ -194,31 +197,25 @@ puback(SessState = #session{clientid = ClientId, error -> lager:error("Session ~s PUBREL PacketId '~p' not found!", [ClientId, PacketId]) end, - SessState#session{awaiting_rel = maps:remove(PacketId, Awaiting)}; - -puback(SessPid, {?PUBREL, PacketId}) when is_pid(SessPid) -> - gen_server:cast(SessPid, {pubrel, PacketId}), SessPid; + Session#session{awaiting_rel = maps:remove(PacketId, Awaiting)}; %% PUBCOMP -puback(SessState = #session{clientid = ClientId, +puback(Session = #session{clientid = ClientId, awaiting_comp = AwaitingComp}, {?PUBCOMP, PacketId}) -> case maps:is_key(PacketId, AwaitingComp) of true -> ok; false -> lager:warning("Session ~s: PUBREC PacketId '~p' not exist", [ClientId, PacketId]) end, - SessState#session{awaiting_comp = maps:remove(PacketId, AwaitingComp)}; + Session#session{awaiting_comp = maps:remove(PacketId, AwaitingComp)}; -puback(SessPid, {?PUBCOMP, PacketId}) when is_pid(SessPid) -> - gen_server:cast(SessPid, {pubcomp, PacketId}), SessPid. - -timeout(awaiting_rel, MsgId, SessState = #session{clientid = ClientId, awaiting_rel = Awaiting}) -> +timeout(awaiting_rel, MsgId, Session = #session{clientid = ClientId, awaiting_rel = Awaiting}) -> case maps:find(MsgId, Awaiting) of {ok, {Msg, _TRef}} -> lager:error([{client, ClientId}], "Session ~s Awaiting Rel Timout!~nDrop Message:~p", [ClientId, Msg]), - SessState#session{awaiting_rel = maps:remove(MsgId, Awaiting)}; + Session#session{awaiting_rel = maps:remove(MsgId, Awaiting)}; error -> lager:error([{client, ClientId}], "Session ~s Cannot find Awaiting Rel: MsgId=~p", [ClientId, MsgId]), - SessState + Session end. %%------------------------------------------------------------------------------ @@ -226,7 +223,7 @@ timeout(awaiting_rel, MsgId, SessState = #session{clientid = ClientId, awaiting_ %% @end %%------------------------------------------------------------------------------ -spec subscribe(session(), [{binary(), mqtt_qos()}]) -> {ok, session(), [mqtt_qos()]}. -subscribe(SessState = #session{clientid = ClientId, subscriptions = Subscriptions}, Topics) -> +subscribe(Session = #session{clientid = ClientId, subscriptions = Subscriptions}, Topics) -> %% subscribe first and don't care if the subscriptions have been existed {ok, GrantedQos} = emqttd_pubsub:subscribe(Topics), @@ -253,18 +250,14 @@ subscribe(SessState = #session{clientid = ClientId, subscriptions = Subscription end end, Subscriptions, Topics), - {ok, SessState#session{subscriptions = Subscriptions1}, GrantedQos}; - -subscribe(SessPid, Topics) when is_pid(SessPid) -> - {ok, GrantedQos} = gen_server:call(SessPid, {subscribe, Topics}), - {ok, SessPid, GrantedQos}. + {ok, Session#session{subscriptions = Subscriptions1}, GrantedQos}; %%------------------------------------------------------------------------------ %% @doc Unsubscribe Topics %% @end %%------------------------------------------------------------------------------ -spec unsubscribe(session(), [binary()]) -> {ok, session()}. -unsubscribe(SessState = #session{clientid = ClientId, subscriptions = Subscriptions}, Topics) -> +unsubscribe(Session = #session{clientid = ClientId, subscriptions = Subscriptions}, Topics) -> %%unsubscribe from topic tree ok = emqttd_pubsub:unsubscribe(Topics), @@ -280,22 +273,15 @@ unsubscribe(SessState = #session{clientid = ClientId, subscriptions = Subscripti end end, Subscriptions, Topics), - {ok, SessState#session{subscriptions = Subscriptions1}}; - -unsubscribe(SessPid, Topics) when is_pid(SessPid) -> - gen_server:call(SessPid, {unsubscribe, Topics}), - {ok, SessPid}. + {ok, Session#session{subscriptions = Subscriptions1}}; %%------------------------------------------------------------------------------ %% @doc Destroy Session %% @end %%------------------------------------------------------------------------------ --spec destroy(SessPid :: pid(), ClientId :: binary()) -> ok. -destroy(SessPid, ClientId) when is_pid(SessPid) -> - gen_server:cast(SessPid, {destroy, ClientId}). % message(qos1) is awaiting ack -await_ack(Msg = #mqtt_message{qos = ?QOS_1}, SessState = #session{message_id = MsgId, +await_ack(Msg = #mqtt_message{qos = ?QOS_1}, Session = #session{message_id = MsgId, inflight_queue = InflightQ, awaiting_ack = Awaiting, unack_retry_after = Time, @@ -304,11 +290,11 @@ await_ack(Msg = #mqtt_message{qos = ?QOS_1}, SessState = #session{message_id = M Msg1 = Msg#mqtt_message{msgid = MsgId}, TRef = erlang:send_after(Time * 1000, self(), {retry, MsgId}), Awaiting1 = maps:put(MsgId, {TRef, Retries, Time}, Awaiting), - {Msg1, next_msgid(SessState#session{inflight_queue = [{MsgId, Msg1} | InflightQ], + {Msg1, next_msgid(Session#session{inflight_queue = [{MsgId, Msg1} | InflightQ], awaiting_ack = Awaiting1})}. % message(qos2) is awaiting ack -await_ack(Message = #mqtt_message{qos = Qos}, SessState = #session{message_id = MsgId, awaiting_ack = Awaiting},) +await_ack(Message = #mqtt_message{qos = Qos}, Session = #session{message_id = MsgId, awaiting_ack = Awaiting},) when (Qos =:= ?QOS_1) orelse (Qos =:= ?QOS_2) -> %%assign msgid before send Message1 = Message#mqtt_message{msgid = MsgId, dup = false}, @@ -318,7 +304,7 @@ await_ack(Message = #mqtt_message{qos = Qos}, SessState = #session{message_id = true -> Message1 end, Awaiting1 = maps:put(MsgId, Message2, Awaiting), - {Message1, next_msgid(SessState#session{awaiting_ack = Awaiting1})}. + {Message1, next_msgid(Session#session{awaiting_ack = Awaiting1})}. initial_state(ClientId) -> %%TODO: init session options. @@ -334,36 +320,35 @@ initial_state(ClientId, ClientPid) -> State = initial_state(ClientId), State#session{client_pid = ClientPid}. - %%%============================================================================= %%% Internal functions %%%============================================================================= %% client is offline -dispatch(Msg, SessState = #session{client_pid = undefined}) -> - queue(Msg, SessState); +dispatch(Msg, Session = #session{client_pid = undefined}) -> + queue(Msg, Session); %% dispatch qos0 directly -dispatch(Msg = #mqtt_message{qos = ?QOS_0}, SessState = #session{client_pid = ClientPid}) -> - ClientPid ! {dispatch, {self(), Msg}}, SessState; +dispatch(Msg = #mqtt_message{qos = ?QOS_0}, Session = #session{client_pid = ClientPid}) -> + ClientPid ! {dispatch, {self(), Msg}}, Session; %% queue if inflight_queue is full -dispatch(Msg = #mqtt_message{qos = Qos}, SessState = #session{inflight_window = InflightWin, +dispatch(Msg = #mqtt_message{qos = Qos}, Session = #session{inflight_window = InflightWin, inflight_queue = InflightQ}) when (Qos > ?QOS_0) andalso (length(InflightQ) >= InflightWin) -> %%TODO: set alarms lager:error([{clientid, ClientId}], "Session ~s inflight_queue is full!", [ClientId]), - queue(Msg, SessState); + queue(Msg, Session); %% dispatch and await ack -dispatch(Msg = #mqtt_message{qos = Qos}, SessState = #session{client_pid = ClientPid}) +dispatch(Msg = #mqtt_message{qos = Qos}, Session = #session{client_pid = ClientPid}) when (Qos =:= ?QOS_1) orelse (Qos =:= ?QOS_2) -> %% assign msgid and await - {NewMsg, NewState} = await_ack(Msg, SessState), + {NewMsg, NewState} = await_ack(Msg, Session), ClientPid ! {dispatch, {self(), NewMsg}}, -queue(Msg, SessState = #session{pending_queue = Queue}) -> - SessState#session{pending_queue = emqttd_mqueue:in(Msg, Queue)}. +queue(Msg, Session = #session{pending_queue = Queue}) -> + Session#session{pending_queue = emqttd_mqueue:in(Msg, Queue)}. next_msgid(State = #session{message_id = 16#ffff}) -> State#session{message_id = 1}; diff --git a/apps/emqttd/src/emqttd_session_proc.erl b/apps/emqttd/src/emqttd_session_proc.erl index 43b60b75e..04d65563b 100644 --- a/apps/emqttd/src/emqttd_session_proc.erl +++ b/apps/emqttd/src/emqttd_session_proc.erl @@ -20,7 +20,7 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% emqttd session process. +%%% emqttd session process of persistent client. %%% %%% @end %%%----------------------------------------------------------------------------- @@ -42,6 +42,11 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +%% Refactor this API. +start({false = _CleanSess, ClientId, ClientPid}) -> + {ok, SessPid} = emqttd_sm:start_session(ClientId, ClientPid), + {ok, SessPid}. + %%------------------------------------------------------------------------------ %% @doc Start a session process. %% @end @@ -49,6 +54,44 @@ start_link(ClientId, ClientPid) -> gen_server:start_link(?MODULE, [ClientId, ClientPid], []). +resume(SessProc, ClientId, ClientPid) when is_pid(SessProc) -> + cast(SessProc, {resume, ClientId, ClientPid}). + +-spec publish(pid(), mqtt_clientid(), {mqtt_qos(), mqtt_message()}) -> pid(). +publish(SessProc, ClientId, {?QOS_0, Message}) when is_pid(SessProc) -> + emqttd_pubsub:publish(ClientId, Message), Session; + +publish(SessProc, ClientId, {?QOS_1, Message}) when is_pid(SessProc) -> + emqttd_pubsub:publish(ClientId, Message), Session; + +publish(SessProc, ClientId, {?QOS_2, Message}) when is_pid(SessProc) -> + cast(SessProc, {publish, ClientId, {?QOS_2, Message}}). + +puback(SessProc, {?PUBACK, PacketId}) when is_pid(SessProc) -> + cast(SessProc, {puback, PacketId}). + +puback(SessProc, {?PUBREL, PacketId}) when is_pid(SessProc) -> + cast(SessPid, {pubrel, PacketId}). + +puback(SessPid, {?PUBCOMP, PacketId}) when is_pid(SessPid) -> + cast(SessPid, {pubcomp, PacketId}). + +subscribe(SessPid, Topics) when is_pid(SessPid) -> + {ok, GrantedQos} = gen_server:call(SessPid, {subscribe, Topics}), + {ok, SessPid, GrantedQos}. + +unsubscribe(SessPid, Topics) when is_pid(SessPid) -> + gen_server:call(SessPid, {unsubscribe, Topics}), + {ok, SessPid}. + +-spec destroy(SessPid :: pid(), ClientId :: binary()) -> ok. +destroy(SessPid, ClientId) when is_pid(SessPid) -> + gen_server:cast(SessPid, {destroy, ClientId}). + +cast(SessProc, Msg) -> + gen_server:cast(SessProc, Msg), SessProc. + + %%%============================================================================= %%% gen_server callbacks %%%=============================================================================