From d622eb848faa6dcf7d765896bded62dee92ce1ba Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 24 Feb 2022 14:43:46 +0800 Subject: [PATCH] refactor(cm): rename takeover -> request_stepdown see: https://github.com/emqx/emqx/pull/7026#pullrequestreview-891954135 --- apps/emqx_exproto/src/emqx_exproto.appup.src | 12 +++++----- src/emqx_cm.erl | 24 ++++++++++---------- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/apps/emqx_exproto/src/emqx_exproto.appup.src b/apps/emqx_exproto/src/emqx_exproto.appup.src index e66e8a58c..adeeeab3f 100644 --- a/apps/emqx_exproto/src/emqx_exproto.appup.src +++ b/apps/emqx_exproto/src/emqx_exproto.appup.src @@ -1,22 +1,22 @@ %% -*- mode: erlang -*- {VSN, - [{<<"4\\.3\\.[4-5]">>, + [{<<"4\\.3\\.[4-5]$">>, [{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, - {<<"4\\.3\\.[2-3]">>, + {<<"4\\.3\\.[2-3]$">>, [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, - {<<"4\\.3\\.[0-1]">>, + {<<"4\\.3\\.[0-1]$">>, [{load_module,emqx_exproto_gsvr,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], - [{<<"4\\.3\\.[4-5]">>, + [{<<"4\\.3\\.[4-5]$">>, [{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, - {<<"4\\.3\\.[2-3]">>, + {<<"4\\.3\\.[2-3]$">>, [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, - {<<"4\\.3\\.[0-1]">>, + {<<"4\\.3\\.[0-1]$">>, [{load_module,emqx_exproto_gsvr,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 002b1a746..3a12b5a24 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -235,7 +235,7 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) -> case takeover_session(ClientId) of {ok, ConnMod, ChanPid, Session} -> ok = emqx_session:resume(ClientInfo, Session), - case takeover('end', ConnMod, ChanPid) of + case request_stepdown({takeover, 'end'}, ConnMod, ChanPid) of {ok, Pendings} -> register_channel(ClientId, Self, ConnInfo), {ok, #{session => Session, @@ -278,7 +278,7 @@ takeover_session(ClientId, ChanPid) when node(ChanPid) == node() -> undefined -> {error, not_found}; ConnMod when is_atom(ConnMod) -> - case takeover('begin', ConnMod, ChanPid) of + case request_stepdown({takeover, 'begin'}, ConnMod, ChanPid) of {ok, Session} -> {ok, ConnMod, ChanPid, Session}; {error, Reason} -> @@ -300,22 +300,22 @@ discard_session(ClientId) when is_binary(ClientId) -> %% If failed to response (e.g. timeout) force a kill. %% Keeping the stale pid around, or returning error or raise an exception %% benefits nobody. --spec takeover(Action, module(), pid()) +-spec request_stepdown(Action, module(), pid()) -> ok | {ok, emqx_session:session() | list(emqx_type:deliver())} | {error, term()} - when Action :: kick | discard | 'begin' | 'end'. -takeover(Action, ConnMod, Pid) -> - {NAction, Timeout} = + when Action :: kick | discard | {takeover, 'begin'} | {takeover, 'end'}. +request_stepdown(Action, ConnMod, Pid) -> + Timeout = case Action == kick orelse Action == discard of - true -> {Action, ?T_KICK}; - _ -> {{takeover, Action},?T_TAKEOVER} + true -> ?T_KICK; + _ -> ?T_TAKEOVER end, Return = %% this is essentailly a gen_server:call implemented in emqx_connection %% and emqx_ws_connection. %% the handle_call is implemented in emqx_channel - try apply(ConnMod, call, [Pid, NAction, Timeout]) of + try apply(ConnMod, call, [Pid, Action, Timeout]) of ok -> ok; Reply -> {ok, Reply} catch @@ -332,7 +332,7 @@ takeover(Action, ConnMod, Pid) -> ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}), {error, Reason}; _ : {timeout, {gen_server, call, _}} -> - ?tp(warning, "takeover_session_timeout", + ?tp(warning, "session_stepdown_request_timeout", #{pid => Pid, action => Action, stale_channel => stale_channel_info(Pid) @@ -340,7 +340,7 @@ takeover(Action, ConnMod, Pid) -> ok = force_kill(Pid), {error, timeout}; _ : Error : St -> - ?tp(error, "takeover_session_exception", + ?tp(error, "session_stepdown_request_exception", #{pid => Pid, action => Action, reason => Error, @@ -375,7 +375,7 @@ kick_session(Action, ClientId, ChanPid) when node(ChanPid) == node() -> %% already deregistered ok; ConnMod when is_atom(ConnMod) -> - ok = takeover(Action, ConnMod, ChanPid) + ok = request_stepdown(Action, ConnMod, ChanPid) end; kick_session(Action, ClientId, ChanPid) -> %% call remote node on the old APIs because we do not know if they have upgraded