From 93bdee80eaa304347e9d79957c6790c3b670cc24 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 11 Apr 2022 01:00:37 +0800 Subject: [PATCH] hitofix: update emqx_cm module to e4.2.10 --- src/emqx_cm.erl | 163 +++++++++++++++++++++++++++------------- src/emqx_connection.erl | 5 +- 2 files changed, 114 insertions(+), 54 deletions(-) diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 200a5f08e..2e917ca79 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -70,7 +70,10 @@ ]). %% Internal export --export([stats_fun/0]). +-export([stats_fun/0, clean_down/1]). + +%% Test export +-export([register_channel_/3]). -type(chan_pid() :: pid()). @@ -91,6 +94,10 @@ %% Server name -define(CM, ?MODULE). +-define(T_KICK, 5000). +-define(T_GET_INFO, 5000). +-define(T_TAKEOVER, 15000). + %% @doc Start the channel manager. -spec(start_link() -> startlink_ret()). start_link() -> @@ -159,7 +166,7 @@ get_chan_info(ClientId, ChanPid) when node(ChanPid) == node() -> error:badarg -> undefined end; get_chan_info(ClientId, ChanPid) -> - rpc_call(node(ChanPid), get_chan_info, [ClientId, ChanPid]). + rpc_call(node(ChanPid), get_chan_info, [ClientId, ChanPid], ?T_GET_INFO). %% @doc Update infos of the channel. -spec(set_chan_info(emqx_types:clientid(), emqx_types:attrs()) -> boolean()). @@ -184,7 +191,7 @@ get_chan_stats(ClientId, ChanPid) when node(ChanPid) == node() -> error:badarg -> undefined end; get_chan_stats(ClientId, ChanPid) -> - rpc_call(node(ChanPid), get_chan_stats, [ClientId, ChanPid]). + rpc_call(node(ChanPid), get_chan_stats, [ClientId, ChanPid], ?T_GET_INFO). %% @doc Set channel's stats. -spec(set_chan_stats(emqx_types:clientid(), emqx_types:stats()) -> boolean()). @@ -222,7 +229,7 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) -> case takeover_session(ClientId) of {ok, ConnMod, ChanPid, Session} -> ok = emqx_session:resume(ClientInfo, Session), - Pendings = ConnMod:call(ChanPid, {takeover, 'end'}), + Pendings = ConnMod:call(ChanPid, {takeover, 'end'}, ?T_TAKEOVER), register_channel_(ClientId, Self, ConnInfo), {ok, #{session => Session, present => true, @@ -252,7 +259,7 @@ takeover_session(ClientId) -> takeover_session(ClientId, ChanPid); ChanPids -> [ChanPid|StalePids] = lists:reverse(ChanPids), - ?LOG(error, "More than one channel found: ~p", [ChanPids]), + ?LOG(error, "more_than_one_channel_found: ~p", [ChanPids]), lists:foreach(fun(StalePid) -> catch discard_session(ClientId, StalePid) end, StalePids), @@ -264,67 +271,111 @@ takeover_session(ClientId, ChanPid) when node(ChanPid) == node() -> undefined -> {error, not_found}; ConnMod when is_atom(ConnMod) -> - Session = ConnMod:call(ChanPid, {takeover, 'begin'}), + %% TODO: if takeover times out, maybe kill the old? + Session = ConnMod:call(ChanPid, {takeover, 'begin'}, ?T_TAKEOVER), {ok, ConnMod, ChanPid, Session} end; - takeover_session(ClientId, ChanPid) -> - rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid]). + rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid], ?T_TAKEOVER). %% @doc Discard all the sessions identified by the ClientId. -spec(discard_session(emqx_types:clientid()) -> ok). discard_session(ClientId) when is_binary(ClientId) -> case lookup_channels(ClientId) of [] -> ok; - ChanPids -> - lists:foreach( - fun(ChanPid) -> - try - discard_session(ClientId, ChanPid) - catch - _:{noproc,_}:_Stk -> ok; - _:{{shutdown,_},_}:_Stk -> ok; - _:Error:_Stk -> - ?LOG(error, "Failed to discard ~0p: ~0p", [ChanPid, Error]) - end - end, ChanPids) + ChanPids -> lists:foreach(fun(Pid) -> discard_session(ClientId, Pid) end, ChanPids) end. -discard_session(ClientId, ChanPid) when node(ChanPid) == node() -> - case get_chann_conn_mod(ClientId, ChanPid) of - undefined -> ok; - ConnMod when is_atom(ConnMod) -> - ConnMod:call(ChanPid, discard) - end; +%% @private Kick a local stale session to force it step down. +%% If failed to kick (e.g. timeout) force a kill. +%% Keeping the stale pid around, or returning error or raise an exception +%% benefits nobody. +-spec kick_or_kill(kick | discard, module(), pid()) -> ok. +kick_or_kill(Action, ConnMod, Pid) -> + try + %% this is essentailly a gen_server:call implemented in emqx_connection + %% and emqx_ws_connection. + %% the handle_call is implemented in emqx_channel + ok = apply(ConnMod, call, [Pid, Action, ?T_KICK]) + catch + _ : noproc -> % emqx_ws_connection: call + ?LOG(debug, "session_already_gone: ~p, action: ~p", [Pid, Action]), + ok; + _ : {noproc, _} -> % emqx_connection: gen_server:call + ?LOG(debug, "session_already_gone: ~p, action: ~p", [Pid, Action]), + ok; + _ : {shutdown, _} -> + ?LOG(debug, "session_already_shutdown: ~p, action: ~p", [Pid, Action]), + ok; + _ : {{shutdown, _}, _} -> + ?LOG(debug, "session_already_shutdown: ~p, action: ~p", [Pid, Action]), + ok; + _ : {timeout, {gen_server, call, _}} -> + ?LOG(warning, "session_kick_timeout: ~p, action: ~p, " + "stale_channel: ~p", + [Pid, Action, stale_channel_info(Pid)]), + ok = force_kill(Pid); + _ : Error -> + ?LOG(error, "session_kick_exception: ~p, action: ~p, " + "reason: ~p, stacktrace: ~p, stale_channel: ~p", + [Pid, Action, Error, erlang:get_stacktrace(), stale_channel_info(Pid)]), + ok = force_kill(Pid) + end. + +force_kill(Pid) -> + exit(Pid, kill), + ok. + +stale_channel_info(Pid) -> + process_info(Pid, [status, message_queue_len, current_stacktrace]). discard_session(ClientId, ChanPid) -> - rpc_call(node(ChanPid), discard_session, [ClientId, ChanPid]). + kick_session(discard, ClientId, ChanPid). + +kick_session(ClientId, ChanPid) -> + kick_session(kick, ClientId, ChanPid). + +%% @private This function is shared for session 'kick' and 'discard' (as the first arg Action). +kick_session(Action, ClientId, ChanPid) when node(ChanPid) == node() -> + case get_chann_conn_mod(ClientId, ChanPid) of + undefined -> + %% already deregistered + ok; + ConnMod when is_atom(ConnMod) -> + ok = kick_or_kill(Action, ConnMod, ChanPid) + end; +kick_session(Action, ClientId, ChanPid) -> + %% call remote node on the old APIs because we do not know if they have upgraded + %% to have kick_session/3 + Function = case Action of + discard -> discard_session; + kick -> kick_session + end, + try + rpc_call(node(ChanPid), Function, [ClientId, ChanPid], ?T_KICK) + catch + Error : Reason -> + %% This should mostly be RPC failures. + %% However, if the node is still running the old version + %% code (prior to emqx app 4.3.10) some of the RPC handler + %% exceptions may get propagated to a new version node + ?LOG(error, "failed_to_kick_session_on_remote_node ~p: ~p ~p ~p", + [node(ChanPid), Action, Error, Reason]) + end. kick_session(ClientId) -> case lookup_channels(ClientId) of - [] -> {error, not_found}; - [ChanPid] -> - kick_session(ClientId, ChanPid); + [] -> + ?LOG(warning, "kiecked_an_unknown_session ~ts", [ClientId]), + ok; ChanPids -> - [ChanPid|StalePids] = lists:reverse(ChanPids), - ?LOG(error, "More than one channel found: ~p", [ChanPids]), - lists:foreach(fun(StalePid) -> - catch discard_session(ClientId, StalePid) - end, StalePids), - kick_session(ClientId, ChanPid) + case length(ChanPids) > 1 of + true -> ?LOG(info, "more_than_one_channel_found: ~p", [ChanPids]); + false -> ok + end, + lists:foreach(fun(Pid) -> kick_session(ClientId, Pid) end, ChanPids) end. -kick_session(ClientId, ChanPid) when node(ChanPid) == node() -> - case get_chan_info(ClientId, ChanPid) of - #{conninfo := #{conn_mod := ConnMod}} -> - ConnMod:call(ChanPid, kick); - undefined -> - {error, not_found} - end; - -kick_session(ClientId, ChanPid) -> - rpc_call(node(ChanPid), kick_session, [ClientId, ChanPid]). - %% @doc Is clean start? % is_clean_start(#{clean_start := false}) -> false; % is_clean_start(_Attrs) -> true. @@ -360,10 +411,16 @@ lookup_channels(local, ClientId) -> [ChanPid || {_, ChanPid} <- ets:lookup(?CHAN_TAB, ClientId)]. %% @private -rpc_call(Node, Fun, Args) -> - case rpc:call(Node, ?MODULE, Fun, Args) of - {badrpc, Reason} -> error(Reason); - Res -> Res +rpc_call(Node, Fun, Args, Timeout) -> + case rpc:call(Node, ?MODULE, Fun, Args, 2 * Timeout) of + {badrpc, Reason} -> + %% since eqmx app 4.3.10, the 'kick' and 'discard' calls hanndler + %% should catch all exceptions and always return 'ok'. + %% This leaves 'badrpc' only possible when there is problem + %% calling the remote node. + error({badrpc, Reason}); + Res -> + Res end. %% @private @@ -396,7 +453,7 @@ handle_cast(Msg, State) -> handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}) -> ChanPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)], {Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon), - ok = emqx_pool:async_submit(fun lists:foreach/2, [fun clean_down/1, Items]), + ok = emqx_pool:async_submit(fun lists:foreach/2, [fun ?MODULE:clean_down/1, Items]), {noreply, State#{chan_pmon := PMon1}}; handle_info(Info, State) -> @@ -432,5 +489,5 @@ get_chann_conn_mod(ClientId, ChanPid) when node(ChanPid) == node() -> error:badarg -> undefined end; get_chann_conn_mod(ClientId, ChanPid) -> - rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid]). + rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid], ?T_GET_INFO). diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 720b7ec98..8a3e5e35d 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -38,7 +38,7 @@ , stats/1 ]). --export([call/2]). +-export([call/2, call/3]). %% Callback -export([init/4]). @@ -170,6 +170,9 @@ stats(#state{transport = Transport, call(Pid, Req) -> gen_server:call(Pid, Req, infinity). +call(Pid, Req, Timeout) -> + gen_server:call(Pid, Req, Timeout). + stop(Pid) -> gen_server:stop(Pid).