fix(stats): `'subscribers.count'` contains shared-subscriber
This commit is contained in:
parent
0f4b148294
commit
50bceee9ab
|
@ -60,9 +60,6 @@
|
|||
|
||||
-export([topics/0]).
|
||||
|
||||
%% Stats fun
|
||||
-export([stats_fun/0]).
|
||||
|
||||
%% gen_server callbacks
|
||||
-export([
|
||||
init/1,
|
||||
|
@ -469,21 +466,6 @@ set_subopts(SubPid, Topic, NewOpts) ->
|
|||
topics() ->
|
||||
emqx_router:topics().
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Stats fun
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
stats_fun() ->
|
||||
safe_update_stats(?SUBSCRIBER, 'subscribers.count', 'subscribers.max'),
|
||||
safe_update_stats(?SUBSCRIPTION, 'subscriptions.count', 'subscriptions.max'),
|
||||
safe_update_stats(?SUBOPTION, 'suboptions.count', 'suboptions.max').
|
||||
|
||||
safe_update_stats(Tab, Stat, MaxStat) ->
|
||||
case ets:info(Tab, size) of
|
||||
undefined -> ok;
|
||||
Size -> emqx_stats:setstat(Stat, MaxStat, Size)
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% call, cast, pick
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -18,6 +18,8 @@
|
|||
|
||||
-behaviour(gen_server).
|
||||
|
||||
-include("emqx_router.hrl").
|
||||
-include("emqx_shared_sub.hrl").
|
||||
-include("logger.hrl").
|
||||
-include("types.hrl").
|
||||
|
||||
|
@ -33,6 +35,9 @@
|
|||
reclaim_seq/1
|
||||
]).
|
||||
|
||||
%% Stats fun
|
||||
-export([stats_fun/0]).
|
||||
|
||||
%% gen_server callbacks
|
||||
-export([
|
||||
init/1,
|
||||
|
@ -99,6 +104,30 @@ create_seq(Topic) ->
|
|||
reclaim_seq(Topic) ->
|
||||
emqx_sequence:reclaim(?SUBSEQ, Topic).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Stats fun
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
stats_fun() ->
|
||||
safe_update_stats(subscriber_val(), 'subscribers.count', 'subscribers.max'),
|
||||
safe_update_stats(table_size(?SUBSCRIPTION), 'subscriptions.count', 'subscriptions.max'),
|
||||
safe_update_stats(table_size(?SUBOPTION), 'suboptions.count', 'suboptions.max').
|
||||
|
||||
safe_update_stats(undefined, _Stat, _MaxStat) ->
|
||||
ok;
|
||||
safe_update_stats(Val, Stat, MaxStat) when is_integer(Val) ->
|
||||
emqx_stats:setstat(Stat, MaxStat, Val).
|
||||
|
||||
subscriber_val() ->
|
||||
sum_subscriber(table_size(?SUBSCRIBER), table_size(?SHARED_SUBSCRIBER)).
|
||||
|
||||
sum_subscriber(undefined, undefined) -> undefined;
|
||||
sum_subscriber(undefined, V2) when is_integer(V2) -> V2;
|
||||
sum_subscriber(V1, undefined) when is_integer(V1) -> V1;
|
||||
sum_subscriber(V1, V2) when is_integer(V1), is_integer(V2) -> V1 + V2.
|
||||
|
||||
table_size(Tab) when is_atom(Tab) -> ets:info(Tab, size).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% gen_server callbacks
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -115,7 +144,7 @@ init([]) ->
|
|||
%% SubMon: SubPid -> SubId
|
||||
ok = emqx_utils_ets:new(?SUBMON, [public, {read_concurrency, true}, {write_concurrency, true}]),
|
||||
%% Stats timer
|
||||
ok = emqx_stats:update_interval(broker_stats, fun emqx_broker:stats_fun/0),
|
||||
ok = emqx_stats:update_interval(broker_stats, fun ?MODULE:stats_fun/0),
|
||||
{ok, #{pmon => emqx_pmon:new()}}.
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
|
|
|
@ -158,7 +158,7 @@ t_stats_fun(Config) when is_list(Config) ->
|
|||
ok = emqx_broker:subscribe(<<"topic">>, <<"clientid">>),
|
||||
ok = emqx_broker:subscribe(<<"topic2">>, <<"clientid">>),
|
||||
%% ensure stats refreshed
|
||||
emqx_broker:stats_fun(),
|
||||
emqx_broker_helper:stats_fun(),
|
||||
%% emqx_stats:set_stat is a gen_server cast
|
||||
%% make a synced call sync
|
||||
ignored = gen_server:call(emqx_stats, call, infinity),
|
||||
|
|
|
@ -105,10 +105,10 @@ t_helper(_) ->
|
|||
end
|
||||
end,
|
||||
[
|
||||
{"emqx_broker", MkTestFun(emqx_broker, stats_fun)},
|
||||
{"emqx_sm", MkTestFun(emqx_sm, stats_fun)},
|
||||
{"emqx_broker_helper", MkTestFun(emqx_broker_helper, stats_fun)},
|
||||
{"emqx_router_helper", MkTestFun(emqx_router_helper, stats_fun)},
|
||||
{"emqx_cm", MkTestFun(emqx_cm, stats_fun)}
|
||||
{"emqx_cm", MkTestFun(emqx_cm, stats_fun)},
|
||||
{"emqx_retainer", MkTestFun(emqx_retainer, stats_fun)}
|
||||
].
|
||||
|
||||
with_proc(F) ->
|
||||
|
|
|
@ -0,0 +1,2 @@
|
|||
Make sure stats `'subscribers.count'` `'subscribers.max'` countains shared-subscribers.
|
||||
It only contains non-shared subscribers previously.
|
Loading…
Reference in New Issue