refactor(cm): rename takeover -> request_stepdown

see: https://github.com/emqx/emqx/pull/7026#pullrequestreview-891954135
This commit is contained in:
JianBo He 2022-02-24 14:43:46 +08:00
parent d943cc2f1c
commit d622eb848f
2 changed files with 18 additions and 18 deletions

View File

@ -1,22 +1,22 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{VSN, {VSN,
[{<<"4\\.3\\.[4-5]">>, [{<<"4\\.3\\.[4-5]$">>,
[{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, [{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[2-3]">>, {<<"4\\.3\\.[2-3]$">>,
[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[0-1]">>, {<<"4\\.3\\.[0-1]$">>,
[{load_module,emqx_exproto_gsvr,brutal_purge,soft_purge,[]}, [{load_module,emqx_exproto_gsvr,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}], {<<".*">>,[]}],
[{<<"4\\.3\\.[4-5]">>, [{<<"4\\.3\\.[4-5]$">>,
[{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, [{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[2-3]">>, {<<"4\\.3\\.[2-3]$">>,
[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[0-1]">>, {<<"4\\.3\\.[0-1]$">>,
[{load_module,emqx_exproto_gsvr,brutal_purge,soft_purge,[]}, [{load_module,emqx_exproto_gsvr,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},

View File

@ -235,7 +235,7 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
case takeover_session(ClientId) of case takeover_session(ClientId) of
{ok, ConnMod, ChanPid, Session} -> {ok, ConnMod, ChanPid, Session} ->
ok = emqx_session:resume(ClientInfo, Session), ok = emqx_session:resume(ClientInfo, Session),
case takeover('end', ConnMod, ChanPid) of case request_stepdown({takeover, 'end'}, ConnMod, ChanPid) of
{ok, Pendings} -> {ok, Pendings} ->
register_channel(ClientId, Self, ConnInfo), register_channel(ClientId, Self, ConnInfo),
{ok, #{session => Session, {ok, #{session => Session,
@ -278,7 +278,7 @@ takeover_session(ClientId, ChanPid) when node(ChanPid) == node() ->
undefined -> undefined ->
{error, not_found}; {error, not_found};
ConnMod when is_atom(ConnMod) -> ConnMod when is_atom(ConnMod) ->
case takeover('begin', ConnMod, ChanPid) of case request_stepdown({takeover, 'begin'}, ConnMod, ChanPid) of
{ok, Session} -> {ok, Session} ->
{ok, ConnMod, ChanPid, Session}; {ok, ConnMod, ChanPid, Session};
{error, Reason} -> {error, Reason} ->
@ -300,22 +300,22 @@ discard_session(ClientId) when is_binary(ClientId) ->
%% If failed to response (e.g. timeout) force a kill. %% If failed to response (e.g. timeout) force a kill.
%% Keeping the stale pid around, or returning error or raise an exception %% Keeping the stale pid around, or returning error or raise an exception
%% benefits nobody. %% benefits nobody.
-spec takeover(Action, module(), pid()) -spec request_stepdown(Action, module(), pid())
-> ok -> ok
| {ok, emqx_session:session() | list(emqx_type:deliver())} | {ok, emqx_session:session() | list(emqx_type:deliver())}
| {error, term()} | {error, term()}
when Action :: kick | discard | 'begin' | 'end'. when Action :: kick | discard | {takeover, 'begin'} | {takeover, 'end'}.
takeover(Action, ConnMod, Pid) -> request_stepdown(Action, ConnMod, Pid) ->
{NAction, Timeout} = Timeout =
case Action == kick orelse Action == discard of case Action == kick orelse Action == discard of
true -> {Action, ?T_KICK}; true -> ?T_KICK;
_ -> {{takeover, Action},?T_TAKEOVER} _ -> ?T_TAKEOVER
end, end,
Return = Return =
%% this is essentailly a gen_server:call implemented in emqx_connection %% this is essentailly a gen_server:call implemented in emqx_connection
%% and emqx_ws_connection. %% and emqx_ws_connection.
%% the handle_call is implemented in emqx_channel %% the handle_call is implemented in emqx_channel
try apply(ConnMod, call, [Pid, NAction, Timeout]) of try apply(ConnMod, call, [Pid, Action, Timeout]) of
ok -> ok; ok -> ok;
Reply -> {ok, Reply} Reply -> {ok, Reply}
catch catch
@ -332,7 +332,7 @@ takeover(Action, ConnMod, Pid) ->
ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}), ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}),
{error, Reason}; {error, Reason};
_ : {timeout, {gen_server, call, _}} -> _ : {timeout, {gen_server, call, _}} ->
?tp(warning, "takeover_session_timeout", ?tp(warning, "session_stepdown_request_timeout",
#{pid => Pid, #{pid => Pid,
action => Action, action => Action,
stale_channel => stale_channel_info(Pid) stale_channel => stale_channel_info(Pid)
@ -340,7 +340,7 @@ takeover(Action, ConnMod, Pid) ->
ok = force_kill(Pid), ok = force_kill(Pid),
{error, timeout}; {error, timeout};
_ : Error : St -> _ : Error : St ->
?tp(error, "takeover_session_exception", ?tp(error, "session_stepdown_request_exception",
#{pid => Pid, #{pid => Pid,
action => Action, action => Action,
reason => Error, reason => Error,
@ -375,7 +375,7 @@ kick_session(Action, ClientId, ChanPid) when node(ChanPid) == node() ->
%% already deregistered %% already deregistered
ok; ok;
ConnMod when is_atom(ConnMod) -> ConnMod when is_atom(ConnMod) ->
ok = takeover(Action, ConnMod, ChanPid) ok = request_stepdown(Action, ConnMod, ChanPid)
end; end;
kick_session(Action, ClientId, ChanPid) -> kick_session(Action, ClientId, ChanPid) ->
%% call remote node on the old APIs because we do not know if they have upgraded %% call remote node on the old APIs because we do not know if they have upgraded