Merge pull request #7026 from HJianBo/force-kill-takover-failure-proc

Force kill takover failure proc
This commit is contained in:
JianBo He 2022-02-24 14:12:12 +08:00 committed by GitHub
commit b22e341c67
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 170 additions and 89 deletions

View File

@ -22,6 +22,7 @@ File format:
* CLI `emqx_ctl pem_cache clean` to force purge x509 certificate cache, * CLI `emqx_ctl pem_cache clean` to force purge x509 certificate cache,
to force an immediate reload of all certificates after the files are updated on disk. to force an immediate reload of all certificates after the files are updated on disk.
* Refactor the ExProto so that anonymous clients can also be displayed on the dashboard [#6983] * Refactor the ExProto so that anonymous clients can also be displayed on the dashboard [#6983]
* Force shutdown of processe that cannot answer takeover event [#7026]
### Bug fixes ### Bug fixes

View File

@ -1,18 +1,22 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{VSN, {VSN,
[{<<"4\\.3\\.[2-5]">>, [{<<"4\\.3\\.[4-5]">>,
[{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[2-3]">>,
[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
{<<"4.3.[0-1]">>, {<<"4\\.3\\.[0-1]">>,
[{load_module,emqx_exproto_gsvr,brutal_purge,soft_purge,[]}, [{load_module,emqx_exproto_gsvr,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}], {<<".*">>,[]}],
[{<<"4\\.3\\.[2-5]">>, [{<<"4\\.3\\.[4-5]">>,
[{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[2-3]">>,
[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
{<<"4.3.[0-1]">>, {<<"4\\.3\\.[0-1]">>,
[{load_module,emqx_exproto_gsvr,brutal_purge,soft_purge,[]}, [{load_module,emqx_exproto_gsvr,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},

View File

@ -366,6 +366,9 @@ handle_call({publish, Topic, Qos, Payload},
handle_call(kick, Channel) -> handle_call(kick, Channel) ->
{shutdown, kicked, ok, Channel}; {shutdown, kicked, ok, Channel};
handle_call(discard, Channel) ->
{shutdown, discarded, ok, Channel};
handle_call(Req, Channel) -> handle_call(Req, Channel) ->
?LOG(warning, "Unexpected call: ~p", [Req]), ?LOG(warning, "Unexpected call: ~p", [Req]),
{reply, {error, unexpected_call}, Channel}. {reply, {error, unexpected_call}, Channel}.

View File

@ -6,6 +6,7 @@
{load_module,emqx_ctl,brutal_purge,soft_purge,[]}, {load_module,emqx_ctl,brutal_purge,soft_purge,[]},
{load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]},
{load_module,emqx_os_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_os_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]}]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}]},
{"4.3.12", {"4.3.12",
[{load_module,emqx_connection,brutal_purge,soft_purge,[]}, [{load_module,emqx_connection,brutal_purge,soft_purge,[]},
@ -17,6 +18,7 @@
{apply,{emqx_metrics,assign_acl_stats_from_ets_to_counter,[]}}, {apply,{emqx_metrics,assign_acl_stats_from_ets_to_counter,[]}},
{load_module,emqx_access_control,brutal_purge,soft_purge,[]}, {load_module,emqx_access_control,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm,brutal_purge,soft_purge,[]},
{load_module,emqx_os_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_os_mon,brutal_purge,soft_purge,[]},
@ -34,6 +36,7 @@
{load_module,emqx_access_control,brutal_purge,soft_purge,[]}, {load_module,emqx_access_control,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_vm,brutal_purge,soft_purge,[]},
{load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]},
@ -54,6 +57,7 @@
{apply,{emqx_metrics,assign_acl_stats_from_ets_to_counter,[]}}, {apply,{emqx_metrics,assign_acl_stats_from_ets_to_counter,[]}},
{load_module,emqx_access_control,brutal_purge,soft_purge,[]}, {load_module,emqx_access_control,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_vm,brutal_purge,soft_purge,[]},
{load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]},
@ -374,6 +378,8 @@
{load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]},
{load_module,emqx_ctl,brutal_purge,soft_purge,[]}, {load_module,emqx_ctl,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_os_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]}]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}]},
{"4.3.12", {"4.3.12",
[{load_module,emqx_connection,brutal_purge,soft_purge,[]}, [{load_module,emqx_connection,brutal_purge,soft_purge,[]},
@ -382,6 +388,7 @@
{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_ctl,brutal_purge,soft_purge,[]}, {load_module,emqx_ctl,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_access_control,brutal_purge,soft_purge,[]}, {load_module,emqx_access_control,brutal_purge,soft_purge,[]},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_session,brutal_purge,soft_purge,[]},
@ -398,6 +405,7 @@
{load_module,emqx_ctl,brutal_purge,soft_purge,[]}, {load_module,emqx_ctl,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_access_control,brutal_purge,soft_purge,[]}, {load_module,emqx_access_control,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_session,brutal_purge,soft_purge,[]},
@ -417,6 +425,7 @@
{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_ctl,brutal_purge,soft_purge,[]}, {load_module,emqx_ctl,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_access_control,brutal_purge,soft_purge,[]}, {load_module,emqx_access_control,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_session,brutal_purge,soft_purge,[]},

View File

@ -226,18 +226,25 @@ open_session(true, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) -> open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
Self = self(), Self = self(),
ResumeStart = fun(_) -> ResumeStart = fun(_) ->
case takeover_session(ClientId) of CreateSess =
{ok, ConnMod, ChanPid, Session} -> fun() ->
ok = emqx_session:resume(ClientInfo, Session),
Pendings = ConnMod:call(ChanPid, {takeover, 'end'}, ?T_TAKEOVER),
register_channel(ClientId, Self, ConnInfo),
{ok, #{session => Session,
present => true,
pendings => Pendings}};
{error, not_found} ->
Session = create_session(ClientInfo, ConnInfo), Session = create_session(ClientInfo, ConnInfo),
register_channel(ClientId, Self, ConnInfo), register_channel(ClientId, Self, ConnInfo),
{ok, #{session => Session, present => false}} {ok, #{session => Session, present => false}}
end,
case takeover_session(ClientId) of
{ok, ConnMod, ChanPid, Session} ->
ok = emqx_session:resume(ClientInfo, Session),
case takeover('end', ConnMod, ChanPid) of
{ok, Pendings} ->
register_channel(ClientId, Self, ConnInfo),
{ok, #{session => Session,
present => true,
pendings => Pendings}};
{error, _} ->
CreateSess()
end;
{error, _Reason} -> CreateSess()
end end
end, end,
emqx_cm_locker:trans(ClientId, ResumeStart). emqx_cm_locker:trans(ClientId, ResumeStart).
@ -271,9 +278,12 @@ takeover_session(ClientId, ChanPid) when node(ChanPid) == node() ->
undefined -> undefined ->
{error, not_found}; {error, not_found};
ConnMod when is_atom(ConnMod) -> ConnMod when is_atom(ConnMod) ->
%% TODO: if takeover times out, maybe kill the old? case takeover('begin', ConnMod, ChanPid) of
Session = ConnMod:call(ChanPid, {takeover, 'begin'}, ?T_TAKEOVER), {ok, Session} ->
{ok, ConnMod, ChanPid, Session} {ok, ConnMod, ChanPid, Session};
{error, Reason} ->
{error, Reason}
end
end; end;
takeover_session(ClientId, ChanPid) -> takeover_session(ClientId, ChanPid) ->
rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid], ?T_TAKEOVER). rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid], ?T_TAKEOVER).
@ -286,42 +296,63 @@ discard_session(ClientId) when is_binary(ClientId) ->
ChanPids -> lists:foreach(fun(Pid) -> discard_session(ClientId, Pid) end, ChanPids) ChanPids -> lists:foreach(fun(Pid) -> discard_session(ClientId, Pid) end, ChanPids)
end. end.
%% @private Kick a local stale session to force it step down. %% @private call a local stale session to execute an Action.
%% If failed to kick (e.g. timeout) force a kill. %% If failed to response (e.g. timeout) force a kill.
%% Keeping the stale pid around, or returning error or raise an exception %% Keeping the stale pid around, or returning error or raise an exception
%% benefits nobody. %% benefits nobody.
-spec kick_or_kill(kick | discard, module(), pid()) -> ok. -spec takeover(Action, module(), pid())
kick_or_kill(Action, ConnMod, Pid) -> -> ok
try | {ok, emqx_session:session() | list(emqx_type:deliver())}
| {error, term()}
when Action :: kick | discard | 'begin' | 'end'.
takeover(Action, ConnMod, Pid) ->
{NAction, Timeout} =
case Action == kick orelse Action == discard of
true -> {Action, ?T_KICK};
_ -> {{takeover, Action},?T_TAKEOVER}
end,
Return =
%% this is essentailly a gen_server:call implemented in emqx_connection %% this is essentailly a gen_server:call implemented in emqx_connection
%% and emqx_ws_connection. %% and emqx_ws_connection.
%% the handle_call is implemented in emqx_channel %% the handle_call is implemented in emqx_channel
ok = apply(ConnMod, call, [Pid, Action, ?T_KICK]) try apply(ConnMod, call, [Pid, NAction, Timeout]) of
catch ok -> ok;
_ : noproc -> % emqx_ws_connection: call Reply -> {ok, Reply}
ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}); catch
_ : {noproc, _} -> % emqx_connection: gen_server:call _ : noproc -> % emqx_ws_connection: call
ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}); ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}),
_ : {shutdown, _} -> {error, noproc};
ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}); _ : {noproc, _} -> % emqx_connection: gen_server:call
_ : {{shutdown, _}, _} -> ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}),
ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}); {error, noproc};
_ : {timeout, {gen_server, call, _}} -> _ : Reason = {shutdown, _} ->
?tp(warning, "session_kick_timeout", ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}),
#{pid => Pid, {error, Reason};
action => Action, _ : Reason = {{shutdown, _}, _} ->
stale_channel => stale_channel_info(Pid) ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}),
}), {error, Reason};
ok = force_kill(Pid); _ : {timeout, {gen_server, call, _}} ->
_ : Error : St -> ?tp(warning, "takeover_session_timeout",
?tp(error, "session_kick_exception", #{pid => Pid,
#{pid => Pid, action => Action,
action => Action, stale_channel => stale_channel_info(Pid)
reason => Error, }),
stacktrace => St, ok = force_kill(Pid),
stale_channel => stale_channel_info(Pid) {error, timeout};
}), _ : Error : St ->
ok = force_kill(Pid) ?tp(error, "takeover_session_exception",
#{pid => Pid,
action => Action,
reason => Error,
stacktrace => St,
stale_channel => stale_channel_info(Pid)
}),
ok = force_kill(Pid),
{error, Error}
end,
case Action == kick orelse Action == discard of
true -> ok;
_ -> Return
end. end.
force_kill(Pid) -> force_kill(Pid) ->
@ -344,7 +375,7 @@ kick_session(Action, ClientId, ChanPid) when node(ChanPid) == node() ->
%% already deregistered %% already deregistered
ok; ok;
ConnMod when is_atom(ConnMod) -> ConnMod when is_atom(ConnMod) ->
ok = kick_or_kill(Action, ConnMod, ChanPid) ok = takeover(Action, ConnMod, ChanPid)
end; end;
kick_session(Action, ClientId, ChanPid) -> kick_session(Action, ClientId, ChanPid) ->
%% call remote node on the old APIs because we do not know if they have upgraded %% call remote node on the old APIs because we do not know if they have upgraded

