fix duplicated session
This commit is contained in:
parent
77a26ded12
commit
e86f9ac6aa
|
@ -109,11 +109,6 @@ handle_call({register, ClientId, Pid}, _From, State = #state{tab = Tab}) ->
|
||||||
ignore;
|
ignore;
|
||||||
[{_, OldPid, MRef}] ->
|
[{_, OldPid, MRef}] ->
|
||||||
lager:error("clientId '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, Pid, OldPid]),
|
lager:error("clientId '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, Pid, OldPid]),
|
||||||
%%TODO: tell session old client is down here?
|
|
||||||
case emqttd_sm:lookup_session(ClientId) of
|
|
||||||
undefined -> ok;
|
|
||||||
SessPid -> emqttd_session:client_down(SessPid, {OldPid, duplicate_id})
|
|
||||||
end,
|
|
||||||
OldPid ! {stop, duplicate_id, Pid},
|
OldPid ! {stop, duplicate_id, Pid},
|
||||||
erlang:demonitor(MRef),
|
erlang:demonitor(MRef),
|
||||||
ets:insert(Tab, {ClientId, Pid, erlang:monitor(process, Pid)});
|
ets:insert(Tab, {ClientId, Pid, erlang:monitor(process, Pid)});
|
||||||
|
|
|
@ -42,7 +42,7 @@
|
||||||
-export([store/2]).
|
-export([store/2]).
|
||||||
|
|
||||||
%% Start gen_server
|
%% Start gen_server
|
||||||
-export([start_link/3, client_down/2]).
|
-export([start_link/3]).
|
||||||
|
|
||||||
%% gen_server Function Exports
|
%% gen_server Function Exports
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||||
|
@ -259,13 +259,6 @@ initial_state(ClientId, ClientPid) ->
|
||||||
start_link(SessOpts, ClientId, ClientPid) ->
|
start_link(SessOpts, ClientId, ClientPid) ->
|
||||||
gen_server:start_link(?MODULE, [SessOpts, ClientId, ClientPid], []).
|
gen_server:start_link(?MODULE, [SessOpts, ClientId, ClientPid], []).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
%% @doc Notify the session process that client will be DOWN.
|
|
||||||
%% @end
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
client_down(SessPid, {ClientPid, Reason}) ->
|
|
||||||
gen_server:cast(SessPid, {'DOWN', ClientPid, Reason}).
|
|
||||||
|
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
%%% gen_server callbacks
|
%%% gen_server callbacks
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
|
@ -295,14 +288,27 @@ handle_call(Req, _From, State) ->
|
||||||
|
|
||||||
handle_cast({resume, ClientId, ClientPid}, State = #session_state{
|
handle_cast({resume, ClientId, ClientPid}, State = #session_state{
|
||||||
clientid = ClientId,
|
clientid = ClientId,
|
||||||
client_pid = undefined,
|
client_pid = OldClientPid,
|
||||||
msg_queue = Queue,
|
msg_queue = Queue,
|
||||||
awaiting_ack = AwaitingAck,
|
awaiting_ack = AwaitingAck,
|
||||||
awaiting_comp = AwaitingComp,
|
awaiting_comp = AwaitingComp,
|
||||||
expire_timer = ETimer}) ->
|
expire_timer = ETimer}) ->
|
||||||
lager:info("Session ~s resumed by ~p", [ClientId, ClientPid]),
|
lager:info("Session ~s resumed by ~p", [ClientId, ClientPid]),
|
||||||
%cancel timeout timer
|
|
||||||
erlang:cancel_timer(ETimer),
|
%% kick old client...
|
||||||
|
if
|
||||||
|
OldClientPid =:= undefined ->
|
||||||
|
ok;
|
||||||
|
OldClientPid =:= ClientPid ->
|
||||||
|
ok;
|
||||||
|
true ->
|
||||||
|
lager:error("Session '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, ClientPid, OldClientPid]),
|
||||||
|
unlink(OldClientPid),
|
||||||
|
OldClientPid ! {stop, duplicate_id, ClientPid}
|
||||||
|
end,
|
||||||
|
|
||||||
|
%% cancel timeout timer
|
||||||
|
emqttd_utils:cancel_timer(ETimer),
|
||||||
|
|
||||||
%% redelivery PUBREL
|
%% redelivery PUBREL
|
||||||
lists:foreach(fun(PacketId) ->
|
lists:foreach(fun(PacketId) ->
|
||||||
|
@ -348,17 +354,6 @@ handle_cast({destroy, ClientId}, State = #session_state{clientid = ClientId}) ->
|
||||||
lager:warning("Session ~s destroyed", [ClientId]),
|
lager:warning("Session ~s destroyed", [ClientId]),
|
||||||
{stop, normal, State};
|
{stop, normal, State};
|
||||||
|
|
||||||
handle_cast({resume, ClientId, ClientPid}, State) ->
|
|
||||||
lager:error("Cannot resume session ~p with pid ~p: ~p",
|
|
||||||
[ClientId, ClientPid, State]),
|
|
||||||
{noreply, State};
|
|
||||||
|
|
||||||
handle_cast({'DOWN', ClientPid, Reason}, State = #session_state{clientid = ClientId,
|
|
||||||
client_pid = ClientPid}) ->
|
|
||||||
lager:error("Session: client ~s@~p is down for ~p", [ClientId, ClientPid, Reason]),
|
|
||||||
unlink(ClientPid),
|
|
||||||
{noreply, start_expire_timer(State#session_state{client_pid = undefined})};
|
|
||||||
|
|
||||||
handle_cast(Msg, State) ->
|
handle_cast(Msg, State) ->
|
||||||
lager:critical("Unexpected Msg: ~p, State: ~p", [Msg, State]),
|
lager:critical("Unexpected Msg: ~p, State: ~p", [Msg, State]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
@ -375,8 +370,8 @@ handle_info({'EXIT', ClientPid, Reason}, State = #session_state{clientid = Clien
|
||||||
lager:error("Session: client ~s@~p exited, caused by ~p", [ClientId, ClientPid, Reason]),
|
lager:error("Session: client ~s@~p exited, caused by ~p", [ClientId, ClientPid, Reason]),
|
||||||
{noreply, start_expire_timer(State#session_state{client_pid = undefined})};
|
{noreply, start_expire_timer(State#session_state{client_pid = undefined})};
|
||||||
|
|
||||||
handle_info({'EXIT', ClientPid, _Reason}, State = #session_state{client_pid = OtherClientPid}) ->
|
handle_info({'EXIT', ClientPid0, _Reason}, State = #session_state{client_pid = ClientPid}) ->
|
||||||
lager:error("Unexpected Client EXIT: pid=~p, pid(state): ~p", [ClientPid, OtherClientPid]),
|
lager:error("Unexpected Client EXIT: pid=~p, pid(state): ~p", [ClientPid0, ClientPid]),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
handle_info(session_expired, State = #session_state{clientid = ClientId}) ->
|
handle_info(session_expired, State = #session_state{clientid = ClientId}) ->
|
||||||
|
|
Loading…
Reference in New Issue