From 5039b751f4c4c42c27aa184c16ec09715286192b Mon Sep 17 00:00:00 2001 From: terry-xiaoyu <506895667@qq.com> Date: Tue, 17 Apr 2018 16:52:05 +0800 Subject: [PATCH] clean dead persistent session on connect --- src/emqttd_packet.erl | 12 ++++++------ src/emqttd_protocol.erl | 16 ++++++++-------- src/emqttd_sm.erl | 21 +++++++++++---------- 3 files changed, 25 insertions(+), 24 deletions(-) diff --git a/src/emqttd_packet.erl b/src/emqttd_packet.erl index f269f3dbe..d909089d1 100644 --- a/src/emqttd_packet.erl +++ b/src/emqttd_packet.erl @@ -56,7 +56,7 @@ format_header(#mqtt_packet_header{type = Type, dup = Dup, qos = QoS, retain = Retain}, S) -> - S1 = if + S1 = if S == undefined -> <<>>; true -> [", ", S] end, @@ -78,13 +78,13 @@ format_variable(#mqtt_packet_connect{ clean_sess = CleanSess, keep_alive = KeepAlive, client_id = ClientId, - will_topic = WillTopic, - will_msg = WillMsg, - username = Username, + will_topic = WillTopic, + will_msg = WillMsg, + username = Username, password = Password}) -> Format = "ClientId=~s, ProtoName=~s, ProtoVsn=~p, CleanSess=~s, KeepAlive=~p, Username=~s, Password=~s", Args = [ClientId, ProtoName, ProtoVer, CleanSess, KeepAlive, Username, format_password(Password)], - {Format1, Args1} = if + {Format1, Args1} = if WillFlag -> { Format ++ ", Will(Q~p, R~p, Topic=~s, Msg=~s)", Args ++ [WillQoS, i(WillRetain), WillTopic, WillMsg] }; true -> {Format, Args} @@ -93,7 +93,7 @@ format_variable(#mqtt_packet_connect{ format_variable(#mqtt_packet_connack{ack_flags = AckFlags, return_code = ReturnCode}) -> - io_lib:format("AckFlags=~p, RetainCode=~p", [AckFlags, ReturnCode]); + io_lib:format("AckFlags=~p, ReturnCode=~p", [AckFlags, ReturnCode]); format_variable(#mqtt_packet_publish{topic_name = TopicName, packet_id = PacketId}) -> diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index 3531b165f..ea6a11ebf 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -137,7 +137,7 @@ session(#proto_state{session = Session}) -> %% CONNECT – Client requests a connection to a Server -%% A Client can only send the CONNECT Packet once over a Network Connection. +%% A Client can only send the CONNECT Packet once over a Network Connection. -spec(received(mqtt_packet(), proto_state()) -> {ok, proto_state()} | {error, term()}). received(Packet = ?PACKET(?CONNECT), State = #proto_state{connected = false, stats_data = Stats}) -> @@ -228,7 +228,8 @@ process(?CONNECT_PACKET(Var), State0) -> %% ACCEPT {?CONNACK_ACCEPT, SP, State2#proto_state{session = Session, is_superuser = IsSuperuser}}; {error, Error} -> - {stop, {shutdown, Error}, State2} + ?LOG(error, "Username '~s' login failed for ~p", [Username, Error], State2), + {?CONNACK_SERVER, false, State2} end; {error, Reason}-> ?LOG(error, "Username '~s' login failed for ~p", [Username, Reason], State1), @@ -449,14 +450,14 @@ start_keepalive(Sec, #proto_state{keepalive_backoff = Backoff}) when Sec > 0 -> validate_connect(Connect = #mqtt_packet_connect{}, ProtoState) -> case validate_protocol(Connect) of - true -> + true -> case validate_clientid(Connect, ProtoState) of - true -> + true -> ?CONNACK_ACCEPT; - false -> + false -> ?CONNACK_INVALID_ID end; - false -> + false -> ?CONNACK_PROTO_VER end. @@ -498,7 +499,7 @@ validate_packet(?SUBSCRIBE_PACKET(_PacketId, TopicTable)) -> validate_packet(?UNSUBSCRIBE_PACKET(_PacketId, Topics)) -> validate_topics(filter, Topics); -validate_packet(_Packet) -> +validate_packet(_Packet) -> ok. validate_topics(_Type, []) -> @@ -593,4 +594,3 @@ unmount(MountPoint, Msg = #mqtt_message{topic = Topic}) -> {MountPoint, Topic0} -> Msg#mqtt_message{topic = Topic0}; _ -> Msg end. - diff --git a/src/emqttd_sm.erl b/src/emqttd_sm.erl index e2e332041..5d9b900c3 100644 --- a/src/emqttd_sm.erl +++ b/src/emqttd_sm.erl @@ -183,15 +183,16 @@ handle_cast(Msg, State) -> handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> case dict:find(MRef, State#state.monitors) of {ok, ClientId} -> - case mnesia:dirty_read({mqtt_session, ClientId}) of - [] -> - ok; - [Sess = #mqtt_session{sess_pid = DownPid}] -> - mnesia:dirty_delete_object(Sess); - [_Sess] -> - ok - end, - {noreply, erase_monitor(MRef, State), hibernate}; + NewState = + case mnesia:dirty_read({mqtt_session, ClientId}) of + [] -> State; + [Sess = #mqtt_session{sess_pid = DownPid}] -> + mnesia:dirty_delete_object(Sess), + erase_monitor(MRef, State); + [_Sess] -> + State + end, + {noreply, NewState, hibernate}; error -> lager:error("MRef of session ~p not found", [DownPid]), {noreply, State} @@ -256,6 +257,7 @@ resume_session(Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid} {ok, SessPid}; false -> ?LOG(error, "Cannot resume ~p which seems already dead!", [SessPid], Session), + remove_session(Session), {error, session_died} end; @@ -305,4 +307,3 @@ monitor_session(ClientId, SessPid, State = #state{monitors = Monitors}) -> erase_monitor(MRef, State = #state{monitors = Monitors}) -> erlang:demonitor(MRef, [flush]), State#state{monitors = dict:erase(MRef, Monitors)}. -