diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index 379614826..f1209dd6f 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -334,7 +334,18 @@ takeover_session(ClientId) -> takeover_session(ClientId, ChanPid) end. -takeover_session(ClientId, ChanPid) when node(ChanPid) == node() -> +takeover_session(ClientId, Pid) -> + try do_takeover_session(ClientId, Pid) + catch + _ : noproc -> % emqx_ws_connection: call + emqx_persistent_session:lookup(ClientId); + _ : {noproc, _} -> % emqx_connection: gen_server:call + emqx_persistent_session:lookup(ClientId); + _ : {'EXIT', {noproc, _}} -> % rpc_call/3 + emqx_persistent_session:lookup(ClientId) + end. + +do_takeover_session(ClientId, ChanPid) when node(ChanPid) == node() -> case get_chann_conn_mod(ClientId, ChanPid) of undefined -> emqx_persistent_session:lookup(ClientId); @@ -343,7 +354,7 @@ takeover_session(ClientId, ChanPid) when node(ChanPid) == node() -> Session = ConnMod:call(ChanPid, {takeover, 'begin'}, ?T_TAKEOVER), {living, ConnMod, ChanPid, Session} end; -takeover_session(ClientId, ChanPid) -> +do_takeover_session(ClientId, ChanPid) -> rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid], ?T_TAKEOVER). %% @doc Discard all the sessions identified by the ClientId. diff --git a/apps/emqx/test/emqx_cm_SUITE.erl b/apps/emqx/test/emqx_cm_SUITE.erl index bce7f0202..20dc433f8 100644 --- a/apps/emqx/test/emqx_cm_SUITE.erl +++ b/apps/emqx/test/emqx_cm_SUITE.erl @@ -300,17 +300,56 @@ t_discard_session_race(_) -> t_takeover_session(_) -> #{conninfo := ConnInfo} = ?ChanInfo, none = emqx_cm:takeover_session(<<"clientid">>), + Parent = self(), erlang:spawn_link(fun() -> ok = emqx_cm:register_channel(<<"clientid">>, self(), ConnInfo), + Parent ! registered, receive {'$gen_call', From, {takeover, 'begin'}} -> gen_server:reply(From, test), ok end end), - timer:sleep(100), + receive registered -> ok end, {living, emqx_connection, _, test} = emqx_cm:takeover_session(<<"clientid">>), emqx_cm:unregister_channel(<<"clientid">>). +t_takeover_session_process_gone(_) -> + #{conninfo := ConnInfo} = ?ChanInfo, + ClientIDTcp = <<"clientidTCP">>, + ClientIDWs = <<"clientidWs">>, + ClientIDRpc = <<"clientidRPC">>, + none = emqx_cm:takeover_session(ClientIDTcp), + none = emqx_cm:takeover_session(ClientIDWs), + meck:new(emqx_connection, [passthrough, no_history]), + meck:expect(emqx_connection, call, + fun(Pid, {takeover, 'begin'}, _) -> + exit({noproc, {gen_server,call,[Pid, takeover_session]}}); + (Pid, What, Args) -> + meck:passthrough([Pid, What, Args]) + end), + ok = emqx_cm:register_channel(ClientIDTcp, self(), ConnInfo), + none = emqx_cm:takeover_session(ClientIDTcp), + meck:expect(emqx_connection, call, + fun(_Pid, {takeover, 'begin'}, _) -> + exit(noproc); + (Pid, What, Args) -> + meck:passthrough([Pid, What, Args]) + end), + ok = emqx_cm:register_channel(ClientIDWs, self(), ConnInfo), + none = emqx_cm:takeover_session(ClientIDWs), + meck:expect(emqx_connection, call, + fun(Pid, {takeover, 'begin'}, _) -> + exit({'EXIT', {noproc, {gen_server,call,[Pid, takeover_session]}}}); + (Pid, What, Args) -> + meck:passthrough([Pid, What, Args]) + end), + ok = emqx_cm:register_channel(ClientIDRpc, self(), ConnInfo), + none = emqx_cm:takeover_session(ClientIDRpc), + emqx_cm:unregister_channel(ClientIDTcp), + emqx_cm:unregister_channel(ClientIDWs), + emqx_cm:unregister_channel(ClientIDRpc), + meck:unload(emqx_connection). + t_all_channels(_) -> ?assertEqual(true, is_list(emqx_cm:all_channels())).