fix(monitor api): add ds connections and subscriptions to old counters

Fixes https://emqx.atlassian.net/browse/EMQX-12423
Fixes https://emqx.atlassian.net/browse/EMQX-12267
This commit is contained in:
Thales Macedo Garitezi 2024-05-22 13:49:23 -03:00
parent d4acceb858
commit 5cad4497de
3 changed files with 63 additions and 13 deletions

View File

@ -264,8 +264,8 @@ merge_cluster_rate(Node, Cluster) ->
%% cluster-synced values
(disconnected_durable_sessions, V, NCluster) ->
NCluster#{disconnected_durable_sessions => V};
(durable_subscriptions, V, NCluster) ->
NCluster#{durable_subscriptions => V};
(subscriptions_durable, V, NCluster) ->
NCluster#{subscriptions_durable => V};
(topics, V, NCluster) ->
NCluster#{topics => V};
(retained_msg_count, V, NCluster) ->
@ -281,7 +281,28 @@ merge_cluster_rate(Node, Cluster) ->
ClusterValue = maps:get(Key, NCluster, 0),
NCluster#{Key => Value + ClusterValue}
end,
maps:fold(Fun, Cluster, Node).
Metrics = maps:fold(Fun, Cluster, Node),
adjust_synthetic_cluster_metrics(Metrics).
adjust_synthetic_cluster_metrics(Metrics0) ->
%% ensure renamed
Metrics1 = emqx_utils_maps:rename(durable_subscriptions, subscriptions_durable, Metrics0),
DSSubs = maps:get(subscriptions_durable, Metrics1, 0),
RamSubs = maps:get(subscriptions, Metrics1, 0),
DisconnectedDSs = maps:get(disconnected_durable_sessions, Metrics1, 0),
Metrics2 = maps:update_with(
subscriptions,
fun(Subs) -> Subs + DSSubs end,
0,
Metrics1
),
Metrics = maps:put(subscriptions_ram, RamSubs, Metrics2),
maps:update_with(
connections,
fun(RamConns) -> RamConns + DisconnectedDSs end,
DisconnectedDSs,
Metrics
).
format({badrpc, Reason}) ->
{badrpc, Reason};

View File

@ -19,6 +19,7 @@
-include("emqx_dashboard.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hocon_types.hrl").
-include_lib("emqx_utils/include/emqx_utils_api.hrl").
-behaviour(minirest_api).
@ -159,7 +160,12 @@ dashboard_samplers_fun(Latest) ->
monitor_current(get, #{bindings := Bindings}) ->
RawNode = maps:get(node, Bindings, <<"all">>),
emqx_utils_api:with_node_or_cluster(RawNode, fun current_rate/1).
case emqx_utils_api:with_node_or_cluster(RawNode, fun current_rate/1) of
?OK(Rates) ->
?OK(maybe_reject_cluster_only_metrics(RawNode, Rates));
Error ->
Error
end.
-spec current_rate(atom()) ->
{error, term()}
@ -242,3 +248,12 @@ swagger_desc_format(Format) ->
swagger_desc_format(Format, Type) ->
Interval = emqx_conf:get([dashboard, monitor, interval], ?DEFAULT_SAMPLE_INTERVAL),
list_to_binary(io_lib:format(Format ++ "~p ~p seconds", [Type, Interval])).
maybe_reject_cluster_only_metrics(<<"all">>, Rates) ->
Rates;
maybe_reject_cluster_only_metrics(_Node, Rates) ->
ClusterOnlyMetrics = [
durable_subscriptions,
disconnected_durable_sessions
],
maps:without(ClusterOnlyMetrics, Rates).

View File

@ -196,13 +196,22 @@ t_monitor_current_api(_) ->
{ok, Rate} = request(["monitor_current"]),
[
?assert(maps:is_key(atom_to_binary(Key, utf8), Rate))
|| Key <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST
|| Key <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST,
%% We rename `durable_subscriptions' key.
Key =/= durable_subscriptions
],
?assert(maps:is_key(<<"subscriptions_durable">>, Rate)),
?assert(maps:is_key(<<"disconnected_durable_sessions">>, Rate)),
ClusterOnlyMetrics = [durable_subscriptions, disconnected_durable_sessions],
{ok, NodeRate} = request(["monitor_current", "nodes", node()]),
[
?assert(maps:is_key(atom_to_binary(Key, utf8), NodeRate))
|| Key <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST
?assert(maps:is_key(atom_to_binary(Key, utf8), NodeRate), #{key => Key, rates => NodeRate})
|| Key <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST,
not lists:member(Key, ClusterOnlyMetrics)
],
?assertNot(maps:is_key(<<"subscriptions_durable">>, NodeRate)),
?assertNot(maps:is_key(<<"subscriptions_ram">>, NodeRate)),
?assertNot(maps:is_key(<<"disconnected_durable_sessions">>, NodeRate)),
ok.
t_monitor_current_api_live_connections(_) ->
@ -290,8 +299,11 @@ t_monitor_reset(_) ->
{ok, Rate} = request(["monitor_current"]),
[
?assert(maps:is_key(atom_to_binary(Key, utf8), Rate))
|| Key <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST
|| Key <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST,
%% We rename `durable_subscriptions' key.
Key =/= durable_subscriptions
],
?assert(maps:is_key(<<"subscriptions_durable">>, Rate)),
{ok, _} =
snabbkaffe:block_until(
?match_n_events(1, #{?snk_kind := dashboard_monitor_flushed}),
@ -347,8 +359,9 @@ t_persistent_session_stats(_Config) ->
%% and non-persistent routes, so we count `commont/topic' twice and get 8
%% instead of 6 here.
<<"topics">> := 8,
<<"durable_subscriptions">> := 4,
<<"subscriptions">> := 4
<<"subscriptions">> := 8,
<<"subscriptions_ram">> := 4,
<<"subscriptions_durable">> := 4
}},
request(["monitor_current"])
)
@ -368,14 +381,15 @@ t_persistent_session_stats(_Config) ->
?retry(1_000, 10, begin
?assertMatch(
{ok, #{
<<"connections">> := 1,
<<"connections">> := 2,
<<"disconnected_durable_sessions">> := 1,
%% N.B.: we currently don't perform any deduplication between persistent
%% and non-persistent routes, so we count `commont/topic' twice and get 8
%% instead of 6 here.
<<"topics">> := 8,
<<"durable_subscriptions">> := 4,
<<"subscriptions">> := 4
<<"subscriptions">> := 8,
<<"subscriptions_ram">> := 4,
<<"subscriptions_durable">> := 4
}},
request(["monitor_current"])
)