Merge pull request #2705 from emqx/develop
Fix session termiated without ws_channel
This commit is contained in:
commit
2fc3e560d2
|
@ -148,6 +148,7 @@ call(CPid, Req) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
init({Transport, RawSocket, Options}) ->
|
init({Transport, RawSocket, Options}) ->
|
||||||
|
process_flag(trap_exit, true),
|
||||||
{ok, Socket} = Transport:wait(RawSocket),
|
{ok, Socket} = Transport:wait(RawSocket),
|
||||||
{ok, Peername} = Transport:ensure_ok_or_exit(peername, [Socket]),
|
{ok, Peername} = Transport:ensure_ok_or_exit(peername, [Socket]),
|
||||||
{ok, Sockname} = Transport:ensure_ok_or_exit(sockname, [Socket]),
|
{ok, Sockname} = Transport:ensure_ok_or_exit(sockname, [Socket]),
|
||||||
|
@ -365,6 +366,16 @@ handle(info, {shutdown, conflict, {ClientId, NewPid}}, State) ->
|
||||||
handle(info, {shutdown, Reason}, State) ->
|
handle(info, {shutdown, Reason}, State) ->
|
||||||
shutdown(Reason, State);
|
shutdown(Reason, State);
|
||||||
|
|
||||||
|
handle(info, Info = {'EXIT', SessionPid, Reason}, State = #state{proto_state = ProtoState}) ->
|
||||||
|
case emqx_protocol:session(ProtoState) of
|
||||||
|
undefined ->
|
||||||
|
?LOG(error, "Unexpected EXIT: ~p", [Info]),
|
||||||
|
{keep_state, State};
|
||||||
|
SessionPid ->
|
||||||
|
?LOG(error, "Session ~p termiated: ~p", [SessionPid, Reason]),
|
||||||
|
shutdown(Reason, State)
|
||||||
|
end;
|
||||||
|
|
||||||
handle(info, Info, State) ->
|
handle(info, Info, State) ->
|
||||||
?LOG(error, "Unexpected info: ~p", [Info]),
|
?LOG(error, "Unexpected info: ~p", [Info]),
|
||||||
{keep_state, State}.
|
{keep_state, State}.
|
||||||
|
@ -499,6 +510,8 @@ maybe_gc(_, State) -> State.
|
||||||
reply(From, Reply, State) ->
|
reply(From, Reply, State) ->
|
||||||
{keep_state, State, [{reply, From, Reply}]}.
|
{keep_state, State, [{reply, From, Reply}]}.
|
||||||
|
|
||||||
|
shutdown(Reason = {shutdown, _}, State) ->
|
||||||
|
stop(Reason, State);
|
||||||
shutdown(Reason, State) ->
|
shutdown(Reason, State) ->
|
||||||
stop({shutdown, Reason}, State).
|
stop({shutdown, Reason}, State).
|
||||||
|
|
||||||
|
|
|
@ -652,23 +652,13 @@ handle_info(Info, State) ->
|
||||||
|
|
||||||
terminate(Reason, #state{will_msg = WillMsg,
|
terminate(Reason, #state{will_msg = WillMsg,
|
||||||
client_id = ClientId,
|
client_id = ClientId,
|
||||||
username = Username,
|
username = Username}) ->
|
||||||
conn_pid = ConnPid,
|
|
||||||
old_conn_pid = OldConnPid}) ->
|
|
||||||
send_willmsg(WillMsg),
|
send_willmsg(WillMsg),
|
||||||
[maybe_shutdown(Pid, Reason) || Pid <- [ConnPid, OldConnPid]],
|
|
||||||
ok = emqx_hooks:run('session.terminated', [#{client_id => ClientId, username => Username}, Reason]).
|
ok = emqx_hooks:run('session.terminated', [#{client_id => ClientId, username => Username}, Reason]).
|
||||||
|
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
|
||||||
maybe_shutdown(undefined, _Reason) ->
|
|
||||||
ok;
|
|
||||||
maybe_shutdown(Pid, normal) ->
|
|
||||||
Pid ! {shutdown, normal};
|
|
||||||
maybe_shutdown(Pid, Reason) ->
|
|
||||||
exit(Pid, Reason).
|
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
|
@ -299,6 +299,16 @@ websocket_info({shutdown, Reason}, State) ->
|
||||||
websocket_info({stop, Reason}, State) ->
|
websocket_info({stop, Reason}, State) ->
|
||||||
{stop, State#state{shutdown = Reason}};
|
{stop, State#state{shutdown = Reason}};
|
||||||
|
|
||||||
|
websocket_info(Info = {'EXIT', SessionPid, Reason}, State = #state{proto_state = ProtoState}) ->
|
||||||
|
case emqx_protocol:session(ProtoState) of
|
||||||
|
undefined ->
|
||||||
|
?LOG(error, "Unexpected EXIT: ~p", [Info]),
|
||||||
|
{ok, State};
|
||||||
|
SessionPid ->
|
||||||
|
?LOG(error, "Session ~p termiated: ~p", [SessionPid, Reason]),
|
||||||
|
shutdown(Reason, State)
|
||||||
|
end;
|
||||||
|
|
||||||
websocket_info(Info, State) ->
|
websocket_info(Info, State) ->
|
||||||
?LOG(error, "Unexpected info: ~p", [Info]),
|
?LOG(error, "Unexpected info: ~p", [Info]),
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
@ -328,6 +338,7 @@ terminate_session(Reason, ProtoState) ->
|
||||||
undefined ->
|
undefined ->
|
||||||
ok;
|
ok;
|
||||||
SessionPid ->
|
SessionPid ->
|
||||||
|
unlink(SessionPid),
|
||||||
SessionPid ! {'EXIT', self(), Reason}
|
SessionPid ! {'EXIT', self(), Reason}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -351,6 +362,9 @@ ensure_stats_timer(State = #state{enable_stats = true,
|
||||||
ensure_stats_timer(State) ->
|
ensure_stats_timer(State) ->
|
||||||
State.
|
State.
|
||||||
|
|
||||||
|
shutdown(Reason = {shutdown, _}, State) ->
|
||||||
|
self() ! {stop, Reason},
|
||||||
|
{ok, State};
|
||||||
shutdown(Reason, State) ->
|
shutdown(Reason, State) ->
|
||||||
%% Fix the issue#2591(https://github.com/emqx/emqx/issues/2591#issuecomment-500278696)
|
%% Fix the issue#2591(https://github.com/emqx/emqx/issues/2591#issuecomment-500278696)
|
||||||
self() ! {stop, {shutdown, Reason}},
|
self() ! {stop, {shutdown, Reason}},
|
||||||
|
|
Loading…
Reference in New Issue