From bfd0fd901932ee23d7aba0d7b4a986f208bafaf9 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 22 Feb 2022 10:16:29 +0800 Subject: [PATCH] refactor(cm): rename call_or_kill to takeover --- src/emqx_cm.erl | 117 +++++++++++++++++++++++++----------------------- 1 file changed, 60 insertions(+), 57 deletions(-) diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 4811313d8..002b1a746 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -235,13 +235,14 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) -> case takeover_session(ClientId) of {ok, ConnMod, ChanPid, Session} -> ok = emqx_session:resume(ClientInfo, Session), - case call_or_kill({takeover, 'end'}, ConnMod, ChanPid) of - {error, _} -> CreateSess(); - Pendings -> + case takeover('end', ConnMod, ChanPid) of + {ok, Pendings} -> register_channel(ClientId, Self, ConnInfo), {ok, #{session => Session, present => true, - pendings => Pendings}} + pendings => Pendings}}; + {error, _} -> + CreateSess() end; {error, _Reason} -> CreateSess() end @@ -277,11 +278,11 @@ takeover_session(ClientId, ChanPid) when node(ChanPid) == node() -> undefined -> {error, not_found}; ConnMod when is_atom(ConnMod) -> - case call_or_kill({takeover, 'begin'}, ConnMod, ChanPid) of + case takeover('begin', ConnMod, ChanPid) of + {ok, Session} -> + {ok, ConnMod, ChanPid, Session}; {error, Reason} -> - {error, Reason}; - Session -> - {ok, ConnMod, ChanPid, Session} + {error, Reason} end end; takeover_session(ClientId, ChanPid) -> @@ -299,59 +300,61 @@ discard_session(ClientId) when is_binary(ClientId) -> %% If failed to response (e.g. timeout) force a kill. %% Keeping the stale pid around, or returning error or raise an exception %% benefits nobody. --spec call_or_kill(Action, module(), pid()) - -> ok %% returned by kick, discard - | emqx_session:session() %% returned by {takeover, 'begin'} - | list(emqx_type:deliver()) %% returned by {takeover, 'end'} - when Action :: kick | discard | {takeover, 'begin'} | {takeover, 'end'}. -call_or_kill(Action, ConnMod, Pid) -> - try - Timeout = case Action of - {takeover, _} -> ?T_TAKEOVER; - _ -> ?T_KICK - end, +-spec takeover(Action, module(), pid()) + -> ok + | {ok, emqx_session:session() | list(emqx_type:deliver())} + | {error, term()} + when Action :: kick | discard | 'begin' | 'end'. +takeover(Action, ConnMod, Pid) -> + {NAction, Timeout} = + case Action == kick orelse Action == discard of + true -> {Action, ?T_KICK}; + _ -> {{takeover, Action},?T_TAKEOVER} + end, + Return = %% this is essentailly a gen_server:call implemented in emqx_connection %% and emqx_ws_connection. %% the handle_call is implemented in emqx_channel - apply(ConnMod, call, [Pid, Action, Timeout]) - catch - _ : noproc -> % emqx_ws_connection: call - ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}), - return_error_if_action_is_takeover(Action, noproc); - _ : {noproc, _} -> % emqx_connection: gen_server:call - ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}), - return_error_if_action_is_takeover(Action, noproc); - _ : Reason = {shutdown, _} -> - ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}), - return_error_if_action_is_takeover(Action, Reason); - _ : Reason = {{shutdown, _}, _} -> - ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}), - return_error_if_action_is_takeover(Action, Reason); - _ : {timeout, {gen_server, call, _}} -> - ?tp(warning, "call_session_timeout", - #{pid => Pid, - action => Action, - stale_channel => stale_channel_info(Pid) - }), - ok = force_kill(Pid), - return_error_if_action_is_takeover(Action, timeout); - _ : Error : St -> - ?tp(error, "call_session_exception", - #{pid => Pid, - action => Action, - reason => Error, - stacktrace => St, - stale_channel => stale_channel_info(Pid) - }), - ok = force_kill(Pid), - return_error_if_action_is_takeover(Action, Error) + try apply(ConnMod, call, [Pid, NAction, Timeout]) of + ok -> ok; + Reply -> {ok, Reply} + catch + _ : noproc -> % emqx_ws_connection: call + ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}), + {error, noproc}; + _ : {noproc, _} -> % emqx_connection: gen_server:call + ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}), + {error, noproc}; + _ : Reason = {shutdown, _} -> + ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}), + {error, Reason}; + _ : Reason = {{shutdown, _}, _} -> + ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}), + {error, Reason}; + _ : {timeout, {gen_server, call, _}} -> + ?tp(warning, "takeover_session_timeout", + #{pid => Pid, + action => Action, + stale_channel => stale_channel_info(Pid) + }), + ok = force_kill(Pid), + {error, timeout}; + _ : Error : St -> + ?tp(error, "takeover_session_exception", + #{pid => Pid, + action => Action, + reason => Error, + stacktrace => St, + stale_channel => stale_channel_info(Pid) + }), + ok = force_kill(Pid), + {error, Error} + end, + case Action == kick orelse Action == discard of + true -> ok; + _ -> Return end. -return_error_if_action_is_takeover({takeover, _}, Reason) -> - {error, Reason}; -return_error_if_action_is_takeover(_, _) -> - ok. - force_kill(Pid) -> exit(Pid, kill), ok. @@ -372,7 +375,7 @@ kick_session(Action, ClientId, ChanPid) when node(ChanPid) == node() -> %% already deregistered ok; ConnMod when is_atom(ConnMod) -> - ok = call_or_kill(Action, ConnMod, ChanPid) + ok = takeover(Action, ConnMod, ChanPid) end; kick_session(Action, ClientId, ChanPid) -> %% call remote node on the old APIs because we do not know if they have upgraded