From 4885171e4f177cb6f19dfa0913eb7dd6bd9a6111 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Thu, 15 Apr 2021 22:04:37 +0800 Subject: [PATCH] fix(emqx): add timeout for open/kick a session --- src/emqx_cm.erl | 10 ++++++---- src/emqx_connection.erl | 8 ++++++-- src/emqx_ws_connection.erl | 11 ++++++++--- test/emqx_cm_SUITE.erl | 25 +++++++++++++------------ 4 files changed, 33 insertions(+), 21 deletions(-) diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index cb9792adb..ff27c51a0 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -92,6 +92,8 @@ %% Server name -define(CM, ?MODULE). +-define(T_TAKEOVER, 15000). + %% @doc Start the channel manager. -spec(start_link() -> startlink_ret()). start_link() -> @@ -223,7 +225,7 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) -> case takeover_session(ClientId) of {ok, ConnMod, ChanPid, Session} -> ok = emqx_session:resume(ClientInfo, Session), - Pendings = ConnMod:call(ChanPid, {takeover, 'end'}), + Pendings = ConnMod:call(ChanPid, {takeover, 'end'}, ?T_TAKEOVER), register_channel(ClientId, Self, ConnInfo), {ok, #{session => Session, present => true, @@ -265,7 +267,7 @@ takeover_session(ClientId, ChanPid) when node(ChanPid) == node() -> undefined -> {error, not_found}; ConnMod when is_atom(ConnMod) -> - Session = ConnMod:call(ChanPid, {takeover, 'begin'}), + Session = ConnMod:call(ChanPid, {takeover, 'begin'}, ?T_TAKEOVER), {ok, ConnMod, ChanPid, Session} end; @@ -295,7 +297,7 @@ discard_session(ClientId, ChanPid) when node(ChanPid) == node() -> case get_chann_conn_mod(ClientId, ChanPid) of undefined -> ok; ConnMod when is_atom(ConnMod) -> - ConnMod:call(ChanPid, discard) + ConnMod:call(ChanPid, discard, ?T_TAKEOVER) end; discard_session(ClientId, ChanPid) -> @@ -318,7 +320,7 @@ kick_session(ClientId) -> kick_session(ClientId, ChanPid) when node(ChanPid) == node() -> case get_chan_info(ClientId, ChanPid) of #{conninfo := #{conn_mod := ConnMod}} -> - ConnMod:call(ChanPid, kick); + ConnMod:call(ChanPid, kick, ?T_TAKEOVER); undefined -> {error, not_found} end; diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index eee3b9c5a..96a32cd01 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -41,7 +41,9 @@ , stats/1 ]). --export([call/2]). +-export([ call/2 + , call/3 + ]). %% Callback -export([init/4]). @@ -183,7 +185,9 @@ stats(#state{transport = Transport, lists:append([SockStats, ConnStats, ChanStats, ProcStats]). call(Pid, Req) -> - gen_server:call(Pid, Req, infinity). + call(Pid, Req, infinity). +call(Pid, Req, Timeout) -> + gen_server:call(Pid, Req, Timeout). stop(Pid) -> gen_server:stop(Pid). diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 664ba568f..4925b6815 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -34,7 +34,9 @@ , stats/1 ]). --export([call/2]). +-export([ call/2 + , call/3 + ]). %% WebSocket callbacks -export([ init/2 @@ -151,7 +153,10 @@ stats(#state{channel = Channel}) -> %% kick|discard|takeover -spec(call(pid(), Req :: term()) -> Reply :: term()). -call(WsPid, Req) when is_pid(WsPid) -> +call(WsPid, Req) -> + call(WsPid, Req, 5000). + +call(WsPid, Req, Timeout) when is_pid(WsPid) -> Mref = erlang:monitor(process, WsPid), WsPid ! {call, {self(), Mref}, Req}, receive @@ -160,7 +165,7 @@ call(WsPid, Req) when is_pid(WsPid) -> Reply; {'DOWN', Mref, _, _, Reason} -> exit(Reason) - after 5000 -> + after Timeout -> erlang:demonitor(Mref, [flush]), exit(timeout) end. diff --git a/test/emqx_cm_SUITE.erl b/test/emqx_cm_SUITE.erl index ed4ddd7eb..3b2ced113 100644 --- a/test/emqx_cm_SUITE.erl +++ b/test/emqx_cm_SUITE.erl @@ -179,13 +179,13 @@ t_discard_session(_) -> t_takeover_session(_) -> #{conninfo := ConnInfo} = ?ChanInfo, {error, not_found} = emqx_cm:takeover_session(<<"clientid">>), - erlang:spawn(fun() -> - ok = emqx_cm:register_channel(<<"clientid">>, self(), ConnInfo), - receive - {'$gen_call', From, {takeover, 'begin'}} -> - gen_server:reply(From, test), ok - end - end), + erlang:spawn_link(fun() -> + ok = emqx_cm:register_channel(<<"clientid">>, self(), ConnInfo), + receive + {'$gen_call', From, {takeover, 'begin'}} -> + gen_server:reply(From, test), ok + end + end), timer:sleep(100), {ok, emqx_connection, _, test} = emqx_cm:takeover_session(<<"clientid">>), emqx_cm:unregister_channel(<<"clientid">>). @@ -198,12 +198,13 @@ t_kick_session(_) -> ok = emqx_cm:register_channel(<<"clientid">>, self(), ConnInfo), ok = emqx_cm:insert_channel_info(<<"clientid">>, Info, []), test = emqx_cm:kick_session(<<"clientid">>), - erlang:spawn(fun() -> - ok = emqx_cm:register_channel(<<"clientid">>, self(), ConnInfo), - ok = emqx_cm:insert_channel_info(<<"clientid">>, Info, []), + erlang:spawn_link( + fun() -> + ok = emqx_cm:register_channel(<<"clientid">>, self(), ConnInfo), + ok = emqx_cm:insert_channel_info(<<"clientid">>, Info, []), - timer:sleep(1000) - end), + timer:sleep(1000) + end), ct:sleep(100), test = emqx_cm:kick_session(<<"clientid">>), ok = emqx_cm:unregister_channel(<<"clientid">>),