diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 65d34d740..5aaa62b30 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -148,6 +148,7 @@ call(CPid, Req) -> %%-------------------------------------------------------------------- init({Transport, RawSocket, Options}) -> + process_flag(trap_exit, true), {ok, Socket} = Transport:wait(RawSocket), {ok, Peername} = Transport:ensure_ok_or_exit(peername, [Socket]), {ok, Sockname} = Transport:ensure_ok_or_exit(sockname, [Socket]), @@ -365,6 +366,16 @@ handle(info, {shutdown, conflict, {ClientId, NewPid}}, State) -> handle(info, {shutdown, Reason}, State) -> shutdown(Reason, State); +handle(info, Info = {'EXIT', SessionPid, Reason}, State = #state{proto_state = ProtoState}) -> + case emqx_protocol:session(ProtoState) of + undefined -> + ?LOG(error, "Unexpected EXIT: ~p", [Info]), + {keep_state, State}; + SessionPid -> + ?LOG(error, "Session ~p termiated: ~p", [SessionPid, Reason]), + shutdown(Reason, State) + end; + handle(info, Info, State) -> ?LOG(error, "Unexpected info: ~p", [Info]), {keep_state, State}. @@ -499,6 +510,8 @@ maybe_gc(_, State) -> State. reply(From, Reply, State) -> {keep_state, State, [{reply, From, Reply}]}. +shutdown(Reason = {shutdown, _}, State) -> + stop(Reason, State); shutdown(Reason, State) -> stop({shutdown, Reason}, State). diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 20028c9ef..7d23c9e62 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -652,23 +652,13 @@ handle_info(Info, State) -> terminate(Reason, #state{will_msg = WillMsg, client_id = ClientId, - username = Username, - conn_pid = ConnPid, - old_conn_pid = OldConnPid}) -> + username = Username}) -> send_willmsg(WillMsg), - [maybe_shutdown(Pid, Reason) || Pid <- [ConnPid, OldConnPid]], ok = emqx_hooks:run('session.terminated', [#{client_id => ClientId, username => Username}, Reason]). code_change(_OldVsn, State, _Extra) -> {ok, State}. -maybe_shutdown(undefined, _Reason) -> - ok; -maybe_shutdown(Pid, normal) -> - Pid ! {shutdown, normal}; -maybe_shutdown(Pid, Reason) -> - exit(Pid, Reason). - %%------------------------------------------------------------------------------ %% Internal functions %%------------------------------------------------------------------------------ diff --git a/src/emqx_ws_channel.erl b/src/emqx_ws_channel.erl index 096e3261f..23a4d8d4c 100644 --- a/src/emqx_ws_channel.erl +++ b/src/emqx_ws_channel.erl @@ -299,6 +299,16 @@ websocket_info({shutdown, Reason}, State) -> websocket_info({stop, Reason}, State) -> {stop, State#state{shutdown = Reason}}; +websocket_info(Info = {'EXIT', SessionPid, Reason}, State = #state{proto_state = ProtoState}) -> + case emqx_protocol:session(ProtoState) of + undefined -> + ?LOG(error, "Unexpected EXIT: ~p", [Info]), + {ok, State}; + SessionPid -> + ?LOG(error, "Session ~p termiated: ~p", [SessionPid, Reason]), + shutdown(Reason, State) + end; + websocket_info(Info, State) -> ?LOG(error, "Unexpected info: ~p", [Info]), {ok, State}. @@ -328,6 +338,7 @@ terminate_session(Reason, ProtoState) -> undefined -> ok; SessionPid -> + unlink(SessionPid), SessionPid ! {'EXIT', self(), Reason} end. @@ -351,6 +362,9 @@ ensure_stats_timer(State = #state{enable_stats = true, ensure_stats_timer(State) -> State. +shutdown(Reason = {shutdown, _}, State) -> + self() ! {stop, Reason}, + {ok, State}; shutdown(Reason, State) -> %% Fix the issue#2591(https://github.com/emqx/emqx/issues/2591#issuecomment-500278696) self() ! {stop, {shutdown, Reason}},