diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 002b1a746..3a12b5a24 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -235,7 +235,7 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) -> case takeover_session(ClientId) of {ok, ConnMod, ChanPid, Session} -> ok = emqx_session:resume(ClientInfo, Session), - case takeover('end', ConnMod, ChanPid) of + case request_stepdown({takeover, 'end'}, ConnMod, ChanPid) of {ok, Pendings} -> register_channel(ClientId, Self, ConnInfo), {ok, #{session => Session, @@ -278,7 +278,7 @@ takeover_session(ClientId, ChanPid) when node(ChanPid) == node() -> undefined -> {error, not_found}; ConnMod when is_atom(ConnMod) -> - case takeover('begin', ConnMod, ChanPid) of + case request_stepdown({takeover, 'begin'}, ConnMod, ChanPid) of {ok, Session} -> {ok, ConnMod, ChanPid, Session}; {error, Reason} -> @@ -300,22 +300,22 @@ discard_session(ClientId) when is_binary(ClientId) -> %% If failed to response (e.g. timeout) force a kill. %% Keeping the stale pid around, or returning error or raise an exception %% benefits nobody. --spec takeover(Action, module(), pid()) +-spec request_stepdown(Action, module(), pid()) -> ok | {ok, emqx_session:session() | list(emqx_type:deliver())} | {error, term()} - when Action :: kick | discard | 'begin' | 'end'. -takeover(Action, ConnMod, Pid) -> - {NAction, Timeout} = + when Action :: kick | discard | {takeover, 'begin'} | {takeover, 'end'}. +request_stepdown(Action, ConnMod, Pid) -> + Timeout = case Action == kick orelse Action == discard of - true -> {Action, ?T_KICK}; - _ -> {{takeover, Action},?T_TAKEOVER} + true -> ?T_KICK; + _ -> ?T_TAKEOVER end, Return = %% this is essentailly a gen_server:call implemented in emqx_connection %% and emqx_ws_connection. %% the handle_call is implemented in emqx_channel - try apply(ConnMod, call, [Pid, NAction, Timeout]) of + try apply(ConnMod, call, [Pid, Action, Timeout]) of ok -> ok; Reply -> {ok, Reply} catch @@ -332,7 +332,7 @@ takeover(Action, ConnMod, Pid) -> ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}), {error, Reason}; _ : {timeout, {gen_server, call, _}} -> - ?tp(warning, "takeover_session_timeout", + ?tp(warning, "session_stepdown_request_timeout", #{pid => Pid, action => Action, stale_channel => stale_channel_info(Pid) @@ -340,7 +340,7 @@ takeover(Action, ConnMod, Pid) -> ok = force_kill(Pid), {error, timeout}; _ : Error : St -> - ?tp(error, "takeover_session_exception", + ?tp(error, "session_stepdown_request_exception", #{pid => Pid, action => Action, reason => Error, @@ -375,7 +375,7 @@ kick_session(Action, ClientId, ChanPid) when node(ChanPid) == node() -> %% already deregistered ok; ConnMod when is_atom(ConnMod) -> - ok = takeover(Action, ConnMod, ChanPid) + ok = request_stepdown(Action, ConnMod, ChanPid) end; kick_session(Action, ClientId, ChanPid) -> %% call remote node on the old APIs because we do not know if they have upgraded diff --git a/test/emqx_cm_SUITE.erl b/test/emqx_cm_SUITE.erl index c82e525f0..4fcaaf473 100644 --- a/test/emqx_cm_SUITE.erl +++ b/test/emqx_cm_SUITE.erl @@ -187,67 +187,67 @@ t_open_session_race_condition(_) -> ok = flush_emqx_pool(), ?assertEqual([], emqx_cm:lookup_channels(ClientId)). -t_call_session_discard_normal(_) -> - test_call_session(discard, normal). +t_stepdown_sessiondiscard_normal(_) -> + test_stepdown_session(discard, normal). -t_call_session_discard_shutdown(_) -> - test_call_session(discard, shutdown). +t_stepdown_sessiondiscard_shutdown(_) -> + test_stepdown_session(discard, shutdown). -t_call_session_discard_shutdown_with_reason(_) -> - test_call_session(discard, {shutdown, discard}). +t_stepdown_sessiondiscard_shutdown_with_reason(_) -> + test_stepdown_session(discard, {shutdown, discard}). -t_call_session_discard_timeout(_) -> - test_call_session(discard, timeout). +t_stepdown_sessiondiscard_timeout(_) -> + test_stepdown_session(discard, timeout). -t_call_session_discard_noproc(_) -> - test_call_session(discard, noproc). +t_stepdown_sessiondiscard_noproc(_) -> + test_stepdown_session(discard, noproc). -t_call_session_kick_normal(_) -> - test_call_session(kick, normal). +t_stepdown_sessionkick_normal(_) -> + test_stepdown_session(kick, normal). -t_call_session_kick_shutdown(_) -> - test_call_session(kick, shutdown). +t_stepdown_sessionkick_shutdown(_) -> + test_stepdown_session(kick, shutdown). -t_call_session_kick_shutdown_with_reason(_) -> - test_call_session(kick, {shutdown, discard}). +t_stepdown_sessionkick_shutdown_with_reason(_) -> + test_stepdown_session(kick, {shutdown, discard}). -t_call_session_kick_timeout(_) -> - test_call_session(kick, timeout). +t_stepdown_sessionkick_timeout(_) -> + test_stepdown_session(kick, timeout). -t_call_session_kick_noproc(_) -> - test_call_session(discard, noproc). +t_stepdown_sessionkick_noproc(_) -> + test_stepdown_session(discard, noproc). -t_call_session_takeover_begin_normal(_) -> - test_call_session({takeover, 'begin'}, normal). +t_stepdown_sessiontakeover_begin_normal(_) -> + test_stepdown_session({takeover, 'begin'}, normal). -t_call_session_takeover_begin_shutdown(_) -> - test_call_session({takeover, 'begin'}, shutdown). +t_stepdown_sessiontakeover_begin_shutdown(_) -> + test_stepdown_session({takeover, 'begin'}, shutdown). -t_call_session_takeover_begin_shutdown_with_reason(_) -> - test_call_session({takeover, 'begin'}, {shutdown, discard}). +t_stepdown_sessiontakeover_begin_shutdown_with_reason(_) -> + test_stepdown_session({takeover, 'begin'}, {shutdown, discard}). -t_call_session_takeover_begin_timeout(_) -> - test_call_session({takeover, 'begin'}, timeout). +t_stepdown_sessiontakeover_begin_timeout(_) -> + test_stepdown_session({takeover, 'begin'}, timeout). -t_call_session_takeover_begin_noproc(_) -> - test_call_session({takeover, 'begin'}, noproc). +t_stepdown_sessiontakeover_begin_noproc(_) -> + test_stepdown_session({takeover, 'begin'}, noproc). -t_call_session_takeover_end_normal(_) -> - test_call_session({takeover, 'end'}, normal). +t_stepdown_sessiontakeover_end_normal(_) -> + test_stepdown_session({takeover, 'end'}, normal). -t_call_session_takeover_end_shutdown(_) -> - test_call_session({takeover, 'end'}, shutdown). +t_stepdown_sessiontakeover_end_shutdown(_) -> + test_stepdown_session({takeover, 'end'}, shutdown). -t_call_session_takeover_end_shutdown_with_reason(_) -> - test_call_session({takeover, 'end'}, {shutdown, discard}). +t_stepdown_sessiontakeover_end_shutdown_with_reason(_) -> + test_stepdown_session({takeover, 'end'}, {shutdown, discard}). -t_call_session_takeover_end_timeout(_) -> - test_call_session({takeover, 'end'}, timeout). +t_stepdown_sessiontakeover_end_timeout(_) -> + test_stepdown_session({takeover, 'end'}, timeout). -t_call_session_takeover_end_noproc(_) -> - test_call_session({takeover, 'end'}, noproc). +t_stepdown_sessiontakeover_end_noproc(_) -> + test_stepdown_session({takeover, 'end'}, noproc). -test_call_session(Action, Reason) -> +test_stepdown_session(Action, Reason) -> ClientId = rand_client_id(), #{conninfo := ConnInfo} = ?ChanInfo, FakeSessionFun =