From 2af91ea1401574f1de099ffe0d7c35f4c5da8b77 Mon Sep 17 00:00:00 2001 From: Feng Date: Mon, 26 Oct 2015 09:21:03 +0800 Subject: [PATCH] link with client --- src/emqttd_session.erl | 44 ++++++++++++++++++++---------------------- 1 file changed, 21 insertions(+), 23 deletions(-) diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index d2163d3f4..f33e72521 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -81,9 +81,6 @@ %% Client Pid bind with session client_pid :: pid(), - %% Client Monitor - client_mon :: reference(), - %% Last packet id of the session packet_id = 1, @@ -224,7 +221,8 @@ unsubscribe(SessPid, Topics) -> %%%============================================================================= init([CleanSess, ClientId, ClientPid]) -> - %% process_flag(trap_exit, true), + process_flag(trap_exit, true), + true = link(ClientPid), QEnv = emqttd:env(mqtt, queue), SessEnv = emqttd:env(mqtt, session), Session = #session{ @@ -245,10 +243,8 @@ init([CleanSess, ClientId, ClientPid]) -> collect_interval = emqttd_opts:g(collect_interval, SessEnv, 0), timestamp = os:timestamp()}, emqttd_sm:register_session(CleanSess, ClientId, info(Session)), - %% monitor client - MRef = erlang:monitor(process, ClientPid), %% start statistics - {ok, start_collector(Session#session{client_mon = MRef}), hibernate}. + {ok, start_collector(Session), hibernate}. prioritise_call(Msg, _From, _Len, _State) -> case Msg of _ -> 0 end. @@ -268,7 +264,6 @@ prioritise_cast(Msg, _Len, _State) -> prioritise_info(Msg, _Len, _State) -> case Msg of - {'DOWN', _, _, _, _} -> 10; {'EXIT', _, _} -> 10; session_expired -> 10; {timeout, _, _} -> 5; @@ -368,7 +363,6 @@ handle_cast({resume, ClientId, ClientPid}, Session) -> #session{client_id = ClientId, client_pid = OldClientPid, - client_mon = MRef, inflight_queue = InflightQ, awaiting_ack = AwaitingAck, awaiting_comp = AwaitingComp, @@ -388,10 +382,12 @@ handle_cast({resume, ClientId, ClientPid}, Session) -> true -> lager:error([{client, ClientId}], "Session(~s): ~p kickout ~p", [ClientId, ClientPid, OldClientPid]), - OldClientPid ! {stop, duplicate_id, ClientPid}, - erlang:demonitor(MRef, [flush]) + unlink(OldClientPid), + OldClientPid ! {stop, duplicate_id, ClientPid} end, + true = link(ClientPid), + %% Redeliver PUBREL [ClientPid ! {redeliver, {?PUBREL, PktId}} || PktId <- maps:keys(AwaitingComp)], @@ -402,7 +398,6 @@ handle_cast({resume, ClientId, ClientPid}, Session) -> [cancel_timer(TRef) || TRef <- maps:values(AwaitingComp)], Session1 = Session#session{client_pid = ClientPid, - client_mon = erlang:monitor(process, ClientPid), awaiting_ack = #{}, awaiting_comp = #{}, expired_timer = undefined}, @@ -548,21 +543,24 @@ handle_info(collect_info, Session = #session{clean_sess = CleanSess, client_id = emqttd_sm:register_session(CleanSess, ClientId, info(Session)), {noreply, start_collector(Session), hibernate}; -handle_info({'DOWN', _MRef, process, ClientPid, _}, Session = #session{clean_sess = true, - client_pid = ClientPid}) -> +handle_info({'EXIT', ClientPid, _Reason}, Session = #session{clean_sess = true, + client_pid = ClientPid}) -> {stop, normal, Session}; -handle_info({'DOWN', _MRef, process, ClientPid, _}, Session = #session{clean_sess = false, - client_pid = ClientPid, - expired_after = Expires}) -> +handle_info({'EXIT', ClientPid, Reason}, Session = #session{clean_sess = false, + client_id = ClientId, + client_pid = ClientPid, + expired_after = Expires}) -> + lager:info("Session(~s): unlink with client ~p: reason=~p", + [ClientId, ClientPid, Reason]), TRef = timer(Expires, session_expired), - noreply(Session#session{client_pid = undefined, client_mon = undefined, expired_timer = TRef}); + noreply(Session#session{client_pid = undefined, expired_timer = TRef}); -handle_info({'DOWN', _MRef, process, Pid, Reason}, Session = #session{client_id = ClientId, - client_pid = ClientPid}) -> - lager:error([{client, ClientId}], "Session(~s): unexpected DOWN: " - "client_pid=~p, down_pid=~p, reason=~p", - [ClientId, ClientPid, Pid, Reason]), +handle_info({'EXIT', Pid, Reason}, Session = #session{client_id = ClientId, + client_pid = ClientPid}) -> + + lager:error("Session(~s): Unexpected EXIT: client_pid=~p, exit_pid=~p, reason=~p", + [ClientId, ClientPid, Pid, Reason]), noreply(Session); handle_info(session_expired, Session = #session{client_id = ClientId}) ->