fix(emqx): add timeout for open/kick a session

This commit is contained in:
Shawn 2021-04-15 22:04:37 +08:00 committed by turtleDeng
parent ebac8c1612
commit 4885171e4f
4 changed files with 33 additions and 21 deletions

View File

@ -92,6 +92,8 @@
%% Server name %% Server name
-define(CM, ?MODULE). -define(CM, ?MODULE).
-define(T_TAKEOVER, 15000).
%% @doc Start the channel manager. %% @doc Start the channel manager.
-spec(start_link() -> startlink_ret()). -spec(start_link() -> startlink_ret()).
start_link() -> start_link() ->
@ -223,7 +225,7 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
case takeover_session(ClientId) of case takeover_session(ClientId) of
{ok, ConnMod, ChanPid, Session} -> {ok, ConnMod, ChanPid, Session} ->
ok = emqx_session:resume(ClientInfo, Session), ok = emqx_session:resume(ClientInfo, Session),
Pendings = ConnMod:call(ChanPid, {takeover, 'end'}), Pendings = ConnMod:call(ChanPid, {takeover, 'end'}, ?T_TAKEOVER),
register_channel(ClientId, Self, ConnInfo), register_channel(ClientId, Self, ConnInfo),
{ok, #{session => Session, {ok, #{session => Session,
present => true, present => true,
@ -265,7 +267,7 @@ takeover_session(ClientId, ChanPid) when node(ChanPid) == node() ->
undefined -> undefined ->
{error, not_found}; {error, not_found};
ConnMod when is_atom(ConnMod) -> ConnMod when is_atom(ConnMod) ->
Session = ConnMod:call(ChanPid, {takeover, 'begin'}), Session = ConnMod:call(ChanPid, {takeover, 'begin'}, ?T_TAKEOVER),
{ok, ConnMod, ChanPid, Session} {ok, ConnMod, ChanPid, Session}
end; end;
@ -295,7 +297,7 @@ discard_session(ClientId, ChanPid) when node(ChanPid) == node() ->
case get_chann_conn_mod(ClientId, ChanPid) of case get_chann_conn_mod(ClientId, ChanPid) of
undefined -> ok; undefined -> ok;
ConnMod when is_atom(ConnMod) -> ConnMod when is_atom(ConnMod) ->
ConnMod:call(ChanPid, discard) ConnMod:call(ChanPid, discard, ?T_TAKEOVER)
end; end;
discard_session(ClientId, ChanPid) -> discard_session(ClientId, ChanPid) ->
@ -318,7 +320,7 @@ kick_session(ClientId) ->
kick_session(ClientId, ChanPid) when node(ChanPid) == node() -> kick_session(ClientId, ChanPid) when node(ChanPid) == node() ->
case get_chan_info(ClientId, ChanPid) of case get_chan_info(ClientId, ChanPid) of
#{conninfo := #{conn_mod := ConnMod}} -> #{conninfo := #{conn_mod := ConnMod}} ->
ConnMod:call(ChanPid, kick); ConnMod:call(ChanPid, kick, ?T_TAKEOVER);
undefined -> undefined ->
{error, not_found} {error, not_found}
end; end;

View File

@ -41,7 +41,9 @@
, stats/1 , stats/1
]). ]).
-export([call/2]). -export([ call/2
, call/3
]).
%% Callback %% Callback
-export([init/4]). -export([init/4]).
@ -183,7 +185,9 @@ stats(#state{transport = Transport,
lists:append([SockStats, ConnStats, ChanStats, ProcStats]). lists:append([SockStats, ConnStats, ChanStats, ProcStats]).
call(Pid, Req) -> call(Pid, Req) ->
gen_server:call(Pid, Req, infinity). call(Pid, Req, infinity).
call(Pid, Req, Timeout) ->
gen_server:call(Pid, Req, Timeout).
stop(Pid) -> stop(Pid) ->
gen_server:stop(Pid). gen_server:stop(Pid).

View File

@ -34,7 +34,9 @@
, stats/1 , stats/1
]). ]).
-export([call/2]). -export([ call/2
, call/3
]).
%% WebSocket callbacks %% WebSocket callbacks
-export([ init/2 -export([ init/2
@ -151,7 +153,10 @@ stats(#state{channel = Channel}) ->
%% kick|discard|takeover %% kick|discard|takeover
-spec(call(pid(), Req :: term()) -> Reply :: term()). -spec(call(pid(), Req :: term()) -> Reply :: term()).
call(WsPid, Req) when is_pid(WsPid) -> call(WsPid, Req) ->
call(WsPid, Req, 5000).
call(WsPid, Req, Timeout) when is_pid(WsPid) ->
Mref = erlang:monitor(process, WsPid), Mref = erlang:monitor(process, WsPid),
WsPid ! {call, {self(), Mref}, Req}, WsPid ! {call, {self(), Mref}, Req},
receive receive
@ -160,7 +165,7 @@ call(WsPid, Req) when is_pid(WsPid) ->
Reply; Reply;
{'DOWN', Mref, _, _, Reason} -> {'DOWN', Mref, _, _, Reason} ->
exit(Reason) exit(Reason)
after 5000 -> after Timeout ->
erlang:demonitor(Mref, [flush]), erlang:demonitor(Mref, [flush]),
exit(timeout) exit(timeout)
end. end.

View File

@ -179,7 +179,7 @@ t_discard_session(_) ->
t_takeover_session(_) -> t_takeover_session(_) ->
#{conninfo := ConnInfo} = ?ChanInfo, #{conninfo := ConnInfo} = ?ChanInfo,
{error, not_found} = emqx_cm:takeover_session(<<"clientid">>), {error, not_found} = emqx_cm:takeover_session(<<"clientid">>),
erlang:spawn(fun() -> erlang:spawn_link(fun() ->
ok = emqx_cm:register_channel(<<"clientid">>, self(), ConnInfo), ok = emqx_cm:register_channel(<<"clientid">>, self(), ConnInfo),
receive receive
{'$gen_call', From, {takeover, 'begin'}} -> {'$gen_call', From, {takeover, 'begin'}} ->
@ -198,7 +198,8 @@ t_kick_session(_) ->
ok = emqx_cm:register_channel(<<"clientid">>, self(), ConnInfo), ok = emqx_cm:register_channel(<<"clientid">>, self(), ConnInfo),
ok = emqx_cm:insert_channel_info(<<"clientid">>, Info, []), ok = emqx_cm:insert_channel_info(<<"clientid">>, Info, []),
test = emqx_cm:kick_session(<<"clientid">>), test = emqx_cm:kick_session(<<"clientid">>),
erlang:spawn(fun() -> erlang:spawn_link(
fun() ->
ok = emqx_cm:register_channel(<<"clientid">>, self(), ConnInfo), ok = emqx_cm:register_channel(<<"clientid">>, self(), ConnInfo),
ok = emqx_cm:insert_channel_info(<<"clientid">>, Info, []), ok = emqx_cm:insert_channel_info(<<"clientid">>, Info, []),