fix: more safe session call

This commit is contained in:
JianBo He 2021-09-24 22:41:53 +08:00
parent aa5d274464
commit 7b177a7929
4 changed files with 43 additions and 25 deletions

View File

@ -88,6 +88,8 @@
%% Batch drain %% Batch drain
-define(BATCH_SIZE, 100000). -define(BATCH_SIZE, 100000).
-define(T_TAKEOVER, 15000).
%% Server name %% Server name
-define(CM, ?MODULE). -define(CM, ?MODULE).
@ -222,7 +224,7 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
case takeover_session(ClientId) of case takeover_session(ClientId) of
{ok, ConnMod, ChanPid, Session} -> {ok, ConnMod, ChanPid, Session} ->
ok = emqx_session:resume(ClientInfo, Session), ok = emqx_session:resume(ClientInfo, Session),
Pendings = ConnMod:call(ChanPid, {takeover, 'end'}), Pendings = ConnMod:call(ChanPid, {takeover, 'end'}, ?T_TAKEOVER),
register_channel_(ClientId, Self, ConnInfo), register_channel_(ClientId, Self, ConnInfo),
{ok, #{session => Session, {ok, #{session => Session,
present => true, present => true,
@ -264,7 +266,7 @@ takeover_session(ClientId, ChanPid) when node(ChanPid) == node() ->
undefined -> undefined ->
{error, not_found}; {error, not_found};
ConnMod when is_atom(ConnMod) -> ConnMod when is_atom(ConnMod) ->
Session = ConnMod:call(ChanPid, {takeover, 'begin'}), Session = ConnMod:call(ChanPid, {takeover, 'begin'}, ?T_TAKEOVER),
{ok, ConnMod, ChanPid, Session} {ok, ConnMod, ChanPid, Session}
end; end;
@ -277,24 +279,35 @@ discard_session(ClientId) when is_binary(ClientId) ->
case lookup_channels(ClientId) of case lookup_channels(ClientId) of
[] -> ok; [] -> ok;
ChanPids -> ChanPids ->
lists:foreach( lists:foreach(fun(Pid) -> do_discard_session(ClientId, Pid) end, ChanPids)
fun(ChanPid) -> end.
try
discard_session(ClientId, ChanPid) do_discard_session(ClientId, Pid) ->
catch try
_:{noproc,_}:_Stk -> ok; discard_session(ClientId, Pid)
_:{{shutdown,_},_}:_Stk -> ok; catch
_:Error:_Stk -> _ : noproc -> % emqx_ws_connection: call
?LOG(error, "Failed to discard ~0p: ~0p", [ChanPid, Error]) ?LOG(debug, "session_already_gone: ~p", [Pid]),
end ok;
end, ChanPids) _ : {noproc, _} -> % emqx_connection: gen_server:call
?LOG(debug, "session_already_gone: ~p", [Pid]),
ok;
_ : {'EXIT', {noproc, _}} -> % rpc_call/3
?LOG(debug, "session_already_gone: ~p", [Pid]),
ok;
_ : {{shutdown, _}, _} ->
?LOG(debug, "session_already_shutdown: ~p", [Pid]),
ok;
_ : Error : St ->
?LOG(debug, "failed_to_discard_session: ~p, "
"error: ~p, stacktrace: ~0p", [Pid, Error, St])
end. end.
discard_session(ClientId, ChanPid) when node(ChanPid) == node() -> discard_session(ClientId, ChanPid) when node(ChanPid) == node() ->
case get_chann_conn_mod(ClientId, ChanPid) of case get_chann_conn_mod(ClientId, ChanPid) of
undefined -> ok; undefined -> ok;
ConnMod when is_atom(ConnMod) -> ConnMod when is_atom(ConnMod) ->
ConnMod:call(ChanPid, discard) ConnMod:call(ChanPid, discard, ?T_TAKEOVER)
end; end;
discard_session(ClientId, ChanPid) -> discard_session(ClientId, ChanPid) ->
@ -317,7 +330,7 @@ kick_session(ClientId) ->
kick_session(ClientId, ChanPid) when node(ChanPid) == node() -> kick_session(ClientId, ChanPid) when node(ChanPid) == node() ->
case get_chan_info(ClientId, ChanPid) of case get_chan_info(ClientId, ChanPid) of
#{conninfo := #{conn_mod := ConnMod}} -> #{conninfo := #{conn_mod := ConnMod}} ->
ConnMod:call(ChanPid, kick); ConnMod:call(ChanPid, kick, ?T_TAKEOVER);
undefined -> undefined ->
{error, not_found} {error, not_found}
end; end;
@ -361,7 +374,7 @@ lookup_channels(local, ClientId) ->
%% @private %% @private
rpc_call(Node, Fun, Args) -> rpc_call(Node, Fun, Args) ->
case rpc:call(Node, ?MODULE, Fun, Args) of case rpc:call(Node, ?MODULE, Fun, Args, 2 * ?T_TAKEOVER) of
{badrpc, Reason} -> error(Reason); {badrpc, Reason} -> error(Reason);
Res -> Res Res -> Res
end. end.

View File

@ -38,7 +38,7 @@
, stats/1 , stats/1
]). ]).
-export([call/2]). -export([call/2, call/3]).
%% Callback %% Callback
-export([init/4]). -export([init/4]).
@ -168,7 +168,9 @@ stats(#state{transport = Transport,
lists:append([SockStats, ConnStats, ChanStats, ProcStats]). lists:append([SockStats, ConnStats, ChanStats, ProcStats]).
call(Pid, Req) -> call(Pid, Req) ->
gen_server:call(Pid, Req, infinity). call(Pid, Req, infinity).
call(Pid, Req, Timeout) ->
gen_server:call(Pid, Req, Timeout).
stop(Pid) -> stop(Pid) ->
gen_server:stop(Pid). gen_server:stop(Pid).

View File

@ -34,7 +34,7 @@
, stats/1 , stats/1
]). ]).
-export([call/2]). -export([call/2, call/3]).
%% WebSocket callbacks %% WebSocket callbacks
-export([ init/2 -export([ init/2
@ -151,7 +151,10 @@ stats(#state{channel = Channel}) ->
%% kick|discard|takeover %% kick|discard|takeover
-spec(call(pid(), Req :: term()) -> Reply :: term()). -spec(call(pid(), Req :: term()) -> Reply :: term()).
call(WsPid, Req) when is_pid(WsPid) -> call(WsPid, Req) ->
call(WsPid, Req, 5000).
call(WsPid, Req, Timeout) when is_pid(WsPid) ->
Mref = erlang:monitor(process, WsPid), Mref = erlang:monitor(process, WsPid),
WsPid ! {call, {self(), Mref}, Req}, WsPid ! {call, {self(), Mref}, Req},
receive receive
@ -160,7 +163,7 @@ call(WsPid, Req) when is_pid(WsPid) ->
Reply; Reply;
{'DOWN', Mref, _, _, Reason} -> {'DOWN', Mref, _, _, Reason} ->
exit(Reason) exit(Reason)
after 5000 -> after Timeout ->
erlang:demonitor(Mref, [flush]), erlang:demonitor(Mref, [flush]),
exit(timeout) exit(timeout)
end. end.

View File

@ -77,7 +77,7 @@ t_get_set_chan_stats(_) ->
t_open_session(_) -> t_open_session(_) ->
ok = meck:new(emqx_connection, [passthrough, no_history]), ok = meck:new(emqx_connection, [passthrough, no_history]),
ok = meck:expect(emqx_connection, call, fun(_, _) -> ok end), ok = meck:expect(emqx_connection, call, fun(_, _, _) -> ok end),
ClientInfo = #{zone => external, ClientInfo = #{zone => external,
clientid => <<"clientid">>, clientid => <<"clientid">>,
@ -153,14 +153,14 @@ t_open_session_race_condition(_) ->
t_discard_session(_) -> t_discard_session(_) ->
ok = meck:new(emqx_connection, [passthrough, no_history]), ok = meck:new(emqx_connection, [passthrough, no_history]),
ok = meck:expect(emqx_connection, call, fun(_, _) -> ok end), ok = meck:expect(emqx_connection, call, fun(_, _, _) -> ok end),
ok = emqx_cm:discard_session(<<"clientid">>), ok = emqx_cm:discard_session(<<"clientid">>),
ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []), ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
ok = emqx_cm:discard_session(<<"clientid">>), ok = emqx_cm:discard_session(<<"clientid">>),
ok = emqx_cm:unregister_channel(<<"clientid">>), ok = emqx_cm:unregister_channel(<<"clientid">>),
ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []), ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
ok = emqx_cm:discard_session(<<"clientid">>), ok = emqx_cm:discard_session(<<"clientid">>),
ok = meck:expect(emqx_connection, call, fun(_, _) -> error(testing) end), ok = meck:expect(emqx_connection, call, fun(_, _, _) -> error(testing) end),
ok = emqx_cm:discard_session(<<"clientid">>), ok = emqx_cm:discard_session(<<"clientid">>),
ok = emqx_cm:unregister_channel(<<"clientid">>), ok = emqx_cm:unregister_channel(<<"clientid">>),
ok = meck:unload(emqx_connection). ok = meck:unload(emqx_connection).
@ -180,7 +180,7 @@ t_takeover_session(_) ->
t_kick_session(_) -> t_kick_session(_) ->
ok = meck:new(emqx_connection, [passthrough, no_history]), ok = meck:new(emqx_connection, [passthrough, no_history]),
ok = meck:expect(emqx_connection, call, fun(_, _) -> test end), ok = meck:expect(emqx_connection, call, fun(_, _, _) -> test end),
{error, not_found} = emqx_cm:kick_session(<<"clientid">>), {error, not_found} = emqx_cm:kick_session(<<"clientid">>),
ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []), ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
test = emqx_cm:kick_session(<<"clientid">>), test = emqx_cm:kick_session(<<"clientid">>),