merge session

This commit is contained in:
Feng Lee 2015-06-13 12:09:08 +08:00
parent 5e0bf3d831
commit 9c666cef70
6 changed files with 271 additions and 321 deletions

View File

@ -152,13 +152,13 @@ handle(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername}
emqttd_cm:register(client(State2)), emqttd_cm:register(client(State2)),
%%Starting session %%Starting session
{ok, Session} = emqttd_session:start({CleanSess, clientid(State2), self()}), {ok, SessMod, Session} = emqttd_sm:start_session(CleanSess, clientid(State2)),
%% Start keepalive %% Start keepalive
start_keepalive(KeepAlive), start_keepalive(KeepAlive),
%% ACCEPT %% ACCEPT
{?CONNACK_ACCEPT, State2#proto_state{session = Session, will_msg = willmsg(Var)}}; {?CONNACK_ACCEPT, State2#proto_state{sessmod = SessMod, session = Session, will_msg = willmsg(Var)}};
{error, Reason}-> {error, Reason}->
lager:error("~s@~s: username '~s', login failed - ~s", lager:error("~s@~s: username '~s', login failed - ~s",
[ClientId, emqttd_net:format(Peername), Username, Reason]), [ClientId, emqttd_net:format(Peername), Username, Reason]),

View File

@ -21,7 +21,7 @@
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
%%% @doc %%% @doc
%%% %%%
%%% emqttd session. %%% emqttd session for persistent client.
%%% %%%
%%% Session State in the broker consists of: %%% Session State in the broker consists of:
%%% %%%
@ -53,10 +53,11 @@
-include_lib("emqtt/include/emqtt_packet.hrl"). -include_lib("emqtt/include/emqtt_packet.hrl").
%% Session Managenent APIs %% Start gen_server
-export([start/1, -export([start_link/2, resume/3, destroy/2]).
resume/3,
destroy/2]). %% Init Session State
-export([new/1]).
%% PubSub APIs %% PubSub APIs
-export([publish/3, -export([publish/3,
@ -66,10 +67,17 @@
await/2, await/2,
dispatch/2]). dispatch/2]).
%% gen_server Function Exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(session, { -record(session, {
%% ClientId: Identifier of Session %% ClientId: Identifier of Session
clientid :: binary(), clientid :: binary(),
%% Clean Session Flag
clean_sess = true,
%% Client Pid linked with session %% Client Pid linked with session
client_pid :: pid(), client_pid :: pid(),
@ -111,63 +119,103 @@
%% Awaiting PUBREL timeout %% Awaiting PUBREL timeout
await_rel_timeout = 8, await_rel_timeout = 8,
%% session expired after 48 hours %% Max Packets that Awaiting PUBREL
sess_expired_after = 172800, max_awaiting_rel = 100,
sess_expired_timer, %% session expired after 48 hours
expired_after = 172800,
expired_timer,
timestamp}). timestamp}).
-type session() :: #session{}. -type session() :: #session{}.
-export_type([session/0]).
-define(SESSION(Sess), is_record(Sess, session)).
%%%============================================================================= %%%=============================================================================
%%% Session API %%% Session API
%%%============================================================================= %%%=============================================================================
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Start Session %% @doc Start a session process.
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec start({boolean(), binary(), pid()}) -> {ok, session()}. start_link(ClientId, ClientPid) ->
start({true = _CleanSess, ClientId, _ClientPid}) -> gen_server:start_link(?MODULE, [ClientId, ClientPid], []).
%%Destroy old session if CleanSess is true before.
ok = emqttd_sm:destroy_session(ClientId),
{ok, initial_state(ClientId)}.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Resume Session %% @doc Resume a session.
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec resume(session(), binary(), pid()) -> session(). resume(Session, _ClientId, _ClientPid) when is_record(Session, session) ->
resume(Session = #session{}, _ClientId, _ClientPid) -> Session;
Session. resume(SessPid, ClientId, ClientPid) when is_pid(SessPid) ->
gen_server:cast(SessPid, {resume, ClientId, ClientPid}), SessPid.
%%------------------------------------------------------------------------------
%% @doc Destroy a session.
%% @end
%%------------------------------------------------------------------------------
-spec destroy(SessPid :: pid(), ClientId :: binary()) -> ok.
destroy(SessPid, ClientId) when is_pid(SessPid) ->
gen_server:cast(SessPid, {destroy, ClientId}), SessPid.
%%------------------------------------------------------------------------------
%% @doc Init Session State.
%% @end
%%------------------------------------------------------------------------------
-spec new(binary()) -> session().
new(ClientId) ->
QEnv = emqttd:env(mqtt, queue),
SessEnv = emqttd:env(mqtt, session),
#session{
clientid = ClientId,
clean_sess = true,
subscriptions = [],
inflight_window = emqttd_mqwin:new(ClientId, QEnv),
pending_queue = emqttd_mqueue:new(ClientId, QEnv),
awaiting_rel = #{},
awaiting_ack = #{},
awaiting_comp = #{},
unack_retries = emqttd_opts:g(unack_retries, SessEnv),
unack_timeout = emqttd_opts:g(unack_timeout, SessEnv),
await_rel_timeout = emqttd_opts:g(await_rel_timeout, SessEnv),
max_awaiting_rel = emqttd_opts:g(max_awaiting_rel, SessEnv),
expired_after = emqttd_opts:g(expired_after, SessEnv) * 3600
}.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Publish message %% @doc Publish message
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec publish(session(), mqtt_clientid(), {mqtt_qos(), mqtt_message()}) -> session(). -spec publish(session() | pid(), mqtt_clientid(), {mqtt_qos(), mqtt_message()}) -> session() | pid().
publish(Session, ClientId, {?QOS_0, Message}) -> publish(Session, ClientId, {?QOS_0, Message}) ->
%% publish qos0 directly
emqttd_pubsub:publish(ClientId, Message), Session; emqttd_pubsub:publish(ClientId, Message), Session;
publish(Session, ClientId, {?QOS_1, Message}) -> publish(Session, ClientId, {?QOS_1, Message}) ->
%% publish qos1 directly, and client will puback
emqttd_pubsub:publish(ClientId, Message), Session; emqttd_pubsub:publish(ClientId, Message), Session;
publish(Session = #session{awaiting_rel = AwaitingRel, publish(Session = #session{awaiting_rel = AwaitingRel,
await_rel_timeout = Timeout}, _ClientId, await_rel_timeout = Timeout,
max_awaiting_rel = MaxLen}, ClientId,
{?QOS_2, Message = #mqtt_message{msgid = MsgId}}) -> {?QOS_2, Message = #mqtt_message{msgid = MsgId}}) ->
case maps:size(AwaitingRel) >= MaxLen of
true -> lager:error([{clientid, ClientId}], "Session ~s "
" dropped Qos2 message for too many awaiting_rel: ~p", [ClientId, Message]);
false ->
%% store in awaiting_rel %% store in awaiting_rel
TRef = erlang:send_after(Timeout * 1000, self(), {timeout, awaiting_rel, MsgId}), TRef = erlang:send_after(Timeout * 1000, self(), {timeout, awaiting_rel, MsgId}),
Session#session{awaiting_rel = maps:put(MsgId, {Message, TRef}, AwaitingRel)}. Session#session{awaiting_rel = maps:put(MsgId, {Message, TRef}, AwaitingRel)};
end;
publish(SessPid, ClientId, {?QOS_2, Message}) when is_pid(SessPid) ->
gen_server:cast(SessPid, {publish, ClientId, {?QOS_2, Message}}), SessPid.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc PubAck message %% @doc PubAck message
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec puback(session(), {mqtt_packet_type(), mqtt_packet_id()}) -> session(). -spec puback(session(), {mqtt_packet_type(), mqtt_packet_id()}) -> session().
puback(Session = #session{clientid = ClientId, awaiting_ack = Awaiting}, {?PUBACK, PacketId}) -> puback(Session = #session{clientid = ClientId, awaiting_ack = Awaiting}, {?PUBACK, PacketId}) ->
case maps:is_key(PacketId, Awaiting) of case maps:is_key(PacketId, Awaiting) of
@ -176,6 +224,9 @@ puback(Session = #session{clientid = ClientId, awaiting_ack = Awaiting}, {?PUBAC
end, end,
Session#session{awaiting_ack = maps:remove(PacketId, Awaiting)}; Session#session{awaiting_ack = maps:remove(PacketId, Awaiting)};
puback(SessPid, {?PUBACK, PacketId}) when is_pid(SessPid) ->
gen_server:cast(SessPid, {puback, PacketId});
%% PUBREC %% PUBREC
puback(Session = #session{clientid = ClientId, puback(Session = #session{clientid = ClientId,
awaiting_ack = AwaitingAck, awaiting_ack = AwaitingAck,
@ -187,18 +238,23 @@ puback(Session = #session{clientid = ClientId,
Session#session{awaiting_ack = maps:remove(PacketId, AwaitingAck), Session#session{awaiting_ack = maps:remove(PacketId, AwaitingAck),
awaiting_comp = maps:put(PacketId, true, AwaitingComp)}; awaiting_comp = maps:put(PacketId, true, AwaitingComp)};
puback(SessPid, {?PUBREC, PacketId}) when is_pid(SessPid) ->
gen_server:cast(SessPid, {pubrec, PacketId}), SessPid;
%% PUBREL %% PUBREL
puback(Session = #session{clientid = ClientId, puback(Session = #session{clientid = ClientId, awaiting_rel = Awaiting}, {?PUBREL, PacketId}) ->
awaiting_rel = Awaiting}, {?PUBREL, PacketId}) ->
case maps:find(PacketId, Awaiting) of case maps:find(PacketId, Awaiting) of
{ok, {Msg, TRef}} -> {ok, {Msg, TRef}} ->
catch erlang:cancel_timer(TRef), catch erlang:cancel_timer(TRef),
emqttd_pubsub:publish(ClientId, Msg); emqttd_pubsub:publish(ClientId, Msg);
error -> error ->
lager:error("Session ~s PUBREL PacketId '~p' not found!", [ClientId, PacketId]) lager:error("Session ~s cannot find PUBREL PacketId '~p'!", [ClientId, PacketId])
end, end,
Session#session{awaiting_rel = maps:remove(PacketId, Awaiting)}; Session#session{awaiting_rel = maps:remove(PacketId, Awaiting)};
puback(SessPid, {?PUBREL, PacketId}) when is_pid(SessPid) ->
cast(SessPid, {pubrel, PacketId});
%% PUBCOMP %% PUBCOMP
puback(Session = #session{clientid = ClientId, puback(Session = #session{clientid = ClientId,
awaiting_comp = AwaitingComp}, {?PUBCOMP, PacketId}) -> awaiting_comp = AwaitingComp}, {?PUBCOMP, PacketId}) ->
@ -208,6 +264,9 @@ puback(Session = #session{clientid = ClientId,
end, end,
Session#session{awaiting_comp = maps:remove(PacketId, AwaitingComp)}; Session#session{awaiting_comp = maps:remove(PacketId, AwaitingComp)};
puback(SessPid, {?PUBCOMP, PacketId}) when is_pid(SessPid) ->
cast(SessPid, {pubcomp, PacketId}).
timeout(awaiting_rel, MsgId, Session = #session{clientid = ClientId, awaiting_rel = Awaiting}) -> timeout(awaiting_rel, MsgId, Session = #session{clientid = ClientId, awaiting_rel = Awaiting}) ->
case maps:find(MsgId, Awaiting) of case maps:find(MsgId, Awaiting) of
{ok, {Msg, _TRef}} -> {ok, {Msg, _TRef}} ->
@ -222,7 +281,7 @@ timeout(awaiting_rel, MsgId, Session = #session{clientid = ClientId, awaiting_re
%% @doc Subscribe Topics %% @doc Subscribe Topics
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec subscribe(session(), [{binary(), mqtt_qos()}]) -> {ok, session(), [mqtt_qos()]}. -spec subscribe(session() | pid(), [{binary(), mqtt_qos()}]) -> {ok, session() | pid(), [mqtt_qos()]}.
subscribe(Session = #session{clientid = ClientId, subscriptions = Subscriptions}, Topics) -> subscribe(Session = #session{clientid = ClientId, subscriptions = Subscriptions}, Topics) ->
%% subscribe first and don't care if the subscriptions have been existed %% subscribe first and don't care if the subscriptions have been existed
@ -252,11 +311,15 @@ subscribe(Session = #session{clientid = ClientId, subscriptions = Subscriptions}
{ok, Session#session{subscriptions = Subscriptions1}, GrantedQos}; {ok, Session#session{subscriptions = Subscriptions1}, GrantedQos};
subscribe(SessPid, Topics) when is_pid(SessPid) ->
{ok, GrantedQos} = gen_server:call(SessPid, {subscribe, Topics}),
{ok, SessPid, GrantedQos}.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Unsubscribe Topics %% @doc Unsubscribe Topics
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec unsubscribe(session(), [binary()]) -> {ok, session()}. -spec unsubscribe(session() | pid(), [binary()]) -> {ok, session() | pid()}.
unsubscribe(Session = #session{clientid = ClientId, subscriptions = Subscriptions}, Topics) -> unsubscribe(Session = #session{clientid = ClientId, subscriptions = Subscriptions}, Topics) ->
%%unsubscribe from topic tree %%unsubscribe from topic tree
@ -275,6 +338,10 @@ unsubscribe(Session = #session{clientid = ClientId, subscriptions = Subscription
{ok, Session#session{subscriptions = Subscriptions1}}; {ok, Session#session{subscriptions = Subscriptions1}};
unsubscribe(SessPid, Topics) when is_pid(SessPid) ->
gen_server:call(SessPid, {unsubscribe, Topics}),
{ok, SessPid}.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Destroy Session %% @doc Destroy Session
%% @end %% @end
@ -306,19 +373,140 @@ await_ack(Message = #mqtt_message{qos = Qos}, Session = #session{message_id = Ms
Awaiting1 = maps:put(MsgId, Message2, Awaiting), Awaiting1 = maps:put(MsgId, Message2, Awaiting),
{Message1, next_msgid(Session#session{awaiting_ack = Awaiting1})}. {Message1, next_msgid(Session#session{awaiting_ack = Awaiting1})}.
initial_state(ClientId) ->
%%TODO: init session options.
#session{clientid = ClientId,
subscriptions = [],
inflight_queue = [],
awaiting_queue = [],
awaiting_ack = #{},
awaiting_rel = #{},
awaiting_comp = #{}}.
initial_state(ClientId, ClientPid) -> %%%=============================================================================
State = initial_state(ClientId), %%% gen_server callbacks
State#session{client_pid = ClientPid}. %%%=============================================================================
init([ClientId, ClientPid]) ->
process_flag(trap_exit, true),
true = link(ClientPid),
Session = emqttd_session:new(ClientId),
{ok, Session#session{clean_sess = false,
client_pid = ClientPid,
timestamp = os:timestamp()}, hibernate}.
handle_call({subscribe, Topics}, _From, Session) ->
{ok, NewSession, GrantedQos} = subscribe(Session, Topics),
{reply, {ok, GrantedQos}, NewSession};
handle_call({unsubscribe, Topics}, _From, Session) ->
{ok, NewSession} = unsubscribe(Session, Topics),
{reply, ok, NewSession};
handle_call(Req, _From, State) ->
lager:error("Unexpected Request: ~p", [Req]),
{reply, {error, badreq}, State}.
handle_cast({resume, ClientId, ClientPid}, State = #session{
clientid = ClientId,
client_pid = OldClientPid,
msg_queue = Queue,
awaiting_ack = AwaitingAck,
awaiting_comp = AwaitingComp,
expire_timer = ETimer}) ->
lager:info([{client, ClientId}], "Session ~s resumed by ~p",[ClientId, ClientPid]),
%% kick old client...
if
OldClientPid =:= undefined ->
ok;
OldClientPid =:= ClientPid ->
ok;
true ->
lager:error("Session '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, ClientPid, OldClientPid]),
unlink(OldClientPid),
OldClientPid ! {stop, duplicate_id, ClientPid}
end,
%% cancel timeout timer
emqttd_util:cancel_timer(ETimer),
%% redelivery PUBREL
lists:foreach(fun(PacketId) ->
ClientPid ! {redeliver, {?PUBREL, PacketId}}
end, maps:keys(AwaitingComp)),
%% redelivery messages that awaiting PUBACK or PUBREC
Dup = fun(Msg) -> Msg#mqtt_message{dup = true} end,
lists:foreach(fun(Msg) ->
ClientPid ! {dispatch, {self(), Dup(Msg)}}
end, maps:values(AwaitingAck)),
%% send offline messages
lists:foreach(fun(Msg) ->
ClientPid ! {dispatch, {self(), Msg}}
end, emqttd_queue:all(Queue)),
{noreply, State#session{client_pid = ClientPid,
msg_queue = emqttd_queue:clear(Queue),
expire_timer = undefined}, hibernate};
handle_cast({publish, ClientId, {?QOS_2, Message}}, State) ->
NewState = publish(State, ClientId, {?QOS_2, Message}),
{noreply, NewState};
handle_cast({puback, PacketId}, State) ->
NewState = puback(State, {?PUBACK, PacketId}),
{noreply, NewState};
handle_cast({pubrec, PacketId}, State) ->
NewState = puback(State, {?PUBREC, PacketId}),
{noreply, NewState};
handle_cast({pubrel, PacketId}, State) ->
NewState = puback(State, {?PUBREL, PacketId}),
{noreply, NewState};
handle_cast({pubcomp, PacketId}, State) ->
NewState = puback(State, {?PUBCOMP, PacketId}),
{noreply, NewState};
handle_cast({destroy, ClientId}, State = #session{clientid = ClientId}) ->
lager:warning("Session ~s destroyed", [ClientId]),
{stop, normal, State};
handle_cast(Msg, State) ->
lager:critical("Unexpected Msg: ~p, State: ~p", [Msg, State]),
{noreply, State}.
handle_info({dispatch, {_From, Messages}}, State) when is_list(Messages) ->
F = fun(Message, S) -> dispatch(Message, S) end,
{noreply, lists:foldl(F, State, Messages)};
handle_info({dispatch, {_From, Message}}, State) ->
{noreply, dispatch(Message, State)};
handle_info({'EXIT', ClientPid, Reason}, State = #session{clientid = ClientId,
client_pid = ClientPid}) ->
lager:info("Session: client ~s@~p exited for ~p", [ClientId, ClientPid, Reason]),
{noreply, start_expire_timer(State#session{client_pid = undefined})};
handle_info({'EXIT', ClientPid0, _Reason}, State = #session{client_pid = ClientPid}) ->
lager:error("Unexpected Client EXIT: pid=~p, pid(state): ~p", [ClientPid0, ClientPid]),
{noreply, State};
handle_info(session_expired, State = #session{clientid = ClientId}) ->
lager:warning("Session ~s expired!", [ClientId]),
{stop, {shutdown, expired}, State};
handle_info({timeout, awaiting_rel, MsgId}, SessState) ->
NewState = timeout(awaiting_rel, MsgId, SessState),
{noreply, NewState};
handle_info(Info, State) ->
lager:critical("Unexpected Info: ~p, State: ~p", [Info, State]),
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%============================================================================= %%%=============================================================================
%%% Internal functions %%% Internal functions

View File

@ -1,255 +0,0 @@
%%%-----------------------------------------------------------------------------
%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd session process of persistent client.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_session_proc).
-author("Feng Lee <feng@emqtt.io>").
-include("emqttd.hrl").
-include_lib("emqtt/include/emqtt.hrl").
-include_lib("emqtt/include/emqtt_packet.hrl").
%% Start gen_server
-export([start_link/2]).
%% gen_server Function Exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
%% Refactor this API.
start({false = _CleanSess, ClientId, ClientPid}) ->
{ok, SessPid} = emqttd_sm:start_session(ClientId, ClientPid),
{ok, SessPid}.
%%------------------------------------------------------------------------------
%% @doc Start a session process.
%% @end
%%------------------------------------------------------------------------------
start_link(ClientId, ClientPid) ->
gen_server:start_link(?MODULE, [ClientId, ClientPid], []).
resume(SessProc, ClientId, ClientPid) when is_pid(SessProc) ->
cast(SessProc, {resume, ClientId, ClientPid}).
-spec publish(pid(), mqtt_clientid(), {mqtt_qos(), mqtt_message()}) -> pid().
publish(SessProc, ClientId, {?QOS_0, Message}) when is_pid(SessProc) ->
emqttd_pubsub:publish(ClientId, Message), Session;
publish(SessProc, ClientId, {?QOS_1, Message}) when is_pid(SessProc) ->
emqttd_pubsub:publish(ClientId, Message), Session;
publish(SessProc, ClientId, {?QOS_2, Message}) when is_pid(SessProc) ->
cast(SessProc, {publish, ClientId, {?QOS_2, Message}}).
puback(SessProc, {?PUBACK, PacketId}) when is_pid(SessProc) ->
cast(SessProc, {puback, PacketId}).
puback(SessProc, {?PUBREL, PacketId}) when is_pid(SessProc) ->
cast(SessPid, {pubrel, PacketId}).
puback(SessPid, {?PUBCOMP, PacketId}) when is_pid(SessPid) ->
cast(SessPid, {pubcomp, PacketId}).
subscribe(SessPid, Topics) when is_pid(SessPid) ->
{ok, GrantedQos} = gen_server:call(SessPid, {subscribe, Topics}),
{ok, SessPid, GrantedQos}.
unsubscribe(SessPid, Topics) when is_pid(SessPid) ->
gen_server:call(SessPid, {unsubscribe, Topics}),
{ok, SessPid}.
-spec destroy(SessPid :: pid(), ClientId :: binary()) -> ok.
destroy(SessPid, ClientId) when is_pid(SessPid) ->
gen_server:cast(SessPid, {destroy, ClientId}).
cast(SessProc, Msg) ->
gen_server:cast(SessProc, Msg), SessProc.
%%%=============================================================================
%%% gen_server callbacks
%%%=============================================================================
init([ClientId, ClientPid]) ->
process_flag(trap_exit, true),
true = link(ClientPid),
State = initial_state(ClientId, ClientPid),
MQueue = emqttd_mqueue:new(ClientId, emqttd:env(mqtt, queue)),
State1 = State#session{pending_queue = MQueue,
timestamp = os:timestamp()},
{ok, init(emqttd:env(mqtt, session), State1), hibernate}.
init([], State) ->
State;
%% Session expired after hours
init([{expired_after, Hours} | Opts], State) ->
init(Opts, State#session{sess_expired_after = Hours * 3600});
%% Max number of QoS 1 and 2 messages that can be inflight at one time.
init([{max_inflight_messages, MaxInflight} | Opts], State) ->
init(Opts, State#session{inflight_window = MaxInflight});
%% Max retries for unacknolege Qos1/2 messages
init([{max_unack_retries, Retries} | Opts], State) ->
init(Opts, State#session{max_unack_retries = Retries});
%% Retry after 4, 8, 16 seconds
init([{unack_retry_after, Secs} | Opts], State) ->
init(Opts, State#session{unack_retry_after = Secs});
%% Awaiting PUBREL timeout
init([{await_rel_timeout, Secs} | Opts], State) ->
init(Opts, State#session{await_rel_timeout = Secs});
init([Opt | Opts], State) ->
lager:error("Bad Session Option: ~p", [Opt]),
init(Opts, State).
handle_call({subscribe, Topics}, _From, State) ->
{ok, NewState, GrantedQos} = subscribe(State, Topics),
{reply, {ok, GrantedQos}, NewState};
handle_call({unsubscribe, Topics}, _From, State) ->
{ok, NewState} = unsubscribe(State, Topics),
{reply, ok, NewState};
handle_call(Req, _From, State) ->
lager:error("Unexpected request: ~p", [Req]),
{reply, error, State}.
handle_cast({resume, ClientId, ClientPid}, State = #session{
clientid = ClientId,
client_pid = OldClientPid,
msg_queue = Queue,
awaiting_ack = AwaitingAck,
awaiting_comp = AwaitingComp,
expire_timer = ETimer}) ->
lager:info([{client, ClientId}], "Session ~s resumed by ~p",[ClientId, ClientPid]),
%% kick old client...
if
OldClientPid =:= undefined ->
ok;
OldClientPid =:= ClientPid ->
ok;
true ->
lager:error("Session '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, ClientPid, OldClientPid]),
unlink(OldClientPid),
OldClientPid ! {stop, duplicate_id, ClientPid}
end,
%% cancel timeout timer
emqttd_util:cancel_timer(ETimer),
%% redelivery PUBREL
lists:foreach(fun(PacketId) ->
ClientPid ! {redeliver, {?PUBREL, PacketId}}
end, maps:keys(AwaitingComp)),
%% redelivery messages that awaiting PUBACK or PUBREC
Dup = fun(Msg) -> Msg#mqtt_message{dup = true} end,
lists:foreach(fun(Msg) ->
ClientPid ! {dispatch, {self(), Dup(Msg)}}
end, maps:values(AwaitingAck)),
%% send offline messages
lists:foreach(fun(Msg) ->
ClientPid ! {dispatch, {self(), Msg}}
end, emqttd_queue:all(Queue)),
{noreply, State#session{client_pid = ClientPid,
msg_queue = emqttd_queue:clear(Queue),
expire_timer = undefined}, hibernate};
handle_cast({publish, ClientId, {?QOS_2, Message}}, State) ->
NewState = publish(State, ClientId, {?QOS_2, Message}),
{noreply, NewState};
handle_cast({puback, PacketId}, State) ->
NewState = puback(State, {?PUBACK, PacketId}),
{noreply, NewState};
handle_cast({pubrec, PacketId}, State) ->
NewState = puback(State, {?PUBREC, PacketId}),
{noreply, NewState};
handle_cast({pubrel, PacketId}, State) ->
NewState = puback(State, {?PUBREL, PacketId}),
{noreply, NewState};
handle_cast({pubcomp, PacketId}, State) ->
NewState = puback(State, {?PUBCOMP, PacketId}),
{noreply, NewState};
handle_cast({destroy, ClientId}, State = #session{clientid = ClientId}) ->
lager:warning("Session ~s destroyed", [ClientId]),
{stop, normal, State};
handle_cast(Msg, State) ->
lager:critical("Unexpected Msg: ~p, State: ~p", [Msg, State]),
{noreply, State}.
handle_info({dispatch, {_From, Messages}}, State) when is_list(Messages) ->
F = fun(Message, S) -> dispatch(Message, S) end,
{noreply, lists:foldl(F, State, Messages)};
handle_info({dispatch, {_From, Message}}, State) ->
{noreply, dispatch(Message, State)};
handle_info({'EXIT', ClientPid, Reason}, State = #session{clientid = ClientId,
client_pid = ClientPid}) ->
lager:info("Session: client ~s@~p exited for ~p", [ClientId, ClientPid, Reason]),
{noreply, start_expire_timer(State#session{client_pid = undefined})};
handle_info({'EXIT', ClientPid0, _Reason}, State = #session{client_pid = ClientPid}) ->
lager:error("Unexpected Client EXIT: pid=~p, pid(state): ~p", [ClientPid0, ClientPid]),
{noreply, State};
handle_info(session_expired, State = #session{clientid = ClientId}) ->
lager:warning("Session ~s expired!", [ClientId]),
{stop, {shutdown, expired}, State};
handle_info({timeout, awaiting_rel, MsgId}, SessState) ->
NewState = timeout(awaiting_rel, MsgId, SessState),
{noreply, NewState};
handle_info(Info, State) ->
lager:critical("Unexpected Info: ~p, State: ~p", [Info, State]),
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

View File

@ -56,6 +56,6 @@ start_session(ClientId, ClientPid) ->
init([]) -> init([]) ->
{ok, {{simple_one_for_one, 10, 10}, {ok, {{simple_one_for_one, 10, 10},
[{session, {emqttd_session, start_link, []}, [{session, {emqttd_session_proc, start_link, []},
transient, 10000, worker, [emqttd_session]}]}}. transient, 10000, worker, [emqttd_session_proc]}]}}.

View File

@ -38,8 +38,6 @@
-author("Feng Lee <feng@emqtt.io>"). -author("Feng Lee <feng@emqtt.io>").
%%cleanSess: true | false
-include("emqttd.hrl"). -include("emqttd.hrl").
-behaviour(gen_server). -behaviour(gen_server).
@ -55,7 +53,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
-record(state, {id, tabid, statsfun}). -record(state, {id, statsfun}).
-define(SM_POOL, sm_pool). -define(SM_POOL, sm_pool).
@ -91,10 +89,21 @@ table() -> ?SESSION_TAB.
%% @doc Start a session %% @doc Start a session
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec start_session(binary(), pid()) -> {ok, pid()} | {error, any()}.
start_session(ClientId, ClientPid) -> -spec start_session(CleanSess :: boolean(), binary()) -> {ok, module(), record() | pid()}.
start_session(true, ClientId) ->
%% destroy old session if existed
ok = destroy_session(ClientId),
{ok, emqttd_session, emqttd_session:new(ClientId)};
start_session(false, ClientId) ->
SmPid = gproc_pool:pick_worker(?SM_POOL, ClientId), SmPid = gproc_pool:pick_worker(?SM_POOL, ClientId),
gen_server:call(SmPid, {start_session, ClientId, ClientPid}). case call(SmPid, {start_session, ClientId, self()}) of
{ok, SessPid} ->
{ok, emqttd_session_proc, SessPid};
{error, Error} ->
{error, Error}
end.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Lookup Session Pid %% @doc Lookup Session Pid
@ -102,7 +111,7 @@ start_session(ClientId, ClientPid) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec lookup_session(binary()) -> pid() | undefined. -spec lookup_session(binary()) -> pid() | undefined.
lookup_session(ClientId) -> lookup_session(ClientId) ->
case ets:lookup(emqttd_sm_sup:table(), ClientId) of case ets:lookup(?SESSION_TAB, ClientId) of
[{_, SessPid, _}] -> SessPid; [{_, SessPid, _}] -> SessPid;
[] -> undefined [] -> undefined
end. end.
@ -114,7 +123,9 @@ lookup_session(ClientId) ->
-spec destroy_session(binary()) -> ok. -spec destroy_session(binary()) -> ok.
destroy_session(ClientId) -> destroy_session(ClientId) ->
SmPid = gproc_pool:pick_worker(?SM_POOL, ClientId), SmPid = gproc_pool:pick_worker(?SM_POOL, ClientId),
gen_server:call(SmPid, {destroy_session, ClientId}). call(SmPid, {destroy_session, ClientId}).
call(SmPid, Req) -> gen_server:call(SmPid, Req).
%%%============================================================================= %%%=============================================================================
%%% gen_server callbacks %%% gen_server callbacks
@ -128,7 +139,7 @@ handle_call({start_session, ClientId, ClientPid}, _From, State) ->
Reply = Reply =
case ets:lookup(?SESSION_TAB, ClientId) of case ets:lookup(?SESSION_TAB, ClientId) of
[{_, SessPid, _MRef}] -> [{_, SessPid, _MRef}] ->
emqttd_session:resume(SessPid, ClientId, ClientPid), emqttd_session_proc:resume(SessPid, ClientId, ClientPid),
{ok, SessPid}; {ok, SessPid};
[] -> [] ->
case emqttd_session_sup:start_session(ClientId, ClientPid) of case emqttd_session_sup:start_session(ClientId, ClientPid) of
@ -158,8 +169,8 @@ handle_call(_Request, _From, State) ->
handle_cast(_Msg, State) -> handle_cast(_Msg, State) ->
{noreply, State}. {noreply, State}.
handle_info({'DOWN', MRef, process, DownPid, _Reason}, State = #state{tabid = Tab}) -> handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
ets:match_delete(Tab, {'_', DownPid, MRef}), ets:match_delete(?SESSION_TAB, {'_', DownPid, MRef}),
{noreply, setstats(State)}; {noreply, setstats(State)};
handle_info(_Info, State) -> handle_info(_Info, State) ->

View File

@ -89,12 +89,18 @@
{session, [ {session, [
%% Expired after 2 days %% Expired after 2 days
{expired_after, 48}, {expired_after, 48},
%% Max retries for unacknolege Qos1/2 messages
{max_unack_retries, 3}, %% Max retries for unack Qos1/2 messages
{unack_retries, 3},
%% Retry after 4, 8, 16 seconds %% Retry after 4, 8, 16 seconds
{unack_retry_after, 4}, {unack_timeout, 4},
%% Awaiting PUBREL timeout
{await_rel_timeout, 8} %% Awaiting PUBREL Timeout
{await_rel_timeout, 8},
%% Max Packets that Awaiting PUBREL
{max_awaiting_rel, 100}
]}, ]},
{queue, [ {queue, [
%% Max queue length %% Max queue length