diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index da412126e..1b20753aa 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -945,7 +945,7 @@ handle_info({sock_closed, Reason}, Channel = Shutdown -> Shutdown end; -handle_info({sock_closed, Reason}, Channel = #channel{conn_state = disconnected}) -> +handle_info({sock_closed, _Reason}, Channel = #channel{conn_state = disconnected}) -> %% Since sock_closed messages can be generated multiple times, %% we can simply ignore errors of this type in the disconnected state. %% e.g. when the socket send function returns an error, there is already diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 10a5dbfff..2e917ca79 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -72,6 +72,9 @@ %% Internal export -export([stats_fun/0, clean_down/1]). +%% Test export +-export([register_channel_/3]). + -type(chan_pid() :: pid()). %% Tables for channel management. @@ -88,14 +91,12 @@ %% Batch drain -define(BATCH_SIZE, 100000). --define(T_TAKEOVER, 15000). - %% Server name -define(CM, ?MODULE). --define(T_KICK, 5_000). --define(T_GET_INFO, 5_000). --define(T_TAKEOVER, 15_000). +-define(T_KICK, 5000). +-define(T_GET_INFO, 5000). +-define(T_TAKEOVER, 15000). %% @doc Start the channel manager. -spec(start_link() -> startlink_ret()). @@ -298,28 +299,26 @@ kick_or_kill(Action, ConnMod, Pid) -> ok = apply(ConnMod, call, [Pid, Action, ?T_KICK]) catch _ : noproc -> % emqx_ws_connection: call - ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}); + ?LOG(debug, "session_already_gone: ~p, action: ~p", [Pid, Action]), + ok; _ : {noproc, _} -> % emqx_connection: gen_server:call - ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}); + ?LOG(debug, "session_already_gone: ~p, action: ~p", [Pid, Action]), + ok; _ : {shutdown, _} -> - ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}); + ?LOG(debug, "session_already_shutdown: ~p, action: ~p", [Pid, Action]), + ok; _ : {{shutdown, _}, _} -> - ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}); + ?LOG(debug, "session_already_shutdown: ~p, action: ~p", [Pid, Action]), + ok; _ : {timeout, {gen_server, call, _}} -> - ?tp(warning, "session_kick_timeout", - #{pid => Pid, - action => Action, - stale_channel => stale_channel_info(Pid) - }), + ?LOG(warning, "session_kick_timeout: ~p, action: ~p, " + "stale_channel: ~p", + [Pid, Action, stale_channel_info(Pid)]), ok = force_kill(Pid); - _ : Error : St -> - ?tp(error, "session_kick_exception", - #{pid => Pid, - action => Action, - reason => Error, - stacktrace => St, - stale_channel => stale_channel_info(Pid) - }), + _ : Error -> + ?LOG(error, "session_kick_exception: ~p, action: ~p, " + "reason: ~p, stacktrace: ~p, stale_channel: ~p", + [Pid, Action, Error, erlang:get_stacktrace(), stale_channel_info(Pid)]), ok = force_kill(Pid) end. diff --git a/test/emqx_cm_SUITE.erl b/test/emqx_cm_SUITE.erl index 62c745828..ec660429a 100644 --- a/test/emqx_cm_SUITE.erl +++ b/test/emqx_cm_SUITE.erl @@ -211,9 +211,9 @@ test_kick_session(Action, Reason) -> end, {Pid1, _} = spawn_monitor(FakeSessionFun), {Pid2, _} = spawn_monitor(FakeSessionFun), - ok = emqx_cm:register_channel(ClientId, Pid1, ConnInfo), - ok = emqx_cm:register_channel(ClientId, Pid1, ConnInfo), - ok = emqx_cm:register_channel(ClientId, Pid2, ConnInfo), + ok = emqx_cm:register_channel_(ClientId, Pid1, ConnInfo), + ok = emqx_cm:register_channel_(ClientId, Pid1, ConnInfo), + ok = emqx_cm:register_channel_(ClientId, Pid2, ConnInfo), ?assertEqual([Pid1, Pid2], lists:sort(emqx_cm:lookup_channels(ClientId))), case Reason of noproc -> exit(Pid1, kill), exit(Pid2, kill); @@ -225,15 +225,18 @@ test_kick_session(Action, Reason) -> end, case Reason =:= timeout orelse Reason =:= noproc of true -> - ?assertEqual(killed, ?WAIT({'DOWN', _, process, Pid1, R}, 2_000, R)), - ?assertEqual(killed, ?WAIT({'DOWN', _, process, Pid2, R}, 2_000, R)); + ?assertEqual(killed, ?WAIT({'DOWN', _, process, Pid1, R}, 2000, R)), + ?assertEqual(killed, ?WAIT({'DOWN', _, process, Pid2, R}, 2000, R)); false -> - ?assertEqual(Reason, ?WAIT({'DOWN', _, process, Pid1, R}, 2_000, R)), - ?assertEqual(Reason, ?WAIT({'DOWN', _, process, Pid2, R}, 2_000, R)) + ?assertEqual(Reason, ?WAIT({'DOWN', _, process, Pid1, R}, 2000, R)), + ?assertEqual(Reason, ?WAIT({'DOWN', _, process, Pid2, R}, 2000, R)) end, ok = flush_emqx_pool(), ?assertEqual([], emqx_cm:lookup_channels(ClientId)). +rand_client_id() -> + list_to_binary("client-id-" ++ integer_to_list(erlang:system_time())). + %% Channel deregistration is delegated to emqx_pool as a sync tasks. %% The emqx_pool is pool of workers, and there is no way to know %% which worker was picked for the last deregistration task.