From 28e23523a53077abd2c13c3abd28ce9e8ecbb873 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 3 Nov 2021 17:45:48 -0300 Subject: [PATCH] feat(stats): track connected client count for monitoring In order to correctly display the number of connected clients in our monitor dashboard, we need to track those connections that are actually connected to clients, not considering connections from persistent sessions that are disconnected. Today, the `connections.count` that is displayed in the dashboards considers those disconnected persistent sessions as well. --- src/emqx_channel.erl | 3 +- src/emqx_cm.erl | 43 ++++++++++++++++++++++++-- src/emqx_connection.erl | 6 ++-- test/emqx_broker_SUITE.erl | 63 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 108 insertions(+), 7 deletions(-) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 7bfef472d..fa64f3528 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -1536,6 +1536,7 @@ ensure_connected(Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) -> NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)}, ok = run_hooks('client.connected', [ClientInfo, NConnInfo]), + emqx_cm:increment_connected_client_count(), Channel#channel{conninfo = NConnInfo, conn_state = connected }. @@ -1624,6 +1625,7 @@ ensure_disconnected(Reason, Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) -> NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)}, ok = run_hooks('client.disconnected', [ClientInfo, Reason, NConnInfo]), + emqx_cm:decrement_connected_client_count(), Channel#channel{conninfo = NConnInfo, conn_state = disconnected}. %%-------------------------------------------------------------------- @@ -1725,4 +1727,3 @@ flag(false) -> 0. set_field(Name, Value, Channel) -> Pos = emqx_misc:index_of(Name, record_info(fields, channel)), setelement(Pos+1, Channel, Value). - diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 23f078568..2ecacabc2 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -72,7 +72,12 @@ ]). %% Internal export --export([stats_fun/0, clean_down/1]). +-export([ stats_fun/0 + , clean_down/1 + , increment_connected_client_count/0 + , decrement_connected_client_count/0 + , get_connected_client_count/0 + ]). -type(chan_pid() :: pid()). @@ -141,6 +146,12 @@ unregister_channel(ClientId) when is_binary(ClientId) -> do_unregister_channel(Chan) -> ok = emqx_cm_registry:unregister_channel(Chan), true = ets:delete(?CHAN_CONN_TAB, Chan), + case ets:lookup(?CHAN_INFO_TAB, Chan) of + [{_, #{conn_state := connected}, _}] -> + decrement_connected_client_count(); + _ -> + skip + end, true = ets:delete(?CHAN_INFO_TAB, Chan), ets:delete_object(?CHAN_TAB, Chan). @@ -427,6 +438,7 @@ rpc_call(Node, Fun, Args, Timeout) -> %% @private cast(Msg) -> gen_server:cast(?CM, Msg). +call(Msg) -> gen_server:call(?CM, Msg). %%-------------------------------------------------------------------- %% gen_server callbacks @@ -438,8 +450,14 @@ init([]) -> ok = emqx_tables:new(?CHAN_CONN_TAB, [bag | TabOpts]), ok = emqx_tables:new(?CHAN_INFO_TAB, [set, compressed | TabOpts]), ok = emqx_stats:update_interval(chan_stats, fun ?MODULE:stats_fun/0), - {ok, #{chan_pmon => emqx_pmon:new()}}. + State = #{ chan_pmon => emqx_pmon:new() + , connected_client_count => 0 + }, + {ok, State}. +handle_call({connected_client_count, get}, _From, + State = #{connected_client_count := CCCount}) -> + {reply, CCCount, State}; handle_call(Req, _From, State) -> ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. @@ -447,7 +465,18 @@ handle_call(Req, _From, State) -> handle_cast({registered, {ClientId, ChanPid}}, State = #{chan_pmon := PMon}) -> PMon1 = emqx_pmon:monitor(ChanPid, ClientId, PMon), {noreply, State#{chan_pmon := PMon1}}; - +handle_cast({connected_client_count, inc}, + State0 = #{connected_client_count := CCCount0}) -> + ?tp(emqx_cm_connected_client_count_inc, #{count_before => CCCount0}), + CCCount = CCCount0 + 1, + State = State0#{connected_client_count := CCCount}, + {noreply, State}; +handle_cast({connected_client_count, dec}, + State0 = #{connected_client_count := CCCount0}) -> + ?tp(emqx_cm_connected_client_count_dec, #{count_before => CCCount0}), + CCCount = max(0, CCCount0 - 1), + State = State0#{connected_client_count := CCCount}, + {noreply, State}; handle_cast(Msg, State) -> ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. @@ -493,3 +522,11 @@ get_chann_conn_mod(ClientId, ChanPid) when node(ChanPid) == node() -> get_chann_conn_mod(ClientId, ChanPid) -> rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid], ?T_GET_INFO). +increment_connected_client_count() -> + cast({connected_client_count, inc}). + +decrement_connected_client_count() -> + cast({connected_client_count, dec}). + +get_connected_client_count() -> + call({connected_client_count, get}). diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index ab91c02b4..84086015d 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -518,7 +518,7 @@ terminate(Reason, State = #state{channel = Channel, transport = Transport, ?tp(warning, unclean_terminate, #{exception => E, context => C, stacktrace => S}) end, ?tp(info, terminate, #{reason => Reason}), - maybe_raise_excption(Reason). + maybe_raise_exception(Reason). %% close socket, discard new state, always return ok. close_socket_ok(State) -> @@ -526,12 +526,12 @@ close_socket_ok(State) -> ok. %% tell truth about the original exception -maybe_raise_excption(#{exception := Exception, +maybe_raise_exception(#{exception := Exception, context := Context, stacktrace := Stacktrace }) -> erlang:raise(Exception, Context, Stacktrace); -maybe_raise_excption(Reason) -> +maybe_raise_exception(Reason) -> exit(Reason). %%-------------------------------------------------------------------- diff --git a/test/emqx_broker_SUITE.erl b/test/emqx_broker_SUITE.erl index 4cef68660..ff1a30fe2 100644 --- a/test/emqx_broker_SUITE.erl +++ b/test/emqx_broker_SUITE.erl @@ -23,6 +23,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). @@ -277,6 +278,68 @@ t_stats_fun({'end', _Config}) -> ok = emqx_broker:unsubscribe(<<"topic">>), ok = emqx_broker:unsubscribe(<<"topic2">>). +%% persistent sessions, when gone, do not contribute to connect client +%% count +t_connected_client_count_persistent({init, Config}) -> + ok = snabbkaffe:start_trace(), + process_flag(trap_exit, true), + Config; +t_connected_client_count_persistent(Config) when is_list(Config) -> + ?assertEqual(0, emqx_cm:get_connected_client_count()), + {ok, ConnPid0} = emqtt:start_link([{clean_start, false}, {clientid, <<"clientid">>}]), + {{ok, _}, _} = wait_for_events( + fun() -> emqtt:connect(ConnPid0) end, + [emqx_cm_connected_client_count_inc] + ), + ?assertEqual(1, emqx_cm:get_connected_client_count()), + {ok, _} = wait_for_events( + fun() -> emqtt:disconnect(ConnPid0) end, + [emqx_cm_connected_client_count_dec] + ), + ?assertEqual(0, emqx_cm:get_connected_client_count()), + %% reconnecting + {ok, ConnPid1} = emqtt:start_link([{clean_start, false}, {clientid, <<"clientid">>}]), + {{ok, _}, _} = wait_for_events( + fun() -> emqtt:connect(ConnPid1) end, + [emqx_cm_connected_client_count_inc] + ), + ?assertEqual(1, emqx_cm:get_connected_client_count()), + %% taking over + {ok, ConnPid2} = emqtt:start_link([{clean_start, false}, {clientid, <<"clientid">>}]), + {{ok, _}, _} = wait_for_events( + fun() -> emqtt:connect(ConnPid2) end, + [ emqx_cm_connected_client_count_dec + , emqx_cm_connected_client_count_inc + ] + ), + ?assertEqual(1, emqx_cm:get_connected_client_count()), + %% abnormal exit of channel process + [ChanPid] = emqx_cm:all_channels(), + {true, _} = wait_for_events( + fun() -> exit(ChanPid, kill) end, + [emqx_cm_connected_client_count_dec] + ), + ?assertEqual(0, emqx_cm:get_connected_client_count()), + ok; +t_connected_client_count_persistent({'end', _Config}) -> + snabbkaffe:stop(), + ok. + +wait_for_events(Action, Kinds) -> + Predicate = fun(#{?snk_kind := K}) -> + lists:member(K, Kinds) + end, + N = length(Kinds), + Timeout = 100, + {ok, Sub} = snabbkaffe_collector:subscribe(Predicate, N, Timeout, 0), + Res = Action(), + case snabbkaffe_collector:receive_events(Sub) of + {timeout, []} -> + {Res, timeout}; + {ok, Events} -> + {Res, {ok, Events}} + end. + recv_msgs(Count) -> recv_msgs(Count, []).