fix(emqx_cm): replace ?tp with ?LOG

This commit is contained in:
JianBo He 2021-11-09 18:27:17 +08:00
parent 915c827fdc
commit 3d7f4335a0
3 changed files with 32 additions and 30 deletions

View File

@ -945,7 +945,7 @@ handle_info({sock_closed, Reason}, Channel =
Shutdown -> Shutdown Shutdown -> Shutdown
end; end;
handle_info({sock_closed, Reason}, Channel = #channel{conn_state = disconnected}) -> handle_info({sock_closed, _Reason}, Channel = #channel{conn_state = disconnected}) ->
%% Since sock_closed messages can be generated multiple times, %% Since sock_closed messages can be generated multiple times,
%% we can simply ignore errors of this type in the disconnected state. %% we can simply ignore errors of this type in the disconnected state.
%% e.g. when the socket send function returns an error, there is already %% e.g. when the socket send function returns an error, there is already

View File

@ -72,6 +72,9 @@
%% Internal export %% Internal export
-export([stats_fun/0, clean_down/1]). -export([stats_fun/0, clean_down/1]).
%% Test export
-export([register_channel_/3]).
-type(chan_pid() :: pid()). -type(chan_pid() :: pid()).
%% Tables for channel management. %% Tables for channel management.
@ -88,14 +91,12 @@
%% Batch drain %% Batch drain
-define(BATCH_SIZE, 100000). -define(BATCH_SIZE, 100000).
-define(T_TAKEOVER, 15000).
%% Server name %% Server name
-define(CM, ?MODULE). -define(CM, ?MODULE).
-define(T_KICK, 5_000). -define(T_KICK, 5000).
-define(T_GET_INFO, 5_000). -define(T_GET_INFO, 5000).
-define(T_TAKEOVER, 15_000). -define(T_TAKEOVER, 15000).
%% @doc Start the channel manager. %% @doc Start the channel manager.
-spec(start_link() -> startlink_ret()). -spec(start_link() -> startlink_ret()).
@ -298,28 +299,26 @@ kick_or_kill(Action, ConnMod, Pid) ->
ok = apply(ConnMod, call, [Pid, Action, ?T_KICK]) ok = apply(ConnMod, call, [Pid, Action, ?T_KICK])
catch catch
_ : noproc -> % emqx_ws_connection: call _ : noproc -> % emqx_ws_connection: call
ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}); ?LOG(debug, "session_already_gone: ~p, action: ~p", [Pid, Action]),
ok;
_ : {noproc, _} -> % emqx_connection: gen_server:call _ : {noproc, _} -> % emqx_connection: gen_server:call
ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}); ?LOG(debug, "session_already_gone: ~p, action: ~p", [Pid, Action]),
ok;
_ : {shutdown, _} -> _ : {shutdown, _} ->
ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}); ?LOG(debug, "session_already_shutdown: ~p, action: ~p", [Pid, Action]),
ok;
_ : {{shutdown, _}, _} -> _ : {{shutdown, _}, _} ->
ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}); ?LOG(debug, "session_already_shutdown: ~p, action: ~p", [Pid, Action]),
ok;
_ : {timeout, {gen_server, call, _}} -> _ : {timeout, {gen_server, call, _}} ->
?tp(warning, "session_kick_timeout", ?LOG(warning, "session_kick_timeout: ~p, action: ~p, "
#{pid => Pid, "stale_channel: ~p",
action => Action, [Pid, Action, stale_channel_info(Pid)]),
stale_channel => stale_channel_info(Pid)
}),
ok = force_kill(Pid); ok = force_kill(Pid);
_ : Error : St -> _ : Error ->
?tp(error, "session_kick_exception", ?LOG(error, "session_kick_exception: ~p, action: ~p, "
#{pid => Pid, "reason: ~p, stacktrace: ~p, stale_channel: ~p",
action => Action, [Pid, Action, Error, erlang:get_stacktrace(), stale_channel_info(Pid)]),
reason => Error,
stacktrace => St,
stale_channel => stale_channel_info(Pid)
}),
ok = force_kill(Pid) ok = force_kill(Pid)
end. end.

View File

@ -211,9 +211,9 @@ test_kick_session(Action, Reason) ->
end, end,
{Pid1, _} = spawn_monitor(FakeSessionFun), {Pid1, _} = spawn_monitor(FakeSessionFun),
{Pid2, _} = spawn_monitor(FakeSessionFun), {Pid2, _} = spawn_monitor(FakeSessionFun),
ok = emqx_cm:register_channel(ClientId, Pid1, ConnInfo), ok = emqx_cm:register_channel_(ClientId, Pid1, ConnInfo),
ok = emqx_cm:register_channel(ClientId, Pid1, ConnInfo), ok = emqx_cm:register_channel_(ClientId, Pid1, ConnInfo),
ok = emqx_cm:register_channel(ClientId, Pid2, ConnInfo), ok = emqx_cm:register_channel_(ClientId, Pid2, ConnInfo),
?assertEqual([Pid1, Pid2], lists:sort(emqx_cm:lookup_channels(ClientId))), ?assertEqual([Pid1, Pid2], lists:sort(emqx_cm:lookup_channels(ClientId))),
case Reason of case Reason of
noproc -> exit(Pid1, kill), exit(Pid2, kill); noproc -> exit(Pid1, kill), exit(Pid2, kill);
@ -225,15 +225,18 @@ test_kick_session(Action, Reason) ->
end, end,
case Reason =:= timeout orelse Reason =:= noproc of case Reason =:= timeout orelse Reason =:= noproc of
true -> true ->
?assertEqual(killed, ?WAIT({'DOWN', _, process, Pid1, R}, 2_000, R)), ?assertEqual(killed, ?WAIT({'DOWN', _, process, Pid1, R}, 2000, R)),
?assertEqual(killed, ?WAIT({'DOWN', _, process, Pid2, R}, 2_000, R)); ?assertEqual(killed, ?WAIT({'DOWN', _, process, Pid2, R}, 2000, R));
false -> false ->
?assertEqual(Reason, ?WAIT({'DOWN', _, process, Pid1, R}, 2_000, R)), ?assertEqual(Reason, ?WAIT({'DOWN', _, process, Pid1, R}, 2000, R)),
?assertEqual(Reason, ?WAIT({'DOWN', _, process, Pid2, R}, 2_000, R)) ?assertEqual(Reason, ?WAIT({'DOWN', _, process, Pid2, R}, 2000, R))
end, end,
ok = flush_emqx_pool(), ok = flush_emqx_pool(),
?assertEqual([], emqx_cm:lookup_channels(ClientId)). ?assertEqual([], emqx_cm:lookup_channels(ClientId)).
rand_client_id() ->
list_to_binary("client-id-" ++ integer_to_list(erlang:system_time())).
%% Channel deregistration is delegated to emqx_pool as a sync tasks. %% Channel deregistration is delegated to emqx_pool as a sync tasks.
%% The emqx_pool is pool of workers, and there is no way to know %% The emqx_pool is pool of workers, and there is no way to know
%% which worker was picked for the last deregistration task. %% which worker was picked for the last deregistration task.