From 05f3bc8c905a2eed6c1bae3b10b1075eaab4b3ec Mon Sep 17 00:00:00 2001 From: Tobias Lindahl Date: Tue, 16 Nov 2021 13:20:23 +0100 Subject: [PATCH] fix(emqx_cm): make takeover session less likely to hit a race --- apps/emqx/src/emqx_cm.erl | 18 ++++++++++++++-- apps/emqx/test/emqx_cm_SUITE.erl | 37 ++++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 2 deletions(-) diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index 379614826..bf57e1ade 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -334,7 +334,21 @@ takeover_session(ClientId) -> takeover_session(ClientId, ChanPid) end. -takeover_session(ClientId, ChanPid) when node(ChanPid) == node() -> +takeover_session(ClientId, Pid) -> + try do_takeover_session(ClientId, Pid) + catch + _ : noproc -> % emqx_ws_connection: call + ?tp(debug, "session_gone", #{pid => Pid}), + emqx_persistent_session:lookup(ClientId); + _ : {noproc, _} -> % emqx_connection: gen_server:call + ?tp(debug, "session_gone", #{pid => Pid}), + emqx_persistent_session:lookup(ClientId); + _ : {'EXIT', {noproc, _}} -> % rpc_call/3 + ?tp(debug, "session_gone", #{pid => Pid}), + emqx_persistent_session:lookup(ClientId) + end. + +do_takeover_session(ClientId, ChanPid) when node(ChanPid) == node() -> case get_chann_conn_mod(ClientId, ChanPid) of undefined -> emqx_persistent_session:lookup(ClientId); @@ -343,7 +357,7 @@ takeover_session(ClientId, ChanPid) when node(ChanPid) == node() -> Session = ConnMod:call(ChanPid, {takeover, 'begin'}, ?T_TAKEOVER), {living, ConnMod, ChanPid, Session} end; -takeover_session(ClientId, ChanPid) -> +do_takeover_session(ClientId, ChanPid) -> rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid], ?T_TAKEOVER). %% @doc Discard all the sessions identified by the ClientId. diff --git a/apps/emqx/test/emqx_cm_SUITE.erl b/apps/emqx/test/emqx_cm_SUITE.erl index bce7f0202..8d017ec6e 100644 --- a/apps/emqx/test/emqx_cm_SUITE.erl +++ b/apps/emqx/test/emqx_cm_SUITE.erl @@ -311,6 +311,43 @@ t_takeover_session(_) -> {living, emqx_connection, _, test} = emqx_cm:takeover_session(<<"clientid">>), emqx_cm:unregister_channel(<<"clientid">>). +t_takeover_session_process_gone(_) -> + #{conninfo := ConnInfo} = ?ChanInfo, + ClientIDTcp = <<"clientidTCP">>, + ClientIDWs = <<"clientidWs">>, + ClientIDRpc = <<"clientidRPC">>, + none = emqx_cm:takeover_session(ClientIDTcp), + none = emqx_cm:takeover_session(ClientIDWs), + meck:new(emqx_connection, [passthrough, no_history]), + meck:expect(emqx_connection, call, + fun(Pid, {takeover, 'begin'}, _) -> + exit({noproc, {gen_server,call,[Pid, takeover_session]}}); + (Pid, What, Args) -> + meck:passthrough([Pid, What, Args]) + end), + ok = emqx_cm:register_channel(ClientIDTcp, self(), ConnInfo), + none = emqx_cm:takeover_session(ClientIDTcp), + meck:expect(emqx_connection, call, + fun(_Pid, {takeover, 'begin'}, _) -> + exit(noproc); + (Pid, What, Args) -> + meck:passthrough([Pid, What, Args]) + end), + ok = emqx_cm:register_channel(ClientIDWs, self(), ConnInfo), + none = emqx_cm:takeover_session(ClientIDWs), + meck:expect(emqx_connection, call, + fun(Pid, {takeover, 'begin'}, _) -> + exit({'EXIT', {noproc, {gen_server,call,[Pid, takeover_session]}}}); + (Pid, What, Args) -> + meck:passthrough([Pid, What, Args]) + end), + ok = emqx_cm:register_channel(ClientIDRpc, self(), ConnInfo), + none = emqx_cm:takeover_session(ClientIDRpc), + emqx_cm:unregister_channel(ClientIDTcp), + emqx_cm:unregister_channel(ClientIDWs), + emqx_cm:unregister_channel(ClientIDRpc), + meck:unload(emqx_connection). + t_all_channels(_) -> ?assertEqual(true, is_list(emqx_cm:all_channels())).