From 50bceee9ab3a244c49b7e78403e346872b9107b0 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Fri, 22 Mar 2024 18:31:57 +0800 Subject: [PATCH] fix(stats): `'subscribers.count'` contains shared-subscriber --- apps/emqx/src/emqx_broker.erl | 18 ---------------- apps/emqx/src/emqx_broker_helper.erl | 31 +++++++++++++++++++++++++++- apps/emqx/test/emqx_broker_SUITE.erl | 2 +- apps/emqx/test/emqx_stats_SUITE.erl | 6 +++--- changes/ce/fix-12765.en.md | 2 ++ 5 files changed, 36 insertions(+), 23 deletions(-) create mode 100644 changes/ce/fix-12765.en.md diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index b20c3a15f..8c1239892 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -60,9 +60,6 @@ -export([topics/0]). -%% Stats fun --export([stats_fun/0]). - %% gen_server callbacks -export([ init/1, @@ -469,21 +466,6 @@ set_subopts(SubPid, Topic, NewOpts) -> topics() -> emqx_router:topics(). -%%-------------------------------------------------------------------- -%% Stats fun -%%-------------------------------------------------------------------- - -stats_fun() -> - safe_update_stats(?SUBSCRIBER, 'subscribers.count', 'subscribers.max'), - safe_update_stats(?SUBSCRIPTION, 'subscriptions.count', 'subscriptions.max'), - safe_update_stats(?SUBOPTION, 'suboptions.count', 'suboptions.max'). - -safe_update_stats(Tab, Stat, MaxStat) -> - case ets:info(Tab, size) of - undefined -> ok; - Size -> emqx_stats:setstat(Stat, MaxStat, Size) - end. - %%-------------------------------------------------------------------- %% call, cast, pick %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_broker_helper.erl b/apps/emqx/src/emqx_broker_helper.erl index 8562a1968..368398b92 100644 --- a/apps/emqx/src/emqx_broker_helper.erl +++ b/apps/emqx/src/emqx_broker_helper.erl @@ -18,6 +18,8 @@ -behaviour(gen_server). +-include("emqx_router.hrl"). +-include("emqx_shared_sub.hrl"). -include("logger.hrl"). -include("types.hrl"). @@ -33,6 +35,9 @@ reclaim_seq/1 ]). +%% Stats fun +-export([stats_fun/0]). + %% gen_server callbacks -export([ init/1, @@ -99,6 +104,30 @@ create_seq(Topic) -> reclaim_seq(Topic) -> emqx_sequence:reclaim(?SUBSEQ, Topic). +%%-------------------------------------------------------------------- +%% Stats fun +%%-------------------------------------------------------------------- + +stats_fun() -> + safe_update_stats(subscriber_val(), 'subscribers.count', 'subscribers.max'), + safe_update_stats(table_size(?SUBSCRIPTION), 'subscriptions.count', 'subscriptions.max'), + safe_update_stats(table_size(?SUBOPTION), 'suboptions.count', 'suboptions.max'). + +safe_update_stats(undefined, _Stat, _MaxStat) -> + ok; +safe_update_stats(Val, Stat, MaxStat) when is_integer(Val) -> + emqx_stats:setstat(Stat, MaxStat, Val). + +subscriber_val() -> + sum_subscriber(table_size(?SUBSCRIBER), table_size(?SHARED_SUBSCRIBER)). + +sum_subscriber(undefined, undefined) -> undefined; +sum_subscriber(undefined, V2) when is_integer(V2) -> V2; +sum_subscriber(V1, undefined) when is_integer(V1) -> V1; +sum_subscriber(V1, V2) when is_integer(V1), is_integer(V2) -> V1 + V2. + +table_size(Tab) when is_atom(Tab) -> ets:info(Tab, size). + %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- @@ -115,7 +144,7 @@ init([]) -> %% SubMon: SubPid -> SubId ok = emqx_utils_ets:new(?SUBMON, [public, {read_concurrency, true}, {write_concurrency, true}]), %% Stats timer - ok = emqx_stats:update_interval(broker_stats, fun emqx_broker:stats_fun/0), + ok = emqx_stats:update_interval(broker_stats, fun ?MODULE:stats_fun/0), {ok, #{pmon => emqx_pmon:new()}}. handle_call(Req, _From, State) -> diff --git a/apps/emqx/test/emqx_broker_SUITE.erl b/apps/emqx/test/emqx_broker_SUITE.erl index d4bb9e7fc..e106e3375 100644 --- a/apps/emqx/test/emqx_broker_SUITE.erl +++ b/apps/emqx/test/emqx_broker_SUITE.erl @@ -158,7 +158,7 @@ t_stats_fun(Config) when is_list(Config) -> ok = emqx_broker:subscribe(<<"topic">>, <<"clientid">>), ok = emqx_broker:subscribe(<<"topic2">>, <<"clientid">>), %% ensure stats refreshed - emqx_broker:stats_fun(), + emqx_broker_helper:stats_fun(), %% emqx_stats:set_stat is a gen_server cast %% make a synced call sync ignored = gen_server:call(emqx_stats, call, infinity), diff --git a/apps/emqx/test/emqx_stats_SUITE.erl b/apps/emqx/test/emqx_stats_SUITE.erl index 1a672fa67..1c32396ce 100644 --- a/apps/emqx/test/emqx_stats_SUITE.erl +++ b/apps/emqx/test/emqx_stats_SUITE.erl @@ -105,10 +105,10 @@ t_helper(_) -> end end, [ - {"emqx_broker", MkTestFun(emqx_broker, stats_fun)}, - {"emqx_sm", MkTestFun(emqx_sm, stats_fun)}, + {"emqx_broker_helper", MkTestFun(emqx_broker_helper, stats_fun)}, {"emqx_router_helper", MkTestFun(emqx_router_helper, stats_fun)}, - {"emqx_cm", MkTestFun(emqx_cm, stats_fun)} + {"emqx_cm", MkTestFun(emqx_cm, stats_fun)}, + {"emqx_retainer", MkTestFun(emqx_retainer, stats_fun)} ]. with_proc(F) -> diff --git a/changes/ce/fix-12765.en.md b/changes/ce/fix-12765.en.md new file mode 100644 index 000000000..01c13146d --- /dev/null +++ b/changes/ce/fix-12765.en.md @@ -0,0 +1,2 @@ +Make sure stats `'subscribers.count'` `'subscribers.max'` countains shared-subscribers. +It only contains non-shared subscribers previously.