Merge pull request #6030 from zmstone/fix-force-kill-after-kick-or-discard-timeout

fix(session): force kill session for 'kick' and 'discard'
This commit is contained in:
Zaiming (Stone) Shi 2021-10-31 09:48:04 +01:00 committed by GitHub
commit 831f2eda0c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 212 additions and 107 deletions

View File

@ -191,10 +191,8 @@ clients(["show", ClientId]) ->
if_client(ClientId, fun print/1);
clients(["kick", ClientId]) ->
case emqx_cm:kick_session(bin(ClientId)) of
ok -> emqx_ctl:print("ok~n");
_ -> emqx_ctl:print("Not Found.~n")
end;
ok = emqx_cm:kick_session(bin(ClientId)),
emqx_ctl:print("ok~n");
clients(_) ->
emqx_ctl:usage([{"clients list", "List all clients"},

View File

@ -158,9 +158,9 @@ t_clients_cmd(_) ->
timer:sleep(300),
emqx_mgmt_cli:clients(["list"]),
?assertMatch({match, _}, re:run(emqx_mgmt_cli:clients(["show", "client12"]), "client12")),
?assertEqual((emqx_mgmt_cli:clients(["kick", "client12"])), "ok~n"),
?assertEqual("ok~n", emqx_mgmt_cli:clients(["kick", "client12"])),
timer:sleep(500),
?assertMatch({match, _}, re:run(emqx_mgmt_cli:clients(["show", "client12"]), "Not Found")),
?assertEqual("ok~n", emqx_mgmt_cli:clients(["kick", "client12"])),
receive
{'EXIT', T, _} ->
ok

View File

@ -223,8 +223,8 @@ t_clients(_) ->
timer:sleep(300),
{ok, NotFound0} = request_api(delete, api_path(["clients", binary_to_list(ClientId1)]), auth_header_()),
?assertEqual(?ERROR12, get(<<"code">>, NotFound0)),
{ok, Ok1} = request_api(delete, api_path(["clients", binary_to_list(ClientId1)]), auth_header_()),
?assertEqual(?SUCCESS, get(<<"code">>, Ok1)),
{ok, Clients6} = request_api(get, api_path(["clients"]), "_limit=100&_page=1", auth_header_()),
?assertEqual(1, maps:get(<<"count">>, get(<<"meta">>, Clients6))),

View File

@ -1,21 +1,24 @@
%% -*- mode: erlang -*-
{VSN,
[{"4.3.9",
[{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
[{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
{"4.3.8",
[{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
[{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
{"4.3.7",
[{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
[{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
@ -24,7 +27,8 @@
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
{"4.3.6",
[{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
[{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
@ -137,21 +141,24 @@
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{"4.3.9",
[{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
[{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
{"4.3.8",
[{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
[{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
{"4.3.7",
[{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
[{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
@ -160,7 +167,8 @@
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
{"4.3.6",
[{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
[{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},

View File

@ -72,7 +72,7 @@
]).
%% Internal export
-export([stats_fun/0]).
-export([stats_fun/0, clean_down/1]).
-type(chan_pid() :: pid()).
@ -93,7 +93,9 @@
%% Server name
-define(CM, ?MODULE).
-define(T_TAKEOVER, 15000).
-define(T_KICK, 5_000).
-define(T_GET_INFO, 5_000).
-define(T_TAKEOVER, 15_000).
%% @doc Start the channel manager.
-spec(start_link() -> startlink_ret()).
@ -164,7 +166,7 @@ get_chan_info(ClientId, ChanPid) when node(ChanPid) == node() ->
error:badarg -> undefined
end;
get_chan_info(ClientId, ChanPid) ->
rpc_call(node(ChanPid), get_chan_info, [ClientId, ChanPid]).
rpc_call(node(ChanPid), get_chan_info, [ClientId, ChanPid], ?T_GET_INFO).
%% @doc Update infos of the channel.
-spec(set_chan_info(emqx_types:clientid(), emqx_types:attrs()) -> boolean()).
@ -189,7 +191,7 @@ get_chan_stats(ClientId, ChanPid) when node(ChanPid) == node() ->
error:badarg -> undefined
end;
get_chan_stats(ClientId, ChanPid) ->
rpc_call(node(ChanPid), get_chan_stats, [ClientId, ChanPid]).
rpc_call(node(ChanPid), get_chan_stats, [ClientId, ChanPid], ?T_GET_INFO).
%% @doc Set channel's stats.
-spec(set_chan_stats(emqx_types:clientid(), emqx_types:stats()) -> boolean()).
@ -257,7 +259,7 @@ takeover_session(ClientId) ->
takeover_session(ClientId, ChanPid);
ChanPids ->
[ChanPid|StalePids] = lists:reverse(ChanPids),
?LOG(error, "More than one channel found: ~p", [ChanPids]),
?LOG(error, "more_than_one_channel_found: ~p", [ChanPids]),
lists:foreach(fun(StalePid) ->
catch discard_session(ClientId, StalePid)
end, StalePids),
@ -269,77 +271,113 @@ takeover_session(ClientId, ChanPid) when node(ChanPid) == node() ->
undefined ->
{error, not_found};
ConnMod when is_atom(ConnMod) ->
%% TODO: if takeover times out, maybe kill the old?
Session = ConnMod:call(ChanPid, {takeover, 'begin'}, ?T_TAKEOVER),
{ok, ConnMod, ChanPid, Session}
end;
takeover_session(ClientId, ChanPid) ->
rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid]).
rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid], ?T_TAKEOVER).
%% @doc Discard all the sessions identified by the ClientId.
-spec(discard_session(emqx_types:clientid()) -> ok).
discard_session(ClientId) when is_binary(ClientId) ->
case lookup_channels(ClientId) of
[] -> ok;
ChanPids -> lists:foreach(fun(Pid) -> do_discard_session(ClientId, Pid) end, ChanPids)
ChanPids -> lists:foreach(fun(Pid) -> discard_session(ClientId, Pid) end, ChanPids)
end.
do_discard_session(ClientId, Pid) ->
%% @private Kick a local stale session to force it step down.
%% If failed to kick (e.g. timeout) force a kill.
%% Keeping the stale pid around, or returning error or raise an exception
%% benefits nobody.
-spec kick_or_kill(kick | discard, module(), pid()) -> ok.
kick_or_kill(Action, ConnMod, Pid) ->
try
discard_session(ClientId, Pid)
%% this is essentailly a gen_server:call implemented in emqx_connection
%% and emqx_ws_connection.
%% the handle_call is implemented in emqx_channel
ok = apply(ConnMod, call, [Pid, Action, ?T_KICK])
catch
_ : noproc -> % emqx_ws_connection: call
?tp(debug, "session_already_gone", #{pid => Pid}),
ok;
ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action});
_ : {noproc, _} -> % emqx_connection: gen_server:call
?tp(debug, "session_already_gone", #{pid => Pid}),
ok;
_ : {'EXIT', {noproc, _}} -> % rpc_call/3
?tp(debug, "session_already_gone", #{pid => Pid}),
ok;
ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action});
_ : {shutdown, _} ->
ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action});
_ : {{shutdown, _}, _} ->
?tp(debug, "session_already_shutdown", #{pid => Pid}),
ok;
ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action});
_ : {timeout, {gen_server, call, _}} ->
?tp(warning, "session_kick_timeout",
#{pid => Pid,
action => Action,
stale_channel => stale_channel_info(Pid)
}),
ok = force_kill(Pid);
_ : Error : St ->
?tp(error, "failed_to_discard_session",
#{pid => Pid, reason => Error, stacktrace=>St})
?tp(error, "session_kick_exception",
#{pid => Pid,
action => Action,
reason => Error,
stacktrace => St,
stale_channel => stale_channel_info(Pid)
}),
ok = force_kill(Pid)
end.
discard_session(ClientId, ChanPid) when node(ChanPid) == node() ->
case get_chann_conn_mod(ClientId, ChanPid) of
undefined -> ok;
ConnMod when is_atom(ConnMod) ->
ConnMod:call(ChanPid, discard, ?T_TAKEOVER)
end;
force_kill(Pid) ->
exit(Pid, kill),
ok.
stale_channel_info(Pid) ->
process_info(Pid, [status, message_queue_len, current_stacktrace]).
discard_session(ClientId, ChanPid) ->
rpc_call(node(ChanPid), discard_session, [ClientId, ChanPid]).
kick_session(discard, ClientId, ChanPid).
kick_session(ClientId, ChanPid) ->
kick_session(kick, ClientId, ChanPid).
%% @private This function is shared for session 'kick' and 'discard' (as the first arg Action).
kick_session(Action, ClientId, ChanPid) when node(ChanPid) == node() ->
case get_chann_conn_mod(ClientId, ChanPid) of
undefined ->
%% already deregistered
ok;
ConnMod when is_atom(ConnMod) ->
ok = kick_or_kill(Action, ConnMod, ChanPid)
end;
kick_session(Action, ClientId, ChanPid) ->
%% call remote node on the old APIs because we do not know if they have upgraded
%% to have kick_session/3
Function = case Action of
discard -> discard_session;
kick -> kick_session
end,
try
rpc_call(node(ChanPid), Function, [ClientId, ChanPid], ?T_KICK)
catch
Error : Reason ->
%% This should mostly be RPC failures.
%% However, if the node is still running the old version
%% code (prior to emqx app 4.3.10) some of the RPC handler
%% exceptions may get propagated to a new version node
?LOG(error, "failed_to_kick_session_on_remote_node ~p: ~p ~p ~p",
[node(ChanPid), Action, Error, Reason])
end.
kick_session(ClientId) ->
case lookup_channels(ClientId) of
[] -> {error, not_found};
[ChanPid] ->
kick_session(ClientId, ChanPid);
[] ->
?LOG(warning, "kiecked_an_unknown_session ~ts", [ClientId]),
ok;
ChanPids ->
[ChanPid|StalePids] = lists:reverse(ChanPids),
?LOG(error, "More than one channel found: ~p", [ChanPids]),
lists:foreach(fun(StalePid) ->
catch discard_session(ClientId, StalePid)
end, StalePids),
kick_session(ClientId, ChanPid)
case length(ChanPids) > 1 of
true -> ?LOG(info, "more_than_one_channel_found: ~p", [ChanPids]);
false -> ok
end,
lists:foreach(fun(Pid) -> kick_session(ClientId, Pid) end, ChanPids)
end.
kick_session(ClientId, ChanPid) when node(ChanPid) == node() ->
case get_chan_info(ClientId, ChanPid) of
#{conninfo := #{conn_mod := ConnMod}} ->
ConnMod:call(ChanPid, kick, ?T_TAKEOVER);
undefined ->
{error, not_found}
end;
kick_session(ClientId, ChanPid) ->
rpc_call(node(ChanPid), kick_session, [ClientId, ChanPid]).
%% @doc Is clean start?
% is_clean_start(#{clean_start := false}) -> false;
% is_clean_start(_Attrs) -> true.
@ -375,10 +413,16 @@ lookup_channels(local, ClientId) ->
[ChanPid || {_, ChanPid} <- ets:lookup(?CHAN_TAB, ClientId)].
%% @private
rpc_call(Node, Fun, Args) ->
case rpc:call(Node, ?MODULE, Fun, Args, 2 * ?T_TAKEOVER) of
{badrpc, Reason} -> error(Reason);
Res -> Res
rpc_call(Node, Fun, Args, Timeout) ->
case rpc:call(Node, ?MODULE, Fun, Args, 2 * Timeout) of
{badrpc, Reason} ->
%% since eqmx app 4.3.10, the 'kick' and 'discard' calls hanndler
%% should catch all exceptions and always return 'ok'.
%% This leaves 'badrpc' only possible when there is problem
%% calling the remote node.
error({badrpc, Reason});
Res ->
Res
end.
%% @private
@ -411,7 +455,7 @@ handle_cast(Msg, State) ->
handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}) ->
ChanPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)],
{Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon),
ok = emqx_pool:async_submit(fun lists:foreach/2, [fun clean_down/1, Items]),
ok = emqx_pool:async_submit(fun lists:foreach/2, [fun ?MODULE:clean_down/1, Items]),
{noreply, State#{chan_pmon := PMon1}};
handle_info(Info, State) ->
@ -447,5 +491,5 @@ get_chann_conn_mod(ClientId, ChanPid) when node(ChanPid) == node() ->
error:badarg -> undefined
end;
get_chann_conn_mod(ClientId, ChanPid) ->
rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid]).
rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid], ?T_GET_INFO).

View File

@ -32,6 +32,12 @@
conn_mod => emqx_connection,
receive_maximum => 100}}).
-define(WAIT(PATTERN, TIMEOUT, RET),
fun() ->
receive PATTERN -> RET
after TIMEOUT -> error({timeout, ?LINE}) end
end()).
%%--------------------------------------------------------------------
%% CT callbacks
%%--------------------------------------------------------------------
@ -180,25 +186,95 @@ t_open_session_race_condition(_) ->
ignored = gen_server:call(emqx_cm, ignore, infinity), %% sync
?assertEqual([], emqx_cm:lookup_channels(ClientId)).
t_discard_session(_) ->
t_kick_session_discard_normal(_) ->
test_kick_session(discard, normal).
t_kick_session_discard_shutdown(_) ->
test_kick_session(discard, shutdown).
t_kick_session_discard_shutdown_with_reason(_) ->
test_kick_session(discard, {shutdown, discard}).
t_kick_session_discard_timeout(_) ->
test_kick_session(discard, timeout).
t_kick_session_discard_noproc(_) ->
test_kick_session(discard, noproc).
t_kick_session_kick_normal(_) ->
test_kick_session(discard, normal).
t_kick_session_kick_shutdown(_) ->
test_kick_session(discard, shutdown).
t_kick_session_kick_shutdown_with_reason(_) ->
test_kick_session(discard, {shutdown, discard}).
t_kick_session_kick_timeout(_) ->
test_kick_session(discard, timeout).
t_kick_session_kick_noproc(_) ->
test_kick_session(discard, noproc).
test_kick_session(Action, Reason) ->
ClientId = rand_client_id(),
#{conninfo := ConnInfo} = ?ChanInfo,
ok = emqx_cm:register_channel(ClientId, self(), ConnInfo),
FakeSessionFun =
fun Loop() ->
receive
{'$gen_call', From, A} when A =:= kick orelse
A =:= discard ->
case Reason of
normal ->
gen_server:reply(From, ok);
timeout ->
%% no response to the call
Loop();
_ ->
exit(Reason)
end;
Msg ->
ct:pal("(~p) fake_session_discarded ~p", [Action, Msg]),
Loop()
end
end,
{Pid1, _} = spawn_monitor(FakeSessionFun),
{Pid2, _} = spawn_monitor(FakeSessionFun),
ok = emqx_cm:register_channel(ClientId, Pid1, ConnInfo),
ok = emqx_cm:register_channel(ClientId, Pid1, ConnInfo),
ok = emqx_cm:register_channel(ClientId, Pid2, ConnInfo),
?assertEqual([Pid1, Pid2], lists:sort(emqx_cm:lookup_channels(ClientId))),
case Reason of
noproc -> exit(Pid1, kill), exit(Pid2, kill);
_ -> ok
end,
ok = case Action of
kick -> emqx_cm:kick_session(ClientId);
discard -> emqx_cm:discard_session(ClientId)
end,
case Reason =:= timeout orelse Reason =:= noproc of
true ->
?assertEqual(killed, ?WAIT({'DOWN', _, process, Pid1, R}, 2_000, R)),
?assertEqual(killed, ?WAIT({'DOWN', _, process, Pid2, R}, 2_000, R));
false ->
?assertEqual(Reason, ?WAIT({'DOWN', _, process, Pid1, R}, 2_000, R)),
?assertEqual(Reason, ?WAIT({'DOWN', _, process, Pid2, R}, 2_000, R))
end,
ok = flush_emqx_pool(),
?assertEqual([], emqx_cm:lookup_channels(ClientId)).
ok = meck:new(emqx_connection, [passthrough, no_history]),
ok = meck:expect(emqx_connection, call, fun(_, _) -> ok end),
ok = meck:expect(emqx_connection, call, fun(_, _, _) -> ok end),
ok = emqx_cm:discard_session(ClientId),
ok = emqx_cm:register_channel(ClientId, self(), ConnInfo),
ok = emqx_cm:discard_session(ClientId),
ok = emqx_cm:unregister_channel(ClientId),
ok = emqx_cm:register_channel(ClientId, self(), ConnInfo),
ok = emqx_cm:discard_session(ClientId),
ok = meck:expect(emqx_connection, call, fun(_, _) -> error(testing) end),
ok = meck:expect(emqx_connection, call, fun(_, _, _) -> error(testing) end),
ok = emqx_cm:discard_session(ClientId),
ok = emqx_cm:unregister_channel(ClientId),
ok = meck:unload(emqx_connection).
%% Channel deregistration is delegated to emqx_pool as a sync tasks.
%% The emqx_pool is pool of workers, and there is no way to know
%% which worker was picked for the last deregistration task.
%% This help function creates a large enough number of async tasks
%% to sync with the pool workers.
%% The number of tasks should be large enough to ensure all workers have
%% the chance to work on at least one of the tasks.
flush_emqx_pool() ->
Self = self(),
L = lists:seq(1, 1000),
lists:foreach(fun(I) -> emqx_pool:async_submit(fun() -> Self ! {done, I} end, []) end, L),
lists:foreach(fun(I) -> receive {done, I} -> ok end end, L).
t_discard_session_race(_) ->
ClientId = rand_client_id(),
@ -231,27 +307,6 @@ t_takeover_session(_) ->
{ok, emqx_connection, _, test} = emqx_cm:takeover_session(<<"clientid">>),
emqx_cm:unregister_channel(<<"clientid">>).
t_kick_session(_) ->
Info = #{conninfo := ConnInfo} = ?ChanInfo,
ok = meck:new(emqx_connection, [passthrough, no_history]),
ok = meck:expect(emqx_connection, call, fun(_, _) -> test end),
ok = meck:expect(emqx_connection, call, fun(_, _, _) -> test end),
{error, not_found} = emqx_cm:kick_session(<<"clientid">>),
ok = emqx_cm:register_channel(<<"clientid">>, self(), ConnInfo),
ok = emqx_cm:insert_channel_info(<<"clientid">>, Info, []),
test = emqx_cm:kick_session(<<"clientid">>),
erlang:spawn_link(
fun() ->
ok = emqx_cm:register_channel(<<"clientid">>, self(), ConnInfo),
ok = emqx_cm:insert_channel_info(<<"clientid">>, Info, []),
timer:sleep(1000)
end),
ct:sleep(100),
test = emqx_cm:kick_session(<<"clientid">>),
ok = emqx_cm:unregister_channel(<<"clientid">>),
ok = meck:unload(emqx_connection).
t_all_channels(_) ->
?assertEqual(true, is_list(emqx_cm:all_channels())).