Merge pull request #7124 from HJianBo/force-kill-takover-failure-proc

refactor(cm): rename takeover -> request_stepdown
This commit is contained in:
JianBo He 2022-02-24 17:46:09 +08:00 committed by GitHub
commit 456fcd6a54
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 53 additions and 53 deletions

View File

@ -235,7 +235,7 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
case takeover_session(ClientId) of case takeover_session(ClientId) of
{ok, ConnMod, ChanPid, Session} -> {ok, ConnMod, ChanPid, Session} ->
ok = emqx_session:resume(ClientInfo, Session), ok = emqx_session:resume(ClientInfo, Session),
case takeover('end', ConnMod, ChanPid) of case request_stepdown({takeover, 'end'}, ConnMod, ChanPid) of
{ok, Pendings} -> {ok, Pendings} ->
register_channel(ClientId, Self, ConnInfo), register_channel(ClientId, Self, ConnInfo),
{ok, #{session => Session, {ok, #{session => Session,
@ -278,7 +278,7 @@ takeover_session(ClientId, ChanPid) when node(ChanPid) == node() ->
undefined -> undefined ->
{error, not_found}; {error, not_found};
ConnMod when is_atom(ConnMod) -> ConnMod when is_atom(ConnMod) ->
case takeover('begin', ConnMod, ChanPid) of case request_stepdown({takeover, 'begin'}, ConnMod, ChanPid) of
{ok, Session} -> {ok, Session} ->
{ok, ConnMod, ChanPid, Session}; {ok, ConnMod, ChanPid, Session};
{error, Reason} -> {error, Reason} ->
@ -300,22 +300,22 @@ discard_session(ClientId) when is_binary(ClientId) ->
%% If failed to response (e.g. timeout) force a kill. %% If failed to response (e.g. timeout) force a kill.
%% Keeping the stale pid around, or returning error or raise an exception %% Keeping the stale pid around, or returning error or raise an exception
%% benefits nobody. %% benefits nobody.
-spec takeover(Action, module(), pid()) -spec request_stepdown(Action, module(), pid())
-> ok -> ok
| {ok, emqx_session:session() | list(emqx_type:deliver())} | {ok, emqx_session:session() | list(emqx_type:deliver())}
| {error, term()} | {error, term()}
when Action :: kick | discard | 'begin' | 'end'. when Action :: kick | discard | {takeover, 'begin'} | {takeover, 'end'}.
takeover(Action, ConnMod, Pid) -> request_stepdown(Action, ConnMod, Pid) ->
{NAction, Timeout} = Timeout =
case Action == kick orelse Action == discard of case Action == kick orelse Action == discard of
true -> {Action, ?T_KICK}; true -> ?T_KICK;
_ -> {{takeover, Action},?T_TAKEOVER} _ -> ?T_TAKEOVER
end, end,
Return = Return =
%% this is essentailly a gen_server:call implemented in emqx_connection %% this is essentailly a gen_server:call implemented in emqx_connection
%% and emqx_ws_connection. %% and emqx_ws_connection.
%% the handle_call is implemented in emqx_channel %% the handle_call is implemented in emqx_channel
try apply(ConnMod, call, [Pid, NAction, Timeout]) of try apply(ConnMod, call, [Pid, Action, Timeout]) of
ok -> ok; ok -> ok;
Reply -> {ok, Reply} Reply -> {ok, Reply}
catch catch
@ -332,7 +332,7 @@ takeover(Action, ConnMod, Pid) ->
ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}), ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}),
{error, Reason}; {error, Reason};
_ : {timeout, {gen_server, call, _}} -> _ : {timeout, {gen_server, call, _}} ->
?tp(warning, "takeover_session_timeout", ?tp(warning, "session_stepdown_request_timeout",
#{pid => Pid, #{pid => Pid,
action => Action, action => Action,
stale_channel => stale_channel_info(Pid) stale_channel => stale_channel_info(Pid)
@ -340,7 +340,7 @@ takeover(Action, ConnMod, Pid) ->
ok = force_kill(Pid), ok = force_kill(Pid),
{error, timeout}; {error, timeout};
_ : Error : St -> _ : Error : St ->
?tp(error, "takeover_session_exception", ?tp(error, "session_stepdown_request_exception",
#{pid => Pid, #{pid => Pid,
action => Action, action => Action,
reason => Error, reason => Error,
@ -375,7 +375,7 @@ kick_session(Action, ClientId, ChanPid) when node(ChanPid) == node() ->
%% already deregistered %% already deregistered
ok; ok;
ConnMod when is_atom(ConnMod) -> ConnMod when is_atom(ConnMod) ->
ok = takeover(Action, ConnMod, ChanPid) ok = request_stepdown(Action, ConnMod, ChanPid)
end; end;
kick_session(Action, ClientId, ChanPid) -> kick_session(Action, ClientId, ChanPid) ->
%% call remote node on the old APIs because we do not know if they have upgraded %% call remote node on the old APIs because we do not know if they have upgraded

View File

@ -187,67 +187,67 @@ t_open_session_race_condition(_) ->
ok = flush_emqx_pool(), ok = flush_emqx_pool(),
?assertEqual([], emqx_cm:lookup_channels(ClientId)). ?assertEqual([], emqx_cm:lookup_channels(ClientId)).
t_call_session_discard_normal(_) -> t_stepdown_sessiondiscard_normal(_) ->
test_call_session(discard, normal). test_stepdown_session(discard, normal).
t_call_session_discard_shutdown(_) -> t_stepdown_sessiondiscard_shutdown(_) ->
test_call_session(discard, shutdown). test_stepdown_session(discard, shutdown).
t_call_session_discard_shutdown_with_reason(_) -> t_stepdown_sessiondiscard_shutdown_with_reason(_) ->
test_call_session(discard, {shutdown, discard}). test_stepdown_session(discard, {shutdown, discard}).
t_call_session_discard_timeout(_) -> t_stepdown_sessiondiscard_timeout(_) ->
test_call_session(discard, timeout). test_stepdown_session(discard, timeout).
t_call_session_discard_noproc(_) -> t_stepdown_sessiondiscard_noproc(_) ->
test_call_session(discard, noproc). test_stepdown_session(discard, noproc).
t_call_session_kick_normal(_) -> t_stepdown_sessionkick_normal(_) ->
test_call_session(kick, normal). test_stepdown_session(kick, normal).
t_call_session_kick_shutdown(_) -> t_stepdown_sessionkick_shutdown(_) ->
test_call_session(kick, shutdown). test_stepdown_session(kick, shutdown).
t_call_session_kick_shutdown_with_reason(_) -> t_stepdown_sessionkick_shutdown_with_reason(_) ->
test_call_session(kick, {shutdown, discard}). test_stepdown_session(kick, {shutdown, discard}).
t_call_session_kick_timeout(_) -> t_stepdown_sessionkick_timeout(_) ->
test_call_session(kick, timeout). test_stepdown_session(kick, timeout).
t_call_session_kick_noproc(_) -> t_stepdown_sessionkick_noproc(_) ->
test_call_session(discard, noproc). test_stepdown_session(discard, noproc).
t_call_session_takeover_begin_normal(_) -> t_stepdown_sessiontakeover_begin_normal(_) ->
test_call_session({takeover, 'begin'}, normal). test_stepdown_session({takeover, 'begin'}, normal).
t_call_session_takeover_begin_shutdown(_) -> t_stepdown_sessiontakeover_begin_shutdown(_) ->
test_call_session({takeover, 'begin'}, shutdown). test_stepdown_session({takeover, 'begin'}, shutdown).
t_call_session_takeover_begin_shutdown_with_reason(_) -> t_stepdown_sessiontakeover_begin_shutdown_with_reason(_) ->
test_call_session({takeover, 'begin'}, {shutdown, discard}). test_stepdown_session({takeover, 'begin'}, {shutdown, discard}).
t_call_session_takeover_begin_timeout(_) -> t_stepdown_sessiontakeover_begin_timeout(_) ->
test_call_session({takeover, 'begin'}, timeout). test_stepdown_session({takeover, 'begin'}, timeout).
t_call_session_takeover_begin_noproc(_) -> t_stepdown_sessiontakeover_begin_noproc(_) ->
test_call_session({takeover, 'begin'}, noproc). test_stepdown_session({takeover, 'begin'}, noproc).
t_call_session_takeover_end_normal(_) -> t_stepdown_sessiontakeover_end_normal(_) ->
test_call_session({takeover, 'end'}, normal). test_stepdown_session({takeover, 'end'}, normal).
t_call_session_takeover_end_shutdown(_) -> t_stepdown_sessiontakeover_end_shutdown(_) ->
test_call_session({takeover, 'end'}, shutdown). test_stepdown_session({takeover, 'end'}, shutdown).
t_call_session_takeover_end_shutdown_with_reason(_) -> t_stepdown_sessiontakeover_end_shutdown_with_reason(_) ->
test_call_session({takeover, 'end'}, {shutdown, discard}). test_stepdown_session({takeover, 'end'}, {shutdown, discard}).
t_call_session_takeover_end_timeout(_) -> t_stepdown_sessiontakeover_end_timeout(_) ->
test_call_session({takeover, 'end'}, timeout). test_stepdown_session({takeover, 'end'}, timeout).
t_call_session_takeover_end_noproc(_) -> t_stepdown_sessiontakeover_end_noproc(_) ->
test_call_session({takeover, 'end'}, noproc). test_stepdown_session({takeover, 'end'}, noproc).
test_call_session(Action, Reason) -> test_stepdown_session(Action, Reason) ->
ClientId = rand_client_id(), ClientId = rand_client_id(),
#{conninfo := ConnInfo} = ?ChanInfo, #{conninfo := ConnInfo} = ?ChanInfo,
FakeSessionFun = FakeSessionFun =