diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index 148627628..af96c295a 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -22,6 +22,7 @@ File format: * CLI `emqx_ctl pem_cache clean` to force purge x509 certificate cache, to force an immediate reload of all certificates after the files are updated on disk. * Refactor the ExProto so that anonymous clients can also be displayed on the dashboard [#6983] +* Force shutdown of processe that cannot answer takeover event [#7026] ### Bug fixes diff --git a/apps/emqx_exproto/src/emqx_exproto.appup.src b/apps/emqx_exproto/src/emqx_exproto.appup.src index 590a2106f..e66e8a58c 100644 --- a/apps/emqx_exproto/src/emqx_exproto.appup.src +++ b/apps/emqx_exproto/src/emqx_exproto.appup.src @@ -1,18 +1,22 @@ %% -*- mode: erlang -*- {VSN, - [{<<"4\\.3\\.[2-5]">>, + [{<<"4\\.3\\.[4-5]">>, + [{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, + {<<"4\\.3\\.[2-3]">>, [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, - {<<"4.3.[0-1]">>, + {<<"4\\.3\\.[0-1]">>, [{load_module,emqx_exproto_gsvr,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], - [{<<"4\\.3\\.[2-5]">>, + [{<<"4\\.3\\.[4-5]">>, + [{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, + {<<"4\\.3\\.[2-3]">>, [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, - {<<"4.3.[0-1]">>, + {<<"4\\.3\\.[0-1]">>, [{load_module,emqx_exproto_gsvr,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, diff --git a/apps/emqx_exproto/src/emqx_exproto_channel.erl b/apps/emqx_exproto/src/emqx_exproto_channel.erl index 130ad155a..527f2e9b7 100644 --- a/apps/emqx_exproto/src/emqx_exproto_channel.erl +++ b/apps/emqx_exproto/src/emqx_exproto_channel.erl @@ -366,6 +366,9 @@ handle_call({publish, Topic, Qos, Payload}, handle_call(kick, Channel) -> {shutdown, kicked, ok, Channel}; +handle_call(discard, Channel) -> + {shutdown, discarded, ok, Channel}; + handle_call(Req, Channel) -> ?LOG(warning, "Unexpected call: ~p", [Req]), {reply, {error, unexpected_call}, Channel}. diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 610b40c75..6e9c43d01 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -6,6 +6,7 @@ {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_os_mon,brutal_purge,soft_purge,[]}, + {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}]}, {"4.3.12", [{load_module,emqx_connection,brutal_purge,soft_purge,[]}, @@ -17,6 +18,7 @@ {apply,{emqx_metrics,assign_acl_stats_from_ets_to_counter,[]}}, {load_module,emqx_access_control,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm,brutal_purge,soft_purge,[]}, {load_module,emqx_os_mon,brutal_purge,soft_purge,[]}, @@ -34,6 +36,7 @@ {load_module,emqx_access_control,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, @@ -54,6 +57,7 @@ {apply,{emqx_metrics,assign_acl_stats_from_ets_to_counter,[]}}, {load_module,emqx_access_control,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, @@ -374,6 +378,8 @@ {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, + {load_module,emqx_cm,brutal_purge,soft_purge,[]}, + {load_module,emqx_os_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}]}, {"4.3.12", [{load_module,emqx_connection,brutal_purge,soft_purge,[]}, @@ -382,6 +388,7 @@ {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_access_control,brutal_purge,soft_purge,[]}, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_session,brutal_purge,soft_purge,[]}, @@ -398,6 +405,7 @@ {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_access_control,brutal_purge,soft_purge,[]}, {load_module,emqx_session,brutal_purge,soft_purge,[]}, @@ -417,6 +425,7 @@ {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_access_control,brutal_purge,soft_purge,[]}, {load_module,emqx_session,brutal_purge,soft_purge,[]}, diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 23f078568..002b1a746 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -226,18 +226,25 @@ open_session(true, ClientInfo = #{clientid := ClientId}, ConnInfo) -> open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) -> Self = self(), ResumeStart = fun(_) -> - case takeover_session(ClientId) of - {ok, ConnMod, ChanPid, Session} -> - ok = emqx_session:resume(ClientInfo, Session), - Pendings = ConnMod:call(ChanPid, {takeover, 'end'}, ?T_TAKEOVER), - register_channel(ClientId, Self, ConnInfo), - {ok, #{session => Session, - present => true, - pendings => Pendings}}; - {error, not_found} -> + CreateSess = + fun() -> Session = create_session(ClientInfo, ConnInfo), register_channel(ClientId, Self, ConnInfo), {ok, #{session => Session, present => false}} + end, + case takeover_session(ClientId) of + {ok, ConnMod, ChanPid, Session} -> + ok = emqx_session:resume(ClientInfo, Session), + case takeover('end', ConnMod, ChanPid) of + {ok, Pendings} -> + register_channel(ClientId, Self, ConnInfo), + {ok, #{session => Session, + present => true, + pendings => Pendings}}; + {error, _} -> + CreateSess() + end; + {error, _Reason} -> CreateSess() end end, emqx_cm_locker:trans(ClientId, ResumeStart). @@ -271,9 +278,12 @@ takeover_session(ClientId, ChanPid) when node(ChanPid) == node() -> undefined -> {error, not_found}; ConnMod when is_atom(ConnMod) -> - %% TODO: if takeover times out, maybe kill the old? - Session = ConnMod:call(ChanPid, {takeover, 'begin'}, ?T_TAKEOVER), - {ok, ConnMod, ChanPid, Session} + case takeover('begin', ConnMod, ChanPid) of + {ok, Session} -> + {ok, ConnMod, ChanPid, Session}; + {error, Reason} -> + {error, Reason} + end end; takeover_session(ClientId, ChanPid) -> rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid], ?T_TAKEOVER). @@ -286,42 +296,63 @@ discard_session(ClientId) when is_binary(ClientId) -> ChanPids -> lists:foreach(fun(Pid) -> discard_session(ClientId, Pid) end, ChanPids) end. -%% @private Kick a local stale session to force it step down. -%% If failed to kick (e.g. timeout) force a kill. +%% @private call a local stale session to execute an Action. +%% If failed to response (e.g. timeout) force a kill. %% Keeping the stale pid around, or returning error or raise an exception %% benefits nobody. --spec kick_or_kill(kick | discard, module(), pid()) -> ok. -kick_or_kill(Action, ConnMod, Pid) -> - try +-spec takeover(Action, module(), pid()) + -> ok + | {ok, emqx_session:session() | list(emqx_type:deliver())} + | {error, term()} + when Action :: kick | discard | 'begin' | 'end'. +takeover(Action, ConnMod, Pid) -> + {NAction, Timeout} = + case Action == kick orelse Action == discard of + true -> {Action, ?T_KICK}; + _ -> {{takeover, Action},?T_TAKEOVER} + end, + Return = %% this is essentailly a gen_server:call implemented in emqx_connection %% and emqx_ws_connection. %% the handle_call is implemented in emqx_channel - ok = apply(ConnMod, call, [Pid, Action, ?T_KICK]) - catch - _ : noproc -> % emqx_ws_connection: call - ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}); - _ : {noproc, _} -> % emqx_connection: gen_server:call - ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}); - _ : {shutdown, _} -> - ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}); - _ : {{shutdown, _}, _} -> - ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}); - _ : {timeout, {gen_server, call, _}} -> - ?tp(warning, "session_kick_timeout", - #{pid => Pid, - action => Action, - stale_channel => stale_channel_info(Pid) - }), - ok = force_kill(Pid); - _ : Error : St -> - ?tp(error, "session_kick_exception", - #{pid => Pid, - action => Action, - reason => Error, - stacktrace => St, - stale_channel => stale_channel_info(Pid) - }), - ok = force_kill(Pid) + try apply(ConnMod, call, [Pid, NAction, Timeout]) of + ok -> ok; + Reply -> {ok, Reply} + catch + _ : noproc -> % emqx_ws_connection: call + ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}), + {error, noproc}; + _ : {noproc, _} -> % emqx_connection: gen_server:call + ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}), + {error, noproc}; + _ : Reason = {shutdown, _} -> + ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}), + {error, Reason}; + _ : Reason = {{shutdown, _}, _} -> + ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}), + {error, Reason}; + _ : {timeout, {gen_server, call, _}} -> + ?tp(warning, "takeover_session_timeout", + #{pid => Pid, + action => Action, + stale_channel => stale_channel_info(Pid) + }), + ok = force_kill(Pid), + {error, timeout}; + _ : Error : St -> + ?tp(error, "takeover_session_exception", + #{pid => Pid, + action => Action, + reason => Error, + stacktrace => St, + stale_channel => stale_channel_info(Pid) + }), + ok = force_kill(Pid), + {error, Error} + end, + case Action == kick orelse Action == discard of + true -> ok; + _ -> Return end. force_kill(Pid) -> @@ -344,7 +375,7 @@ kick_session(Action, ClientId, ChanPid) when node(ChanPid) == node() -> %% already deregistered ok; ConnMod when is_atom(ConnMod) -> - ok = kick_or_kill(Action, ConnMod, ChanPid) + ok = takeover(Action, ConnMod, ChanPid) end; kick_session(Action, ClientId, ChanPid) -> %% call remote node on the old APIs because we do not know if they have upgraded diff --git a/test/emqx_cm_SUITE.erl b/test/emqx_cm_SUITE.erl index a46c046ca..c82e525f0 100644 --- a/test/emqx_cm_SUITE.erl +++ b/test/emqx_cm_SUITE.erl @@ -187,57 +187,89 @@ t_open_session_race_condition(_) -> ok = flush_emqx_pool(), ?assertEqual([], emqx_cm:lookup_channels(ClientId)). -t_kick_session_discard_normal(_) -> - test_kick_session(discard, normal). +t_call_session_discard_normal(_) -> + test_call_session(discard, normal). -t_kick_session_discard_shutdown(_) -> - test_kick_session(discard, shutdown). +t_call_session_discard_shutdown(_) -> + test_call_session(discard, shutdown). -t_kick_session_discard_shutdown_with_reason(_) -> - test_kick_session(discard, {shutdown, discard}). +t_call_session_discard_shutdown_with_reason(_) -> + test_call_session(discard, {shutdown, discard}). -t_kick_session_discard_timeout(_) -> - test_kick_session(discard, timeout). +t_call_session_discard_timeout(_) -> + test_call_session(discard, timeout). -t_kick_session_discard_noproc(_) -> - test_kick_session(discard, noproc). +t_call_session_discard_noproc(_) -> + test_call_session(discard, noproc). -t_kick_session_kick_normal(_) -> - test_kick_session(discard, normal). +t_call_session_kick_normal(_) -> + test_call_session(kick, normal). -t_kick_session_kick_shutdown(_) -> - test_kick_session(discard, shutdown). +t_call_session_kick_shutdown(_) -> + test_call_session(kick, shutdown). -t_kick_session_kick_shutdown_with_reason(_) -> - test_kick_session(discard, {shutdown, discard}). +t_call_session_kick_shutdown_with_reason(_) -> + test_call_session(kick, {shutdown, discard}). -t_kick_session_kick_timeout(_) -> - test_kick_session(discard, timeout). +t_call_session_kick_timeout(_) -> + test_call_session(kick, timeout). -t_kick_session_kick_noproc(_) -> - test_kick_session(discard, noproc). +t_call_session_kick_noproc(_) -> + test_call_session(discard, noproc). -test_kick_session(Action, Reason) -> +t_call_session_takeover_begin_normal(_) -> + test_call_session({takeover, 'begin'}, normal). + +t_call_session_takeover_begin_shutdown(_) -> + test_call_session({takeover, 'begin'}, shutdown). + +t_call_session_takeover_begin_shutdown_with_reason(_) -> + test_call_session({takeover, 'begin'}, {shutdown, discard}). + +t_call_session_takeover_begin_timeout(_) -> + test_call_session({takeover, 'begin'}, timeout). + +t_call_session_takeover_begin_noproc(_) -> + test_call_session({takeover, 'begin'}, noproc). + +t_call_session_takeover_end_normal(_) -> + test_call_session({takeover, 'end'}, normal). + +t_call_session_takeover_end_shutdown(_) -> + test_call_session({takeover, 'end'}, shutdown). + +t_call_session_takeover_end_shutdown_with_reason(_) -> + test_call_session({takeover, 'end'}, {shutdown, discard}). + +t_call_session_takeover_end_timeout(_) -> + test_call_session({takeover, 'end'}, timeout). + +t_call_session_takeover_end_noproc(_) -> + test_call_session({takeover, 'end'}, noproc). + +test_call_session(Action, Reason) -> ClientId = rand_client_id(), #{conninfo := ConnInfo} = ?ChanInfo, FakeSessionFun = fun Loop() -> - receive - {'$gen_call', From, A} when A =:= kick orelse - A =:= discard -> - case Reason of - normal -> - gen_server:reply(From, ok); - timeout -> - %% no response to the call - Loop(); - _ -> - exit(Reason) - end; - Msg -> - ct:pal("(~p) fake_session_discarded ~p", [Action, Msg]), - Loop() - end + receive + {'$gen_call', From, A} when A =:= kick orelse + A =:= discard orelse + A =:= {takeover, 'begin'} orelse + A =:= {takeover, 'end'} -> + case Reason of + normal when A =:= kick orelse A =:= discard -> + gen_server:reply(From, ok); + timeout -> + %% no response to the call + Loop(); + _ -> + exit(Reason) + end; + Msg -> + ct:pal("(~p) fake_session_discarded ~p", [Action, Msg]), + Loop() + end end, {Pid1, _} = spawn_monitor(FakeSessionFun), {Pid2, _} = spawn_monitor(FakeSessionFun), @@ -249,10 +281,11 @@ test_kick_session(Action, Reason) -> noproc -> exit(Pid1, kill), exit(Pid2, kill); _ -> ok end, - ok = case Action of - kick -> emqx_cm:kick_session(ClientId); - discard -> emqx_cm:discard_session(ClientId) - end, + _ = case Action of + kick -> emqx_cm:kick_session(ClientId); + discard -> emqx_cm:discard_session(ClientId); + {takeover, _} -> emqx_cm:takeover_session(ClientId) + end, case Reason =:= timeout orelse Reason =:= noproc of true -> ?assertEqual(killed, ?WAIT({'DOWN', _, process, Pid1, R}, 2_000, R)),