From 7e2ca9e2874979b89e387a2581d9eded7eb4b6e0 Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Sat, 30 Oct 2021 14:07:15 +0200 Subject: [PATCH 1/5] fix(session): force kill session for 'kick' and 'discard' Prior to this fix, 'kick' and 'discard' calls may timeout (or fail for other reason), failures lead to only a log, then continue to allow the new session to get registered. As a result, in case a client is stuck, there is no way to force it to step down, end up with multiple connections (sessions) for the client ID in dashboard. After this fix, the stale pids are notified to shutdown via a gen_server:call, and forced with a exit(Pid, kill) for any exception happend to the gen_server:call --- apps/emqx/src/emqx_cm.erl | 156 ++++++++++++++++++++----------- apps/emqx/test/emqx_cm_SUITE.erl | 129 +++++++++++++++++-------- 2 files changed, 196 insertions(+), 89 deletions(-) diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index 1abd5c151..837492cbd 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -74,6 +74,7 @@ %% Internal export -export([ stats_fun/0 + , clean_down/1 , mark_channel_connected/1 , mark_channel_disconnected/1 , get_connected_client_count/0 @@ -100,7 +101,9 @@ %% Server name -define(CM, ?MODULE). --define(T_TAKEOVER, 15000). +-define(T_KICK, 5_000). +-define(T_GET_INFO, 5_000). +-define(T_TAKEOVER, 15_000). %% linting overrides -elvis([ {elvis_style, invalid_dynamic_call, #{ignore => [emqx_cm]}} @@ -176,7 +179,7 @@ get_chan_info(ClientId, ChanPid) when node(ChanPid) == node() -> error:badarg -> undefined end; get_chan_info(ClientId, ChanPid) -> - rpc_call(node(ChanPid), get_chan_info, [ClientId, ChanPid]). + rpc_call(node(ChanPid), get_chan_info, [ClientId, ChanPid], ?T_GET_INFO). %% @doc Update infos of the channel. -spec(set_chan_info(emqx_types:clientid(), emqx_types:attrs()) -> boolean()). @@ -201,7 +204,7 @@ get_chan_stats(ClientId, ChanPid) when node(ChanPid) == node() -> error:badarg -> undefined end; get_chan_stats(ClientId, ChanPid) -> - rpc_call(node(ChanPid), get_chan_stats, [ClientId, ChanPid]). + rpc_call(node(ChanPid), get_chan_stats, [ClientId, ChanPid], ?T_GET_INFO). %% @doc Set channel's stats. -spec(set_chan_stats(emqx_types:clientid(), emqx_types:stats()) -> boolean()). @@ -336,77 +339,120 @@ takeover_session(ClientId, ChanPid) when node(ChanPid) == node() -> undefined -> emqx_persistent_session:lookup(ClientId); ConnMod when is_atom(ConnMod) -> + %% TODO: if takeover times out, maybe kill the old? Session = ConnMod:call(ChanPid, {takeover, 'begin'}, ?T_TAKEOVER), {living, ConnMod, ChanPid, Session} end; - takeover_session(ClientId, ChanPid) -> - rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid]). + rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid], ?T_TAKEOVER). %% @doc Discard all the sessions identified by the ClientId. -spec(discard_session(emqx_types:clientid()) -> ok). discard_session(ClientId) when is_binary(ClientId) -> case lookup_channels(ClientId) of [] -> ok; - ChanPids -> lists:foreach(fun(Pid) -> do_discard_session(ClientId, Pid) end, ChanPids) + ChanPids -> lists:foreach(fun(Pid) -> discard_session(ClientId, Pid) end, ChanPids) end. -do_discard_session(ClientId, Pid) -> +%% @private Kick a local stale session to force it step down. +%% If failed to kick (e.g. timeout) force a kill. +%% Keeping the stale pid around, or returning error or raise an exception +%% benefits nobody. +-spec kick_or_kill(kick | discard, module(), pid()) -> ok. +kick_or_kill(Action, ConnMod, Pid) -> try - discard_session(ClientId, Pid) + %% this is essentailly a gen_server:call implemented in emqx_connection + %% and emqx_ws_connection. + %% the handle_call is implemented in emqx_channel + ok = apply(ConnMod, call, [Pid, Action, ?T_KICK]) catch _ : noproc -> % emqx_ws_connection: call - ?tp(debug, "session_already_gone", #{pid => Pid}), - ok; + ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}); _ : {noproc, _} -> % emqx_connection: gen_server:call - ?tp(debug, "session_already_gone", #{pid => Pid}), - ok; - _ : {'EXIT', {noproc, _}} -> % rpc_call/3 - ?tp(debug, "session_already_gone", #{pid => Pid}), - ok; + ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}); + _ : {shutdown, _} -> + ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}); _ : {{shutdown, _}, _} -> - ?tp(debug, "session_already_shutdown", #{pid => Pid}), - ok; + ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}); + _ : {timeout, {gen_server, call, _}} -> + ?tp(warning, "session_kick_timeout", + #{pid => Pid, + action => Action, + stale_channel => stale_channel_info(Pid) + }), + ok = force_kill(Pid); _ : Error : St -> - ?tp(error, "failed_to_discard_session", - #{pid => Pid, reason => Error, stacktrace=>St}) + ?tp(error, "session_kick_exception", + #{pid => Pid, + action => Action, + reason => Error, + stacktrace => St, + stale_channel => stale_channel_info(Pid) + }), + ok = force_kill(Pid) end. -discard_session(ClientId, ChanPid) when node(ChanPid) == node() -> - case get_chann_conn_mod(ClientId, ChanPid) of - undefined -> ok; - ConnMod when is_atom(ConnMod) -> - ConnMod:call(ChanPid, discard, ?T_TAKEOVER) - end; +force_kill(Pid) -> + exit(Pid, kill), + ok. + +stale_channel_info(Pid) -> + process_info(Pid, [status, message_queue_len, current_stacktrace]). discard_session(ClientId, ChanPid) -> - rpc_call(node(ChanPid), discard_session, [ClientId, ChanPid]). + kick_session(discard, ClientId, ChanPid). + +kick_session(ClientId, ChanPid) -> + kick_session(kick, ClientId, ChanPid). + +%% @private This function is shared for session 'kick' and 'discard' (as the first arg Action). +kick_session(Action, ClientId, ChanPid) when node(ChanPid) == node() -> + case get_chann_conn_mod(ClientId, ChanPid) of + undefined -> + %% already deregistered + ok; + ConnMod when is_atom(ConnMod) -> + ok = kick_or_kill(Action, ConnMod, ChanPid) + end; +kick_session(Action, ClientId, ChanPid) -> + %% call remote node on the old APIs because we do not know if they have upgraded + %% to have kick_session/3 + Function = case Action of + discard -> discard_session; + kick -> kick_session + end, + try + rpc_call(node(ChanPid), Function, [ClientId, ChanPid], ?T_KICK) + catch + Error : Reason -> + %% This should mostly be RPC failures. + %% However, if the node is still running the old version + %% code (prior to emqx app 4.3.10) some of the RPC handler + %% exceptions may get propagated to a new version node + ?SLOG(error, #{ msg => "failed_to_kick_session_on_remote_node" + , node => node(ChanPid) + , action => Action + , error => Error + , reason => Reason + }) + end. kick_session(ClientId) -> case lookup_channels(ClientId) of - [] -> {error, not_found}; - [ChanPid] -> - kick_session(ClientId, ChanPid); + [] -> + ?SLOG(warning, #{msg => "kiecked_an_unknown_session", + clientid => ClientId}), + ok; ChanPids -> - [ChanPid | StalePids] = lists:reverse(ChanPids), - ?SLOG(warning, #{msg => "more_than_one_channel_found", chan_pids => ChanPids}), - lists:foreach(fun(StalePid) -> - catch discard_session(ClientId, StalePid) - end, StalePids), - kick_session(ClientId, ChanPid) + case length(ChanPids) > 1 of + true -> + ?SLOG(warning, #{msg => "more_than_one_channel_found", + chan_pids => ChanPids}); + false -> ok + end, + lists:foreach(fun(Pid) -> kick_session(ClientId, Pid) end, ChanPids) end. -kick_session(ClientId, ChanPid) when node(ChanPid) == node() -> - case get_chan_info(ClientId, ChanPid) of - #{conninfo := #{conn_mod := ConnMod}} -> - ConnMod:call(ChanPid, kick, ?T_TAKEOVER); - undefined -> - {error, not_found} - end; - -kick_session(ClientId, ChanPid) -> - rpc_call(node(ChanPid), kick_session, [ClientId, ChanPid]). - %% @doc Is clean start? % is_clean_start(#{clean_start := false}) -> false; % is_clean_start(_Attrs) -> true. @@ -448,10 +494,16 @@ lookup_channels(local, ClientId) -> [ChanPid || {_, ChanPid} <- ets:lookup(?CHAN_TAB, ClientId)]. %% @private -rpc_call(Node, Fun, Args) -> - case rpc:call(Node, ?MODULE, Fun, Args) of - {badrpc, Reason} -> error(Reason); - Res -> Res +rpc_call(Node, Fun, Args, Timeout) -> + case rpc:call(Node, ?MODULE, Fun, Args, 2 * Timeout) of + {badrpc, Reason} -> + %% since eqmx app 4.3.10, the 'kick' and 'discard' calls hanndler + %% should catch all exceptions and always return 'ok'. + %% This leaves 'badrpc' only possible when there is problem + %% calling the remote node. + error({badrpc, Reason}); + Res -> + Res end. %% @private @@ -491,7 +543,7 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon} mark_channel_disconnected(ChanPid) end, Items), - ok = emqx_pool:async_submit(fun lists:foreach/2, [fun clean_down/1, Items]), + ok = emqx_pool:async_submit(fun lists:foreach/2, [fun ?MODULE:clean_down/1, Items]), {noreply, State#{chan_pmon := PMon1}}; handle_info(Info, State) -> ?SLOG(error, #{msg => "unexpected_info", info => Info}), @@ -527,7 +579,7 @@ get_chann_conn_mod(ClientId, ChanPid) when node(ChanPid) == node() -> error:badarg -> undefined end; get_chann_conn_mod(ClientId, ChanPid) -> - rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid]). + rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid], ?T_GET_INFO). mark_channel_connected(ChanPid) -> ?tp(emqx_cm_connected_client_count_inc, #{}), diff --git a/apps/emqx/test/emqx_cm_SUITE.erl b/apps/emqx/test/emqx_cm_SUITE.erl index 05f0418c5..8ac246424 100644 --- a/apps/emqx/test/emqx_cm_SUITE.erl +++ b/apps/emqx/test/emqx_cm_SUITE.erl @@ -32,6 +32,12 @@ conn_mod => emqx_connection, receive_maximum => 100}}). +-define(WAIT(PATTERN, TIMEOUT, RET), + fun() -> + receive PATTERN -> RET + after TIMEOUT -> error({timeout, ?LINE}) end + end()). + %%-------------------------------------------------------------------- %% CT callbacks %%-------------------------------------------------------------------- @@ -183,25 +189,95 @@ t_open_session_race_condition(_) -> ok = emqx_pool:flush_async_tasks(), ?assertEqual([], emqx_cm:lookup_channels(ClientId)). -t_discard_session(_) -> +t_kick_session_discard_normal(_) -> + test_kick_session(discard, normal). + +t_kick_session_discard_shutdown(_) -> + test_kick_session(discard, shutdown). + +t_kick_session_discard_shutdown_with_reason(_) -> + test_kick_session(discard, {shutdown, discard}). + +t_kick_session_discard_timeout(_) -> + test_kick_session(discard, timeout). + +t_kick_session_discard_noproc(_) -> + test_kick_session(discard, noproc). + +t_kick_session_kick_normal(_) -> + test_kick_session(discard, normal). + +t_kick_session_kick_shutdown(_) -> + test_kick_session(discard, shutdown). + +t_kick_session_kick_shutdown_with_reason(_) -> + test_kick_session(discard, {shutdown, discard}). + +t_kick_session_kick_timeout(_) -> + test_kick_session(discard, timeout). + +t_kick_session_kick_noproc(_) -> + test_kick_session(discard, noproc). + +test_kick_session(Action, Reason) -> ClientId = rand_client_id(), #{conninfo := ConnInfo} = ?ChanInfo, - ok = emqx_cm:register_channel(ClientId, self(), ConnInfo), + FakeSessionFun = + fun Loop() -> + receive + {'$gen_call', From, A} when A =:= kick orelse + A =:= discard -> + case Reason of + normal -> + gen_server:reply(From, ok); + timeout -> + %% no response to the call + Loop(); + _ -> + exit(Reason) + end; + Msg -> + ct:pal("(~p) fake_session_discarded ~p", [Action, Msg]), + Loop() + end + end, + {Pid1, _} = spawn_monitor(FakeSessionFun), + {Pid2, _} = spawn_monitor(FakeSessionFun), + ok = emqx_cm:register_channel(ClientId, Pid1, ConnInfo), + ok = emqx_cm:register_channel(ClientId, Pid1, ConnInfo), + ok = emqx_cm:register_channel(ClientId, Pid2, ConnInfo), + ?assertEqual([Pid1, Pid2], lists:sort(emqx_cm:lookup_channels(ClientId))), + case Reason of + noproc -> exit(Pid1, kill), exit(Pid2, kill); + _ -> ok + end, + ok = case Action of + kick -> emqx_cm:kick_session(ClientId); + discard -> emqx_cm:discard_session(ClientId) + end, + case Reason =:= timeout orelse Reason =:= noproc of + true -> + ?assertEqual(killed, ?WAIT({'DOWN', _, process, Pid1, R}, 2_000, R)), + ?assertEqual(killed, ?WAIT({'DOWN', _, process, Pid2, R}, 2_000, R)); + false -> + ?assertEqual(Reason, ?WAIT({'DOWN', _, process, Pid1, R}, 2_000, R)), + ?assertEqual(Reason, ?WAIT({'DOWN', _, process, Pid2, R}, 2_000, R)) + end, + ok = flush_emqx_pool(), + ?assertEqual([], emqx_cm:lookup_channels(ClientId)). - ok = meck:new(emqx_connection, [passthrough, no_history]), - ok = meck:expect(emqx_connection, call, fun(_, _) -> ok end), - ok = meck:expect(emqx_connection, call, fun(_, _, _) -> ok end), - ok = emqx_cm:discard_session(ClientId), - ok = emqx_cm:register_channel(ClientId, self(), ConnInfo), - ok = emqx_cm:discard_session(ClientId), - ok = emqx_cm:unregister_channel(ClientId), - ok = emqx_cm:register_channel(ClientId, self(), ConnInfo), - ok = emqx_cm:discard_session(ClientId), - ok = meck:expect(emqx_connection, call, fun(_, _) -> error(testing) end), - ok = meck:expect(emqx_connection, call, fun(_, _, _) -> error(testing) end), - ok = emqx_cm:discard_session(ClientId), - ok = emqx_cm:unregister_channel(ClientId), - ok = meck:unload(emqx_connection). +%% Channel deregistration is delegated to emqx_pool as a sync tasks. +%% The emqx_pool is pool of workers, and there is no way to know +%% which worker was picked for the last deregistration task. +%% This help function creates a large enough number of async tasks +%% to sync with the pool workers. +%% The number of tasks should be large enough to ensure all workers have +%% the chance to work on at least one of the tasks. +flush_emqx_pool() -> + Self = self(), + L = lists:seq(1, 1000), + lists:foreach(fun(I) -> emqx_pool:async_submit(fun() -> Self ! {done, I} end, []) end, L), + lists:foreach(fun(I) -> receive {done, I} -> ok end end, L). t_discard_session_race(_) -> ClientId = rand_client_id(), @@ -234,27 +310,6 @@ t_takeover_session(_) -> {living, emqx_connection, _, test} = emqx_cm:takeover_session(<<"clientid">>), emqx_cm:unregister_channel(<<"clientid">>). -t_kick_session(_) -> - Info = #{conninfo := ConnInfo} = ?ChanInfo, - ok = meck:new(emqx_connection, [passthrough, no_history]), - ok = meck:expect(emqx_connection, call, fun(_, _) -> test end), - ok = meck:expect(emqx_connection, call, fun(_, _, _) -> test end), - {error, not_found} = emqx_cm:kick_session(<<"clientid">>), - ok = emqx_cm:register_channel(<<"clientid">>, self(), ConnInfo), - ok = emqx_cm:insert_channel_info(<<"clientid">>, Info, []), - test = emqx_cm:kick_session(<<"clientid">>), - erlang:spawn_link( - fun() -> - ok = emqx_cm:register_channel(<<"clientid">>, self(), ConnInfo), - ok = emqx_cm:insert_channel_info(<<"clientid">>, Info, []), - - timer:sleep(1000) - end), - ct:sleep(100), - test = emqx_cm:kick_session(<<"clientid">>), - ok = emqx_cm:unregister_channel(<<"clientid">>), - ok = meck:unload(emqx_connection). - t_all_channels(_) -> ?assertEqual(true, is_list(emqx_cm:all_channels())). From dd771c5568a377006899e7ae75bcba525e8ea55e Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Sat, 30 Oct 2021 15:22:01 +0200 Subject: [PATCH 2/5] fix(emqx_mgmt_cli): idempontent kick. now it always returns ok --- apps/emqx_management/src/emqx_mgmt_cli.erl | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index d664828bf..cdbff5329 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -142,10 +142,8 @@ clients(["show", ClientId]) -> if_client(ClientId, fun print/1); clients(["kick", ClientId]) -> - case emqx_cm:kick_session(bin(ClientId)) of - ok -> emqx_ctl:print("ok~n"); - _ -> emqx_ctl:print("Not Found.~n") - end; + ok = emqx_cm:kick_session(bin(ClientId)), + emqx_ctl:print("ok~n"); clients(_) -> emqx_ctl:usage([{"clients list", "List all clients"}, From 9e333ac8b3d21e5d8543824167de1591820e1bb5 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 10 Nov 2021 14:12:47 +0800 Subject: [PATCH 3/5] fix(emqx_cm): fix typos --- apps/emqx/src/emqx_cm.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index 837492cbd..379614826 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -440,7 +440,7 @@ kick_session(Action, ClientId, ChanPid) -> kick_session(ClientId) -> case lookup_channels(ClientId) of [] -> - ?SLOG(warning, #{msg => "kiecked_an_unknown_session", + ?SLOG(warning, #{msg => "kicked_an_unknown_session", clientid => ClientId}), ok; ChanPids -> From a113b9b65d2317d75c451e17704b36094f89db77 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 10 Nov 2021 18:28:53 +0800 Subject: [PATCH 4/5] chore: fix elvis warnings --- apps/emqx_management/src/emqx_mgmt_cli.erl | 88 +++++++++++++--------- 1 file changed, 53 insertions(+), 35 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index cdbff5329..94aff7b69 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -79,10 +79,12 @@ broker([]) -> emqx_ctl:print("~-10s: ~p~n", [uptime, emqx_sys:uptime()]); broker(["stats"]) -> - [emqx_ctl:print("~-30s: ~w~n", [Stat, Val]) || {Stat, Val} <- lists:sort(emqx_stats:getstats())]; + [emqx_ctl:print("~-30s: ~w~n", [Stat, Val]) + || {Stat, Val} <- lists:sort(emqx_stats:getstats())]; broker(["metrics"]) -> - [emqx_ctl:print("~-30s: ~w~n", [Metric, Val]) || {Metric, Val} <- lists:sort(emqx_metrics:all())]; + [emqx_ctl:print("~-30s: ~w~n", [Metric, Val]) + || {Metric, Val} <- lists:sort(emqx_metrics:all())]; broker(_) -> emqx_ctl:usage([{"broker", "Show broker version, uptime and description"}, @@ -207,10 +209,11 @@ subscriptions(["del", ClientId, Topic]) -> end; subscriptions(_) -> - emqx_ctl:usage([{"subscriptions list", "List all subscriptions"}, - {"subscriptions show ", "Show subscriptions of a client"}, - {"subscriptions add ", "Add a static subscription manually"}, - {"subscriptions del ", "Delete a static subscription manually"}]). + emqx_ctl:usage( + [{"subscriptions list", "List all subscriptions"}, + {"subscriptions show ", "Show subscriptions of a client"}, + {"subscriptions add ", "Add a static subscription manually"}, + {"subscriptions del ", "Delete a static subscription manually"}]). if_valid_qos(QoS, Fun) -> try list_to_integer(QoS) of @@ -279,14 +282,17 @@ vm(["memory"]) -> [emqx_ctl:print("memory/~-17s: ~w~n", [Cat, Val]) || {Cat, Val} <- erlang:memory()]; vm(["process"]) -> - [emqx_ctl:print("process/~-16s: ~w~n", [Name, erlang:system_info(Key)]) || {Name, Key} <- [{limit, process_limit}, {count, process_count}]]; + [emqx_ctl:print("process/~-16s: ~w~n", [Name, erlang:system_info(Key)]) + || {Name, Key} <- [{limit, process_limit}, {count, process_count}]]; vm(["io"]) -> IoInfo = lists:usort(lists:flatten(erlang:system_info(check_io))), - [emqx_ctl:print("io/~-21s: ~w~n", [Key, proplists:get_value(Key, IoInfo)]) || Key <- [max_fds, active_fds]]; + [emqx_ctl:print("io/~-21s: ~w~n", [Key, proplists:get_value(Key, IoInfo)]) + || Key <- [max_fds, active_fds]]; vm(["ports"]) -> - [emqx_ctl:print("ports/~-18s: ~w~n", [Name, erlang:system_info(Key)]) || {Name, Key} <- [{count, port_count}, {limit, port_limit}]]; + [emqx_ctl:print("ports/~-18s: ~w~n", [Name, erlang:system_info(Key)]) + || {Name, Key} <- [{count, port_count}, {limit, port_limit}]]; vm(_) -> emqx_ctl:usage([{"vm all", "Show info of Erlang VM"}, @@ -323,8 +329,14 @@ log(["primary-level", Level]) -> emqx_ctl:print("~ts~n", [emqx_logger:get_primary_log_level()]); log(["handlers", "list"]) -> - _ = [emqx_ctl:print("LogHandler(id=~ts, level=~ts, destination=~ts, status=~ts)~n", [Id, Level, Dst, Status]) - || #{id := Id, level := Level, dst := Dst, status := Status} <- emqx_logger:get_log_handlers()], + _ = [emqx_ctl:print( + "LogHandler(id=~ts, level=~ts, destination=~ts, status=~ts)~n", + [Id, Level, Dst, Status] + ) + || #{id := Id, + level := Level, + dst := Dst, + status := Status} <- emqx_logger:get_log_handlers()], ok; log(["handlers", "start", HandlerId]) -> @@ -351,21 +363,25 @@ log(["handlers", "set-level", HandlerId, Level]) -> end; log(_) -> - emqx_ctl:usage([{"log set-level ", "Set the overall log level"}, - {"log primary-level", "Show the primary log level now"}, - {"log primary-level ","Set the primary log level"}, - {"log handlers list", "Show log handlers"}, - {"log handlers start ", "Start a log handler"}, - {"log handlers stop ", "Stop a log handler"}, - {"log handlers set-level ", "Set log level of a log handler"}]). + emqx_ctl:usage( + [{"log set-level ", "Set the overall log level"}, + {"log primary-level", "Show the primary log level now"}, + {"log primary-level ","Set the primary log level"}, + {"log handlers list", "Show log handlers"}, + {"log handlers start ", "Start a log handler"}, + {"log handlers stop ", "Stop a log handler"}, + {"log handlers set-level ", "Set log level of a log handler"}]). %%-------------------------------------------------------------------- %% @doc Trace Command trace(["list"]) -> lists:foreach(fun({{Who, Name}, {Level, LogFile}}) -> - emqx_ctl:print("Trace(~ts=~ts, level=~ts, destination=~p)~n", [Who, Name, Level, LogFile]) - end, emqx_tracer:lookup_traces()); + emqx_ctl:print( + "Trace(~ts=~ts, level=~ts, destination=~p)~n", + [Who, Name, Level, LogFile] + ) + end, emqx_tracer:lookup_traces()); trace(["stop", "client", ClientId]) -> trace_off(clientid, ClientId); @@ -489,10 +505,11 @@ authz(["cache-clean", ClientId]) -> emqx_mgmt:clean_authz_cache(ClientId); authz(_) -> - emqx_ctl:usage([{"authz cache-clean all", "Clears authorization cache on all nodes"}, - {"authz cache-clean node ", "Clears authorization cache on given node"}, - {"authz cache-clean ", "Clears authorization cache for given client"} - ]). + emqx_ctl:usage( + [{"authz cache-clean all", "Clears authorization cache on all nodes"}, + {"authz cache-clean node ", "Clears authorization cache on given node"}, + {"authz cache-clean ", "Clears authorization cache for given client"} + ]). %%-------------------------------------------------------------------- @@ -560,23 +577,24 @@ print({client, {ClientId, ChanPid}}) -> maps:with([peername, clean_start, keepalive, expiry_interval, connected_at, disconnected_at], ConnInfo), maps:with([created_at], Session)]), - InfoKeys = [clientid, username, peername, - clean_start, keepalive, expiry_interval, - subscriptions_cnt, inflight_cnt, awaiting_rel_cnt, send_msg, mqueue_len, mqueue_dropped, + InfoKeys = [clientid, username, peername, clean_start, keepalive, + expiry_interval, subscriptions_cnt, inflight_cnt, + awaiting_rel_cnt, send_msg, mqueue_len, mqueue_dropped, connected, created_at, connected_at] ++ case maps:is_key(disconnected_at, Info) of true -> [disconnected_at]; false -> [] end, Info1 = Info#{expiry_interval => maps:get(expiry_interval, Info) div 1000}, - emqx_ctl:print("Client(~ts, username=~ts, peername=~ts, " - "clean_start=~ts, keepalive=~w, session_expiry_interval=~w, " - "subscriptions=~w, inflight=~w, awaiting_rel=~w, delivered_msgs=~w, enqueued_msgs=~w, dropped_msgs=~w, " - "connected=~ts, created_at=~w, connected_at=~w" ++ - case maps:is_key(disconnected_at, Info1) of - true -> ", disconnected_at=~w)~n"; - false -> ")~n" - end, + emqx_ctl:print( + "Client(~ts, username=~ts, peername=~ts, clean_start=~ts, " + "keepalive=~w, session_expiry_interval=~w, subscriptions=~w, " + "inflight=~w, awaiting_rel=~w, delivered_msgs=~w, enqueued_msgs=~w, " + "dropped_msgs=~w, connected=~ts, created_at=~w, connected_at=~w" + ++ case maps:is_key(disconnected_at, Info1) of + true -> ", disconnected_at=~w)~n"; + false -> ")~n" + end, [format(K, maps:get(K, Info1)) || K <- InfoKeys]); print({emqx_route, #route{topic = Topic, dest = {_, Node}}}) -> From ff23e9dde13bc264470d72a5e4e2cca24437a3a7 Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Mon, 15 Nov 2021 22:23:50 +0100 Subject: [PATCH 5/5] test: sync with emqx_cm process before flushing emqx_pool --- apps/emqx/test/emqx_cm_SUITE.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/apps/emqx/test/emqx_cm_SUITE.erl b/apps/emqx/test/emqx_cm_SUITE.erl index 8ac246424..bce7f0202 100644 --- a/apps/emqx/test/emqx_cm_SUITE.erl +++ b/apps/emqx/test/emqx_cm_SUITE.erl @@ -185,7 +185,7 @@ t_open_session_race_condition(_) -> exit(Winner, kill), receive {'DOWN', _, process, Winner, _} -> ok end, - ignored = gen_server:call(emqx_cm, ignore, infinity), %% sync + ignored = gen_server:call(?CM, ignore, infinity), %% sync ok = emqx_pool:flush_async_tasks(), ?assertEqual([], emqx_cm:lookup_channels(ClientId)). @@ -263,6 +263,7 @@ test_kick_session(Action, Reason) -> ?assertEqual(Reason, ?WAIT({'DOWN', _, process, Pid1, R}, 2_000, R)), ?assertEqual(Reason, ?WAIT({'DOWN', _, process, Pid2, R}, 2_000, R)) end, + ignored = gen_server:call(?CM, ignore, infinity), % sync ok = flush_emqx_pool(), ?assertEqual([], emqx_cm:lookup_channels(ClientId)).