Merge pull request #1567 from terry-xiaoyu/clean_dead_session

clean dead persistent session on connect
This commit is contained in:
turtleDeng 2018-04-21 09:58:40 +08:00 committed by GitHub
commit 2ee18ddebc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 25 additions and 24 deletions

View File

@ -56,7 +56,7 @@ format_header(#mqtt_packet_header{type = Type,
dup = Dup, dup = Dup,
qos = QoS, qos = QoS,
retain = Retain}, S) -> retain = Retain}, S) ->
S1 = if S1 = if
S == undefined -> <<>>; S == undefined -> <<>>;
true -> [", ", S] true -> [", ", S]
end, end,
@ -78,13 +78,13 @@ format_variable(#mqtt_packet_connect{
clean_sess = CleanSess, clean_sess = CleanSess,
keep_alive = KeepAlive, keep_alive = KeepAlive,
client_id = ClientId, client_id = ClientId,
will_topic = WillTopic, will_topic = WillTopic,
will_msg = WillMsg, will_msg = WillMsg,
username = Username, username = Username,
password = Password}) -> password = Password}) ->
Format = "ClientId=~s, ProtoName=~s, ProtoVsn=~p, CleanSess=~s, KeepAlive=~p, Username=~s, Password=~s", Format = "ClientId=~s, ProtoName=~s, ProtoVsn=~p, CleanSess=~s, KeepAlive=~p, Username=~s, Password=~s",
Args = [ClientId, ProtoName, ProtoVer, CleanSess, KeepAlive, Username, format_password(Password)], Args = [ClientId, ProtoName, ProtoVer, CleanSess, KeepAlive, Username, format_password(Password)],
{Format1, Args1} = if {Format1, Args1} = if
WillFlag -> { Format ++ ", Will(Q~p, R~p, Topic=~s, Msg=~s)", WillFlag -> { Format ++ ", Will(Q~p, R~p, Topic=~s, Msg=~s)",
Args ++ [WillQoS, i(WillRetain), WillTopic, WillMsg] }; Args ++ [WillQoS, i(WillRetain), WillTopic, WillMsg] };
true -> {Format, Args} true -> {Format, Args}
@ -93,7 +93,7 @@ format_variable(#mqtt_packet_connect{
format_variable(#mqtt_packet_connack{ack_flags = AckFlags, format_variable(#mqtt_packet_connack{ack_flags = AckFlags,
return_code = ReturnCode}) -> return_code = ReturnCode}) ->
io_lib:format("AckFlags=~p, RetainCode=~p", [AckFlags, ReturnCode]); io_lib:format("AckFlags=~p, ReturnCode=~p", [AckFlags, ReturnCode]);
format_variable(#mqtt_packet_publish{topic_name = TopicName, format_variable(#mqtt_packet_publish{topic_name = TopicName,
packet_id = PacketId}) -> packet_id = PacketId}) ->

View File

