From 8f853982a65845ffd375d941ccdf1a3e5b886c05 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 4 Nov 2021 09:26:51 -0300 Subject: [PATCH] feat(stats): add `live_connections.{count,max}` stats to `emqx_stats` --- src/emqx_cm.erl | 11 +++++++- src/emqx_stats.erl | 13 ++++++--- test/emqx_broker_SUITE.erl | 55 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 75 insertions(+), 4 deletions(-) diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 236ded0fe..0f1c37fdb 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -92,6 +92,8 @@ {?CHAN_CONN_TAB, 'connections.count', 'connections.max'} ]). +-define(CONNECTED_CLIENT_STATS, {'live_connections.count', 'live_connections.max'}). + %% Batch drain -define(BATCH_SIZE, 100000). @@ -516,7 +518,8 @@ clean_down({ChanPid, ClientId}) -> do_unregister_channel({ClientId, ChanPid}). stats_fun() -> - lists:foreach(fun update_stats/1, ?CHAN_STATS). + lists:foreach(fun update_stats/1, ?CHAN_STATS), + update_connected_client_stats(). update_stats({Tab, Stat, MaxStat}) -> case ets:info(Tab, size) of @@ -524,6 +527,12 @@ update_stats({Tab, Stat, MaxStat}) -> Size -> emqx_stats:setstat(Stat, MaxStat, Size) end. +update_connected_client_stats() -> + {CountStat, MaxStat} = ?CONNECTED_CLIENT_STATS, + CCCount = get_connected_client_count(), + emqx_stats:setstat(CountStat, MaxStat, CCCount), + ok. + get_chann_conn_mod(ClientId, ChanPid) when node(ChanPid) == node() -> Chan = {ClientId, ChanPid}, try [ConnMod] = ets:lookup_element(?CHAN_CONN_TAB, Chan, 2), ConnMod diff --git a/src/emqx_stats.erl b/src/emqx_stats.erl index f53549e65..ba61143ac 100644 --- a/src/emqx_stats.erl +++ b/src/emqx_stats.erl @@ -21,6 +21,7 @@ -include("emqx.hrl"). -include("logger.hrl"). -include("types.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -logger_header("[Stats]"). @@ -67,8 +68,10 @@ %% Connection stats -define(CONNECTION_STATS, - ['connections.count', %% Count of Concurrent Connections - 'connections.max' %% Maximum Number of Concurrent Connections + [ 'connections.count' %% Count of Concurrent Connections + , 'connections.max' %% Maximum Number of Concurrent Connections + , 'live_connections.count' %% Count of connected clients + , 'live_connections.max' %% Maximum number of connected clients ]). %% Channel stats @@ -216,6 +219,11 @@ handle_cast({setstat, Stat, MaxStat, Val}, State) -> ets:insert(?TAB, {MaxStat, Val}) end, safe_update_element(Stat, Val), + ?tp(emqx_stats_setstat, + #{ count_stat => Stat + , max_stat => MaxStat + , value => Val + }), {noreply, State}; handle_cast({update_interval, Update = #update{name = Name}}, @@ -274,4 +282,3 @@ safe_update_element(Key, Val) -> error:badarg -> ?LOG(warning, "Failed to update ~0p to ~0p", [Key, Val]) end. - diff --git a/test/emqx_broker_SUITE.erl b/test/emqx_broker_SUITE.erl index 6a9a7a478..693f9b51e 100644 --- a/test/emqx_broker_SUITE.erl +++ b/test/emqx_broker_SUITE.erl @@ -364,6 +364,40 @@ t_connected_client_count_refresh({'end', Config}) -> {ok, _} = supervisor:restart_child(emqx_cm_sup, manager), ok. +t_connected_client_stats({init, Config}) -> + ok = snabbkaffe:start_trace(), + Config; +t_connected_client_stats(Config) when is_list(Config) -> + ?assertEqual(0, emqx_cm:get_connected_client_count()), + ?assertEqual(0, emqx_stats:getstat('live_connections.count')), + ?assertEqual(0, emqx_stats:getstat('live_connections.max')), + {ok, ConnPid} = emqtt:start_link([{clean_start, false}, {clientid, <<"clientid">>}]), + {{ok, _}, _} = wait_for_events( + fun() -> emqtt:connect(ConnPid) end, + [emqx_cm_connected_client_count_inc] + ), + %% ensure stats are synchronized + wait_for_stats( + fun emqx_cm:stats_fun/0, + [#{count_stat => 'live_connections.count', + max_stat => 'live_connections.max'}] + ), + ?assertEqual(1, emqx_stats:getstat('live_connections.count')), + ?assertEqual(1, emqx_stats:getstat('live_connections.max')), + ok = emqtt:disconnect(ConnPid), + %% ensure stats are synchronized + wait_for_stats( + fun emqx_cm:stats_fun/0, + [#{count_stat => 'live_connections.count', + max_stat => 'live_connections.max'}] + ), + ?assertEqual(0, emqx_stats:getstat('live_connections.count')), + ?assertEqual(1, emqx_stats:getstat('live_connections.max')), + ok; +t_connected_client_stats({'end', _Config}) -> + ok = snabbkaffe:stop(), + ok. + wait_for_events(Action, Kinds) -> Predicate = fun(#{?snk_kind := K}) -> lists:member(K, Kinds) @@ -379,6 +413,27 @@ wait_for_events(Action, Kinds) -> {Res, {ok, Events}} end. +wait_for_stats(Action, Stats) -> + Predicate = fun(Event = #{?snk_kind := emqx_stats_setstat}) -> + Stat = maps:take( + [ count_stat + , max_stat + ], Event), + lists:member(Stat, Stats); + (_) -> + false + end, + N = length(Stats), + Timeout = 100, + {ok, Sub} = snabbkaffe_collector:subscribe(Predicate, N, Timeout, 0), + Res = Action(), + case snabbkaffe_collector:receive_events(Sub) of + {timeout, []} -> + {Res, timeout}; + {ok, Events} -> + {Res, {ok, Events}} + end. + insert_fake_channels() -> %% Insert copies to simulate missed counts Tab = emqx_channel_info,