diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 4f0743210..f0190dc14 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -370,8 +370,8 @@ init([Parent, #{zone := Zone, topic_alias_maximum = TopicAliasMaximum, will_msg = WillMsg }, - emqx_sm:register_session(ClientId, attrs(State)), - emqx_sm:set_session_stats(ClientId, stats(State)), + ok = emqx_sm:register_session(ClientId, attrs(State)), + true = emqx_sm:set_session_stats(ClientId, stats(State)), emqx_hooks:run('session.created', [#{client_id => ClientId}, info(State)]), GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false), ok = emqx_gc:init(GcPolicy), @@ -617,14 +617,20 @@ handle_info({timeout, Timer, emit_stats}, ?LOG(warning, "shutdown due to ~p", [Reason], NewState), shutdown(Reason, NewState) end; + handle_info({timeout, Timer, expired}, State = #state{expiry_timer = Timer}) -> - ?LOG(info, "expired, shutdown now:(", [], State), + ?LOG(info, "expired, shutdown now.", [], State), shutdown(expired, State); handle_info({timeout, Timer, will_delay}, State = #state{will_msg = WillMsg, will_delay_timer = Timer}) -> send_willmsg(WillMsg), {noreply, State#state{will_msg = undefined}}; +%% ConnPid is shutting down by the supervisor. +handle_info({'EXIT', ConnPid, Reason}, #state{conn_pid = ConnPid}) + when Reason =:= killed; Reason =:= shutdown -> + exit(Reason); + handle_info({'EXIT', ConnPid, Reason}, State = #state{will_msg = WillMsg, expiry_interval = 0, conn_pid = ConnPid}) -> send_willmsg(WillMsg), {stop, Reason, State#state{will_msg = undefined, conn_pid = undefined}}; @@ -641,30 +647,35 @@ handle_info({'EXIT', Pid, Reason}, State = #state{conn_pid = ConnPid}) -> ?LOG(error, "Unexpected EXIT: conn_pid=~p, exit_pid=~p, reason=~p", [ConnPid, Pid, Reason], State), {noreply, State}; + handle_info(Info, State) -> emqx_logger:error("[Session] unexpected info: ~p", [Info]), {noreply, State}. -terminate(Reason, #state{will_msg = WillMsg, conn_pid = ConnPid}) -> - %% Should not run hooks here. - %% emqx_hooks:run('session.terminated', [#{client_id => ClientId}, Reason]), - %% Let it crash. - %% emqx_sm:unregister_session(ClientId), +terminate(Reason, #state{will_msg = WillMsg, + client_id = ClientId, + conn_pid = ConnPid, + old_conn_pid = OldConnPid}) -> send_willmsg(WillMsg), - %% Ensure to shutdown the connection - if - ConnPid == undefined -> ok; - true -> ConnPid ! {shutdown, Reason} - end. + [maybe_shutdown(Pid, Reason) || Pid <- [ConnPid, OldConnPid]], + emqx_hooks:run('session.terminated', [#{client_id => ClientId}, Reason]). code_change(_OldVsn, State, _Extra) -> {ok, State}. +maybe_shutdown(undefined, _Reason) -> + ok; +maybe_shutdown(Pid, normal) -> + Pid ! {shutdown, normal}; +maybe_shutdown(Pid, Reason) -> + exit(Pid, Reason). + %%------------------------------------------------------------------------------ %% Internal functions %%------------------------------------------------------------------------------ -has_connection(#state{conn_pid = Pid}) -> is_pid(Pid) andalso is_process_alive(Pid). +has_connection(#state{conn_pid = Pid}) -> + is_pid(Pid) andalso is_process_alive(Pid). handle_dispatch(Topic, Msg = #message{headers = Headers}, State = #state{subscriptions = SubMap, diff --git a/src/emqx_sm.erl b/src/emqx_sm.erl index df2e4b862..fd6a44231 100644 --- a/src/emqx_sm.erl +++ b/src/emqx_sm.erl @@ -55,7 +55,7 @@ start_link() -> open_session(SessAttrs = #{clean_start := true, client_id := ClientId, conn_pid := ConnPid}) -> CleanStart = fun(_) -> ok = discard_session(ClientId, ConnPid), - emqx_session_sup:start_session(SessAttrs) + emqx_session:start_link(SessAttrs) end, emqx_sm_locker:trans(ClientId, CleanStart); @@ -65,7 +65,7 @@ open_session(SessAttrs = #{clean_start := false, client_id := ClientId}) -> {ok, SPid} -> {ok, SPid, true}; {error, not_found} -> - emqx_session_sup:start_session(SessAttrs) + emqx_session:start_link(SessAttrs) end end, emqx_sm_locker:trans(ClientId, ResumeStart). diff --git a/src/emqx_sup.erl b/src/emqx_sup.erl index 2f29dbfee..60be4db87 100644 --- a/src/emqx_sup.erl +++ b/src/emqx_sup.erl @@ -69,8 +69,6 @@ init([]) -> AccessControl = worker_spec(emqx_access_control), %% Session Manager SMSup = supervisor_spec(emqx_sm_sup), - %% Session Sup - SessionSup = supervisor_spec(emqx_session_sup), %% Connection Manager CMSup = supervisor_spec(emqx_cm_sup), %% Sys Sup @@ -83,7 +81,6 @@ init([]) -> BridgeSup, AccessControl, SMSup, - SessionSup, CMSup, SysSup]}}.