feat(stats): track connected client count for monitoring

In order to correctly display the number of connected clients in our
monitor dashboard, we need to track those connections that are
actually connected to clients, not considering connections from
persistent sessions that are disconnected.  Today, the
`connections.count` that is displayed in the dashboards considers
those disconnected persistent sessions as well.
This commit is contained in:
Thales Macedo Garitezi 2021-11-03 17:45:48 -03:00
parent 4b2586fec4
commit 28e23523a5
No known key found for this signature in database
GPG Key ID: DD279F8152A9B6DD
4 changed files with 108 additions and 7 deletions

View File

@ -1536,6 +1536,7 @@ ensure_connected(Channel = #channel{conninfo = ConnInfo,
clientinfo = ClientInfo}) ->
NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
ok = run_hooks('client.connected', [ClientInfo, NConnInfo]),
emqx_cm:increment_connected_client_count(),
Channel#channel{conninfo = NConnInfo,
conn_state = connected
}.
@ -1624,6 +1625,7 @@ ensure_disconnected(Reason, Channel = #channel{conninfo = ConnInfo,
clientinfo = ClientInfo}) ->
NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)},
ok = run_hooks('client.disconnected', [ClientInfo, Reason, NConnInfo]),
emqx_cm:decrement_connected_client_count(),
Channel#channel{conninfo = NConnInfo, conn_state = disconnected}.
%%--------------------------------------------------------------------
@ -1725,4 +1727,3 @@ flag(false) -> 0.
set_field(Name, Value, Channel) ->
Pos = emqx_misc:index_of(Name, record_info(fields, channel)),
setelement(Pos+1, Channel, Value).

View File

