diff --git a/apps/emqttd/src/emqttd_protocol.erl b/apps/emqttd/src/emqttd_protocol.erl index c6b3448a5..bd500ab26 100644 --- a/apps/emqttd/src/emqttd_protocol.erl +++ b/apps/emqttd/src/emqttd_protocol.erl @@ -152,13 +152,13 @@ handle(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername} emqttd_cm:register(client(State2)), %%Starting session - {ok, Session} = emqttd_session:start({CleanSess, clientid(State2), self()}), + {ok, SessMod, Session} = emqttd_sm:start_session(CleanSess, clientid(State2)), %% Start keepalive start_keepalive(KeepAlive), %% ACCEPT - {?CONNACK_ACCEPT, State2#proto_state{session = Session, will_msg = willmsg(Var)}}; + {?CONNACK_ACCEPT, State2#proto_state{sessmod = SessMod, session = Session, will_msg = willmsg(Var)}}; {error, Reason}-> lager:error("~s@~s: username '~s', login failed - ~s", [ClientId, emqttd_net:format(Peername), Username, Reason]), diff --git a/apps/emqttd/src/emqttd_session.erl b/apps/emqttd/src/emqttd_session.erl index 4a1f44309..fce1c766b 100644 --- a/apps/emqttd/src/emqttd_session.erl +++ b/apps/emqttd/src/emqttd_session.erl @@ -21,7 +21,7 @@ %%%----------------------------------------------------------------------------- %%% @doc %%% -%%% emqttd session. +%%% emqttd session for persistent client. %%% %%% Session State in the broker consists of: %%% @@ -53,10 +53,11 @@ -include_lib("emqtt/include/emqtt_packet.hrl"). -%% Session Managenent APIs --export([start/1, - resume/3, - destroy/2]). +%% Start gen_server +-export([start_link/2, resume/3, destroy/2]). + +%% Init Session State +-export([new/1]). %% PubSub APIs -export([publish/3, @@ -66,10 +67,17 @@ await/2, dispatch/2]). +%% gen_server Function Exports +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + -record(session, { %% ClientId: Identifier of Session clientid :: binary(), + %% Clean Session Flag + clean_sess = true, + %% Client Pid linked with session client_pid :: pid(), @@ -111,63 +119,103 @@ %% Awaiting PUBREL timeout await_rel_timeout = 8, - %% session expired after 48 hours - sess_expired_after = 172800, + %% Max Packets that Awaiting PUBREL + max_awaiting_rel = 100, - sess_expired_timer, + %% session expired after 48 hours + expired_after = 172800, + + expired_timer, timestamp}). -type session() :: #session{}. --export_type([session/0]). - --define(SESSION(Sess), is_record(Sess, session)). - %%%============================================================================= %%% Session API %%%============================================================================= %%------------------------------------------------------------------------------ -%% @doc Start Session +%% @doc Start a session process. %% @end %%------------------------------------------------------------------------------ --spec start({boolean(), binary(), pid()}) -> {ok, session()}. -start({true = _CleanSess, ClientId, _ClientPid}) -> - %%Destroy old session if CleanSess is true before. - ok = emqttd_sm:destroy_session(ClientId), - {ok, initial_state(ClientId)}. +start_link(ClientId, ClientPid) -> + gen_server:start_link(?MODULE, [ClientId, ClientPid], []). %%------------------------------------------------------------------------------ -%% @doc Resume Session +%% @doc Resume a session. %% @end %%------------------------------------------------------------------------------ --spec resume(session(), binary(), pid()) -> session(). -resume(Session = #session{}, _ClientId, _ClientPid) -> - Session. +resume(Session, _ClientId, _ClientPid) when is_record(Session, session) -> + Session; +resume(SessPid, ClientId, ClientPid) when is_pid(SessPid) -> + gen_server:cast(SessPid, {resume, ClientId, ClientPid}), SessPid. + +%%------------------------------------------------------------------------------ +%% @doc Destroy a session. +%% @end +%%------------------------------------------------------------------------------ +-spec destroy(SessPid :: pid(), ClientId :: binary()) -> ok. +destroy(SessPid, ClientId) when is_pid(SessPid) -> + gen_server:cast(SessPid, {destroy, ClientId}), SessPid. + +%%------------------------------------------------------------------------------ +%% @doc Init Session State. +%% @end +%%------------------------------------------------------------------------------ +-spec new(binary()) -> session(). +new(ClientId) -> + QEnv = emqttd:env(mqtt, queue), + SessEnv = emqttd:env(mqtt, session), + #session{ + clientid = ClientId, + clean_sess = true, + subscriptions = [], + inflight_window = emqttd_mqwin:new(ClientId, QEnv), + pending_queue = emqttd_mqueue:new(ClientId, QEnv), + awaiting_rel = #{}, + awaiting_ack = #{}, + awaiting_comp = #{}, + unack_retries = emqttd_opts:g(unack_retries, SessEnv), + unack_timeout = emqttd_opts:g(unack_timeout, SessEnv), + await_rel_timeout = emqttd_opts:g(await_rel_timeout, SessEnv), + max_awaiting_rel = emqttd_opts:g(max_awaiting_rel, SessEnv), + expired_after = emqttd_opts:g(expired_after, SessEnv) * 3600 + }. %%------------------------------------------------------------------------------ %% @doc Publish message %% @end %%------------------------------------------------------------------------------ --spec publish(session(), mqtt_clientid(), {mqtt_qos(), mqtt_message()}) -> session(). +-spec publish(session() | pid(), mqtt_clientid(), {mqtt_qos(), mqtt_message()}) -> session() | pid(). publish(Session, ClientId, {?QOS_0, Message}) -> + %% publish qos0 directly emqttd_pubsub:publish(ClientId, Message), Session; publish(Session, ClientId, {?QOS_1, Message}) -> + %% publish qos1 directly, and client will puback emqttd_pubsub:publish(ClientId, Message), Session; -publish(Session = #session{awaiting_rel = AwaitingRel, - await_rel_timeout = Timeout}, _ClientId, +publish(Session = #session{awaiting_rel = AwaitingRel, + await_rel_timeout = Timeout, + max_awaiting_rel = MaxLen}, ClientId, {?QOS_2, Message = #mqtt_message{msgid = MsgId}}) -> - %% store in awaiting_rel - TRef = erlang:send_after(Timeout * 1000, self(), {timeout, awaiting_rel, MsgId}), - Session#session{awaiting_rel = maps:put(MsgId, {Message, TRef}, AwaitingRel)}. + case maps:size(AwaitingRel) >= MaxLen of + true -> lager:error([{clientid, ClientId}], "Session ~s " + " dropped Qos2 message for too many awaiting_rel: ~p", [ClientId, Message]); + false -> + %% store in awaiting_rel + TRef = erlang:send_after(Timeout * 1000, self(), {timeout, awaiting_rel, MsgId}), + Session#session{awaiting_rel = maps:put(MsgId, {Message, TRef}, AwaitingRel)}; + end; +publish(SessPid, ClientId, {?QOS_2, Message}) when is_pid(SessPid) -> + gen_server:cast(SessPid, {publish, ClientId, {?QOS_2, Message}}), SessPid. %%------------------------------------------------------------------------------ %% @doc PubAck message %% @end %%------------------------------------------------------------------------------ + -spec puback(session(), {mqtt_packet_type(), mqtt_packet_id()}) -> session(). puback(Session = #session{clientid = ClientId, awaiting_ack = Awaiting}, {?PUBACK, PacketId}) -> case maps:is_key(PacketId, Awaiting) of @@ -176,6 +224,9 @@ puback(Session = #session{clientid = ClientId, awaiting_ack = Awaiting}, {?PUBAC end, Session#session{awaiting_ack = maps:remove(PacketId, Awaiting)}; +puback(SessPid, {?PUBACK, PacketId}) when is_pid(SessPid) -> + gen_server:cast(SessPid, {puback, PacketId}); + %% PUBREC puback(Session = #session{clientid = ClientId, awaiting_ack = AwaitingAck, @@ -187,18 +238,23 @@ puback(Session = #session{clientid = ClientId, Session#session{awaiting_ack = maps:remove(PacketId, AwaitingAck), awaiting_comp = maps:put(PacketId, true, AwaitingComp)}; +puback(SessPid, {?PUBREC, PacketId}) when is_pid(SessPid) -> + gen_server:cast(SessPid, {pubrec, PacketId}), SessPid; + %% PUBREL -puback(Session = #session{clientid = ClientId, - awaiting_rel = Awaiting}, {?PUBREL, PacketId}) -> +puback(Session = #session{clientid = ClientId, awaiting_rel = Awaiting}, {?PUBREL, PacketId}) -> case maps:find(PacketId, Awaiting) of {ok, {Msg, TRef}} -> catch erlang:cancel_timer(TRef), emqttd_pubsub:publish(ClientId, Msg); error -> - lager:error("Session ~s PUBREL PacketId '~p' not found!", [ClientId, PacketId]) + lager:error("Session ~s cannot find PUBREL PacketId '~p'!", [ClientId, PacketId]) end, Session#session{awaiting_rel = maps:remove(PacketId, Awaiting)}; +puback(SessPid, {?PUBREL, PacketId}) when is_pid(SessPid) -> + cast(SessPid, {pubrel, PacketId}); + %% PUBCOMP puback(Session = #session{clientid = ClientId, awaiting_comp = AwaitingComp}, {?PUBCOMP, PacketId}) -> @@ -208,6 +264,9 @@ puback(Session = #session{clientid = ClientId, end, Session#session{awaiting_comp = maps:remove(PacketId, AwaitingComp)}; +puback(SessPid, {?PUBCOMP, PacketId}) when is_pid(SessPid) -> + cast(SessPid, {pubcomp, PacketId}). + timeout(awaiting_rel, MsgId, Session = #session{clientid = ClientId, awaiting_rel = Awaiting}) -> case maps:find(MsgId, Awaiting) of {ok, {Msg, _TRef}} -> @@ -222,7 +281,7 @@ timeout(awaiting_rel, MsgId, Session = #session{clientid = ClientId, awaiting_re %% @doc Subscribe Topics %% @end %%------------------------------------------------------------------------------ --spec subscribe(session(), [{binary(), mqtt_qos()}]) -> {ok, session(), [mqtt_qos()]}. +-spec subscribe(session() | pid(), [{binary(), mqtt_qos()}]) -> {ok, session() | pid(), [mqtt_qos()]}. subscribe(Session = #session{clientid = ClientId, subscriptions = Subscriptions}, Topics) -> %% subscribe first and don't care if the subscriptions have been existed @@ -252,11 +311,15 @@ subscribe(Session = #session{clientid = ClientId, subscriptions = Subscriptions} {ok, Session#session{subscriptions = Subscriptions1}, GrantedQos}; +subscribe(SessPid, Topics) when is_pid(SessPid) -> + {ok, GrantedQos} = gen_server:call(SessPid, {subscribe, Topics}), + {ok, SessPid, GrantedQos}. + %%------------------------------------------------------------------------------ %% @doc Unsubscribe Topics %% @end %%------------------------------------------------------------------------------ --spec unsubscribe(session(), [binary()]) -> {ok, session()}. +-spec unsubscribe(session() | pid(), [binary()]) -> {ok, session() | pid()}. unsubscribe(Session = #session{clientid = ClientId, subscriptions = Subscriptions}, Topics) -> %%unsubscribe from topic tree @@ -275,6 +338,10 @@ unsubscribe(Session = #session{clientid = ClientId, subscriptions = Subscription {ok, Session#session{subscriptions = Subscriptions1}}; +unsubscribe(SessPid, Topics) when is_pid(SessPid) -> + gen_server:call(SessPid, {unsubscribe, Topics}), + {ok, SessPid}. + %%------------------------------------------------------------------------------ %% @doc Destroy Session %% @end @@ -306,19 +373,140 @@ await_ack(Message = #mqtt_message{qos = Qos}, Session = #session{message_id = Ms Awaiting1 = maps:put(MsgId, Message2, Awaiting), {Message1, next_msgid(Session#session{awaiting_ack = Awaiting1})}. -initial_state(ClientId) -> - %%TODO: init session options. - #session{clientid = ClientId, - subscriptions = [], - inflight_queue = [], - awaiting_queue = [], - awaiting_ack = #{}, - awaiting_rel = #{}, - awaiting_comp = #{}}. -initial_state(ClientId, ClientPid) -> - State = initial_state(ClientId), - State#session{client_pid = ClientPid}. +%%%============================================================================= +%%% gen_server callbacks +%%%============================================================================= +init([ClientId, ClientPid]) -> + process_flag(trap_exit, true), + true = link(ClientPid), + Session = emqttd_session:new(ClientId), + {ok, Session#session{clean_sess = false, + client_pid = ClientPid, + timestamp = os:timestamp()}, hibernate}. + + +handle_call({subscribe, Topics}, _From, Session) -> + {ok, NewSession, GrantedQos} = subscribe(Session, Topics), + {reply, {ok, GrantedQos}, NewSession}; + +handle_call({unsubscribe, Topics}, _From, Session) -> + {ok, NewSession} = unsubscribe(Session, Topics), + {reply, ok, NewSession}; + +handle_call(Req, _From, State) -> + lager:error("Unexpected Request: ~p", [Req]), + {reply, {error, badreq}, State}. + +handle_cast({resume, ClientId, ClientPid}, State = #session{ + clientid = ClientId, + client_pid = OldClientPid, + msg_queue = Queue, + awaiting_ack = AwaitingAck, + awaiting_comp = AwaitingComp, + expire_timer = ETimer}) -> + + lager:info([{client, ClientId}], "Session ~s resumed by ~p",[ClientId, ClientPid]), + + %% kick old client... + if + OldClientPid =:= undefined -> + ok; + OldClientPid =:= ClientPid -> + ok; + true -> + lager:error("Session '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, ClientPid, OldClientPid]), + unlink(OldClientPid), + OldClientPid ! {stop, duplicate_id, ClientPid} + end, + + %% cancel timeout timer + emqttd_util:cancel_timer(ETimer), + + %% redelivery PUBREL + lists:foreach(fun(PacketId) -> + ClientPid ! {redeliver, {?PUBREL, PacketId}} + end, maps:keys(AwaitingComp)), + + %% redelivery messages that awaiting PUBACK or PUBREC + Dup = fun(Msg) -> Msg#mqtt_message{dup = true} end, + lists:foreach(fun(Msg) -> + ClientPid ! {dispatch, {self(), Dup(Msg)}} + end, maps:values(AwaitingAck)), + + %% send offline messages + lists:foreach(fun(Msg) -> + ClientPid ! {dispatch, {self(), Msg}} + end, emqttd_queue:all(Queue)), + + {noreply, State#session{client_pid = ClientPid, + msg_queue = emqttd_queue:clear(Queue), + expire_timer = undefined}, hibernate}; + +handle_cast({publish, ClientId, {?QOS_2, Message}}, State) -> + NewState = publish(State, ClientId, {?QOS_2, Message}), + {noreply, NewState}; + +handle_cast({puback, PacketId}, State) -> + NewState = puback(State, {?PUBACK, PacketId}), + {noreply, NewState}; + +handle_cast({pubrec, PacketId}, State) -> + NewState = puback(State, {?PUBREC, PacketId}), + {noreply, NewState}; + +handle_cast({pubrel, PacketId}, State) -> + NewState = puback(State, {?PUBREL, PacketId}), + {noreply, NewState}; + +handle_cast({pubcomp, PacketId}, State) -> + NewState = puback(State, {?PUBCOMP, PacketId}), + {noreply, NewState}; + +handle_cast({destroy, ClientId}, State = #session{clientid = ClientId}) -> + lager:warning("Session ~s destroyed", [ClientId]), + {stop, normal, State}; + +handle_cast(Msg, State) -> + lager:critical("Unexpected Msg: ~p, State: ~p", [Msg, State]), + {noreply, State}. + +handle_info({dispatch, {_From, Messages}}, State) when is_list(Messages) -> + F = fun(Message, S) -> dispatch(Message, S) end, + {noreply, lists:foldl(F, State, Messages)}; + +handle_info({dispatch, {_From, Message}}, State) -> + {noreply, dispatch(Message, State)}; + +handle_info({'EXIT', ClientPid, Reason}, State = #session{clientid = ClientId, + client_pid = ClientPid}) -> + lager:info("Session: client ~s@~p exited for ~p", [ClientId, ClientPid, Reason]), + {noreply, start_expire_timer(State#session{client_pid = undefined})}; + +handle_info({'EXIT', ClientPid0, _Reason}, State = #session{client_pid = ClientPid}) -> + lager:error("Unexpected Client EXIT: pid=~p, pid(state): ~p", [ClientPid0, ClientPid]), + {noreply, State}; + +handle_info(session_expired, State = #session{clientid = ClientId}) -> + lager:warning("Session ~s expired!", [ClientId]), + {stop, {shutdown, expired}, State}; + +handle_info({timeout, awaiting_rel, MsgId}, SessState) -> + NewState = timeout(awaiting_rel, MsgId, SessState), + {noreply, NewState}; + +handle_info(Info, State) -> + lager:critical("Unexpected Info: ~p, State: ~p", [Info, State]), + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + + %%%============================================================================= %%% Internal functions diff --git a/apps/emqttd/src/emqttd_session_proc.erl b/apps/emqttd/src/emqttd_session_proc.erl deleted file mode 100644 index 04d65563b..000000000 --- a/apps/emqttd/src/emqttd_session_proc.erl +++ /dev/null @@ -1,255 +0,0 @@ -%%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. -%%% -%%% Permission is hereby granted, free of charge, to any person obtaining a copy -%%% of this software and associated documentation files (the "Software"), to deal -%%% in the Software without restriction, including without limitation the rights -%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -%%% copies of the Software, and to permit persons to whom the Software is -%%% furnished to do so, subject to the following conditions: -%%% -%%% The above copyright notice and this permission notice shall be included in all -%%% copies or substantial portions of the Software. -%%% -%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -%%% SOFTWARE. -%%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd session process of persistent client. -%%% -%%% @end -%%%----------------------------------------------------------------------------- - --module(emqttd_session_proc). - --author("Feng Lee "). - --include("emqttd.hrl"). - --include_lib("emqtt/include/emqtt.hrl"). - --include_lib("emqtt/include/emqtt_packet.hrl"). - -%% Start gen_server --export([start_link/2]). - -%% gen_server Function Exports --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - -%% Refactor this API. -start({false = _CleanSess, ClientId, ClientPid}) -> - {ok, SessPid} = emqttd_sm:start_session(ClientId, ClientPid), - {ok, SessPid}. - -%%------------------------------------------------------------------------------ -%% @doc Start a session process. -%% @end -%%------------------------------------------------------------------------------ -start_link(ClientId, ClientPid) -> - gen_server:start_link(?MODULE, [ClientId, ClientPid], []). - -resume(SessProc, ClientId, ClientPid) when is_pid(SessProc) -> - cast(SessProc, {resume, ClientId, ClientPid}). - --spec publish(pid(), mqtt_clientid(), {mqtt_qos(), mqtt_message()}) -> pid(). -publish(SessProc, ClientId, {?QOS_0, Message}) when is_pid(SessProc) -> - emqttd_pubsub:publish(ClientId, Message), Session; - -publish(SessProc, ClientId, {?QOS_1, Message}) when is_pid(SessProc) -> - emqttd_pubsub:publish(ClientId, Message), Session; - -publish(SessProc, ClientId, {?QOS_2, Message}) when is_pid(SessProc) -> - cast(SessProc, {publish, ClientId, {?QOS_2, Message}}). - -puback(SessProc, {?PUBACK, PacketId}) when is_pid(SessProc) -> - cast(SessProc, {puback, PacketId}). - -puback(SessProc, {?PUBREL, PacketId}) when is_pid(SessProc) -> - cast(SessPid, {pubrel, PacketId}). - -puback(SessPid, {?PUBCOMP, PacketId}) when is_pid(SessPid) -> - cast(SessPid, {pubcomp, PacketId}). - -subscribe(SessPid, Topics) when is_pid(SessPid) -> - {ok, GrantedQos} = gen_server:call(SessPid, {subscribe, Topics}), - {ok, SessPid, GrantedQos}. - -unsubscribe(SessPid, Topics) when is_pid(SessPid) -> - gen_server:call(SessPid, {unsubscribe, Topics}), - {ok, SessPid}. - --spec destroy(SessPid :: pid(), ClientId :: binary()) -> ok. -destroy(SessPid, ClientId) when is_pid(SessPid) -> - gen_server:cast(SessPid, {destroy, ClientId}). - -cast(SessProc, Msg) -> - gen_server:cast(SessProc, Msg), SessProc. - - -%%%============================================================================= -%%% gen_server callbacks -%%%============================================================================= - -init([ClientId, ClientPid]) -> - process_flag(trap_exit, true), - true = link(ClientPid), - State = initial_state(ClientId, ClientPid), - MQueue = emqttd_mqueue:new(ClientId, emqttd:env(mqtt, queue)), - State1 = State#session{pending_queue = MQueue, - timestamp = os:timestamp()}, - {ok, init(emqttd:env(mqtt, session), State1), hibernate}. - -init([], State) -> - State; - -%% Session expired after hours -init([{expired_after, Hours} | Opts], State) -> - init(Opts, State#session{sess_expired_after = Hours * 3600}); - -%% Max number of QoS 1 and 2 messages that can be “inflight” at one time. -init([{max_inflight_messages, MaxInflight} | Opts], State) -> - init(Opts, State#session{inflight_window = MaxInflight}); - -%% Max retries for unacknolege Qos1/2 messages -init([{max_unack_retries, Retries} | Opts], State) -> - init(Opts, State#session{max_unack_retries = Retries}); - -%% Retry after 4, 8, 16 seconds -init([{unack_retry_after, Secs} | Opts], State) -> - init(Opts, State#session{unack_retry_after = Secs}); - -%% Awaiting PUBREL timeout -init([{await_rel_timeout, Secs} | Opts], State) -> - init(Opts, State#session{await_rel_timeout = Secs}); - -init([Opt | Opts], State) -> - lager:error("Bad Session Option: ~p", [Opt]), - init(Opts, State). - -handle_call({subscribe, Topics}, _From, State) -> - {ok, NewState, GrantedQos} = subscribe(State, Topics), - {reply, {ok, GrantedQos}, NewState}; - -handle_call({unsubscribe, Topics}, _From, State) -> - {ok, NewState} = unsubscribe(State, Topics), - {reply, ok, NewState}; - -handle_call(Req, _From, State) -> - lager:error("Unexpected request: ~p", [Req]), - {reply, error, State}. - -handle_cast({resume, ClientId, ClientPid}, State = #session{ - clientid = ClientId, - client_pid = OldClientPid, - msg_queue = Queue, - awaiting_ack = AwaitingAck, - awaiting_comp = AwaitingComp, - expire_timer = ETimer}) -> - - lager:info([{client, ClientId}], "Session ~s resumed by ~p",[ClientId, ClientPid]), - - %% kick old client... - if - OldClientPid =:= undefined -> - ok; - OldClientPid =:= ClientPid -> - ok; - true -> - lager:error("Session '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, ClientPid, OldClientPid]), - unlink(OldClientPid), - OldClientPid ! {stop, duplicate_id, ClientPid} - end, - - %% cancel timeout timer - emqttd_util:cancel_timer(ETimer), - - %% redelivery PUBREL - lists:foreach(fun(PacketId) -> - ClientPid ! {redeliver, {?PUBREL, PacketId}} - end, maps:keys(AwaitingComp)), - - %% redelivery messages that awaiting PUBACK or PUBREC - Dup = fun(Msg) -> Msg#mqtt_message{dup = true} end, - lists:foreach(fun(Msg) -> - ClientPid ! {dispatch, {self(), Dup(Msg)}} - end, maps:values(AwaitingAck)), - - %% send offline messages - lists:foreach(fun(Msg) -> - ClientPid ! {dispatch, {self(), Msg}} - end, emqttd_queue:all(Queue)), - - {noreply, State#session{client_pid = ClientPid, - msg_queue = emqttd_queue:clear(Queue), - expire_timer = undefined}, hibernate}; - - -handle_cast({publish, ClientId, {?QOS_2, Message}}, State) -> - NewState = publish(State, ClientId, {?QOS_2, Message}), - {noreply, NewState}; - -handle_cast({puback, PacketId}, State) -> - NewState = puback(State, {?PUBACK, PacketId}), - {noreply, NewState}; - -handle_cast({pubrec, PacketId}, State) -> - NewState = puback(State, {?PUBREC, PacketId}), - {noreply, NewState}; - -handle_cast({pubrel, PacketId}, State) -> - NewState = puback(State, {?PUBREL, PacketId}), - {noreply, NewState}; - -handle_cast({pubcomp, PacketId}, State) -> - NewState = puback(State, {?PUBCOMP, PacketId}), - {noreply, NewState}; - -handle_cast({destroy, ClientId}, State = #session{clientid = ClientId}) -> - lager:warning("Session ~s destroyed", [ClientId]), - {stop, normal, State}; - -handle_cast(Msg, State) -> - lager:critical("Unexpected Msg: ~p, State: ~p", [Msg, State]), - {noreply, State}. - -handle_info({dispatch, {_From, Messages}}, State) when is_list(Messages) -> - F = fun(Message, S) -> dispatch(Message, S) end, - {noreply, lists:foldl(F, State, Messages)}; - -handle_info({dispatch, {_From, Message}}, State) -> - {noreply, dispatch(Message, State)}; - -handle_info({'EXIT', ClientPid, Reason}, State = #session{clientid = ClientId, - client_pid = ClientPid}) -> - lager:info("Session: client ~s@~p exited for ~p", [ClientId, ClientPid, Reason]), - {noreply, start_expire_timer(State#session{client_pid = undefined})}; - -handle_info({'EXIT', ClientPid0, _Reason}, State = #session{client_pid = ClientPid}) -> - lager:error("Unexpected Client EXIT: pid=~p, pid(state): ~p", [ClientPid0, ClientPid]), - {noreply, State}; - -handle_info(session_expired, State = #session{clientid = ClientId}) -> - lager:warning("Session ~s expired!", [ClientId]), - {stop, {shutdown, expired}, State}; - -handle_info({timeout, awaiting_rel, MsgId}, SessState) -> - NewState = timeout(awaiting_rel, MsgId, SessState), - {noreply, NewState}; - -handle_info(Info, State) -> - lager:critical("Unexpected Info: ~p, State: ~p", [Info, State]), - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - - diff --git a/apps/emqttd/src/emqttd_session_sup.erl b/apps/emqttd/src/emqttd_session_sup.erl index 7ab0cc18a..d4f790847 100644 --- a/apps/emqttd/src/emqttd_session_sup.erl +++ b/apps/emqttd/src/emqttd_session_sup.erl @@ -56,6 +56,6 @@ start_session(ClientId, ClientPid) -> init([]) -> {ok, {{simple_one_for_one, 10, 10}, - [{session, {emqttd_session, start_link, []}, - transient, 10000, worker, [emqttd_session]}]}}. + [{session, {emqttd_session_proc, start_link, []}, + transient, 10000, worker, [emqttd_session_proc]}]}}. diff --git a/apps/emqttd/src/emqttd_sm.erl b/apps/emqttd/src/emqttd_sm.erl index a454d6fd9..56c0ea9f9 100644 --- a/apps/emqttd/src/emqttd_sm.erl +++ b/apps/emqttd/src/emqttd_sm.erl @@ -38,8 +38,6 @@ -author("Feng Lee "). -%%cleanSess: true | false - -include("emqttd.hrl"). -behaviour(gen_server). @@ -55,7 +53,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {id, tabid, statsfun}). +-record(state, {id, statsfun}). -define(SM_POOL, sm_pool). @@ -91,10 +89,21 @@ table() -> ?SESSION_TAB. %% @doc Start a session %% @end %%------------------------------------------------------------------------------ --spec start_session(binary(), pid()) -> {ok, pid()} | {error, any()}. -start_session(ClientId, ClientPid) -> + +-spec start_session(CleanSess :: boolean(), binary()) -> {ok, module(), record() | pid()}. +start_session(true, ClientId) -> + %% destroy old session if existed + ok = destroy_session(ClientId), + {ok, emqttd_session, emqttd_session:new(ClientId)}; + +start_session(false, ClientId) -> SmPid = gproc_pool:pick_worker(?SM_POOL, ClientId), - gen_server:call(SmPid, {start_session, ClientId, ClientPid}). + case call(SmPid, {start_session, ClientId, self()}) of + {ok, SessPid} -> + {ok, emqttd_session_proc, SessPid}; + {error, Error} -> + {error, Error} + end. %%------------------------------------------------------------------------------ %% @doc Lookup Session Pid @@ -102,7 +111,7 @@ start_session(ClientId, ClientPid) -> %%------------------------------------------------------------------------------ -spec lookup_session(binary()) -> pid() | undefined. lookup_session(ClientId) -> - case ets:lookup(emqttd_sm_sup:table(), ClientId) of + case ets:lookup(?SESSION_TAB, ClientId) of [{_, SessPid, _}] -> SessPid; [] -> undefined end. @@ -114,7 +123,9 @@ lookup_session(ClientId) -> -spec destroy_session(binary()) -> ok. destroy_session(ClientId) -> SmPid = gproc_pool:pick_worker(?SM_POOL, ClientId), - gen_server:call(SmPid, {destroy_session, ClientId}). + call(SmPid, {destroy_session, ClientId}). + +call(SmPid, Req) -> gen_server:call(SmPid, Req). %%%============================================================================= %%% gen_server callbacks @@ -128,11 +139,11 @@ handle_call({start_session, ClientId, ClientPid}, _From, State) -> Reply = case ets:lookup(?SESSION_TAB, ClientId) of [{_, SessPid, _MRef}] -> - emqttd_session:resume(SessPid, ClientId, ClientPid), + emqttd_session_proc:resume(SessPid, ClientId, ClientPid), {ok, SessPid}; [] -> case emqttd_session_sup:start_session(ClientId, ClientPid) of - {ok, SessPid} -> + {ok, SessPid} -> ets:insert(?SESSION_TAB, {ClientId, SessPid, erlang:monitor(process, SessPid)}), {ok, SessPid}; {error, Error} -> @@ -158,8 +169,8 @@ handle_call(_Request, _From, State) -> handle_cast(_Msg, State) -> {noreply, State}. -handle_info({'DOWN', MRef, process, DownPid, _Reason}, State = #state{tabid = Tab}) -> - ets:match_delete(Tab, {'_', DownPid, MRef}), +handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> + ets:match_delete(?SESSION_TAB, {'_', DownPid, MRef}), {noreply, setstats(State)}; handle_info(_Info, State) -> diff --git a/rel/files/emqttd.config b/rel/files/emqttd.config index 2df163bae..9995ddea1 100644 --- a/rel/files/emqttd.config +++ b/rel/files/emqttd.config @@ -89,12 +89,18 @@ {session, [ %% Expired after 2 days {expired_after, 48}, - %% Max retries for unacknolege Qos1/2 messages - {max_unack_retries, 3}, + + %% Max retries for unack Qos1/2 messages + {unack_retries, 3}, + %% Retry after 4, 8, 16 seconds - {unack_retry_after, 4}, - %% Awaiting PUBREL timeout - {await_rel_timeout, 8} + {unack_timeout, 4}, + + %% Awaiting PUBREL Timeout + {await_rel_timeout, 8}, + + %% Max Packets that Awaiting PUBREL + {max_awaiting_rel, 100} ]}, {queue, [ %% Max queue length