diff --git a/apps/emqttd/src/emqttd_cm.erl b/apps/emqttd/src/emqttd_cm.erl index 3dd0fcdf2..48f9bb79a 100644 --- a/apps/emqttd/src/emqttd_cm.erl +++ b/apps/emqttd/src/emqttd_cm.erl @@ -109,11 +109,6 @@ handle_call({register, ClientId, Pid}, _From, State = #state{tab = Tab}) -> ignore; [{_, OldPid, MRef}] -> lager:error("clientId '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, Pid, OldPid]), - %%TODO: tell session old client is down here? - case emqttd_sm:lookup_session(ClientId) of - undefined -> ok; - SessPid -> emqttd_session:client_down(SessPid, {OldPid, duplicate_id}) - end, OldPid ! {stop, duplicate_id, Pid}, erlang:demonitor(MRef), ets:insert(Tab, {ClientId, Pid, erlang:monitor(process, Pid)}); diff --git a/apps/emqttd/src/emqttd_session.erl b/apps/emqttd/src/emqttd_session.erl index 77c0ae7a3..45681dcd9 100644 --- a/apps/emqttd/src/emqttd_session.erl +++ b/apps/emqttd/src/emqttd_session.erl @@ -42,7 +42,7 @@ -export([store/2]). %% Start gen_server --export([start_link/3, client_down/2]). +-export([start_link/3]). %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -259,13 +259,6 @@ initial_state(ClientId, ClientPid) -> start_link(SessOpts, ClientId, ClientPid) -> gen_server:start_link(?MODULE, [SessOpts, ClientId, ClientPid], []). -%%------------------------------------------------------------------------------ -%% @doc Notify the session process that client will be DOWN. -%% @end -%%------------------------------------------------------------------------------ -client_down(SessPid, {ClientPid, Reason}) -> - gen_server:cast(SessPid, {'DOWN', ClientPid, Reason}). - %%%============================================================================= %%% gen_server callbacks %%%============================================================================= @@ -295,14 +288,27 @@ handle_call(Req, _From, State) -> handle_cast({resume, ClientId, ClientPid}, State = #session_state{ clientid = ClientId, - client_pid = undefined, + client_pid = OldClientPid, msg_queue = Queue, awaiting_ack = AwaitingAck, awaiting_comp = AwaitingComp, expire_timer = ETimer}) -> lager:info("Session ~s resumed by ~p", [ClientId, ClientPid]), - %cancel timeout timer - erlang:cancel_timer(ETimer), + + %% kick old client... + if + OldClientPid =:= undefined -> + ok; + OldClientPid =:= ClientPid -> + ok; + true -> + lager:error("Session '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, ClientPid, OldClientPid]), + unlink(OldClientPid), + OldClientPid ! {stop, duplicate_id, ClientPid} + end, + + %% cancel timeout timer + emqttd_utils:cancel_timer(ETimer), %% redelivery PUBREL lists:foreach(fun(PacketId) -> @@ -348,17 +354,6 @@ handle_cast({destroy, ClientId}, State = #session_state{clientid = ClientId}) -> lager:warning("Session ~s destroyed", [ClientId]), {stop, normal, State}; -handle_cast({resume, ClientId, ClientPid}, State) -> - lager:error("Cannot resume session ~p with pid ~p: ~p", - [ClientId, ClientPid, State]), - {noreply, State}; - -handle_cast({'DOWN', ClientPid, Reason}, State = #session_state{clientid = ClientId, - client_pid = ClientPid}) -> - lager:error("Session: client ~s@~p is down for ~p", [ClientId, ClientPid, Reason]), - unlink(ClientPid), - {noreply, start_expire_timer(State#session_state{client_pid = undefined})}; - handle_cast(Msg, State) -> lager:critical("Unexpected Msg: ~p, State: ~p", [Msg, State]), {noreply, State}. @@ -375,8 +370,8 @@ handle_info({'EXIT', ClientPid, Reason}, State = #session_state{clientid = Clien lager:error("Session: client ~s@~p exited, caused by ~p", [ClientId, ClientPid, Reason]), {noreply, start_expire_timer(State#session_state{client_pid = undefined})}; -handle_info({'EXIT', ClientPid, _Reason}, State = #session_state{client_pid = OtherClientPid}) -> - lager:error("Unexpected Client EXIT: pid=~p, pid(state): ~p", [ClientPid, OtherClientPid]), +handle_info({'EXIT', ClientPid0, _Reason}, State = #session_state{client_pid = ClientPid}) -> + lager:error("Unexpected Client EXIT: pid=~p, pid(state): ~p", [ClientPid0, ClientPid]), {noreply, State}; handle_info(session_expired, State = #session_state{clientid = ClientId}) ->