This commit is contained in:
Feng Lee 2015-06-12 18:38:26 +08:00
parent 68968a572f
commit c293ccab13
2 changed files with 101 additions and 73 deletions

View File

@ -20,8 +20,26 @@
%%% SOFTWARE. %%% SOFTWARE.
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
%%% @doc %%% @doc
%%%
%%% emqttd session. %%% emqttd session.
%%% %%%
%%% Session State in the broker consists of:
%%%
%%% 1. The Clients subscriptions.
%%%
%%% 2. inflight qos1, qos2 messages sent to the client but unacked, QoS 1 and QoS 2
%%% messages which have been sent to the Client, but have not been completely
%%% acknowledged.
%%%
%%% 3. inflight qos2 messages received from client and waiting for pubrel. QoS 2
%%% messages which have been received from the Client, but have not been
%%% completely acknowledged.
%%%
%%% 4. all qos1, qos2 messages published to when client is disconnected.
%%% QoS 1 and QoS 2 messages pending transmission to the Client.
%%%
%%% 5. Optionally, QoS 0 messages pending transmission to the Client.
%%%
%%% @end %%% @end
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
@ -35,8 +53,6 @@
-include_lib("emqtt/include/emqtt_packet.hrl"). -include_lib("emqtt/include/emqtt_packet.hrl").
-define(SessProc, emqttd_session_proc).
%% Session Managenent APIs %% Session Managenent APIs
-export([start/1, -export([start/1,
resume/3, resume/3,
@ -104,6 +120,10 @@
-type session() :: #session{}. -type session() :: #session{}.
-export_type([session/0]).
-define(SESSION(Sess), is_record(Sess, session)).
%%%============================================================================= %%%=============================================================================
%%% Session API %%% Session API
%%%============================================================================= %%%=============================================================================
@ -116,23 +136,15 @@
start({true = _CleanSess, ClientId, _ClientPid}) -> start({true = _CleanSess, ClientId, _ClientPid}) ->
%%Destroy old session if CleanSess is true before. %%Destroy old session if CleanSess is true before.
ok = emqttd_sm:destroy_session(ClientId), ok = emqttd_sm:destroy_session(ClientId),
{ok, initial_state(ClientId)}; {ok, initial_state(ClientId)}.
start({false = _CleanSess, ClientId, ClientPid}) ->
{ok, SessPid} = emqttd_sm:start_session(ClientId, ClientPid),
{ok, SessPid}.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Resume Session %% @doc Resume Session
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec resume(session(), binary(), pid()) -> session(). -spec resume(session(), binary(), pid()) -> session().
resume(SessState = #session{}, _ClientId, _ClientPid) -> resume(Session = #session{}, _ClientId, _ClientPid) ->
SessState; Session.
resume(SessPid, ClientId, ClientPid) when is_pid(SessPid) ->
?SessProc:
gen_server:cast(SessPid, {resume, ClientId, ClientPid}),
SessPid.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Publish message %% @doc Publish message
@ -145,47 +157,38 @@ publish(Session, ClientId, {?QOS_0, Message}) ->
publish(Session, ClientId, {?QOS_1, Message}) -> publish(Session, ClientId, {?QOS_1, Message}) ->
emqttd_pubsub:publish(ClientId, Message), Session; emqttd_pubsub:publish(ClientId, Message), Session;
publish(SessState = #session{awaiting_rel = AwaitingRel, publish(Session = #session{awaiting_rel = AwaitingRel,
await_rel_timeout = Timeout}, _ClientId, await_rel_timeout = Timeout}, _ClientId,
{?QOS_2, Message = #mqtt_message{msgid = MsgId}}) -> {?QOS_2, Message = #mqtt_message{msgid = MsgId}}) ->
%% store in awaiting_rel %% store in awaiting_rel
TRef = erlang:send_after(Timeout * 1000, self(), {timeout, awaiting_rel, MsgId}), TRef = erlang:send_after(Timeout * 1000, self(), {timeout, awaiting_rel, MsgId}),
SessState#session{awaiting_rel = maps:put(MsgId, {Message, TRef}, AwaitingRel)}; Session#session{awaiting_rel = maps:put(MsgId, {Message, TRef}, AwaitingRel)}.
publish(SessPid, ClientId, {?QOS_2, Message}) when is_pid(SessPid) ->
gen_server:cast(SessPid, {publish, ClientId, {?QOS_2, Message}}),
SessPid.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc PubAck message %% @doc PubAck message
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec puback(session(), {mqtt_packet_type(), mqtt_packet_id()}) -> session(). -spec puback(session(), {mqtt_packet_type(), mqtt_packet_id()}) -> session().
puback(SessState = #session{clientid = ClientId, awaiting_ack = Awaiting}, {?PUBACK, PacketId}) -> puback(Session = #session{clientid = ClientId, awaiting_ack = Awaiting}, {?PUBACK, PacketId}) ->
case maps:is_key(PacketId, Awaiting) of case maps:is_key(PacketId, Awaiting) of
true -> ok; true -> ok;
false -> lager:warning("Session ~s: PUBACK PacketId '~p' not found!", [ClientId, PacketId]) false -> lager:warning("Session ~s: PUBACK PacketId '~p' not found!", [ClientId, PacketId])
end, end,
SessState#session{awaiting_ack = maps:remove(PacketId, Awaiting)}; Session#session{awaiting_ack = maps:remove(PacketId, Awaiting)};
puback(SessPid, {?PUBACK, PacketId}) when is_pid(SessPid) ->
gen_server:cast(SessPid, {puback, PacketId}), SessPid;
%% PUBREC %% PUBREC
puback(SessState = #session{clientid = ClientId, puback(Session = #session{clientid = ClientId,
awaiting_ack = AwaitingAck, awaiting_ack = AwaitingAck,
awaiting_comp = AwaitingComp}, {?PUBREC, PacketId}) -> awaiting_comp = AwaitingComp}, {?PUBREC, PacketId}) ->
case maps:is_key(PacketId, AwaitingAck) of case maps:is_key(PacketId, AwaitingAck) of
true -> ok; true -> ok;
false -> lager:warning("Session ~s: PUBREC PacketId '~p' not found!", [ClientId, PacketId]) false -> lager:warning("Session ~s: PUBREC PacketId '~p' not found!", [ClientId, PacketId])
end, end,
SessState#session{awaiting_ack = maps:remove(PacketId, AwaitingAck), Session#session{awaiting_ack = maps:remove(PacketId, AwaitingAck),
awaiting_comp = maps:put(PacketId, true, AwaitingComp)}; awaiting_comp = maps:put(PacketId, true, AwaitingComp)};
puback(SessPid, {?PUBREC, PacketId}) when is_pid(SessPid) ->
gen_server:cast(SessPid, {pubrec, PacketId}), SessPid;
%% PUBREL %% PUBREL
puback(SessState = #session{clientid = ClientId, puback(Session = #session{clientid = ClientId,
awaiting_rel = Awaiting}, {?PUBREL, PacketId}) -> awaiting_rel = Awaiting}, {?PUBREL, PacketId}) ->
case maps:find(PacketId, Awaiting) of case maps:find(PacketId, Awaiting) of
{ok, {Msg, TRef}} -> {ok, {Msg, TRef}} ->
@ -194,31 +197,25 @@ puback(SessState = #session{clientid = ClientId,
error -> error ->
lager:error("Session ~s PUBREL PacketId '~p' not found!", [ClientId, PacketId]) lager:error("Session ~s PUBREL PacketId '~p' not found!", [ClientId, PacketId])
end, end,
SessState#session{awaiting_rel = maps:remove(PacketId, Awaiting)}; Session#session{awaiting_rel = maps:remove(PacketId, Awaiting)};
puback(SessPid, {?PUBREL, PacketId}) when is_pid(SessPid) ->
gen_server:cast(SessPid, {pubrel, PacketId}), SessPid;
%% PUBCOMP %% PUBCOMP
puback(SessState = #session{clientid = ClientId, puback(Session = #session{clientid = ClientId,
awaiting_comp = AwaitingComp}, {?PUBCOMP, PacketId}) -> awaiting_comp = AwaitingComp}, {?PUBCOMP, PacketId}) ->
case maps:is_key(PacketId, AwaitingComp) of case maps:is_key(PacketId, AwaitingComp) of
true -> ok; true -> ok;
false -> lager:warning("Session ~s: PUBREC PacketId '~p' not exist", [ClientId, PacketId]) false -> lager:warning("Session ~s: PUBREC PacketId '~p' not exist", [ClientId, PacketId])
end, end,
SessState#session{awaiting_comp = maps:remove(PacketId, AwaitingComp)}; Session#session{awaiting_comp = maps:remove(PacketId, AwaitingComp)};
puback(SessPid, {?PUBCOMP, PacketId}) when is_pid(SessPid) -> timeout(awaiting_rel, MsgId, Session = #session{clientid = ClientId, awaiting_rel = Awaiting}) ->
gen_server:cast(SessPid, {pubcomp, PacketId}), SessPid.
timeout(awaiting_rel, MsgId, SessState = #session{clientid = ClientId, awaiting_rel = Awaiting}) ->
case maps:find(MsgId, Awaiting) of case maps:find(MsgId, Awaiting) of
{ok, {Msg, _TRef}} -> {ok, {Msg, _TRef}} ->
lager:error([{client, ClientId}], "Session ~s Awaiting Rel Timout!~nDrop Message:~p", [ClientId, Msg]), lager:error([{client, ClientId}], "Session ~s Awaiting Rel Timout!~nDrop Message:~p", [ClientId, Msg]),
SessState#session{awaiting_rel = maps:remove(MsgId, Awaiting)}; Session#session{awaiting_rel = maps:remove(MsgId, Awaiting)};
error -> error ->
lager:error([{client, ClientId}], "Session ~s Cannot find Awaiting Rel: MsgId=~p", [ClientId, MsgId]), lager:error([{client, ClientId}], "Session ~s Cannot find Awaiting Rel: MsgId=~p", [ClientId, MsgId]),
SessState Session
end. end.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -226,7 +223,7 @@ timeout(awaiting_rel, MsgId, SessState = #session{clientid = ClientId, awaiting_
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec subscribe(session(), [{binary(), mqtt_qos()}]) -> {ok, session(), [mqtt_qos()]}. -spec subscribe(session(), [{binary(), mqtt_qos()}]) -> {ok, session(), [mqtt_qos()]}.
subscribe(SessState = #session{clientid = ClientId, subscriptions = Subscriptions}, Topics) -> subscribe(Session = #session{clientid = ClientId, subscriptions = Subscriptions}, Topics) ->
%% subscribe first and don't care if the subscriptions have been existed %% subscribe first and don't care if the subscriptions have been existed
{ok, GrantedQos} = emqttd_pubsub:subscribe(Topics), {ok, GrantedQos} = emqttd_pubsub:subscribe(Topics),
@ -253,18 +250,14 @@ subscribe(SessState = #session{clientid = ClientId, subscriptions = Subscription
end end
end, Subscriptions, Topics), end, Subscriptions, Topics),
{ok, SessState#session{subscriptions = Subscriptions1}, GrantedQos}; {ok, Session#session{subscriptions = Subscriptions1}, GrantedQos};
subscribe(SessPid, Topics) when is_pid(SessPid) ->
{ok, GrantedQos} = gen_server:call(SessPid, {subscribe, Topics}),
{ok, SessPid, GrantedQos}.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Unsubscribe Topics %% @doc Unsubscribe Topics
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec unsubscribe(session(), [binary()]) -> {ok, session()}. -spec unsubscribe(session(), [binary()]) -> {ok, session()}.
unsubscribe(SessState = #session{clientid = ClientId, subscriptions = Subscriptions}, Topics) -> unsubscribe(Session = #session{clientid = ClientId, subscriptions = Subscriptions}, Topics) ->
%%unsubscribe from topic tree %%unsubscribe from topic tree
ok = emqttd_pubsub:unsubscribe(Topics), ok = emqttd_pubsub:unsubscribe(Topics),
@ -280,22 +273,15 @@ unsubscribe(SessState = #session{clientid = ClientId, subscriptions = Subscripti
end end
end, Subscriptions, Topics), end, Subscriptions, Topics),
{ok, SessState#session{subscriptions = Subscriptions1}}; {ok, Session#session{subscriptions = Subscriptions1}};
unsubscribe(SessPid, Topics) when is_pid(SessPid) ->
gen_server:call(SessPid, {unsubscribe, Topics}),
{ok, SessPid}.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Destroy Session %% @doc Destroy Session
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec destroy(SessPid :: pid(), ClientId :: binary()) -> ok.
destroy(SessPid, ClientId) when is_pid(SessPid) ->
gen_server:cast(SessPid, {destroy, ClientId}).
% message(qos1) is awaiting ack % message(qos1) is awaiting ack
await_ack(Msg = #mqtt_message{qos = ?QOS_1}, SessState = #session{message_id = MsgId, await_ack(Msg = #mqtt_message{qos = ?QOS_1}, Session = #session{message_id = MsgId,
inflight_queue = InflightQ, inflight_queue = InflightQ,
awaiting_ack = Awaiting, awaiting_ack = Awaiting,
unack_retry_after = Time, unack_retry_after = Time,
@ -304,11 +290,11 @@ await_ack(Msg = #mqtt_message{qos = ?QOS_1}, SessState = #session{message_id = M
Msg1 = Msg#mqtt_message{msgid = MsgId}, Msg1 = Msg#mqtt_message{msgid = MsgId},
TRef = erlang:send_after(Time * 1000, self(), {retry, MsgId}), TRef = erlang:send_after(Time * 1000, self(), {retry, MsgId}),
Awaiting1 = maps:put(MsgId, {TRef, Retries, Time}, Awaiting), Awaiting1 = maps:put(MsgId, {TRef, Retries, Time}, Awaiting),
{Msg1, next_msgid(SessState#session{inflight_queue = [{MsgId, Msg1} | InflightQ], {Msg1, next_msgid(Session#session{inflight_queue = [{MsgId, Msg1} | InflightQ],
awaiting_ack = Awaiting1})}. awaiting_ack = Awaiting1})}.
% message(qos2) is awaiting ack % message(qos2) is awaiting ack
await_ack(Message = #mqtt_message{qos = Qos}, SessState = #session{message_id = MsgId, awaiting_ack = Awaiting},) await_ack(Message = #mqtt_message{qos = Qos}, Session = #session{message_id = MsgId, awaiting_ack = Awaiting},)
when (Qos =:= ?QOS_1) orelse (Qos =:= ?QOS_2) -> when (Qos =:= ?QOS_1) orelse (Qos =:= ?QOS_2) ->
%%assign msgid before send %%assign msgid before send
Message1 = Message#mqtt_message{msgid = MsgId, dup = false}, Message1 = Message#mqtt_message{msgid = MsgId, dup = false},
@ -318,7 +304,7 @@ await_ack(Message = #mqtt_message{qos = Qos}, SessState = #session{message_id =
true -> Message1 true -> Message1
end, end,
Awaiting1 = maps:put(MsgId, Message2, Awaiting), Awaiting1 = maps:put(MsgId, Message2, Awaiting),
{Message1, next_msgid(SessState#session{awaiting_ack = Awaiting1})}. {Message1, next_msgid(Session#session{awaiting_ack = Awaiting1})}.
initial_state(ClientId) -> initial_state(ClientId) ->
%%TODO: init session options. %%TODO: init session options.
@ -334,36 +320,35 @@ initial_state(ClientId, ClientPid) ->
State = initial_state(ClientId), State = initial_state(ClientId),
State#session{client_pid = ClientPid}. State#session{client_pid = ClientPid}.
%%%============================================================================= %%%=============================================================================
%%% Internal functions %%% Internal functions
%%%============================================================================= %%%=============================================================================
%% client is offline %% client is offline
dispatch(Msg, SessState = #session{client_pid = undefined}) -> dispatch(Msg, Session = #session{client_pid = undefined}) ->
queue(Msg, SessState); queue(Msg, Session);
%% dispatch qos0 directly %% dispatch qos0 directly
dispatch(Msg = #mqtt_message{qos = ?QOS_0}, SessState = #session{client_pid = ClientPid}) -> dispatch(Msg = #mqtt_message{qos = ?QOS_0}, Session = #session{client_pid = ClientPid}) ->
ClientPid ! {dispatch, {self(), Msg}}, SessState; ClientPid ! {dispatch, {self(), Msg}}, Session;
%% queue if inflight_queue is full %% queue if inflight_queue is full
dispatch(Msg = #mqtt_message{qos = Qos}, SessState = #session{inflight_window = InflightWin, dispatch(Msg = #mqtt_message{qos = Qos}, Session = #session{inflight_window = InflightWin,
inflight_queue = InflightQ}) inflight_queue = InflightQ})
when (Qos > ?QOS_0) andalso (length(InflightQ) >= InflightWin) -> when (Qos > ?QOS_0) andalso (length(InflightQ) >= InflightWin) ->
%%TODO: set alarms %%TODO: set alarms
lager:error([{clientid, ClientId}], "Session ~s inflight_queue is full!", [ClientId]), lager:error([{clientid, ClientId}], "Session ~s inflight_queue is full!", [ClientId]),
queue(Msg, SessState); queue(Msg, Session);
%% dispatch and await ack %% dispatch and await ack
dispatch(Msg = #mqtt_message{qos = Qos}, SessState = #session{client_pid = ClientPid}) dispatch(Msg = #mqtt_message{qos = Qos}, Session = #session{client_pid = ClientPid})
when (Qos =:= ?QOS_1) orelse (Qos =:= ?QOS_2) -> when (Qos =:= ?QOS_1) orelse (Qos =:= ?QOS_2) ->
%% assign msgid and await %% assign msgid and await
{NewMsg, NewState} = await_ack(Msg, SessState), {NewMsg, NewState} = await_ack(Msg, Session),
ClientPid ! {dispatch, {self(), NewMsg}}, ClientPid ! {dispatch, {self(), NewMsg}},
queue(Msg, SessState = #session{pending_queue = Queue}) -> queue(Msg, Session = #session{pending_queue = Queue}) ->
SessState#session{pending_queue = emqttd_mqueue:in(Msg, Queue)}. Session#session{pending_queue = emqttd_mqueue:in(Msg, Queue)}.
next_msgid(State = #session{message_id = 16#ffff}) -> next_msgid(State = #session{message_id = 16#ffff}) ->
State#session{message_id = 1}; State#session{message_id = 1};

View File

@ -20,7 +20,7 @@
%%% SOFTWARE. %%% SOFTWARE.
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
%%% @doc %%% @doc
%%% emqttd session process. %%% emqttd session process of persistent client.
%%% %%%
%%% @end %%% @end
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
@ -42,6 +42,11 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
%% Refactor this API.
start({false = _CleanSess, ClientId, ClientPid}) ->
{ok, SessPid} = emqttd_sm:start_session(ClientId, ClientPid),
{ok, SessPid}.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Start a session process. %% @doc Start a session process.
%% @end %% @end
@ -49,6 +54,44 @@
start_link(ClientId, ClientPid) -> start_link(ClientId, ClientPid) ->
gen_server:start_link(?MODULE, [ClientId, ClientPid], []). gen_server:start_link(?MODULE, [ClientId, ClientPid], []).
resume(SessProc, ClientId, ClientPid) when is_pid(SessProc) ->
cast(SessProc, {resume, ClientId, ClientPid}).
-spec publish(pid(), mqtt_clientid(), {mqtt_qos(), mqtt_message()}) -> pid().
publish(SessProc, ClientId, {?QOS_0, Message}) when is_pid(SessProc) ->
emqttd_pubsub:publish(ClientId, Message), Session;
publish(SessProc, ClientId, {?QOS_1, Message}) when is_pid(SessProc) ->
emqttd_pubsub:publish(ClientId, Message), Session;
publish(SessProc, ClientId, {?QOS_2, Message}) when is_pid(SessProc) ->
cast(SessProc, {publish, ClientId, {?QOS_2, Message}}).
puback(SessProc, {?PUBACK, PacketId}) when is_pid(SessProc) ->
cast(SessProc, {puback, PacketId}).
puback(SessProc, {?PUBREL, PacketId}) when is_pid(SessProc) ->
cast(SessPid, {pubrel, PacketId}).
puback(SessPid, {?PUBCOMP, PacketId}) when is_pid(SessPid) ->
cast(SessPid, {pubcomp, PacketId}).
subscribe(SessPid, Topics) when is_pid(SessPid) ->
{ok, GrantedQos} = gen_server:call(SessPid, {subscribe, Topics}),
{ok, SessPid, GrantedQos}.
unsubscribe(SessPid, Topics) when is_pid(SessPid) ->
gen_server:call(SessPid, {unsubscribe, Topics}),
{ok, SessPid}.
-spec destroy(SessPid :: pid(), ClientId :: binary()) -> ok.
destroy(SessPid, ClientId) when is_pid(SessPid) ->
gen_server:cast(SessPid, {destroy, ClientId}).
cast(SessProc, Msg) ->
gen_server:cast(SessProc, Msg), SessProc.
%%%============================================================================= %%%=============================================================================
%%% gen_server callbacks %%% gen_server callbacks
%%%============================================================================= %%%=============================================================================