From 388e6c826259c7cb69ee90ce303b9ac459114a77 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 17 May 2024 10:14:49 -0300 Subject: [PATCH] fix(dssubs): introduce separate gauge for subscriptions from durable sessions Fixes https://emqx.atlassian.net/browse/EMQX-12267 --- apps/emqx/src/emqx_broker_helper.erl | 19 ++++++++++------- apps/emqx/src/emqx_stats.erl | 4 ++++ .../emqx_dashboard/include/emqx_dashboard.hrl | 1 + .../src/emqx_dashboard_monitor.erl | 3 +++ .../src/emqx_dashboard_monitor_api.erl | 5 ++++- .../test/emqx_dashboard_monitor_SUITE.erl | 3 ++- apps/emqx_management/src/emqx_mgmt.erl | 15 +++++++++++-- .../src/emqx_mgmt_api_nodes.erl | 2 +- .../src/emqx_mgmt_api_stats.erl | 21 ++++++++++++------- .../test/emqx_mgmt_api_stats_SUITE.erl | 21 ++++++++++++++----- apps/emqx_prometheus/src/emqx_prometheus.erl | 4 ++++ .../test/emqx_prometheus_data_SUITE.erl | 2 ++ changes/ce/fix-13067.en.md | 1 + 13 files changed, 76 insertions(+), 25 deletions(-) create mode 100644 changes/ce/fix-13067.en.md diff --git a/apps/emqx/src/emqx_broker_helper.erl b/apps/emqx/src/emqx_broker_helper.erl index 854e56fc5..4401085c4 100644 --- a/apps/emqx/src/emqx_broker_helper.erl +++ b/apps/emqx/src/emqx_broker_helper.erl @@ -111,6 +111,11 @@ reclaim_seq(Topic) -> stats_fun() -> safe_update_stats(subscriber_val(), 'subscribers.count', 'subscribers.max'), safe_update_stats(subscription_count(), 'subscriptions.count', 'subscriptions.max'), + safe_update_stats( + durable_subscription_count(), + 'durable_subscriptions.count', + 'durable_subscriptions.max' + ), safe_update_stats(table_size(?SUBOPTION), 'suboptions.count', 'suboptions.max'). safe_update_stats(undefined, _Stat, _MaxStat) -> @@ -118,15 +123,13 @@ safe_update_stats(undefined, _Stat, _MaxStat) -> safe_update_stats(Val, Stat, MaxStat) when is_integer(Val) -> emqx_stats:setstat(Stat, MaxStat, Val). +%% N.B.: subscriptions from durable sessions are not tied to any particular node. +%% Therefore, do not sum them with node-local subscriptions. subscription_count() -> - NonPSCount = table_size(?SUBSCRIPTION), - PSCount = emqx_persistent_session_bookkeeper:get_subscription_count(), - case is_integer(NonPSCount) of - true -> - NonPSCount + PSCount; - false -> - PSCount - end. + table_size(?SUBSCRIPTION). + +durable_subscription_count() -> + emqx_persistent_session_bookkeeper:get_subscription_count(). subscriber_val() -> sum_subscriber(table_size(?SUBSCRIBER), table_size(?SHARED_SUBSCRIBER)). diff --git a/apps/emqx/src/emqx_stats.erl b/apps/emqx/src/emqx_stats.erl index 48688acd7..777948e7b 100644 --- a/apps/emqx/src/emqx_stats.erl +++ b/apps/emqx/src/emqx_stats.erl @@ -109,6 +109,8 @@ %% PubSub stats -define(PUBSUB_STATS, [ + 'durable_subscriptions.count', + 'durable_subscriptions.max', 'topics.count', 'topics.max', 'suboptions.count', @@ -166,6 +168,8 @@ names() -> [ emqx_connections_count, emqx_connections_max, + emqx_durable_subscriptions_count, + emqx_durable_subscriptions_max, emqx_live_connections_count, emqx_live_connections_max, emqx_cluster_sessions_count, diff --git a/apps/emqx_dashboard/include/emqx_dashboard.hrl b/apps/emqx_dashboard/include/emqx_dashboard.hrl index 40f2ba2b3..61e513a5f 100644 --- a/apps/emqx_dashboard/include/emqx_dashboard.hrl +++ b/apps/emqx_dashboard/include/emqx_dashboard.hrl @@ -72,6 +72,7 @@ ]). -define(GAUGE_SAMPLER_LIST, [ + durable_subscriptions, subscriptions, topics, connections, diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl index fe0476e6d..f2ebe3831 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl @@ -262,6 +262,8 @@ merge_cluster_rate(Node, Cluster) -> Fun = fun %% cluster-synced values + (durable_subscriptions, V, NCluster) -> + NCluster#{durable_subscriptions => V}; (topics, V, NCluster) -> NCluster#{topics => V}; (retained_msg_count, V, NCluster) -> @@ -416,6 +418,7 @@ getstats(Key) -> end. stats(connections) -> emqx_stats:getstat('connections.count'); +stats(durable_subscriptions) -> emqx_stats:getstat('durable_subscriptions.count'); stats(live_connections) -> emqx_stats:getstat('live_connections.count'); stats(cluster_sessions) -> emqx_stats:getstat('cluster_sessions.count'); stats(topics) -> emqx_stats:getstat('topics.count'); diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl index 9d9b095f0..1dca9d341 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl @@ -194,8 +194,11 @@ swagger_desc(validation_failed) -> swagger_desc_format("Schema validations failed "); swagger_desc(persisted) -> swagger_desc_format("Messages saved to the durable storage "); +swagger_desc(durable_subscriptions) -> + <<"Subscriptions from durable sessions at the time of sampling.", ?APPROXIMATE_DESC>>; swagger_desc(subscriptions) -> - <<"Subscriptions at the time of sampling.", ?APPROXIMATE_DESC>>; + <<"Subscriptions at the time of sampling (not considering durable sessions).", + ?APPROXIMATE_DESC>>; swagger_desc(topics) -> <<"Count topics at the time of sampling.", ?APPROXIMATE_DESC>>; swagger_desc(connections) -> diff --git a/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl b/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl index d581bd5fc..f174e03be 100644 --- a/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl +++ b/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl @@ -345,7 +345,8 @@ t_persistent_session_stats(_Config) -> %% and non-persistent routes, so we count `commont/topic' twice and get 8 %% instead of 6 here. <<"topics">> := 8, - <<"subscriptions">> := 8 + <<"durable_subscriptions">> := 4, + <<"subscriptions">> := 4 }}, request(["monitor_current"]) ) diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index 9177d255e..95303a1e6 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -270,6 +270,12 @@ get_metrics() -> get_metrics(Node) -> unwrap_rpc(emqx_proto_v1:get_metrics(Node)). +aggregated_only_keys() -> + [ + 'durable_subscriptions.count', + 'durable_subscriptions.max' + ]. + get_stats() -> GlobalStatsKeys = [ @@ -294,7 +300,7 @@ get_stats() -> emqx:running_nodes() ) ), - GlobalStats = maps:with(GlobalStatsKeys, maps:from_list(get_stats(node()))), + GlobalStats = maps:with(GlobalStatsKeys, maps:from_list(emqx_stats:getstats())), maps:merge(CountStats, GlobalStats). delete_keys(List, []) -> @@ -303,7 +309,12 @@ delete_keys(List, [Key | Keys]) -> delete_keys(proplists:delete(Key, List), Keys). get_stats(Node) -> - unwrap_rpc(emqx_proto_v1:get_stats(Node)). + case unwrap_rpc(emqx_proto_v1:get_stats(Node)) of + {error, _} = Error -> + Error; + Stats when is_list(Stats) -> + delete_keys(Stats, aggregated_only_keys()) + end. nodes_info_count(PropList) -> NodeCount = diff --git a/apps/emqx_management/src/emqx_mgmt_api_nodes.erl b/apps/emqx_management/src/emqx_mgmt_api_nodes.erl index ec9d1272b..9d9ded7b8 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_nodes.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_nodes.erl @@ -122,7 +122,7 @@ schema("/nodes/:node/stats") -> responses => #{ 200 => mk( - ref(?NODE_STATS_MODULE, node_stats_data), + ref(?NODE_STATS_MODULE, aggregated_data), #{desc => <<"Get node stats successfully">>} ), 404 => not_found() diff --git a/apps/emqx_management/src/emqx_mgmt_api_stats.erl b/apps/emqx_management/src/emqx_mgmt_api_stats.erl index 8d5d964de..5e7d279b1 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_stats.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_stats.erl @@ -60,8 +60,8 @@ schema("/stats") -> #{ 200 => mk( hoconsc:union([ - ref(?MODULE, node_stats_data), - array(ref(?MODULE, aggergate_data)) + array(ref(?MODULE, per_node_data)), + ref(?MODULE, aggregated_data) ]), #{desc => <<"List stats ok">>} ) @@ -82,7 +82,7 @@ fields(aggregate) -> } )} ]; -fields(node_stats_data) -> +fields(aggregated_data) -> [ stats_schema('channels.count', <<"sessions.count">>), stats_schema('channels.max', <<"session.max">>), @@ -106,7 +106,10 @@ fields(node_stats_data) -> stats_schema('subscribers.max', <<"Historical maximum number of subscribers">>), stats_schema( 'subscriptions.count', - <<"Number of current subscriptions, including shared subscriptions">> + << + "Number of current subscriptions, including shared subscriptions," + " but not subscriptions from durable sessions" + >> ), stats_schema('subscriptions.max', <<"Historical maximum number of subscriptions">>), stats_schema('subscriptions.shared.count', <<"Number of current shared subscriptions">>), @@ -116,14 +119,18 @@ fields(node_stats_data) -> stats_schema('topics.count', <<"Number of current topics">>), stats_schema('topics.max', <<"Historical maximum number of topics">>) ]; -fields(aggergate_data) -> +fields(per_node_data) -> [ {node, mk(string(), #{ desc => <<"Node name">>, example => <<"emqx@127.0.0.1">> - })} - ] ++ fields(node_stats_data). + })}, + stats_schema( + 'durable_subscriptions.count', + <<"Number of current subscriptions from durable sessions in the cluster">> + ) + ] ++ fields(aggregated_data). stats_schema(Name, Desc) -> {Name, mk(non_neg_integer(), #{desc => Desc, example => 0})}. diff --git a/apps/emqx_management/test/emqx_mgmt_api_stats_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_stats_SUITE.erl index 962c004be..1db30a73b 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_stats_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_stats_SUITE.erl @@ -25,12 +25,22 @@ all() -> init_per_suite(Config) -> meck:expect(emqx, running_nodes, 0, [node(), 'fake@node']), - emqx_mgmt_api_test_util:init_suite(), - Config. + Apps = emqx_cth_suite:start( + [ + emqx, + emqx_management, + emqx_mgmt_api_test_util:emqx_dashboard() + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + {ok, _Api} = emqx_common_test_http:create_default_app(), + [{apps, Apps} | Config]. -end_per_suite(_) -> +end_per_suite(Config) -> + Apps = proplists:get_value(apps, Config), meck:unload(emqx), - emqx_mgmt_api_test_util:end_suite(). + emqx_cth_suite:stop(Apps), + ok. t_stats_api(_) -> S = emqx_mgmt_api_test_util:api_path(["stats?aggregate=false"]), @@ -39,7 +49,8 @@ t_stats_api(_) -> SystemStats1 = emqx_mgmt:get_stats(), Fun1 = fun(Key) -> - ?assertEqual(maps:get(Key, SystemStats1), maps:get(atom_to_binary(Key, utf8), Stats1)) + ?assertEqual(maps:get(Key, SystemStats1), maps:get(atom_to_binary(Key, utf8), Stats1)), + ?assertNot(is_map_key(<<"durable_subscriptions.count">>, Stats1), #{stats => Stats1}) end, lists:foreach(Fun1, maps:keys(SystemStats1)), StatsPath = emqx_mgmt_api_test_util:api_path(["stats?aggregate=true"]), diff --git a/apps/emqx_prometheus/src/emqx_prometheus.erl b/apps/emqx_prometheus/src/emqx_prometheus.erl index 450033f18..667af1a30 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus.erl @@ -331,6 +331,8 @@ emqx_collect(K = emqx_channels_max, D) -> gauge_metrics(?MG(K, D)); emqx_collect(K = emqx_cluster_sessions_count, D) -> gauge_metrics(?MG(K, D)); emqx_collect(K = emqx_cluster_sessions_max, D) -> gauge_metrics(?MG(K, D)); %% pub/sub stats +emqx_collect(K = emqx_durable_subscriptions_count, D) -> gauge_metrics(?MG(K, D)); +emqx_collect(K = emqx_durable_subscriptions_max, D) -> gauge_metrics(?MG(K, D)); emqx_collect(K = emqx_topics_count, D) -> gauge_metrics(?MG(K, D)); emqx_collect(K = emqx_topics_max, D) -> gauge_metrics(?MG(K, D)); emqx_collect(K = emqx_suboptions_count, D) -> gauge_metrics(?MG(K, D)); @@ -541,6 +543,8 @@ stats_metric_meta() -> {emqx_subscribers_max, gauge, 'subscribers.max'}, {emqx_subscriptions_count, gauge, 'subscriptions.count'}, {emqx_subscriptions_max, gauge, 'subscriptions.max'}, + {emqx_durable_subscriptions_count, gauge, 'durable_subscriptions.count'}, + {emqx_durable_subscriptions_max, gauge, 'durable_subscriptions.max'}, %% delayed {emqx_delayed_count, gauge, 'delayed.count'}, {emqx_delayed_max, gauge, 'delayed.max'} diff --git a/apps/emqx_prometheus/test/emqx_prometheus_data_SUITE.erl b/apps/emqx_prometheus/test/emqx_prometheus_data_SUITE.erl index 62c454e64..115279852 100644 --- a/apps/emqx_prometheus/test/emqx_prometheus_data_SUITE.erl +++ b/apps/emqx_prometheus/test/emqx_prometheus_data_SUITE.erl @@ -402,6 +402,8 @@ assert_json_data__stats(M, Mode) when #{ emqx_connections_count := _, emqx_connections_max := _, + emqx_durable_subscriptions_count := _, + emqx_durable_subscriptions_max := _, emqx_live_connections_count := _, emqx_live_connections_max := _, emqx_sessions_count := _, diff --git a/changes/ce/fix-13067.en.md b/changes/ce/fix-13067.en.md new file mode 100644 index 000000000..fb410ccb0 --- /dev/null +++ b/changes/ce/fix-13067.en.md @@ -0,0 +1 @@ +Adds a new `durable_subscriptions.count` statistic to track subscriptions that are tied to durable sessions. `subscriptions.count` does not include such subscriptions.