refactor(cm): rename call_or_kill to takeover

This commit is contained in:
JianBo He 2022-02-22 10:16:29 +08:00
parent 66807f17df
commit bfd0fd9019
1 changed files with 60 additions and 57 deletions

View File

@ -235,13 +235,14 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
case takeover_session(ClientId) of case takeover_session(ClientId) of
{ok, ConnMod, ChanPid, Session} -> {ok, ConnMod, ChanPid, Session} ->
ok = emqx_session:resume(ClientInfo, Session), ok = emqx_session:resume(ClientInfo, Session),
case call_or_kill({takeover, 'end'}, ConnMod, ChanPid) of case takeover('end', ConnMod, ChanPid) of
{error, _} -> CreateSess(); {ok, Pendings} ->
Pendings ->
register_channel(ClientId, Self, ConnInfo), register_channel(ClientId, Self, ConnInfo),
{ok, #{session => Session, {ok, #{session => Session,
present => true, present => true,
pendings => Pendings}} pendings => Pendings}};
{error, _} ->
CreateSess()
end; end;
{error, _Reason} -> CreateSess() {error, _Reason} -> CreateSess()
end end
@ -277,11 +278,11 @@ takeover_session(ClientId, ChanPid) when node(ChanPid) == node() ->
undefined -> undefined ->
{error, not_found}; {error, not_found};
ConnMod when is_atom(ConnMod) -> ConnMod when is_atom(ConnMod) ->
case call_or_kill({takeover, 'begin'}, ConnMod, ChanPid) of case takeover('begin', ConnMod, ChanPid) of
{ok, Session} ->
{ok, ConnMod, ChanPid, Session};
{error, Reason} -> {error, Reason} ->
{error, Reason}; {error, Reason}
Session ->
{ok, ConnMod, ChanPid, Session}
end end
end; end;
takeover_session(ClientId, ChanPid) -> takeover_session(ClientId, ChanPid) ->
@ -299,44 +300,47 @@ discard_session(ClientId) when is_binary(ClientId) ->
%% If failed to response (e.g. timeout) force a kill. %% If failed to response (e.g. timeout) force a kill.
%% Keeping the stale pid around, or returning error or raise an exception %% Keeping the stale pid around, or returning error or raise an exception
%% benefits nobody. %% benefits nobody.
-spec call_or_kill(Action, module(), pid()) -spec takeover(Action, module(), pid())
-> ok %% returned by kick, discard -> ok
| emqx_session:session() %% returned by {takeover, 'begin'} | {ok, emqx_session:session() | list(emqx_type:deliver())}
| list(emqx_type:deliver()) %% returned by {takeover, 'end'} | {error, term()}
when Action :: kick | discard | {takeover, 'begin'} | {takeover, 'end'}. when Action :: kick | discard | 'begin' | 'end'.
call_or_kill(Action, ConnMod, Pid) -> takeover(Action, ConnMod, Pid) ->
try {NAction, Timeout} =
Timeout = case Action of case Action == kick orelse Action == discard of
{takeover, _} -> ?T_TAKEOVER; true -> {Action, ?T_KICK};
_ -> ?T_KICK _ -> {{takeover, Action},?T_TAKEOVER}
end, end,
Return =
%% this is essentailly a gen_server:call implemented in emqx_connection %% this is essentailly a gen_server:call implemented in emqx_connection
%% and emqx_ws_connection. %% and emqx_ws_connection.
%% the handle_call is implemented in emqx_channel %% the handle_call is implemented in emqx_channel
apply(ConnMod, call, [Pid, Action, Timeout]) try apply(ConnMod, call, [Pid, NAction, Timeout]) of
ok -> ok;
Reply -> {ok, Reply}
catch catch
_ : noproc -> % emqx_ws_connection: call _ : noproc -> % emqx_ws_connection: call
ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}), ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}),
return_error_if_action_is_takeover(Action, noproc); {error, noproc};
_ : {noproc, _} -> % emqx_connection: gen_server:call _ : {noproc, _} -> % emqx_connection: gen_server:call
ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}), ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}),
return_error_if_action_is_takeover(Action, noproc); {error, noproc};
_ : Reason = {shutdown, _} -> _ : Reason = {shutdown, _} ->
ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}), ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}),
return_error_if_action_is_takeover(Action, Reason); {error, Reason};
_ : Reason = {{shutdown, _}, _} -> _ : Reason = {{shutdown, _}, _} ->
ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}), ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}),
return_error_if_action_is_takeover(Action, Reason); {error, Reason};
_ : {timeout, {gen_server, call, _}} -> _ : {timeout, {gen_server, call, _}} ->
?tp(warning, "call_session_timeout", ?tp(warning, "takeover_session_timeout",
#{pid => Pid, #{pid => Pid,
action => Action, action => Action,
stale_channel => stale_channel_info(Pid) stale_channel => stale_channel_info(Pid)
}), }),
ok = force_kill(Pid), ok = force_kill(Pid),
return_error_if_action_is_takeover(Action, timeout); {error, timeout};
_ : Error : St -> _ : Error : St ->
?tp(error, "call_session_exception", ?tp(error, "takeover_session_exception",
#{pid => Pid, #{pid => Pid,
action => Action, action => Action,
reason => Error, reason => Error,
@ -344,14 +348,13 @@ call_or_kill(Action, ConnMod, Pid) ->
stale_channel => stale_channel_info(Pid) stale_channel => stale_channel_info(Pid)
}), }),
ok = force_kill(Pid), ok = force_kill(Pid),
return_error_if_action_is_takeover(Action, Error) {error, Error}
end,
case Action == kick orelse Action == discard of
true -> ok;
_ -> Return
end. end.
return_error_if_action_is_takeover({takeover, _}, Reason) ->
{error, Reason};
return_error_if_action_is_takeover(_, _) ->
ok.
force_kill(Pid) -> force_kill(Pid) ->
exit(Pid, kill), exit(Pid, kill),
ok. ok.
@ -372,7 +375,7 @@ kick_session(Action, ClientId, ChanPid) when node(ChanPid) == node() ->
%% already deregistered %% already deregistered
ok; ok;
ConnMod when is_atom(ConnMod) -> ConnMod when is_atom(ConnMod) ->
ok = call_or_kill(Action, ConnMod, ChanPid) ok = takeover(Action, ConnMod, ChanPid)
end; end;
kick_session(Action, ClientId, ChanPid) -> kick_session(Action, ClientId, ChanPid) ->
%% call remote node on the old APIs because we do not know if they have upgraded %% call remote node on the old APIs because we do not know if they have upgraded