feat(cm): force shutdown of processe that cannot answer takeover event
Related PR: #6030
This commit is contained in:
parent
c9ff263e59
commit
6dd0b49dd2
|
@ -226,18 +226,24 @@ open_session(true, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
|
||||||
open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
|
open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
|
||||||
Self = self(),
|
Self = self(),
|
||||||
ResumeStart = fun(_) ->
|
ResumeStart = fun(_) ->
|
||||||
case takeover_session(ClientId) of
|
CreateSess =
|
||||||
{ok, ConnMod, ChanPid, Session} ->
|
fun() ->
|
||||||
ok = emqx_session:resume(ClientInfo, Session),
|
|
||||||
Pendings = ConnMod:call(ChanPid, {takeover, 'end'}, ?T_TAKEOVER),
|
|
||||||
register_channel(ClientId, Self, ConnInfo),
|
|
||||||
{ok, #{session => Session,
|
|
||||||
present => true,
|
|
||||||
pendings => Pendings}};
|
|
||||||
{error, not_found} ->
|
|
||||||
Session = create_session(ClientInfo, ConnInfo),
|
Session = create_session(ClientInfo, ConnInfo),
|
||||||
register_channel(ClientId, Self, ConnInfo),
|
register_channel(ClientId, Self, ConnInfo),
|
||||||
{ok, #{session => Session, present => false}}
|
{ok, #{session => Session, present => false}}
|
||||||
|
end,
|
||||||
|
case takeover_session(ClientId) of
|
||||||
|
{ok, ConnMod, ChanPid, Session} ->
|
||||||
|
ok = emqx_session:resume(ClientInfo, Session),
|
||||||
|
case call_or_kill({takeover, 'end'}, ConnMod, ChanPid) of
|
||||||
|
{error, _} -> CreateSess();
|
||||||
|
Pendings ->
|
||||||
|
register_channel(ClientId, Self, ConnInfo),
|
||||||
|
{ok, #{session => Session,
|
||||||
|
present => true,
|
||||||
|
pendings => Pendings}}
|
||||||
|
end;
|
||||||
|
{error, _Reason} -> CreateSess()
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
emqx_cm_locker:trans(ClientId, ResumeStart).
|
emqx_cm_locker:trans(ClientId, ResumeStart).
|
||||||
|
@ -271,9 +277,12 @@ takeover_session(ClientId, ChanPid) when node(ChanPid) == node() ->
|
||||||
undefined ->
|
undefined ->
|
||||||
{error, not_found};
|
{error, not_found};
|
||||||
ConnMod when is_atom(ConnMod) ->
|
ConnMod when is_atom(ConnMod) ->
|
||||||
%% TODO: if takeover times out, maybe kill the old?
|
case call_or_kill({takeover, 'begin'}, ConnMod, ChanPid) of
|
||||||
Session = ConnMod:call(ChanPid, {takeover, 'begin'}, ?T_TAKEOVER),
|
{error, Reason} ->
|
||||||
|
{error, Reason};
|
||||||
|
Session ->
|
||||||
{ok, ConnMod, ChanPid, Session}
|
{ok, ConnMod, ChanPid, Session}
|
||||||
|
end
|
||||||
end;
|
end;
|
||||||
takeover_session(ClientId, ChanPid) ->
|
takeover_session(ClientId, ChanPid) ->
|
||||||
rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid], ?T_TAKEOVER).
|
rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid], ?T_TAKEOVER).
|
||||||
|
@ -286,44 +295,63 @@ discard_session(ClientId) when is_binary(ClientId) ->
|
||||||
ChanPids -> lists:foreach(fun(Pid) -> discard_session(ClientId, Pid) end, ChanPids)
|
ChanPids -> lists:foreach(fun(Pid) -> discard_session(ClientId, Pid) end, ChanPids)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% @private Kick a local stale session to force it step down.
|
%% @private call a local stale session to execute an Action.
|
||||||
%% If failed to kick (e.g. timeout) force a kill.
|
%% If failed to response (e.g. timeout) force a kill.
|
||||||
%% Keeping the stale pid around, or returning error or raise an exception
|
%% Keeping the stale pid around, or returning error or raise an exception
|
||||||
%% benefits nobody.
|
%% benefits nobody.
|
||||||
-spec kick_or_kill(kick | discard, module(), pid()) -> ok.
|
-spec call_or_kill(Action, module(), pid())
|
||||||
kick_or_kill(Action, ConnMod, Pid) ->
|
-> ok %% returned by kick, discard
|
||||||
|
| emqx_session:session() %% returned by {takeover, 'begin'}
|
||||||
|
| list(emqx_type:deliver()) %% returned by {takeover, 'end'}
|
||||||
|
when Action :: kick | discard | {takeover, 'begin'} | {takeover, 'end'}.
|
||||||
|
call_or_kill(Action, ConnMod, Pid) ->
|
||||||
try
|
try
|
||||||
|
Timeout = case Action of
|
||||||
|
{takeover, _} -> ?T_TAKEOVER;
|
||||||
|
_ -> ?T_KICK
|
||||||
|
end,
|
||||||
%% this is essentailly a gen_server:call implemented in emqx_connection
|
%% this is essentailly a gen_server:call implemented in emqx_connection
|
||||||
%% and emqx_ws_connection.
|
%% and emqx_ws_connection.
|
||||||
%% the handle_call is implemented in emqx_channel
|
%% the handle_call is implemented in emqx_channel
|
||||||
ok = apply(ConnMod, call, [Pid, Action, ?T_KICK])
|
apply(ConnMod, call, [Pid, Action, Timeout])
|
||||||
catch
|
catch
|
||||||
_ : noproc -> % emqx_ws_connection: call
|
_ : noproc -> % emqx_ws_connection: call
|
||||||
ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action});
|
ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}),
|
||||||
|
return_error_if_action_is_takeover(Action, noproc);
|
||||||
_ : {noproc, _} -> % emqx_connection: gen_server:call
|
_ : {noproc, _} -> % emqx_connection: gen_server:call
|
||||||
ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action});
|
ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}),
|
||||||
_ : {shutdown, _} ->
|
return_error_if_action_is_takeover(Action, noproc);
|
||||||
ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action});
|
_ : Reason = {shutdown, _} ->
|
||||||
_ : {{shutdown, _}, _} ->
|
ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}),
|
||||||
ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action});
|
return_error_if_action_is_takeover(Action, Reason);
|
||||||
|
_ : Reason = {{shutdown, _}, _} ->
|
||||||
|
ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}),
|
||||||
|
return_error_if_action_is_takeover(Action, Reason);
|
||||||
_ : {timeout, {gen_server, call, _}} ->
|
_ : {timeout, {gen_server, call, _}} ->
|
||||||
?tp(warning, "session_kick_timeout",
|
?tp(warning, "call_session_timeout",
|
||||||
#{pid => Pid,
|
#{pid => Pid,
|
||||||
action => Action,
|
action => Action,
|
||||||
stale_channel => stale_channel_info(Pid)
|
stale_channel => stale_channel_info(Pid)
|
||||||
}),
|
}),
|
||||||
ok = force_kill(Pid);
|
ok = force_kill(Pid),
|
||||||
|
return_error_if_action_is_takeover(Action, timeout);
|
||||||
_ : Error : St ->
|
_ : Error : St ->
|
||||||
?tp(error, "session_kick_exception",
|
?tp(error, "call_session_exception",
|
||||||
#{pid => Pid,
|
#{pid => Pid,
|
||||||
action => Action,
|
action => Action,
|
||||||
reason => Error,
|
reason => Error,
|
||||||
stacktrace => St,
|
stacktrace => St,
|
||||||
stale_channel => stale_channel_info(Pid)
|
stale_channel => stale_channel_info(Pid)
|
||||||
}),
|
}),
|
||||||
ok = force_kill(Pid)
|
ok = force_kill(Pid),
|
||||||
|
return_error_if_action_is_takeover(Action, Error)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
return_error_if_action_is_takeover({takeover, _}, Reason) ->
|
||||||
|
{error, Reason};
|
||||||
|
return_error_if_action_is_takeover(_, _) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
force_kill(Pid) ->
|
force_kill(Pid) ->
|
||||||
exit(Pid, kill),
|
exit(Pid, kill),
|
||||||
ok.
|
ok.
|
||||||
|
@ -344,7 +372,7 @@ kick_session(Action, ClientId, ChanPid) when node(ChanPid) == node() ->
|
||||||
%% already deregistered
|
%% already deregistered
|
||||||
ok;
|
ok;
|
||||||
ConnMod when is_atom(ConnMod) ->
|
ConnMod when is_atom(ConnMod) ->
|
||||||
ok = kick_or_kill(Action, ConnMod, ChanPid)
|
ok = call_or_kill(Action, ConnMod, ChanPid)
|
||||||
end;
|
end;
|
||||||
kick_session(Action, ClientId, ChanPid) ->
|
kick_session(Action, ClientId, ChanPid) ->
|
||||||
%% call remote node on the old APIs because we do not know if they have upgraded
|
%% call remote node on the old APIs because we do not know if they have upgraded
|
||||||
|
|
Loading…
Reference in New Issue