feat(stats): periodic reconciliation of connected client count
This commit is contained in:
parent
28e23523a5
commit
2d29ba8550
|
@ -450,10 +450,12 @@ init([]) ->
|
|||
ok = emqx_tables:new(?CHAN_CONN_TAB, [bag | TabOpts]),
|
||||
ok = emqx_tables:new(?CHAN_INFO_TAB, [set, compressed | TabOpts]),
|
||||
ok = emqx_stats:update_interval(chan_stats, fun ?MODULE:stats_fun/0),
|
||||
RefreshPeriod = application:get_env(emqx, connected_client_count_refresh_period, 60000),
|
||||
State = #{ chan_pmon => emqx_pmon:new()
|
||||
, connected_client_count => 0
|
||||
, connected_client_refresh_period => RefreshPeriod
|
||||
},
|
||||
{ok, State}.
|
||||
{ok, ensure_refresh_timer(State)}.
|
||||
|
||||
handle_call({connected_client_count, get}, _From,
|
||||
State = #{connected_client_count := CCCount}) ->
|
||||
|
@ -486,7 +488,16 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}
|
|||
{Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon),
|
||||
ok = emqx_pool:async_submit(fun lists:foreach/2, [fun ?MODULE:clean_down/1, Items]),
|
||||
{noreply, State#{chan_pmon := PMon1}};
|
||||
|
||||
handle_info({timeout, TRef, refresh_connected_client_count},
|
||||
State0 = #{connected_client_refresh_timer := TRef,
|
||||
connected_client_count := _CCCount0}) ->
|
||||
CCCount = refresh_connected_client_count(),
|
||||
?tp(emqx_cm_refresh_connected_client_count,
|
||||
#{ new_count => CCCount
|
||||
, old_count => _CCCount0
|
||||
}),
|
||||
State = ensure_refresh_timer(State0#{connected_client_count := CCCount}),
|
||||
{noreply, State};
|
||||
handle_info(Info, State) ->
|
||||
?LOG(error, "Unexpected info: ~p", [Info]),
|
||||
{noreply, State}.
|
||||
|
@ -530,3 +541,11 @@ decrement_connected_client_count() ->
|
|||
|
||||
get_connected_client_count() ->
|
||||
call({connected_client_count, get}).
|
||||
|
||||
ensure_refresh_timer(State = #{connected_client_refresh_period := RefreshPeriod}) ->
|
||||
TRef = emqx_misc:start_timer(RefreshPeriod, refresh_connected_client_count),
|
||||
State#{connected_client_refresh_timer => TRef}.
|
||||
|
||||
refresh_connected_client_count() ->
|
||||
Spec = [{{'_', #{conn_state => connected}, '_'}, [], [true]}],
|
||||
ets:select_count(?CHAN_INFO_TAB, Spec).
|
||||
|
|
|
@ -325,6 +325,45 @@ t_connected_client_count_persistent({'end', _Config}) ->
|
|||
snabbkaffe:stop(),
|
||||
ok.
|
||||
|
||||
t_connected_client_count_refresh({init, Config}) ->
|
||||
ok = snabbkaffe:start_trace(),
|
||||
OldConfig = application:get_env(emqx, connected_client_count_refresh_period),
|
||||
application:set_env(emqx, connected_client_count_refresh_period, 100),
|
||||
ok = supervisor:terminate_child(emqx_cm_sup, manager),
|
||||
{ok, _} = supervisor:restart_child(emqx_cm_sup, manager),
|
||||
[{old_config, OldConfig} | Config];
|
||||
t_connected_client_count_refresh(Config) when is_list(Config) ->
|
||||
?assertEqual(0, emqx_cm:get_connected_client_count()),
|
||||
{ok, ConnPid} = emqtt:start_link([{clean_start, false}, {clientid, <<"clientid">>}]),
|
||||
{{ok, _}, _} = wait_for_events(
|
||||
fun() -> emqtt:connect(ConnPid) end,
|
||||
[emqx_cm_connected_client_count_inc]
|
||||
),
|
||||
%% simulate count mismatch
|
||||
insert_fake_channels(),
|
||||
?block_until(
|
||||
#{ ?snk_kind := emqx_cm_refresh_connected_client_count
|
||||
, new_count := 10
|
||||
, old_count := 1
|
||||
},
|
||||
150
|
||||
),
|
||||
?assertEqual(10, emqx_cm:get_connected_client_count()),
|
||||
ok;
|
||||
t_connected_client_count_refresh({'end', Config}) ->
|
||||
OldConfig = proplists:get_value(old_config, Config),
|
||||
case OldConfig of
|
||||
undefined ->
|
||||
skip;
|
||||
_ ->
|
||||
application:set_env(emqx, connected_client_count_refresh_period, OldConfig)
|
||||
end,
|
||||
snabbkaffe:stop(),
|
||||
ets:delete_all_objects(emqx_channel_info),
|
||||
ok = supervisor:terminate_child(emqx_cm_sup, manager),
|
||||
{ok, _} = supervisor:restart_child(emqx_cm_sup, manager),
|
||||
ok.
|
||||
|
||||
wait_for_events(Action, Kinds) ->
|
||||
Predicate = fun(#{?snk_kind := K}) ->
|
||||
lists:member(K, Kinds)
|
||||
|
@ -340,6 +379,27 @@ wait_for_events(Action, Kinds) ->
|
|||
{Res, {ok, Events}}
|
||||
end.
|
||||
|
||||
insert_fake_channels() ->
|
||||
%% Insert copies to simulate missed counts
|
||||
Tab = emqx_channel_info,
|
||||
Key = ets:first(Tab),
|
||||
[{_Chan, ChanInfo = #{conn_state := connected}, Stats}] = ets:lookup(Tab, Key),
|
||||
lists:foreach(
|
||||
fun(N) ->
|
||||
ClientID = "fake" ++ integer_to_list(N),
|
||||
ets:insert(Tab, {{ClientID, undefined}, ChanInfo, Stats})
|
||||
end,
|
||||
lists:seq(1, 9)
|
||||
),
|
||||
%% these should not be counted
|
||||
lists:foreach(
|
||||
fun(N) ->
|
||||
ClientID = "fake" ++ integer_to_list(N),
|
||||
ets:insert(Tab, {{ClientID, undefined}, ChanInfo#{conn_state := disconnected}, Stats})
|
||||
end,
|
||||
lists:seq(10, 20)
|
||||
).
|
||||
|
||||
recv_msgs(Count) ->
|
||||
recv_msgs(Count, []).
|
||||
|
||||
|
|
Loading…
Reference in New Issue