From 2d29ba8550ad7da4f4370ac15827733f9f8f97b9 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 4 Nov 2021 07:44:23 -0300 Subject: [PATCH] feat(stats): periodic reconciliation of connected client count --- src/emqx_cm.erl | 23 +++++++++++++-- test/emqx_broker_SUITE.erl | 60 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 81 insertions(+), 2 deletions(-) diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 2ecacabc2..236ded0fe 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -450,10 +450,12 @@ init([]) -> ok = emqx_tables:new(?CHAN_CONN_TAB, [bag | TabOpts]), ok = emqx_tables:new(?CHAN_INFO_TAB, [set, compressed | TabOpts]), ok = emqx_stats:update_interval(chan_stats, fun ?MODULE:stats_fun/0), + RefreshPeriod = application:get_env(emqx, connected_client_count_refresh_period, 60000), State = #{ chan_pmon => emqx_pmon:new() , connected_client_count => 0 + , connected_client_refresh_period => RefreshPeriod }, - {ok, State}. + {ok, ensure_refresh_timer(State)}. handle_call({connected_client_count, get}, _From, State = #{connected_client_count := CCCount}) -> @@ -486,7 +488,16 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon} {Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon), ok = emqx_pool:async_submit(fun lists:foreach/2, [fun ?MODULE:clean_down/1, Items]), {noreply, State#{chan_pmon := PMon1}}; - +handle_info({timeout, TRef, refresh_connected_client_count}, + State0 = #{connected_client_refresh_timer := TRef, + connected_client_count := _CCCount0}) -> + CCCount = refresh_connected_client_count(), + ?tp(emqx_cm_refresh_connected_client_count, + #{ new_count => CCCount + , old_count => _CCCount0 + }), + State = ensure_refresh_timer(State0#{connected_client_count := CCCount}), + {noreply, State}; handle_info(Info, State) -> ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}. @@ -530,3 +541,11 @@ decrement_connected_client_count() -> get_connected_client_count() -> call({connected_client_count, get}). + +ensure_refresh_timer(State = #{connected_client_refresh_period := RefreshPeriod}) -> + TRef = emqx_misc:start_timer(RefreshPeriod, refresh_connected_client_count), + State#{connected_client_refresh_timer => TRef}. + +refresh_connected_client_count() -> + Spec = [{{'_', #{conn_state => connected}, '_'}, [], [true]}], + ets:select_count(?CHAN_INFO_TAB, Spec). diff --git a/test/emqx_broker_SUITE.erl b/test/emqx_broker_SUITE.erl index ff1a30fe2..6a9a7a478 100644 --- a/test/emqx_broker_SUITE.erl +++ b/test/emqx_broker_SUITE.erl @@ -325,6 +325,45 @@ t_connected_client_count_persistent({'end', _Config}) -> snabbkaffe:stop(), ok. +t_connected_client_count_refresh({init, Config}) -> + ok = snabbkaffe:start_trace(), + OldConfig = application:get_env(emqx, connected_client_count_refresh_period), + application:set_env(emqx, connected_client_count_refresh_period, 100), + ok = supervisor:terminate_child(emqx_cm_sup, manager), + {ok, _} = supervisor:restart_child(emqx_cm_sup, manager), + [{old_config, OldConfig} | Config]; +t_connected_client_count_refresh(Config) when is_list(Config) -> + ?assertEqual(0, emqx_cm:get_connected_client_count()), + {ok, ConnPid} = emqtt:start_link([{clean_start, false}, {clientid, <<"clientid">>}]), + {{ok, _}, _} = wait_for_events( + fun() -> emqtt:connect(ConnPid) end, + [emqx_cm_connected_client_count_inc] + ), + %% simulate count mismatch + insert_fake_channels(), + ?block_until( + #{ ?snk_kind := emqx_cm_refresh_connected_client_count + , new_count := 10 + , old_count := 1 + }, + 150 + ), + ?assertEqual(10, emqx_cm:get_connected_client_count()), + ok; +t_connected_client_count_refresh({'end', Config}) -> + OldConfig = proplists:get_value(old_config, Config), + case OldConfig of + undefined -> + skip; + _ -> + application:set_env(emqx, connected_client_count_refresh_period, OldConfig) + end, + snabbkaffe:stop(), + ets:delete_all_objects(emqx_channel_info), + ok = supervisor:terminate_child(emqx_cm_sup, manager), + {ok, _} = supervisor:restart_child(emqx_cm_sup, manager), + ok. + wait_for_events(Action, Kinds) -> Predicate = fun(#{?snk_kind := K}) -> lists:member(K, Kinds) @@ -340,6 +379,27 @@ wait_for_events(Action, Kinds) -> {Res, {ok, Events}} end. +insert_fake_channels() -> + %% Insert copies to simulate missed counts + Tab = emqx_channel_info, + Key = ets:first(Tab), + [{_Chan, ChanInfo = #{conn_state := connected}, Stats}] = ets:lookup(Tab, Key), + lists:foreach( + fun(N) -> + ClientID = "fake" ++ integer_to_list(N), + ets:insert(Tab, {{ClientID, undefined}, ChanInfo, Stats}) + end, + lists:seq(1, 9) + ), + %% these should not be counted + lists:foreach( + fun(N) -> + ClientID = "fake" ++ integer_to_list(N), + ets:insert(Tab, {{ClientID, undefined}, ChanInfo#{conn_state := disconnected}, Stats}) + end, + lists:seq(10, 20) + ). + recv_msgs(Count) -> recv_msgs(Count, []).