diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 200a5f08e..36a280714 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -88,6 +88,8 @@ %% Batch drain -define(BATCH_SIZE, 100000). +-define(T_TAKEOVER, 15000). + %% Server name -define(CM, ?MODULE). @@ -222,7 +224,7 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) -> case takeover_session(ClientId) of {ok, ConnMod, ChanPid, Session} -> ok = emqx_session:resume(ClientInfo, Session), - Pendings = ConnMod:call(ChanPid, {takeover, 'end'}), + Pendings = ConnMod:call(ChanPid, {takeover, 'end'}, ?T_TAKEOVER), register_channel_(ClientId, Self, ConnInfo), {ok, #{session => Session, present => true, @@ -264,7 +266,7 @@ takeover_session(ClientId, ChanPid) when node(ChanPid) == node() -> undefined -> {error, not_found}; ConnMod when is_atom(ConnMod) -> - Session = ConnMod:call(ChanPid, {takeover, 'begin'}), + Session = ConnMod:call(ChanPid, {takeover, 'begin'}, ?T_TAKEOVER), {ok, ConnMod, ChanPid, Session} end; @@ -277,24 +279,35 @@ discard_session(ClientId) when is_binary(ClientId) -> case lookup_channels(ClientId) of [] -> ok; ChanPids -> - lists:foreach( - fun(ChanPid) -> - try - discard_session(ClientId, ChanPid) - catch - _:{noproc,_}:_Stk -> ok; - _:{{shutdown,_},_}:_Stk -> ok; - _:Error:_Stk -> - ?LOG(error, "Failed to discard ~0p: ~0p", [ChanPid, Error]) - end - end, ChanPids) + lists:foreach(fun(Pid) -> do_discard_session(ClientId, Pid) end, ChanPids) + end. + +do_discard_session(ClientId, Pid) -> + try + discard_session(ClientId, Pid) + catch + _ : noproc -> % emqx_ws_connection: call + ?LOG(debug, "session_already_gone: ~p", [Pid]), + ok; + _ : {noproc, _} -> % emqx_connection: gen_server:call + ?LOG(debug, "session_already_gone: ~p", [Pid]), + ok; + _ : {'EXIT', {noproc, _}} -> % rpc_call/3 + ?LOG(debug, "session_already_gone: ~p", [Pid]), + ok; + _ : {{shutdown, _}, _} -> + ?LOG(debug, "session_already_shutdown: ~p", [Pid]), + ok; + _ : Error : St -> + ?LOG(debug, "failed_to_discard_session: ~p, " + "error: ~p, stacktrace: ~0p", [Pid, Error, St]) end. discard_session(ClientId, ChanPid) when node(ChanPid) == node() -> case get_chann_conn_mod(ClientId, ChanPid) of undefined -> ok; ConnMod when is_atom(ConnMod) -> - ConnMod:call(ChanPid, discard) + ConnMod:call(ChanPid, discard, ?T_TAKEOVER) end; discard_session(ClientId, ChanPid) -> @@ -317,7 +330,7 @@ kick_session(ClientId) -> kick_session(ClientId, ChanPid) when node(ChanPid) == node() -> case get_chan_info(ClientId, ChanPid) of #{conninfo := #{conn_mod := ConnMod}} -> - ConnMod:call(ChanPid, kick); + ConnMod:call(ChanPid, kick, ?T_TAKEOVER); undefined -> {error, not_found} end; @@ -361,7 +374,7 @@ lookup_channels(local, ClientId) -> %% @private rpc_call(Node, Fun, Args) -> - case rpc:call(Node, ?MODULE, Fun, Args) of + case rpc:call(Node, ?MODULE, Fun, Args, 2 * ?T_TAKEOVER) of {badrpc, Reason} -> error(Reason); Res -> Res end. diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 720b7ec98..69337adf2 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -38,7 +38,7 @@ , stats/1 ]). --export([call/2]). +-export([call/2, call/3]). %% Callback -export([init/4]). @@ -168,7 +168,9 @@ stats(#state{transport = Transport, lists:append([SockStats, ConnStats, ChanStats, ProcStats]). call(Pid, Req) -> - gen_server:call(Pid, Req, infinity). + call(Pid, Req, infinity). +call(Pid, Req, Timeout) -> + gen_server:call(Pid, Req, Timeout). stop(Pid) -> gen_server:stop(Pid). diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index aacc4c4f2..ac975850e 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -34,7 +34,7 @@ , stats/1 ]). --export([call/2]). +-export([call/2, call/3]). %% WebSocket callbacks -export([ init/2 @@ -151,7 +151,10 @@ stats(#state{channel = Channel}) -> %% kick|discard|takeover -spec(call(pid(), Req :: term()) -> Reply :: term()). -call(WsPid, Req) when is_pid(WsPid) -> +call(WsPid, Req) -> + call(WsPid, Req, 5000). + +call(WsPid, Req, Timeout) when is_pid(WsPid) -> Mref = erlang:monitor(process, WsPid), WsPid ! {call, {self(), Mref}, Req}, receive @@ -160,7 +163,7 @@ call(WsPid, Req) when is_pid(WsPid) -> Reply; {'DOWN', Mref, _, _, Reason} -> exit(Reason) - after 5000 -> + after Timeout -> erlang:demonitor(Mref, [flush]), exit(timeout) end. diff --git a/test/emqx_cm_SUITE.erl b/test/emqx_cm_SUITE.erl index 6a4b84d01..b32cdaf23 100644 --- a/test/emqx_cm_SUITE.erl +++ b/test/emqx_cm_SUITE.erl @@ -77,7 +77,7 @@ t_get_set_chan_stats(_) -> t_open_session(_) -> ok = meck:new(emqx_connection, [passthrough, no_history]), - ok = meck:expect(emqx_connection, call, fun(_, _) -> ok end), + ok = meck:expect(emqx_connection, call, fun(_, _, _) -> ok end), ClientInfo = #{zone => external, clientid => <<"clientid">>, @@ -153,14 +153,14 @@ t_open_session_race_condition(_) -> t_discard_session(_) -> ok = meck:new(emqx_connection, [passthrough, no_history]), - ok = meck:expect(emqx_connection, call, fun(_, _) -> ok end), + ok = meck:expect(emqx_connection, call, fun(_, _, _) -> ok end), ok = emqx_cm:discard_session(<<"clientid">>), ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []), ok = emqx_cm:discard_session(<<"clientid">>), ok = emqx_cm:unregister_channel(<<"clientid">>), ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []), ok = emqx_cm:discard_session(<<"clientid">>), - ok = meck:expect(emqx_connection, call, fun(_, _) -> error(testing) end), + ok = meck:expect(emqx_connection, call, fun(_, _, _) -> error(testing) end), ok = emqx_cm:discard_session(<<"clientid">>), ok = emqx_cm:unregister_channel(<<"clientid">>), ok = meck:unload(emqx_connection). @@ -180,7 +180,7 @@ t_takeover_session(_) -> t_kick_session(_) -> ok = meck:new(emqx_connection, [passthrough, no_history]), - ok = meck:expect(emqx_connection, call, fun(_, _) -> test end), + ok = meck:expect(emqx_connection, call, fun(_, _, _) -> test end), {error, not_found} = emqx_cm:kick_session(<<"clientid">>), ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []), test = emqx_cm:kick_session(<<"clientid">>),