@ -137,7 +137,7 @@ session(#proto_state{session = Session}) ->
%% CONNECT Client requests a connection to a Server %% CONNECT Client requests a connection to a Server
%% A Client can only send the CONNECT Packet once over a Network Connection. %% A Client can only send the CONNECT Packet once over a Network Connection.
-spec(received(mqtt_packet(), proto_state()) -> {ok, proto_state()} | {error, term()}). -spec(received(mqtt_packet(), proto_state()) -> {ok, proto_state()} | {error, term()}).
received(Packet = ?PACKET(?CONNECT), received(Packet = ?PACKET(?CONNECT),
State = #proto_state{connected = false, stats_data = Stats}) -> State = #proto_state{connected = false, stats_data = Stats}) ->
@ -228,7 +228,8 @@ process(?CONNECT_PACKET(Var), State0) ->
%% ACCEPT %% ACCEPT
{?CONNACK_ACCEPT, SP, State2#proto_state{session = Session, is_superuser = IsSuperuser}}; {?CONNACK_ACCEPT, SP, State2#proto_state{session = Session, is_superuser = IsSuperuser}};
{error, Error} -> {error, Error} ->
{stop, {shutdown, Error}, State2} ?LOG(error, "Username '~s' login failed for ~p", [Username, Error], State2),
{?CONNACK_SERVER, false, State2}
end; end;
{error, Reason}-> {error, Reason}->
?LOG(error, "Username '~s' login failed for ~p", [Username, Reason], State1), ?LOG(error, "Username '~s' login failed for ~p", [Username, Reason], State1),
@ -449,14 +450,14 @@ start_keepalive(Sec, #proto_state{keepalive_backoff = Backoff}) when Sec > 0 ->
validate_connect(Connect = #mqtt_packet_connect{}, ProtoState) -> validate_connect(Connect = #mqtt_packet_connect{}, ProtoState) ->
case validate_protocol(Connect) of case validate_protocol(Connect) of
true -> true ->
case validate_clientid(Connect, ProtoState) of case validate_clientid(Connect, ProtoState) of
true -> true ->
?CONNACK_ACCEPT; ?CONNACK_ACCEPT;
false -> false ->
?CONNACK_INVALID_ID ?CONNACK_INVALID_ID
end; end;
false -> false ->
?CONNACK_PROTO_VER ?CONNACK_PROTO_VER
end. end.
@ -498,7 +499,7 @@ validate_packet(?SUBSCRIBE_PACKET(_PacketId, TopicTable)) ->
validate_packet(?UNSUBSCRIBE_PACKET(_PacketId, Topics)) -> validate_packet(?UNSUBSCRIBE_PACKET(_PacketId, Topics)) ->
validate_topics(filter, Topics); validate_topics(filter, Topics);
validate_packet(_Packet) -> validate_packet(_Packet) ->
ok. ok.
validate_topics(_Type, []) -> validate_topics(_Type, []) ->
@ -593,4 +594,3 @@ unmount(MountPoint, Msg = #mqtt_message{topic = Topic}) ->
{MountPoint, Topic0} -> Msg#mqtt_message{topic = Topic0}; {MountPoint, Topic0} -> Msg#mqtt_message{topic = Topic0};
_ -> Msg _ -> Msg
end. end.

View File

@ -183,15 +183,16 @@ handle_cast(Msg, State) ->
handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
case dict:find(MRef, State#state.monitors) of case dict:find(MRef, State#state.monitors) of
{ok, ClientId} -> {ok, ClientId} ->
case mnesia:dirty_read({mqtt_session, ClientId}) of NewState =
[] -> case mnesia:dirty_read({mqtt_session, ClientId}) of
ok; [] -> State;
[Sess = #mqtt_session{sess_pid = DownPid}] -> [Sess = #mqtt_session{sess_pid = DownPid}] ->
mnesia:dirty_delete_object(Sess); mnesia:dirty_delete_object(Sess),
[_Sess] -> erase_monitor(MRef, State);
ok [_Sess] ->
end, State
{noreply, erase_monitor(MRef, State), hibernate}; end,
{noreply, NewState, hibernate};
error -> error ->
lager:error("MRef of session ~p not found", [DownPid]), lager:error("MRef of session ~p not found", [DownPid]),
{noreply, State} {noreply, State}
@ -256,6 +257,7 @@ resume_session(Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid}
{ok, SessPid}; {ok, SessPid};
false -> false ->
?LOG(error, "Cannot resume ~p which seems already dead!", [SessPid], Session), ?LOG(error, "Cannot resume ~p which seems already dead!", [SessPid], Session),
remove_session(Session),
{error, session_died} {error, session_died}
end; end;
@ -305,4 +307,3 @@ monitor_session(ClientId, SessPid, State = #state{monitors = Monitors}) ->
erase_monitor(MRef, State = #state{monitors = Monitors}) -> erase_monitor(MRef, State = #state{monitors = Monitors}) ->
erlang:demonitor(MRef, [flush]), erlang:demonitor(MRef, [flush]),
State#state{monitors = dict:erase(MRef, Monitors)}. State#state{monitors = dict:erase(MRef, Monitors)}.