Merge pull request #13095 from thalesmg/fix-monitor-ds-counters-again-r57-20240522
fix(monitor api): add ds connections and subscriptions to old counters
This commit is contained in:
commit
63cb12d7c2
|
@ -264,8 +264,8 @@ merge_cluster_rate(Node, Cluster) ->
|
||||||
%% cluster-synced values
|
%% cluster-synced values
|
||||||
(disconnected_durable_sessions, V, NCluster) ->
|
(disconnected_durable_sessions, V, NCluster) ->
|
||||||
NCluster#{disconnected_durable_sessions => V};
|
NCluster#{disconnected_durable_sessions => V};
|
||||||
(durable_subscriptions, V, NCluster) ->
|
(subscriptions_durable, V, NCluster) ->
|
||||||
NCluster#{durable_subscriptions => V};
|
NCluster#{subscriptions_durable => V};
|
||||||
(topics, V, NCluster) ->
|
(topics, V, NCluster) ->
|
||||||
NCluster#{topics => V};
|
NCluster#{topics => V};
|
||||||
(retained_msg_count, V, NCluster) ->
|
(retained_msg_count, V, NCluster) ->
|
||||||
|
@ -281,7 +281,28 @@ merge_cluster_rate(Node, Cluster) ->
|
||||||
ClusterValue = maps:get(Key, NCluster, 0),
|
ClusterValue = maps:get(Key, NCluster, 0),
|
||||||
NCluster#{Key => Value + ClusterValue}
|
NCluster#{Key => Value + ClusterValue}
|
||||||
end,
|
end,
|
||||||
maps:fold(Fun, Cluster, Node).
|
Metrics = maps:fold(Fun, Cluster, Node),
|
||||||
|
adjust_synthetic_cluster_metrics(Metrics).
|
||||||
|
|
||||||
|
adjust_synthetic_cluster_metrics(Metrics0) ->
|
||||||
|
%% ensure renamed
|
||||||
|
Metrics1 = emqx_utils_maps:rename(durable_subscriptions, subscriptions_durable, Metrics0),
|
||||||
|
DSSubs = maps:get(subscriptions_durable, Metrics1, 0),
|
||||||
|
RamSubs = maps:get(subscriptions, Metrics1, 0),
|
||||||
|
DisconnectedDSs = maps:get(disconnected_durable_sessions, Metrics1, 0),
|
||||||
|
Metrics2 = maps:update_with(
|
||||||
|
subscriptions,
|
||||||
|
fun(Subs) -> Subs + DSSubs end,
|
||||||
|
0,
|
||||||
|
Metrics1
|
||||||
|
),
|
||||||
|
Metrics = maps:put(subscriptions_ram, RamSubs, Metrics2),
|
||||||
|
maps:update_with(
|
||||||
|
connections,
|
||||||
|
fun(RamConns) -> RamConns + DisconnectedDSs end,
|
||||||
|
DisconnectedDSs,
|
||||||
|
Metrics
|
||||||
|
).
|
||||||
|
|
||||||
format({badrpc, Reason}) ->
|
format({badrpc, Reason}) ->
|
||||||
{badrpc, Reason};
|
{badrpc, Reason};
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
-include("emqx_dashboard.hrl").
|
-include("emqx_dashboard.hrl").
|
||||||
-include_lib("typerefl/include/types.hrl").
|
-include_lib("typerefl/include/types.hrl").
|
||||||
-include_lib("hocon/include/hocon_types.hrl").
|
-include_lib("hocon/include/hocon_types.hrl").
|
||||||
|
-include_lib("emqx_utils/include/emqx_utils_api.hrl").
|
||||||
|
|
||||||
-behaviour(minirest_api).
|
-behaviour(minirest_api).
|
||||||
|
|
||||||
|
@ -159,7 +160,12 @@ dashboard_samplers_fun(Latest) ->
|
||||||
|
|
||||||
monitor_current(get, #{bindings := Bindings}) ->
|
monitor_current(get, #{bindings := Bindings}) ->
|
||||||
RawNode = maps:get(node, Bindings, <<"all">>),
|
RawNode = maps:get(node, Bindings, <<"all">>),
|
||||||
emqx_utils_api:with_node_or_cluster(RawNode, fun current_rate/1).
|
case emqx_utils_api:with_node_or_cluster(RawNode, fun current_rate/1) of
|
||||||
|
?OK(Rates) ->
|
||||||
|
?OK(maybe_reject_cluster_only_metrics(RawNode, Rates));
|
||||||
|
Error ->
|
||||||
|
Error
|
||||||
|
end.
|
||||||
|
|
||||||
-spec current_rate(atom()) ->
|
-spec current_rate(atom()) ->
|
||||||
{error, term()}
|
{error, term()}
|
||||||
|
@ -242,3 +248,12 @@ swagger_desc_format(Format) ->
|
||||||
swagger_desc_format(Format, Type) ->
|
swagger_desc_format(Format, Type) ->
|
||||||
Interval = emqx_conf:get([dashboard, monitor, interval], ?DEFAULT_SAMPLE_INTERVAL),
|
Interval = emqx_conf:get([dashboard, monitor, interval], ?DEFAULT_SAMPLE_INTERVAL),
|
||||||
list_to_binary(io_lib:format(Format ++ "~p ~p seconds", [Type, Interval])).
|
list_to_binary(io_lib:format(Format ++ "~p ~p seconds", [Type, Interval])).
|
||||||
|
|
||||||
|
maybe_reject_cluster_only_metrics(<<"all">>, Rates) ->
|
||||||
|
Rates;
|
||||||
|
maybe_reject_cluster_only_metrics(_Node, Rates) ->
|
||||||
|
ClusterOnlyMetrics = [
|
||||||
|
durable_subscriptions,
|
||||||
|
disconnected_durable_sessions
|
||||||
|
],
|
||||||
|
maps:without(ClusterOnlyMetrics, Rates).
|
||||||
|
|
|
@ -196,13 +196,22 @@ t_monitor_current_api(_) ->
|
||||||
{ok, Rate} = request(["monitor_current"]),
|
{ok, Rate} = request(["monitor_current"]),
|
||||||
[
|
[
|
||||||
?assert(maps:is_key(atom_to_binary(Key, utf8), Rate))
|
?assert(maps:is_key(atom_to_binary(Key, utf8), Rate))
|
||||||
|| Key <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST
|
|| Key <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST,
|
||||||
|
%% We rename `durable_subscriptions' key.
|
||||||
|
Key =/= durable_subscriptions
|
||||||
],
|
],
|
||||||
|
?assert(maps:is_key(<<"subscriptions_durable">>, Rate)),
|
||||||
|
?assert(maps:is_key(<<"disconnected_durable_sessions">>, Rate)),
|
||||||
|
ClusterOnlyMetrics = [durable_subscriptions, disconnected_durable_sessions],
|
||||||
{ok, NodeRate} = request(["monitor_current", "nodes", node()]),
|
{ok, NodeRate} = request(["monitor_current", "nodes", node()]),
|
||||||
[
|
[
|
||||||
?assert(maps:is_key(atom_to_binary(Key, utf8), NodeRate))
|
?assert(maps:is_key(atom_to_binary(Key, utf8), NodeRate), #{key => Key, rates => NodeRate})
|
||||||
|| Key <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST
|
|| Key <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST,
|
||||||
|
not lists:member(Key, ClusterOnlyMetrics)
|
||||||
],
|
],
|
||||||
|
?assertNot(maps:is_key(<<"subscriptions_durable">>, NodeRate)),
|
||||||
|
?assertNot(maps:is_key(<<"subscriptions_ram">>, NodeRate)),
|
||||||
|
?assertNot(maps:is_key(<<"disconnected_durable_sessions">>, NodeRate)),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_monitor_current_api_live_connections(_) ->
|
t_monitor_current_api_live_connections(_) ->
|
||||||
|
@ -290,8 +299,11 @@ t_monitor_reset(_) ->
|
||||||
{ok, Rate} = request(["monitor_current"]),
|
{ok, Rate} = request(["monitor_current"]),
|
||||||
[
|
[
|
||||||
?assert(maps:is_key(atom_to_binary(Key, utf8), Rate))
|
?assert(maps:is_key(atom_to_binary(Key, utf8), Rate))
|
||||||
|| Key <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST
|
|| Key <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST,
|
||||||
|
%% We rename `durable_subscriptions' key.
|
||||||
|
Key =/= durable_subscriptions
|
||||||
],
|
],
|
||||||
|
?assert(maps:is_key(<<"subscriptions_durable">>, Rate)),
|
||||||
{ok, _} =
|
{ok, _} =
|
||||||
snabbkaffe:block_until(
|
snabbkaffe:block_until(
|
||||||
?match_n_events(1, #{?snk_kind := dashboard_monitor_flushed}),
|
?match_n_events(1, #{?snk_kind := dashboard_monitor_flushed}),
|
||||||
|
@ -347,8 +359,9 @@ t_persistent_session_stats(_Config) ->
|
||||||
%% and non-persistent routes, so we count `commont/topic' twice and get 8
|
%% and non-persistent routes, so we count `commont/topic' twice and get 8
|
||||||
%% instead of 6 here.
|
%% instead of 6 here.
|
||||||
<<"topics">> := 8,
|
<<"topics">> := 8,
|
||||||
<<"durable_subscriptions">> := 4,
|
<<"subscriptions">> := 8,
|
||||||
<<"subscriptions">> := 4
|
<<"subscriptions_ram">> := 4,
|
||||||
|
<<"subscriptions_durable">> := 4
|
||||||
}},
|
}},
|
||||||
request(["monitor_current"])
|
request(["monitor_current"])
|
||||||
)
|
)
|
||||||
|
@ -368,14 +381,15 @@ t_persistent_session_stats(_Config) ->
|
||||||
?retry(1_000, 10, begin
|
?retry(1_000, 10, begin
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
{ok, #{
|
{ok, #{
|
||||||
<<"connections">> := 1,
|
<<"connections">> := 2,
|
||||||
<<"disconnected_durable_sessions">> := 1,
|
<<"disconnected_durable_sessions">> := 1,
|
||||||
%% N.B.: we currently don't perform any deduplication between persistent
|
%% N.B.: we currently don't perform any deduplication between persistent
|
||||||
%% and non-persistent routes, so we count `commont/topic' twice and get 8
|
%% and non-persistent routes, so we count `commont/topic' twice and get 8
|
||||||
%% instead of 6 here.
|
%% instead of 6 here.
|
||||||
<<"topics">> := 8,
|
<<"topics">> := 8,
|
||||||
<<"durable_subscriptions">> := 4,
|
<<"subscriptions">> := 8,
|
||||||
<<"subscriptions">> := 4
|
<<"subscriptions_ram">> := 4,
|
||||||
|
<<"subscriptions_durable">> := 4
|
||||||
}},
|
}},
|
||||||
request(["monitor_current"])
|
request(["monitor_current"])
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in New Issue