diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 23f078568..4811313d8 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -226,18 +226,24 @@ open_session(true, ClientInfo = #{clientid := ClientId}, ConnInfo) -> open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) -> Self = self(), ResumeStart = fun(_) -> - case takeover_session(ClientId) of - {ok, ConnMod, ChanPid, Session} -> - ok = emqx_session:resume(ClientInfo, Session), - Pendings = ConnMod:call(ChanPid, {takeover, 'end'}, ?T_TAKEOVER), - register_channel(ClientId, Self, ConnInfo), - {ok, #{session => Session, - present => true, - pendings => Pendings}}; - {error, not_found} -> + CreateSess = + fun() -> Session = create_session(ClientInfo, ConnInfo), register_channel(ClientId, Self, ConnInfo), {ok, #{session => Session, present => false}} + end, + case takeover_session(ClientId) of + {ok, ConnMod, ChanPid, Session} -> + ok = emqx_session:resume(ClientInfo, Session), + case call_or_kill({takeover, 'end'}, ConnMod, ChanPid) of + {error, _} -> CreateSess(); + Pendings -> + register_channel(ClientId, Self, ConnInfo), + {ok, #{session => Session, + present => true, + pendings => Pendings}} + end; + {error, _Reason} -> CreateSess() end end, emqx_cm_locker:trans(ClientId, ResumeStart). @@ -271,9 +277,12 @@ takeover_session(ClientId, ChanPid) when node(ChanPid) == node() -> undefined -> {error, not_found}; ConnMod when is_atom(ConnMod) -> - %% TODO: if takeover times out, maybe kill the old? - Session = ConnMod:call(ChanPid, {takeover, 'begin'}, ?T_TAKEOVER), - {ok, ConnMod, ChanPid, Session} + case call_or_kill({takeover, 'begin'}, ConnMod, ChanPid) of + {error, Reason} -> + {error, Reason}; + Session -> + {ok, ConnMod, ChanPid, Session} + end end; takeover_session(ClientId, ChanPid) -> rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid], ?T_TAKEOVER). @@ -286,44 +295,63 @@ discard_session(ClientId) when is_binary(ClientId) -> ChanPids -> lists:foreach(fun(Pid) -> discard_session(ClientId, Pid) end, ChanPids) end. -%% @private Kick a local stale session to force it step down. -%% If failed to kick (e.g. timeout) force a kill. +%% @private call a local stale session to execute an Action. +%% If failed to response (e.g. timeout) force a kill. %% Keeping the stale pid around, or returning error or raise an exception %% benefits nobody. --spec kick_or_kill(kick | discard, module(), pid()) -> ok. -kick_or_kill(Action, ConnMod, Pid) -> +-spec call_or_kill(Action, module(), pid()) + -> ok %% returned by kick, discard + | emqx_session:session() %% returned by {takeover, 'begin'} + | list(emqx_type:deliver()) %% returned by {takeover, 'end'} + when Action :: kick | discard | {takeover, 'begin'} | {takeover, 'end'}. +call_or_kill(Action, ConnMod, Pid) -> try + Timeout = case Action of + {takeover, _} -> ?T_TAKEOVER; + _ -> ?T_KICK + end, %% this is essentailly a gen_server:call implemented in emqx_connection %% and emqx_ws_connection. %% the handle_call is implemented in emqx_channel - ok = apply(ConnMod, call, [Pid, Action, ?T_KICK]) + apply(ConnMod, call, [Pid, Action, Timeout]) catch _ : noproc -> % emqx_ws_connection: call - ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}); + ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}), + return_error_if_action_is_takeover(Action, noproc); _ : {noproc, _} -> % emqx_connection: gen_server:call - ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}); - _ : {shutdown, _} -> - ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}); - _ : {{shutdown, _}, _} -> - ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}); + ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}), + return_error_if_action_is_takeover(Action, noproc); + _ : Reason = {shutdown, _} -> + ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}), + return_error_if_action_is_takeover(Action, Reason); + _ : Reason = {{shutdown, _}, _} -> + ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}), + return_error_if_action_is_takeover(Action, Reason); _ : {timeout, {gen_server, call, _}} -> - ?tp(warning, "session_kick_timeout", + ?tp(warning, "call_session_timeout", #{pid => Pid, action => Action, stale_channel => stale_channel_info(Pid) }), - ok = force_kill(Pid); + ok = force_kill(Pid), + return_error_if_action_is_takeover(Action, timeout); _ : Error : St -> - ?tp(error, "session_kick_exception", + ?tp(error, "call_session_exception", #{pid => Pid, action => Action, reason => Error, stacktrace => St, stale_channel => stale_channel_info(Pid) }), - ok = force_kill(Pid) + ok = force_kill(Pid), + return_error_if_action_is_takeover(Action, Error) end. +return_error_if_action_is_takeover({takeover, _}, Reason) -> + {error, Reason}; +return_error_if_action_is_takeover(_, _) -> + ok. + force_kill(Pid) -> exit(Pid, kill), ok. @@ -344,7 +372,7 @@ kick_session(Action, ClientId, ChanPid) when node(ChanPid) == node() -> %% already deregistered ok; ConnMod when is_atom(ConnMod) -> - ok = kick_or_kill(Action, ConnMod, ChanPid) + ok = call_or_kill(Action, ConnMod, ChanPid) end; kick_session(Action, ClientId, ChanPid) -> %% call remote node on the old APIs because we do not know if they have upgraded