View File

@ -187,57 +187,89 @@ t_open_session_race_condition(_) ->
ok = flush_emqx_pool(), ok = flush_emqx_pool(),
?assertEqual([], emqx_cm:lookup_channels(ClientId)). ?assertEqual([], emqx_cm:lookup_channels(ClientId)).
t_kick_session_discard_normal(_) -> t_call_session_discard_normal(_) ->
test_kick_session(discard, normal). test_call_session(discard, normal).
t_kick_session_discard_shutdown(_) -> t_call_session_discard_shutdown(_) ->
test_kick_session(discard, shutdown). test_call_session(discard, shutdown).
t_kick_session_discard_shutdown_with_reason(_) -> t_call_session_discard_shutdown_with_reason(_) ->
test_kick_session(discard, {shutdown, discard}). test_call_session(discard, {shutdown, discard}).
t_kick_session_discard_timeout(_) -> t_call_session_discard_timeout(_) ->
test_kick_session(discard, timeout). test_call_session(discard, timeout).
t_kick_session_discard_noproc(_) -> t_call_session_discard_noproc(_) ->
test_kick_session(discard, noproc). test_call_session(discard, noproc).
t_kick_session_kick_normal(_) -> t_call_session_kick_normal(_) ->
test_kick_session(discard, normal). test_call_session(kick, normal).
t_kick_session_kick_shutdown(_) -> t_call_session_kick_shutdown(_) ->
test_kick_session(discard, shutdown). test_call_session(kick, shutdown).
t_kick_session_kick_shutdown_with_reason(_) -> t_call_session_kick_shutdown_with_reason(_) ->
test_kick_session(discard, {shutdown, discard}). test_call_session(kick, {shutdown, discard}).
t_kick_session_kick_timeout(_) -> t_call_session_kick_timeout(_) ->
test_kick_session(discard, timeout). test_call_session(kick, timeout).
t_kick_session_kick_noproc(_) -> t_call_session_kick_noproc(_) ->
test_kick_session(discard, noproc). test_call_session(discard, noproc).
test_kick_session(Action, Reason) -> t_call_session_takeover_begin_normal(_) ->
test_call_session({takeover, 'begin'}, normal).
t_call_session_takeover_begin_shutdown(_) ->
test_call_session({takeover, 'begin'}, shutdown).
t_call_session_takeover_begin_shutdown_with_reason(_) ->
test_call_session({takeover, 'begin'}, {shutdown, discard}).
t_call_session_takeover_begin_timeout(_) ->
test_call_session({takeover, 'begin'}, timeout).
t_call_session_takeover_begin_noproc(_) ->
test_call_session({takeover, 'begin'}, noproc).
t_call_session_takeover_end_normal(_) ->
test_call_session({takeover, 'end'}, normal).
t_call_session_takeover_end_shutdown(_) ->
test_call_session({takeover, 'end'}, shutdown).
t_call_session_takeover_end_shutdown_with_reason(_) ->
test_call_session({takeover, 'end'}, {shutdown, discard}).
t_call_session_takeover_end_timeout(_) ->
test_call_session({takeover, 'end'}, timeout).
t_call_session_takeover_end_noproc(_) ->
test_call_session({takeover, 'end'}, noproc).
test_call_session(Action, Reason) ->
ClientId = rand_client_id(), ClientId = rand_client_id(),
#{conninfo := ConnInfo} = ?ChanInfo, #{conninfo := ConnInfo} = ?ChanInfo,
FakeSessionFun = FakeSessionFun =
fun Loop() -> fun Loop() ->
receive receive
{'$gen_call', From, A} when A =:= kick orelse {'$gen_call', From, A} when A =:= kick orelse
A =:= discard -> A =:= discard orelse
case Reason of A =:= {takeover, 'begin'} orelse
normal -> A =:= {takeover, 'end'} ->
gen_server:reply(From, ok); case Reason of
timeout -> normal when A =:= kick orelse A =:= discard ->
%% no response to the call gen_server:reply(From, ok);
Loop(); timeout ->
_ -> %% no response to the call
exit(Reason) Loop();
end; _ ->
Msg -> exit(Reason)
ct:pal("(~p) fake_session_discarded ~p", [Action, Msg]), end;
Loop() Msg ->
end ct:pal("(~p) fake_session_discarded ~p", [Action, Msg]),
Loop()
end
end, end,
{Pid1, _} = spawn_monitor(FakeSessionFun), {Pid1, _} = spawn_monitor(FakeSessionFun),
{Pid2, _} = spawn_monitor(FakeSessionFun), {Pid2, _} = spawn_monitor(FakeSessionFun),
@ -249,10 +281,11 @@ test_kick_session(Action, Reason) ->
noproc -> exit(Pid1, kill), exit(Pid2, kill); noproc -> exit(Pid1, kill), exit(Pid2, kill);
_ -> ok _ -> ok
end, end,
ok = case Action of _ = case Action of
kick -> emqx_cm:kick_session(ClientId); kick -> emqx_cm:kick_session(ClientId);
discard -> emqx_cm:discard_session(ClientId) discard -> emqx_cm:discard_session(ClientId);
end, {takeover, _} -> emqx_cm:takeover_session(ClientId)
end,
case Reason =:= timeout orelse Reason =:= noproc of case Reason =:= timeout orelse Reason =:= noproc of
true -> true ->
?assertEqual(killed, ?WAIT({'DOWN', _, process, Pid1, R}, 2_000, R)), ?assertEqual(killed, ?WAIT({'DOWN', _, process, Pid1, R}, 2_000, R)),