diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl index d3b533ebf..8d6a1dbc9 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl @@ -264,8 +264,8 @@ merge_cluster_rate(Node, Cluster) -> %% cluster-synced values (disconnected_durable_sessions, V, NCluster) -> NCluster#{disconnected_durable_sessions => V}; - (durable_subscriptions, V, NCluster) -> - NCluster#{durable_subscriptions => V}; + (subscriptions_durable, V, NCluster) -> + NCluster#{subscriptions_durable => V}; (topics, V, NCluster) -> NCluster#{topics => V}; (retained_msg_count, V, NCluster) -> @@ -281,7 +281,28 @@ merge_cluster_rate(Node, Cluster) -> ClusterValue = maps:get(Key, NCluster, 0), NCluster#{Key => Value + ClusterValue} end, - maps:fold(Fun, Cluster, Node). + Metrics = maps:fold(Fun, Cluster, Node), + adjust_synthetic_cluster_metrics(Metrics). + +adjust_synthetic_cluster_metrics(Metrics0) -> + %% ensure renamed + Metrics1 = emqx_utils_maps:rename(durable_subscriptions, subscriptions_durable, Metrics0), + DSSubs = maps:get(subscriptions_durable, Metrics1, 0), + RamSubs = maps:get(subscriptions, Metrics1, 0), + DisconnectedDSs = maps:get(disconnected_durable_sessions, Metrics1, 0), + Metrics2 = maps:update_with( + subscriptions, + fun(Subs) -> Subs + DSSubs end, + 0, + Metrics1 + ), + Metrics = maps:put(subscriptions_ram, RamSubs, Metrics2), + maps:update_with( + connections, + fun(RamConns) -> RamConns + DisconnectedDSs end, + DisconnectedDSs, + Metrics + ). format({badrpc, Reason}) -> {badrpc, Reason}; diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl index c8bb9c8be..3d9536299 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl @@ -19,6 +19,7 @@ -include("emqx_dashboard.hrl"). -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hocon_types.hrl"). +-include_lib("emqx_utils/include/emqx_utils_api.hrl"). -behaviour(minirest_api). @@ -159,7 +160,12 @@ dashboard_samplers_fun(Latest) -> monitor_current(get, #{bindings := Bindings}) -> RawNode = maps:get(node, Bindings, <<"all">>), - emqx_utils_api:with_node_or_cluster(RawNode, fun current_rate/1). + case emqx_utils_api:with_node_or_cluster(RawNode, fun current_rate/1) of + ?OK(Rates) -> + ?OK(maybe_reject_cluster_only_metrics(RawNode, Rates)); + Error -> + Error + end. -spec current_rate(atom()) -> {error, term()} @@ -242,3 +248,12 @@ swagger_desc_format(Format) -> swagger_desc_format(Format, Type) -> Interval = emqx_conf:get([dashboard, monitor, interval], ?DEFAULT_SAMPLE_INTERVAL), list_to_binary(io_lib:format(Format ++ "~p ~p seconds", [Type, Interval])). + +maybe_reject_cluster_only_metrics(<<"all">>, Rates) -> + Rates; +maybe_reject_cluster_only_metrics(_Node, Rates) -> + ClusterOnlyMetrics = [ + durable_subscriptions, + disconnected_durable_sessions + ], + maps:without(ClusterOnlyMetrics, Rates). diff --git a/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl b/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl index 59951faa9..8286810db 100644 --- a/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl +++ b/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl @@ -196,13 +196,22 @@ t_monitor_current_api(_) -> {ok, Rate} = request(["monitor_current"]), [ ?assert(maps:is_key(atom_to_binary(Key, utf8), Rate)) - || Key <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST + || Key <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST, + %% We rename `durable_subscriptions' key. + Key =/= durable_subscriptions ], + ?assert(maps:is_key(<<"subscriptions_durable">>, Rate)), + ?assert(maps:is_key(<<"disconnected_durable_sessions">>, Rate)), + ClusterOnlyMetrics = [durable_subscriptions, disconnected_durable_sessions], {ok, NodeRate} = request(["monitor_current", "nodes", node()]), [ - ?assert(maps:is_key(atom_to_binary(Key, utf8), NodeRate)) - || Key <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST + ?assert(maps:is_key(atom_to_binary(Key, utf8), NodeRate), #{key => Key, rates => NodeRate}) + || Key <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST, + not lists:member(Key, ClusterOnlyMetrics) ], + ?assertNot(maps:is_key(<<"subscriptions_durable">>, NodeRate)), + ?assertNot(maps:is_key(<<"subscriptions_ram">>, NodeRate)), + ?assertNot(maps:is_key(<<"disconnected_durable_sessions">>, NodeRate)), ok. t_monitor_current_api_live_connections(_) -> @@ -290,8 +299,11 @@ t_monitor_reset(_) -> {ok, Rate} = request(["monitor_current"]), [ ?assert(maps:is_key(atom_to_binary(Key, utf8), Rate)) - || Key <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST + || Key <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST, + %% We rename `durable_subscriptions' key. + Key =/= durable_subscriptions ], + ?assert(maps:is_key(<<"subscriptions_durable">>, Rate)), {ok, _} = snabbkaffe:block_until( ?match_n_events(1, #{?snk_kind := dashboard_monitor_flushed}), @@ -347,8 +359,9 @@ t_persistent_session_stats(_Config) -> %% and non-persistent routes, so we count `commont/topic' twice and get 8 %% instead of 6 here. <<"topics">> := 8, - <<"durable_subscriptions">> := 4, - <<"subscriptions">> := 4 + <<"subscriptions">> := 8, + <<"subscriptions_ram">> := 4, + <<"subscriptions_durable">> := 4 }}, request(["monitor_current"]) ) @@ -368,14 +381,15 @@ t_persistent_session_stats(_Config) -> ?retry(1_000, 10, begin ?assertMatch( {ok, #{ - <<"connections">> := 1, + <<"connections">> := 2, <<"disconnected_durable_sessions">> := 1, %% N.B.: we currently don't perform any deduplication between persistent %% and non-persistent routes, so we count `commont/topic' twice and get 8 %% instead of 6 here. <<"topics">> := 8, - <<"durable_subscriptions">> := 4, - <<"subscriptions">> := 4 + <<"subscriptions">> := 8, + <<"subscriptions_ram">> := 4, + <<"subscriptions_durable">> := 4 }}, request(["monitor_current"]) )