diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index 23c47660c..1bb0d5005 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -262,6 +262,14 @@ open_session(true, ClientInfo = #{clientid := ClientId}, ConnInfo) -> open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) -> Self = self(), ResumeStart = fun(_) -> + CreateSess = + fun() -> + Session = create_session(ClientInfo, ConnInfo), + Session1 = emqx_persistent_session:persist( + ClientInfo,ConnInfo, Session), + register_channel(ClientId, Self, ConnInfo), + {ok, #{session => Session1, present => false}} + end, case takeover_session(ClientId) of {persistent, Session} -> %% This is a persistent session without a managing process. @@ -274,15 +282,20 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) -> pendings => Pendings}}; {living, ConnMod, ChanPid, Session} -> ok = emqx_session:resume(ClientInfo, Session), - Session1 = emqx_persistent_session:persist( ClientInfo - , ConnInfo - , Session - ), - Pendings = ConnMod:call(ChanPid, {takeover, 'end'}, ?T_TAKEOVER), - register_channel(ClientId, Self, ConnInfo), - {ok, #{session => Session1, - present => true, - pendings => Pendings}}; + case request_stepdown( + {takeover, 'end'}, + ConnMod, + ChanPid) of + {ok, Pendings} -> + Session1 = emqx_persistent_session:persist( + ClientInfo, ConnInfo, Session), + register_channel(ClientId, Self, ConnInfo), + {ok, #{session => Session1, + present => true, + pendings => Pendings}}; + {error, _} -> + CreateSess() + end; {expired, OldSession} -> _ = emqx_persistent_session:discard(ClientId, OldSession), Session = create_session(ClientInfo, ConnInfo), @@ -293,13 +306,7 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) -> register_channel(ClientId, Self, ConnInfo), {ok, #{session => Session1, present => false}}; none -> - Session = create_session(ClientInfo, ConnInfo), - Session1 = emqx_persistent_session:persist( ClientInfo - , ConnInfo - , Session - ), - register_channel(ClientId, Self, ConnInfo), - {ok, #{session => Session1, present => false}} + CreateSess() end end, emqx_cm_locker:trans(ClientId, ResumeStart). @@ -359,9 +366,9 @@ takeover_session(ClientId) -> takeover_session(ClientId, Pid) -> try do_takeover_session(ClientId, Pid) catch - _ : noproc -> % emqx_ws_connection: call - emqx_persistent_session:lookup(ClientId); - _ : {noproc, _} -> % emqx_connection: gen_server:call + _ : R when R == noproc; + R == timeout; + R == unexpected_exception -> %% request_stepdown/3 emqx_persistent_session:lookup(ClientId); _ : {'EXIT', {noproc, _}} -> % rpc_call/3 emqx_persistent_session:lookup(ClientId) @@ -372,9 +379,12 @@ do_takeover_session(ClientId, ChanPid) when node(ChanPid) == node() -> undefined -> emqx_persistent_session:lookup(ClientId); ConnMod when is_atom(ConnMod) -> - %% TODO: if takeover times out, maybe kill the old? - Session = ConnMod:call(ChanPid, {takeover, 'begin'}, ?T_TAKEOVER), - {living, ConnMod, ChanPid, Session} + case request_stepdown({takeover, 'begin'}, ConnMod, ChanPid) of + {ok, Session} -> + {living, ConnMod, ChanPid, Session}; + {error, Reason} -> + error(Reason) + end end; do_takeover_session(ClientId, ChanPid) -> wrap_rpc(emqx_cm_proto_v1:takeover_session(ClientId, ChanPid)). @@ -391,31 +401,52 @@ discard_session(ClientId) when is_binary(ClientId) -> %% If failed to kick (e.g. timeout) force a kill. %% Keeping the stale pid around, or returning error or raise an exception %% benefits nobody. --spec kick_or_kill(kick | discard, module(), pid()) -> ok. -kick_or_kill(Action, ConnMod, Pid) -> - try +-spec request_stepdown(Action, module(), pid()) + -> ok + | {ok, emqx_session:session() | list(emqx_type:deliver())} + | {error, term()} + when Action :: kick | discard | {takeover, 'begin'} | {takeover, 'end'}. +request_stepdown(Action, ConnMod, Pid) -> + Timeout = + case Action == kick orelse Action == discard of + true -> ?T_KICK; + _ -> ?T_TAKEOVER + end, + Return = %% this is essentially a gen_server:call implemented in emqx_connection %% and emqx_ws_connection. %% the handle_call is implemented in emqx_channel - ok = apply(ConnMod, call, [Pid, Action, ?T_KICK]) - catch - _ : noproc -> % emqx_ws_connection: call - ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}); - _ : {noproc, _} -> % emqx_connection: gen_server:call - ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}); - _ : {shutdown, _} -> - ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}); - _ : {{shutdown, _}, _} -> - ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}); - _ : {timeout, {gen_server, call, _}} -> - ?tp(warning, "session_kick_timeout", - #{pid => Pid, action => Action, stale_channel => stale_channel_info(Pid)}), - ok = force_kill(Pid); - _ : Error : St -> - ?tp(error, "session_kick_exception", - #{pid => Pid, action => Action, reason => Error, stacktrace => St, - stale_channel => stale_channel_info(Pid)}), - ok = force_kill(Pid) + try apply(ConnMod, call, [Pid, Action, Timeout]) of + ok -> ok; + Reply -> {ok, Reply} + catch + _ : noproc -> % emqx_ws_connection: call + ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}), + {error, noproc}; + _ : {noproc, _} -> % emqx_connection: gen_server:call + ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}), + {error, noproc}; + _ : {shutdown, _} -> + ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}), + {error, noproc}; + _ : {{shutdown, _}, _} -> + ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}), + {error, noproc}; + _ : {timeout, {gen_server, call, _}} -> + ?tp(warning, "session_stepdown_request_timeout", + #{pid => Pid, action => Action, stale_channel => stale_channel_info(Pid)}), + ok = force_kill(Pid), + {error, timeout}; + _ : Error : St -> + ?tp(error, "session_stepdown_request_exception", + #{pid => Pid, action => Action, reason => Error, stacktrace => St, + stale_channel => stale_channel_info(Pid)}), + ok = force_kill(Pid), + {error, unexpected_exception} + end, + case Action == kick orelse Action == discard of + true -> ok; + _ -> Return end. force_kill(Pid) -> @@ -438,7 +469,7 @@ do_kick_session(Action, ClientId, ChanPid) -> %% already deregistered ok; ConnMod when is_atom(ConnMod) -> - ok = kick_or_kill(Action, ConnMod, ChanPid) + ok = request_stepdown(Action, ConnMod, ChanPid) end. %% @private This function is shared for session 'kick' and 'discard' (as the first arg Action). diff --git a/apps/emqx/test/emqx_cm_SUITE.erl b/apps/emqx/test/emqx_cm_SUITE.erl index f28fd6bd8..2ea2c77ab 100644 --- a/apps/emqx/test/emqx_cm_SUITE.erl +++ b/apps/emqx/test/emqx_cm_SUITE.erl @@ -190,45 +190,77 @@ t_open_session_race_condition(_) -> ?assertEqual([], emqx_cm:lookup_channels(ClientId)). t_kick_session_discard_normal(_) -> - test_kick_session(discard, normal). + test_stepdown_session(discard, normal). t_kick_session_discard_shutdown(_) -> - test_kick_session(discard, shutdown). + test_stepdown_session(discard, shutdown). t_kick_session_discard_shutdown_with_reason(_) -> - test_kick_session(discard, {shutdown, discard}). + test_stepdown_session(discard, {shutdown, discard}). t_kick_session_discard_timeout(_) -> - test_kick_session(discard, timeout). + test_stepdown_session(discard, timeout). t_kick_session_discard_noproc(_) -> - test_kick_session(discard, noproc). + test_stepdown_session(discard, noproc). t_kick_session_kick_normal(_) -> - test_kick_session(discard, normal). + test_stepdown_session(kick, normal). t_kick_session_kick_shutdown(_) -> - test_kick_session(discard, shutdown). + test_stepdown_session(kick, shutdown). t_kick_session_kick_shutdown_with_reason(_) -> - test_kick_session(discard, {shutdown, discard}). + test_stepdown_session(kick, {shutdown, kicked}). t_kick_session_kick_timeout(_) -> - test_kick_session(discard, timeout). + test_stepdown_session(kick, timeout). t_kick_session_kick_noproc(_) -> - test_kick_session(discard, noproc). + test_stepdown_session(kick, noproc). -test_kick_session(Action, Reason) -> +t_stepdown_session_takeover_begin_normal(_) -> + test_stepdown_session({takeover, 'begin'}, normal). + +t_stepdown_session_takeover_begin_shutdown(_) -> + test_stepdown_session({takeover, 'begin'}, shutdown). + +t_stepdown_session_takeover_begin_shutdown_with_reason(_) -> + test_stepdown_session({takeover, 'begin'}, {shutdown, kicked}). + +t_stepdown_session_takeover_begin_timeout(_) -> + test_stepdown_session({takeover, 'begin'}, timeout). + +t_stepdown_session_takeover_begin_noproc(_) -> + test_stepdown_session({takeover, 'begin'}, noproc). + +t_stepdown_session_takeover_end_normal(_) -> + test_stepdown_session({takeover, 'end'}, normal). + +t_stepdown_session_takeover_end_shutdown(_) -> + test_stepdown_session({takeover, 'end'}, shutdown). + +t_stepdown_session_takeover_end_shutdown_with_reason(_) -> + test_stepdown_session({takeover, 'end'}, {shutdown, kicked}). + +t_stepdown_session_takeover_end_timeout(_) -> + test_stepdown_session({takeover, 'end'}, timeout). + +t_stepdown_session_takeover_end_noproc(_) -> + test_stepdown_session({takeover, 'end'}, noproc). + +test_stepdown_session(Action, Reason) -> ClientId = rand_client_id(), #{conninfo := ConnInfo} = ?ChanInfo, FakeSessionFun = fun Loop() -> receive {'$gen_call', From, A} when A =:= kick orelse - A =:= discard -> + A =:= discard orelse + A =:= {takeover, 'begin'} orelse + A =:= {takeover, 'end'} -> case Reason of - normal -> + normal when A =:= kick orelse A =:= discard -> gen_server:reply(From, ok); timeout -> %% no response to the call @@ -253,7 +285,8 @@ test_kick_session(Action, Reason) -> end, ok = case Action of kick -> emqx_cm:kick_session(ClientId); - discard -> emqx_cm:discard_session(ClientId) + discard -> emqx_cm:discard_session(ClientId); + {takeover, _} -> none = emqx_cm:takeover_session(ClientId), ok end, case Reason =:= timeout orelse Reason =:= noproc of true ->