fix(dssubs): introduce separate gauge for subscriptions from durable sessions
Fixes https://emqx.atlassian.net/browse/EMQX-12267
This commit is contained in:
parent
4d1db9f847
commit
388e6c8262
|
@ -111,6 +111,11 @@ reclaim_seq(Topic) ->
|
||||||
stats_fun() ->
|
stats_fun() ->
|
||||||
safe_update_stats(subscriber_val(), 'subscribers.count', 'subscribers.max'),
|
safe_update_stats(subscriber_val(), 'subscribers.count', 'subscribers.max'),
|
||||||
safe_update_stats(subscription_count(), 'subscriptions.count', 'subscriptions.max'),
|
safe_update_stats(subscription_count(), 'subscriptions.count', 'subscriptions.max'),
|
||||||
|
safe_update_stats(
|
||||||
|
durable_subscription_count(),
|
||||||
|
'durable_subscriptions.count',
|
||||||
|
'durable_subscriptions.max'
|
||||||
|
),
|
||||||
safe_update_stats(table_size(?SUBOPTION), 'suboptions.count', 'suboptions.max').
|
safe_update_stats(table_size(?SUBOPTION), 'suboptions.count', 'suboptions.max').
|
||||||
|
|
||||||
safe_update_stats(undefined, _Stat, _MaxStat) ->
|
safe_update_stats(undefined, _Stat, _MaxStat) ->
|
||||||
|
@ -118,15 +123,13 @@ safe_update_stats(undefined, _Stat, _MaxStat) ->
|
||||||
safe_update_stats(Val, Stat, MaxStat) when is_integer(Val) ->
|
safe_update_stats(Val, Stat, MaxStat) when is_integer(Val) ->
|
||||||
emqx_stats:setstat(Stat, MaxStat, Val).
|
emqx_stats:setstat(Stat, MaxStat, Val).
|
||||||
|
|
||||||
|
%% N.B.: subscriptions from durable sessions are not tied to any particular node.
|
||||||
|
%% Therefore, do not sum them with node-local subscriptions.
|
||||||
subscription_count() ->
|
subscription_count() ->
|
||||||
NonPSCount = table_size(?SUBSCRIPTION),
|
table_size(?SUBSCRIPTION).
|
||||||
PSCount = emqx_persistent_session_bookkeeper:get_subscription_count(),
|
|
||||||
case is_integer(NonPSCount) of
|
durable_subscription_count() ->
|
||||||
true ->
|
emqx_persistent_session_bookkeeper:get_subscription_count().
|
||||||
NonPSCount + PSCount;
|
|
||||||
false ->
|
|
||||||
PSCount
|
|
||||||
end.
|
|
||||||
|
|
||||||
subscriber_val() ->
|
subscriber_val() ->
|
||||||
sum_subscriber(table_size(?SUBSCRIBER), table_size(?SHARED_SUBSCRIBER)).
|
sum_subscriber(table_size(?SUBSCRIBER), table_size(?SHARED_SUBSCRIBER)).
|
||||||
|
|
|
@ -109,6 +109,8 @@
|
||||||
|
|
||||||
%% PubSub stats
|
%% PubSub stats
|
||||||
-define(PUBSUB_STATS, [
|
-define(PUBSUB_STATS, [
|
||||||
|
'durable_subscriptions.count',
|
||||||
|
'durable_subscriptions.max',
|
||||||
'topics.count',
|
'topics.count',
|
||||||
'topics.max',
|
'topics.max',
|
||||||
'suboptions.count',
|
'suboptions.count',
|
||||||
|
@ -166,6 +168,8 @@ names() ->
|
||||||
[
|
[
|
||||||
emqx_connections_count,
|
emqx_connections_count,
|
||||||
emqx_connections_max,
|
emqx_connections_max,
|
||||||
|
emqx_durable_subscriptions_count,
|
||||||
|
emqx_durable_subscriptions_max,
|
||||||
emqx_live_connections_count,
|
emqx_live_connections_count,
|
||||||
emqx_live_connections_max,
|
emqx_live_connections_max,
|
||||||
emqx_cluster_sessions_count,
|
emqx_cluster_sessions_count,
|
||||||
|
|
|
@ -72,6 +72,7 @@
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-define(GAUGE_SAMPLER_LIST, [
|
-define(GAUGE_SAMPLER_LIST, [
|
||||||
|
durable_subscriptions,
|
||||||
subscriptions,
|
subscriptions,
|
||||||
topics,
|
topics,
|
||||||
connections,
|
connections,
|
||||||
|
|
|
@ -262,6 +262,8 @@ merge_cluster_rate(Node, Cluster) ->
|
||||||
Fun =
|
Fun =
|
||||||
fun
|
fun
|
||||||
%% cluster-synced values
|
%% cluster-synced values
|
||||||
|
(durable_subscriptions, V, NCluster) ->
|
||||||
|
NCluster#{durable_subscriptions => V};
|
||||||
(topics, V, NCluster) ->
|
(topics, V, NCluster) ->
|
||||||
NCluster#{topics => V};
|
NCluster#{topics => V};
|
||||||
(retained_msg_count, V, NCluster) ->
|
(retained_msg_count, V, NCluster) ->
|
||||||
|
@ -416,6 +418,7 @@ getstats(Key) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
stats(connections) -> emqx_stats:getstat('connections.count');
|
stats(connections) -> emqx_stats:getstat('connections.count');
|
||||||
|
stats(durable_subscriptions) -> emqx_stats:getstat('durable_subscriptions.count');
|
||||||
stats(live_connections) -> emqx_stats:getstat('live_connections.count');
|
stats(live_connections) -> emqx_stats:getstat('live_connections.count');
|
||||||
stats(cluster_sessions) -> emqx_stats:getstat('cluster_sessions.count');
|
stats(cluster_sessions) -> emqx_stats:getstat('cluster_sessions.count');
|
||||||
stats(topics) -> emqx_stats:getstat('topics.count');
|
stats(topics) -> emqx_stats:getstat('topics.count');
|
||||||
|
|
|
@ -194,8 +194,11 @@ swagger_desc(validation_failed) ->
|
||||||
swagger_desc_format("Schema validations failed ");
|
swagger_desc_format("Schema validations failed ");
|
||||||
swagger_desc(persisted) ->
|
swagger_desc(persisted) ->
|
||||||
swagger_desc_format("Messages saved to the durable storage ");
|
swagger_desc_format("Messages saved to the durable storage ");
|
||||||
|
swagger_desc(durable_subscriptions) ->
|
||||||
|
<<"Subscriptions from durable sessions at the time of sampling.", ?APPROXIMATE_DESC>>;
|
||||||
swagger_desc(subscriptions) ->
|
swagger_desc(subscriptions) ->
|
||||||
<<"Subscriptions at the time of sampling.", ?APPROXIMATE_DESC>>;
|
<<"Subscriptions at the time of sampling (not considering durable sessions).",
|
||||||
|
?APPROXIMATE_DESC>>;
|
||||||
swagger_desc(topics) ->
|
swagger_desc(topics) ->
|
||||||
<<"Count topics at the time of sampling.", ?APPROXIMATE_DESC>>;
|
<<"Count topics at the time of sampling.", ?APPROXIMATE_DESC>>;
|
||||||
swagger_desc(connections) ->
|
swagger_desc(connections) ->
|
||||||
|
|
|
@ -345,7 +345,8 @@ t_persistent_session_stats(_Config) ->
|
||||||
%% and non-persistent routes, so we count `commont/topic' twice and get 8
|
%% and non-persistent routes, so we count `commont/topic' twice and get 8
|
||||||
%% instead of 6 here.
|
%% instead of 6 here.
|
||||||
<<"topics">> := 8,
|
<<"topics">> := 8,
|
||||||
<<"subscriptions">> := 8
|
<<"durable_subscriptions">> := 4,
|
||||||
|
<<"subscriptions">> := 4
|
||||||
}},
|
}},
|
||||||
request(["monitor_current"])
|
request(["monitor_current"])
|
||||||
)
|
)
|
||||||
|
|
|
@ -270,6 +270,12 @@ get_metrics() ->
|
||||||
get_metrics(Node) ->
|
get_metrics(Node) ->
|
||||||
unwrap_rpc(emqx_proto_v1:get_metrics(Node)).
|
unwrap_rpc(emqx_proto_v1:get_metrics(Node)).
|
||||||
|
|
||||||
|
aggregated_only_keys() ->
|
||||||
|
[
|
||||||
|
'durable_subscriptions.count',
|
||||||
|
'durable_subscriptions.max'
|
||||||
|
].
|
||||||
|
|
||||||
get_stats() ->
|
get_stats() ->
|
||||||
GlobalStatsKeys =
|
GlobalStatsKeys =
|
||||||
[
|
[
|
||||||
|
@ -294,7 +300,7 @@ get_stats() ->
|
||||||
emqx:running_nodes()
|
emqx:running_nodes()
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
GlobalStats = maps:with(GlobalStatsKeys, maps:from_list(get_stats(node()))),
|
GlobalStats = maps:with(GlobalStatsKeys, maps:from_list(emqx_stats:getstats())),
|
||||||
maps:merge(CountStats, GlobalStats).
|
maps:merge(CountStats, GlobalStats).
|
||||||
|
|
||||||
delete_keys(List, []) ->
|
delete_keys(List, []) ->
|
||||||
|
@ -303,7 +309,12 @@ delete_keys(List, [Key | Keys]) ->
|
||||||
delete_keys(proplists:delete(Key, List), Keys).
|
delete_keys(proplists:delete(Key, List), Keys).
|
||||||
|
|
||||||
get_stats(Node) ->
|
get_stats(Node) ->
|
||||||
unwrap_rpc(emqx_proto_v1:get_stats(Node)).
|
case unwrap_rpc(emqx_proto_v1:get_stats(Node)) of
|
||||||
|
{error, _} = Error ->
|
||||||
|
Error;
|
||||||
|
Stats when is_list(Stats) ->
|
||||||
|
delete_keys(Stats, aggregated_only_keys())
|
||||||
|
end.
|
||||||
|
|
||||||
nodes_info_count(PropList) ->
|
nodes_info_count(PropList) ->
|
||||||
NodeCount =
|
NodeCount =
|
||||||
|
|
|
@ -122,7 +122,7 @@ schema("/nodes/:node/stats") ->
|
||||||
responses =>
|
responses =>
|
||||||
#{
|
#{
|
||||||
200 => mk(
|
200 => mk(
|
||||||
ref(?NODE_STATS_MODULE, node_stats_data),
|
ref(?NODE_STATS_MODULE, aggregated_data),
|
||||||
#{desc => <<"Get node stats successfully">>}
|
#{desc => <<"Get node stats successfully">>}
|
||||||
),
|
),
|
||||||
404 => not_found()
|
404 => not_found()
|
||||||
|
|
|
@ -60,8 +60,8 @@ schema("/stats") ->
|
||||||
#{
|
#{
|
||||||
200 => mk(
|
200 => mk(
|
||||||
hoconsc:union([
|
hoconsc:union([
|
||||||
ref(?MODULE, node_stats_data),
|
array(ref(?MODULE, per_node_data)),
|
||||||
array(ref(?MODULE, aggergate_data))
|
ref(?MODULE, aggregated_data)
|
||||||
]),
|
]),
|
||||||
#{desc => <<"List stats ok">>}
|
#{desc => <<"List stats ok">>}
|
||||||
)
|
)
|
||||||
|
@ -82,7 +82,7 @@ fields(aggregate) ->
|
||||||
}
|
}
|
||||||
)}
|
)}
|
||||||
];
|
];
|
||||||
fields(node_stats_data) ->
|
fields(aggregated_data) ->
|
||||||
[
|
[
|
||||||
stats_schema('channels.count', <<"sessions.count">>),
|
stats_schema('channels.count', <<"sessions.count">>),
|
||||||
stats_schema('channels.max', <<"session.max">>),
|
stats_schema('channels.max', <<"session.max">>),
|
||||||
|
@ -106,7 +106,10 @@ fields(node_stats_data) ->
|
||||||
stats_schema('subscribers.max', <<"Historical maximum number of subscribers">>),
|
stats_schema('subscribers.max', <<"Historical maximum number of subscribers">>),
|
||||||
stats_schema(
|
stats_schema(
|
||||||
'subscriptions.count',
|
'subscriptions.count',
|
||||||
<<"Number of current subscriptions, including shared subscriptions">>
|
<<
|
||||||
|
"Number of current subscriptions, including shared subscriptions,"
|
||||||
|
" but not subscriptions from durable sessions"
|
||||||
|
>>
|
||||||
),
|
),
|
||||||
stats_schema('subscriptions.max', <<"Historical maximum number of subscriptions">>),
|
stats_schema('subscriptions.max', <<"Historical maximum number of subscriptions">>),
|
||||||
stats_schema('subscriptions.shared.count', <<"Number of current shared subscriptions">>),
|
stats_schema('subscriptions.shared.count', <<"Number of current shared subscriptions">>),
|
||||||
|
@ -116,14 +119,18 @@ fields(node_stats_data) ->
|
||||||
stats_schema('topics.count', <<"Number of current topics">>),
|
stats_schema('topics.count', <<"Number of current topics">>),
|
||||||
stats_schema('topics.max', <<"Historical maximum number of topics">>)
|
stats_schema('topics.max', <<"Historical maximum number of topics">>)
|
||||||
];
|
];
|
||||||
fields(aggergate_data) ->
|
fields(per_node_data) ->
|
||||||
[
|
[
|
||||||
{node,
|
{node,
|
||||||
mk(string(), #{
|
mk(string(), #{
|
||||||
desc => <<"Node name">>,
|
desc => <<"Node name">>,
|
||||||
example => <<"emqx@127.0.0.1">>
|
example => <<"emqx@127.0.0.1">>
|
||||||
})}
|
})},
|
||||||
] ++ fields(node_stats_data).
|
stats_schema(
|
||||||
|
'durable_subscriptions.count',
|
||||||
|
<<"Number of current subscriptions from durable sessions in the cluster">>
|
||||||
|
)
|
||||||
|
] ++ fields(aggregated_data).
|
||||||
|
|
||||||
stats_schema(Name, Desc) ->
|
stats_schema(Name, Desc) ->
|
||||||
{Name, mk(non_neg_integer(), #{desc => Desc, example => 0})}.
|
{Name, mk(non_neg_integer(), #{desc => Desc, example => 0})}.
|
||||||
|
|
|
@ -25,12 +25,22 @@ all() ->
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
meck:expect(emqx, running_nodes, 0, [node(), 'fake@node']),
|
meck:expect(emqx, running_nodes, 0, [node(), 'fake@node']),
|
||||||
emqx_mgmt_api_test_util:init_suite(),
|
Apps = emqx_cth_suite:start(
|
||||||
Config.
|
[
|
||||||
|
emqx,
|
||||||
|
emqx_management,
|
||||||
|
emqx_mgmt_api_test_util:emqx_dashboard()
|
||||||
|
],
|
||||||
|
#{work_dir => emqx_cth_suite:work_dir(Config)}
|
||||||
|
),
|
||||||
|
{ok, _Api} = emqx_common_test_http:create_default_app(),
|
||||||
|
[{apps, Apps} | Config].
|
||||||
|
|
||||||
end_per_suite(_) ->
|
end_per_suite(Config) ->
|
||||||
|
Apps = proplists:get_value(apps, Config),
|
||||||
meck:unload(emqx),
|
meck:unload(emqx),
|
||||||
emqx_mgmt_api_test_util:end_suite().
|
emqx_cth_suite:stop(Apps),
|
||||||
|
ok.
|
||||||
|
|
||||||
t_stats_api(_) ->
|
t_stats_api(_) ->
|
||||||
S = emqx_mgmt_api_test_util:api_path(["stats?aggregate=false"]),
|
S = emqx_mgmt_api_test_util:api_path(["stats?aggregate=false"]),
|
||||||
|
@ -39,7 +49,8 @@ t_stats_api(_) ->
|
||||||
SystemStats1 = emqx_mgmt:get_stats(),
|
SystemStats1 = emqx_mgmt:get_stats(),
|
||||||
Fun1 =
|
Fun1 =
|
||||||
fun(Key) ->
|
fun(Key) ->
|
||||||
?assertEqual(maps:get(Key, SystemStats1), maps:get(atom_to_binary(Key, utf8), Stats1))
|
?assertEqual(maps:get(Key, SystemStats1), maps:get(atom_to_binary(Key, utf8), Stats1)),
|
||||||
|
?assertNot(is_map_key(<<"durable_subscriptions.count">>, Stats1), #{stats => Stats1})
|
||||||
end,
|
end,
|
||||||
lists:foreach(Fun1, maps:keys(SystemStats1)),
|
lists:foreach(Fun1, maps:keys(SystemStats1)),
|
||||||
StatsPath = emqx_mgmt_api_test_util:api_path(["stats?aggregate=true"]),
|
StatsPath = emqx_mgmt_api_test_util:api_path(["stats?aggregate=true"]),
|
||||||
|
|
|
@ -331,6 +331,8 @@ emqx_collect(K = emqx_channels_max, D) -> gauge_metrics(?MG(K, D));
|
||||||
emqx_collect(K = emqx_cluster_sessions_count, D) -> gauge_metrics(?MG(K, D));
|
emqx_collect(K = emqx_cluster_sessions_count, D) -> gauge_metrics(?MG(K, D));
|
||||||
emqx_collect(K = emqx_cluster_sessions_max, D) -> gauge_metrics(?MG(K, D));
|
emqx_collect(K = emqx_cluster_sessions_max, D) -> gauge_metrics(?MG(K, D));
|
||||||
%% pub/sub stats
|
%% pub/sub stats
|
||||||
|
emqx_collect(K = emqx_durable_subscriptions_count, D) -> gauge_metrics(?MG(K, D));
|
||||||
|
emqx_collect(K = emqx_durable_subscriptions_max, D) -> gauge_metrics(?MG(K, D));
|
||||||
emqx_collect(K = emqx_topics_count, D) -> gauge_metrics(?MG(K, D));
|
emqx_collect(K = emqx_topics_count, D) -> gauge_metrics(?MG(K, D));
|
||||||
emqx_collect(K = emqx_topics_max, D) -> gauge_metrics(?MG(K, D));
|
emqx_collect(K = emqx_topics_max, D) -> gauge_metrics(?MG(K, D));
|
||||||
emqx_collect(K = emqx_suboptions_count, D) -> gauge_metrics(?MG(K, D));
|
emqx_collect(K = emqx_suboptions_count, D) -> gauge_metrics(?MG(K, D));
|
||||||
|
@ -541,6 +543,8 @@ stats_metric_meta() ->
|
||||||
{emqx_subscribers_max, gauge, 'subscribers.max'},
|
{emqx_subscribers_max, gauge, 'subscribers.max'},
|
||||||
{emqx_subscriptions_count, gauge, 'subscriptions.count'},
|
{emqx_subscriptions_count, gauge, 'subscriptions.count'},
|
||||||
{emqx_subscriptions_max, gauge, 'subscriptions.max'},
|
{emqx_subscriptions_max, gauge, 'subscriptions.max'},
|
||||||
|
{emqx_durable_subscriptions_count, gauge, 'durable_subscriptions.count'},
|
||||||
|
{emqx_durable_subscriptions_max, gauge, 'durable_subscriptions.max'},
|
||||||
%% delayed
|
%% delayed
|
||||||
{emqx_delayed_count, gauge, 'delayed.count'},
|
{emqx_delayed_count, gauge, 'delayed.count'},
|
||||||
{emqx_delayed_max, gauge, 'delayed.max'}
|
{emqx_delayed_max, gauge, 'delayed.max'}
|
||||||
|
|
|
@ -402,6 +402,8 @@ assert_json_data__stats(M, Mode) when
|
||||||
#{
|
#{
|
||||||
emqx_connections_count := _,
|
emqx_connections_count := _,
|
||||||
emqx_connections_max := _,
|
emqx_connections_max := _,
|
||||||
|
emqx_durable_subscriptions_count := _,
|
||||||
|
emqx_durable_subscriptions_max := _,
|
||||||
emqx_live_connections_count := _,
|
emqx_live_connections_count := _,
|
||||||
emqx_live_connections_max := _,
|
emqx_live_connections_max := _,
|
||||||
emqx_sessions_count := _,
|
emqx_sessions_count := _,
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Adds a new `durable_subscriptions.count` statistic to track subscriptions that are tied to durable sessions. `subscriptions.count` does not include such subscriptions.
|
Loading…
Reference in New Issue