Merge pull request #6183 from emqx/fix-takeover-race

Fix takeover race
This commit is contained in:
Tobias Lindahl 2021-11-16 15:00:12 +01:00 committed by GitHub
commit 17547970c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 53 additions and 3 deletions

View File

@ -334,7 +334,18 @@ takeover_session(ClientId) ->
takeover_session(ClientId, ChanPid)
end.
takeover_session(ClientId, ChanPid) when node(ChanPid) == node() ->
takeover_session(ClientId, Pid) ->
try do_takeover_session(ClientId, Pid)
catch
_ : noproc -> % emqx_ws_connection: call
emqx_persistent_session:lookup(ClientId);
_ : {noproc, _} -> % emqx_connection: gen_server:call
emqx_persistent_session:lookup(ClientId);
_ : {'EXIT', {noproc, _}} -> % rpc_call/3
emqx_persistent_session:lookup(ClientId)
end.
do_takeover_session(ClientId, ChanPid) when node(ChanPid) == node() ->
case get_chann_conn_mod(ClientId, ChanPid) of
undefined ->
emqx_persistent_session:lookup(ClientId);
@ -343,7 +354,7 @@ takeover_session(ClientId, ChanPid) when node(ChanPid) == node() ->
Session = ConnMod:call(ChanPid, {takeover, 'begin'}, ?T_TAKEOVER),
{living, ConnMod, ChanPid, Session}
end;
takeover_session(ClientId, ChanPid) ->
do_takeover_session(ClientId, ChanPid) ->
rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid], ?T_TAKEOVER).
%% @doc Discard all the sessions identified by the ClientId.

View File

@ -300,17 +300,56 @@ t_discard_session_race(_) ->
t_takeover_session(_) ->
#{conninfo := ConnInfo} = ?ChanInfo,
none = emqx_cm:takeover_session(<<"clientid">>),
Parent = self(),
erlang:spawn_link(fun() ->
ok = emqx_cm:register_channel(<<"clientid">>, self(), ConnInfo),
Parent ! registered,
receive
{'$gen_call', From, {takeover, 'begin'}} ->
gen_server:reply(From, test), ok
end
end),
timer:sleep(100),
receive registered -> ok end,
{living, emqx_connection, _, test} = emqx_cm:takeover_session(<<"clientid">>),
emqx_cm:unregister_channel(<<"clientid">>).
t_takeover_session_process_gone(_) ->
#{conninfo := ConnInfo} = ?ChanInfo,
ClientIDTcp = <<"clientidTCP">>,
ClientIDWs = <<"clientidWs">>,
ClientIDRpc = <<"clientidRPC">>,
none = emqx_cm:takeover_session(ClientIDTcp),
none = emqx_cm:takeover_session(ClientIDWs),
meck:new(emqx_connection, [passthrough, no_history]),
meck:expect(emqx_connection, call,
fun(Pid, {takeover, 'begin'}, _) ->
exit({noproc, {gen_server,call,[Pid, takeover_session]}});
(Pid, What, Args) ->
meck:passthrough([Pid, What, Args])
end),
ok = emqx_cm:register_channel(ClientIDTcp, self(), ConnInfo),
none = emqx_cm:takeover_session(ClientIDTcp),
meck:expect(emqx_connection, call,
fun(_Pid, {takeover, 'begin'}, _) ->
exit(noproc);
(Pid, What, Args) ->
meck:passthrough([Pid, What, Args])
end),
ok = emqx_cm:register_channel(ClientIDWs, self(), ConnInfo),
none = emqx_cm:takeover_session(ClientIDWs),
meck:expect(emqx_connection, call,
fun(Pid, {takeover, 'begin'}, _) ->
exit({'EXIT', {noproc, {gen_server,call,[Pid, takeover_session]}}});
(Pid, What, Args) ->
meck:passthrough([Pid, What, Args])
end),
ok = emqx_cm:register_channel(ClientIDRpc, self(), ConnInfo),
none = emqx_cm:takeover_session(ClientIDRpc),
emqx_cm:unregister_channel(ClientIDTcp),
emqx_cm:unregister_channel(ClientIDWs),
emqx_cm:unregister_channel(ClientIDRpc),
meck:unload(emqx_connection).
t_all_channels(_) ->
?assertEqual(true, is_list(emqx_cm:all_channels())).