@ -72,7 +72,12 @@
]).
%% Internal export
-export([stats_fun/0, clean_down/1]).
-export([ stats_fun/0
, clean_down/1
, increment_connected_client_count/0
, decrement_connected_client_count/0
, get_connected_client_count/0
]).
-type(chan_pid() :: pid()).
@ -141,6 +146,12 @@ unregister_channel(ClientId) when is_binary(ClientId) ->
do_unregister_channel(Chan) ->
ok = emqx_cm_registry:unregister_channel(Chan),
true = ets:delete(?CHAN_CONN_TAB, Chan),
case ets:lookup(?CHAN_INFO_TAB, Chan) of
[{_, #{conn_state := connected}, _}] ->
decrement_connected_client_count();
_ ->
skip
end,
true = ets:delete(?CHAN_INFO_TAB, Chan),
ets:delete_object(?CHAN_TAB, Chan).
@ -427,6 +438,7 @@ rpc_call(Node, Fun, Args, Timeout) ->
%% @private
cast(Msg) -> gen_server:cast(?CM, Msg).
call(Msg) -> gen_server:call(?CM, Msg).
%%--------------------------------------------------------------------
%% gen_server callbacks
@ -438,8 +450,14 @@ init([]) ->
ok = emqx_tables:new(?CHAN_CONN_TAB, [bag | TabOpts]),
ok = emqx_tables:new(?CHAN_INFO_TAB, [set, compressed | TabOpts]),
ok = emqx_stats:update_interval(chan_stats, fun ?MODULE:stats_fun/0),
{ok, #{chan_pmon => emqx_pmon:new()}}.
State = #{ chan_pmon => emqx_pmon:new()
, connected_client_count => 0
},
{ok, State}.
handle_call({connected_client_count, get}, _From,
State = #{connected_client_count := CCCount}) ->
{reply, CCCount, State};
handle_call(Req, _From, State) ->
?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, State}.
@ -447,7 +465,18 @@ handle_call(Req, _From, State) ->
handle_cast({registered, {ClientId, ChanPid}}, State = #{chan_pmon := PMon}) ->
PMon1 = emqx_pmon:monitor(ChanPid, ClientId, PMon),
{noreply, State#{chan_pmon := PMon1}};
handle_cast({connected_client_count, inc},
State0 = #{connected_client_count := CCCount0}) ->
?tp(emqx_cm_connected_client_count_inc, #{count_before => CCCount0}),
CCCount = CCCount0 + 1,
State = State0#{connected_client_count := CCCount},
{noreply, State};
handle_cast({connected_client_count, dec},
State0 = #{connected_client_count := CCCount0}) ->
?tp(emqx_cm_connected_client_count_dec, #{count_before => CCCount0}),
CCCount = max(0, CCCount0 - 1),
State = State0#{connected_client_count := CCCount},
{noreply, State};
handle_cast(Msg, State) ->
?LOG(error, "Unexpected cast: ~p", [Msg]),
{noreply, State}.
@ -493,3 +522,11 @@ get_chann_conn_mod(ClientId, ChanPid) when node(ChanPid) == node() ->
get_chann_conn_mod(ClientId, ChanPid) ->
rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid], ?T_GET_INFO).
increment_connected_client_count() ->
cast({connected_client_count, inc}).
decrement_connected_client_count() ->
cast({connected_client_count, dec}).
get_connected_client_count() ->
call({connected_client_count, get}).

View File

@ -518,7 +518,7 @@ terminate(Reason, State = #state{channel = Channel, transport = Transport,
?tp(warning, unclean_terminate, #{exception => E, context => C, stacktrace => S})
end,
?tp(info, terminate, #{reason => Reason}),
maybe_raise_excption(Reason).
maybe_raise_exception(Reason).
%% close socket, discard new state, always return ok.
close_socket_ok(State) ->
@ -526,12 +526,12 @@ close_socket_ok(State) ->
ok.
%% tell truth about the original exception
maybe_raise_excption(#{exception := Exception,
maybe_raise_exception(#{exception := Exception,
context := Context,
stacktrace := Stacktrace
}) ->
erlang:raise(Exception, Context, Stacktrace);
maybe_raise_excption(Reason) ->
maybe_raise_exception(Reason) ->
exit(Reason).
%%--------------------------------------------------------------------

View File

@ -23,6 +23,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
@ -277,6 +278,68 @@ t_stats_fun({'end', _Config}) ->
ok = emqx_broker:unsubscribe(<<"topic">>),
ok = emqx_broker:unsubscribe(<<"topic2">>).
%% persistent sessions, when gone, do not contribute to connect client
%% count
t_connected_client_count_persistent({init, Config}) ->
ok = snabbkaffe:start_trace(),
process_flag(trap_exit, true),
Config;
t_connected_client_count_persistent(Config) when is_list(Config) ->
?assertEqual(0, emqx_cm:get_connected_client_count()),
{ok, ConnPid0} = emqtt:start_link([{clean_start, false}, {clientid, <<"clientid">>}]),
{{ok, _}, _} = wait_for_events(
fun() -> emqtt:connect(ConnPid0) end,
[emqx_cm_connected_client_count_inc]
),
?assertEqual(1, emqx_cm:get_connected_client_count()),
{ok, _} = wait_for_events(
fun() -> emqtt:disconnect(ConnPid0) end,
[emqx_cm_connected_client_count_dec]
),
?assertEqual(0, emqx_cm:get_connected_client_count()),
%% reconnecting
{ok, ConnPid1} = emqtt:start_link([{clean_start, false}, {clientid, <<"clientid">>}]),
{{ok, _}, _} = wait_for_events(
fun() -> emqtt:connect(ConnPid1) end,
[emqx_cm_connected_client_count_inc]
),
?assertEqual(1, emqx_cm:get_connected_client_count()),
%% taking over
{ok, ConnPid2} = emqtt:start_link([{clean_start, false}, {clientid, <<"clientid">>}]),
{{ok, _}, _} = wait_for_events(
fun() -> emqtt:connect(ConnPid2) end,
[ emqx_cm_connected_client_count_dec
, emqx_cm_connected_client_count_inc
]
),
?assertEqual(1, emqx_cm:get_connected_client_count()),
%% abnormal exit of channel process
[ChanPid] = emqx_cm:all_channels(),
{true, _} = wait_for_events(
fun() -> exit(ChanPid, kill) end,
[emqx_cm_connected_client_count_dec]
),
?assertEqual(0, emqx_cm:get_connected_client_count()),
ok;
t_connected_client_count_persistent({'end', _Config}) ->
snabbkaffe:stop(),
ok.
wait_for_events(Action, Kinds) ->
Predicate = fun(#{?snk_kind := K}) ->
lists:member(K, Kinds)
end,
N = length(Kinds),
Timeout = 100,
{ok, Sub} = snabbkaffe_collector:subscribe(Predicate, N, Timeout, 0),
Res = Action(),
case snabbkaffe_collector:receive_events(Sub) of
{timeout, []} ->
{Res, timeout};
{ok, Events} ->
{Res, {ok, Events}}
end.
recv_msgs(Count) ->
recv_msgs(Count, []).