From 05b660ff50e8e08f01ab3b324d0d25e7254adf49 Mon Sep 17 00:00:00 2001 From: Gilbert Date: Sat, 20 Jul 2019 13:05:34 +0800 Subject: [PATCH 1/3] Unlink session when exit message has been forwarded to session (#2703) * Unlink session when exit message has been forwarded to session * Readjust possition of emqx_protocol:session --- src/emqx_ws_channel.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/src/emqx_ws_channel.erl b/src/emqx_ws_channel.erl index 096e3261f..74a5e2a71 100644 --- a/src/emqx_ws_channel.erl +++ b/src/emqx_ws_channel.erl @@ -328,6 +328,7 @@ terminate_session(Reason, ProtoState) -> undefined -> ok; SessionPid -> + unlink(SessionPid), SessionPid ! {'EXIT', self(), Reason} end. From 0c54d899dab1dfca7771a768068a497c1d01be45 Mon Sep 17 00:00:00 2001 From: terry-xiaoyu <506895667@qq.com> Date: Sat, 20 Jul 2019 13:19:13 +0800 Subject: [PATCH 2/3] Trap and handle exit in channel and ws_channel --- src/emqx_channel.erl | 9 +++++++++ src/emqx_ws_channel.erl | 8 ++++++++ 2 files changed, 17 insertions(+) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 65d34d740..0aeedcdfa 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -148,6 +148,7 @@ call(CPid, Req) -> %%-------------------------------------------------------------------- init({Transport, RawSocket, Options}) -> + process_flag(trap_exit, true), {ok, Socket} = Transport:wait(RawSocket), {ok, Peername} = Transport:ensure_ok_or_exit(peername, [Socket]), {ok, Sockname} = Transport:ensure_ok_or_exit(sockname, [Socket]), @@ -365,6 +366,14 @@ handle(info, {shutdown, conflict, {ClientId, NewPid}}, State) -> handle(info, {shutdown, Reason}, State) -> shutdown(Reason, State); +handle(info, Info = {'EXIT', SessionPid, Reason}, State = #state{proto_state = ProtoState}) -> + case emqx_protocol:session(ProtoState) of + undefined -> + ?LOG(error, "Unexpected EXIT: ~p", [Info]), + {keep_state, State}; + SessionPid -> shutdown(Reason, State) + end; + handle(info, Info, State) -> ?LOG(error, "Unexpected info: ~p", [Info]), {keep_state, State}. diff --git a/src/emqx_ws_channel.erl b/src/emqx_ws_channel.erl index 74a5e2a71..c308b687e 100644 --- a/src/emqx_ws_channel.erl +++ b/src/emqx_ws_channel.erl @@ -299,6 +299,14 @@ websocket_info({shutdown, Reason}, State) -> websocket_info({stop, Reason}, State) -> {stop, State#state{shutdown = Reason}}; +websocket_info(Info = {'EXIT', SessionPid, Reason}, State = #state{proto_state = ProtoState}) -> + case emqx_protocol:session(ProtoState) of + undefined -> + ?LOG(error, "Unexpected EXIT: ~p", [Info]), + {ok, State}; + SessionPid -> shutdown(Reason, State) + end; + websocket_info(Info, State) -> ?LOG(error, "Unexpected info: ~p", [Info]), {ok, State}. From 721e7c4804e032be3795d7a524117586bd985397 Mon Sep 17 00:00:00 2001 From: terry-xiaoyu <506895667@qq.com> Date: Sat, 20 Jul 2019 14:14:28 +0800 Subject: [PATCH 3/3] Fix session termiated without ws_channel --- src/emqx_channel.erl | 6 +++++- src/emqx_session.erl | 12 +----------- src/emqx_ws_channel.erl | 7 ++++++- 3 files changed, 12 insertions(+), 13 deletions(-) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 0aeedcdfa..5aaa62b30 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -371,7 +371,9 @@ handle(info, Info = {'EXIT', SessionPid, Reason}, State = #state{proto_state = P undefined -> ?LOG(error, "Unexpected EXIT: ~p", [Info]), {keep_state, State}; - SessionPid -> shutdown(Reason, State) + SessionPid -> + ?LOG(error, "Session ~p termiated: ~p", [SessionPid, Reason]), + shutdown(Reason, State) end; handle(info, Info, State) -> @@ -508,6 +510,8 @@ maybe_gc(_, State) -> State. reply(From, Reply, State) -> {keep_state, State, [{reply, From, Reply}]}. +shutdown(Reason = {shutdown, _}, State) -> + stop(Reason, State); shutdown(Reason, State) -> stop({shutdown, Reason}, State). diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 20028c9ef..7d23c9e62 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -652,23 +652,13 @@ handle_info(Info, State) -> terminate(Reason, #state{will_msg = WillMsg, client_id = ClientId, - username = Username, - conn_pid = ConnPid, - old_conn_pid = OldConnPid}) -> + username = Username}) -> send_willmsg(WillMsg), - [maybe_shutdown(Pid, Reason) || Pid <- [ConnPid, OldConnPid]], ok = emqx_hooks:run('session.terminated', [#{client_id => ClientId, username => Username}, Reason]). code_change(_OldVsn, State, _Extra) -> {ok, State}. -maybe_shutdown(undefined, _Reason) -> - ok; -maybe_shutdown(Pid, normal) -> - Pid ! {shutdown, normal}; -maybe_shutdown(Pid, Reason) -> - exit(Pid, Reason). - %%------------------------------------------------------------------------------ %% Internal functions %%------------------------------------------------------------------------------ diff --git a/src/emqx_ws_channel.erl b/src/emqx_ws_channel.erl index c308b687e..23a4d8d4c 100644 --- a/src/emqx_ws_channel.erl +++ b/src/emqx_ws_channel.erl @@ -304,7 +304,9 @@ websocket_info(Info = {'EXIT', SessionPid, Reason}, State = #state{proto_state = undefined -> ?LOG(error, "Unexpected EXIT: ~p", [Info]), {ok, State}; - SessionPid -> shutdown(Reason, State) + SessionPid -> + ?LOG(error, "Session ~p termiated: ~p", [SessionPid, Reason]), + shutdown(Reason, State) end; websocket_info(Info, State) -> @@ -360,6 +362,9 @@ ensure_stats_timer(State = #state{enable_stats = true, ensure_stats_timer(State) -> State. +shutdown(Reason = {shutdown, _}, State) -> + self() ! {stop, Reason}, + {ok, State}; shutdown(Reason, State) -> %% Fix the issue#2591(https://github.com/emqx/emqx/issues/2591#issuecomment-500278696) self() ! {stop, {shutdown, Reason}},