fix(monitor api): fix cluster metric aggregation

Fixes https://emqx.atlassian.net/browse/EMQX-12423
Fixes https://emqx.atlassian.net/browse/EMQX-12267
This commit is contained in:
Thales Macedo Garitezi 2024-05-23 09:31:00 -03:00
parent 398dc97ed6
commit c0e3a81c61
4 changed files with 105 additions and 40 deletions

View File

@ -109,6 +109,7 @@ start(Nodes, ClusterOpts) ->
start(NodeSpecs). start(NodeSpecs).
start(NodeSpecs) -> start(NodeSpecs) ->
emqx_common_test_helpers:clear_screen(),
ct:pal("(Re)starting nodes:\n ~p", [NodeSpecs]), ct:pal("(Re)starting nodes:\n ~p", [NodeSpecs]),
% 1. Start bare nodes with only basic applications running % 1. Start bare nodes with only basic applications running
ok = start_nodes_init(NodeSpecs, ?TIMEOUT_NODE_START_MS), ok = start_nodes_init(NodeSpecs, ?TIMEOUT_NODE_START_MS),

View File

@ -119,7 +119,7 @@ current_rate(all) ->
current_rate(Node) when Node == node() -> current_rate(Node) when Node == node() ->
try try
{ok, Rate} = do_call(current_rate), {ok, Rate} = do_call(current_rate),
{ok, Rate} {ok, adjust_individual_node_metrics(Rate)}
catch catch
_E:R -> _E:R ->
?SLOG(warning, #{msg => "dashboard_monitor_error", reason => R}), ?SLOG(warning, #{msg => "dashboard_monitor_error", reason => R}),
@ -156,8 +156,8 @@ current_rate_cluster() ->
case lists:foldl(Fun, #{}, mria:cluster_nodes(running)) of case lists:foldl(Fun, #{}, mria:cluster_nodes(running)) of
{badrpc, Reason} -> {badrpc, Reason} ->
{badrpc, Reason}; {badrpc, Reason};
Rate -> Metrics ->
{ok, Rate} {ok, adjust_synthetic_cluster_metrics(Metrics)}
end. end.
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
@ -281,22 +281,23 @@ merge_cluster_rate(Node, Cluster) ->
ClusterValue = maps:get(Key, NCluster, 0), ClusterValue = maps:get(Key, NCluster, 0),
NCluster#{Key => Value + ClusterValue} NCluster#{Key => Value + ClusterValue}
end, end,
Metrics = maps:fold(Fun, Cluster, Node), maps:fold(Fun, Cluster, Node).
adjust_synthetic_cluster_metrics(Metrics).
adjust_individual_node_metrics(Metrics0) ->
%% ensure renamed
emqx_utils_maps:rename(durable_subscriptions, subscriptions_durable, Metrics0).
adjust_synthetic_cluster_metrics(Metrics0) -> adjust_synthetic_cluster_metrics(Metrics0) ->
%% ensure renamed DSSubs = maps:get(subscriptions_durable, Metrics0, 0),
Metrics1 = emqx_utils_maps:rename(durable_subscriptions, subscriptions_durable, Metrics0), RamSubs = maps:get(subscriptions, Metrics0, 0),
DSSubs = maps:get(subscriptions_durable, Metrics1, 0), DisconnectedDSs = maps:get(disconnected_durable_sessions, Metrics0, 0),
RamSubs = maps:get(subscriptions, Metrics1, 0), Metrics1 = maps:update_with(
DisconnectedDSs = maps:get(disconnected_durable_sessions, Metrics1, 0),
Metrics2 = maps:update_with(
subscriptions, subscriptions,
fun(Subs) -> Subs + DSSubs end, fun(Subs) -> Subs + DSSubs end,
0, 0,
Metrics1 Metrics0
), ),
Metrics = maps:put(subscriptions_ram, RamSubs, Metrics2), Metrics = maps:put(subscriptions_ram, RamSubs, Metrics1),
maps:update_with( maps:update_with(
connections, connections,
fun(RamConns) -> RamConns + DisconnectedDSs end, fun(RamConns) -> RamConns + DisconnectedDSs end,

View File

@ -253,7 +253,7 @@ maybe_reject_cluster_only_metrics(<<"all">>, Rates) ->
Rates; Rates;
maybe_reject_cluster_only_metrics(_Node, Rates) -> maybe_reject_cluster_only_metrics(_Node, Rates) ->
ClusterOnlyMetrics = [ ClusterOnlyMetrics = [
durable_subscriptions, subscriptions_durable,
disconnected_durable_sessions disconnected_durable_sessions
], ],
maps:without(ClusterOnlyMetrics, Rates). maps:without(ClusterOnlyMetrics, Rates).

View File

@ -49,6 +49,8 @@
"}" "}"
>>). >>).
-define(ON(NODE, BODY), erpc:call(NODE, fun() -> BODY end)).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% CT boilerplate %% CT boilerplate
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -79,21 +81,37 @@ end_per_suite(_Config) ->
ok. ok.
init_per_group(persistent_sessions = Group, Config) -> init_per_group(persistent_sessions = Group, Config) ->
Apps = emqx_cth_suite:start( AppSpecsFn = fun(Enable) ->
Port =
case Enable of
true -> "18083";
false -> "0"
end,
[ [
emqx_conf, emqx_conf,
{emqx, "durable_sessions {enable = true}"}, {emqx, "durable_sessions {enable = true}"},
{emqx_retainer, ?BASE_RETAINER_CONF}, {emqx_retainer, ?BASE_RETAINER_CONF},
emqx_management, emqx_management,
emqx_mgmt_api_test_util:emqx_dashboard( emqx_mgmt_api_test_util:emqx_dashboard(
"dashboard.listeners.http { enable = true, bind = 18083 }\n" lists:concat([
"dashboard.sample_interval = 1s" "dashboard.listeners.http { bind = " ++ Port ++ " }\n",
"dashboard.sample_interval = 1s\n",
"dashboard.listeners.http.enable = " ++ atom_to_list(Enable)
])
) )
], ]
#{work_dir => emqx_cth_suite:work_dir(Group, Config)} end,
), NodeSpecs = [
{ok, _} = emqx_common_test_http:create_default_app(), {dashboard_monitor1, #{apps => AppSpecsFn(true)}},
[{apps, Apps} | Config]; {dashboard_monitor2, #{apps => AppSpecsFn(false)}}
],
Nodes =
[N1 | _] = emqx_cth_cluster:start(
NodeSpecs,
#{work_dir => emqx_cth_suite:work_dir(Group, Config)}
),
?ON(N1, {ok, _} = emqx_common_test_http:create_default_app()),
[{cluster, Nodes} | Config];
init_per_group(common = Group, Config) -> init_per_group(common = Group, Config) ->
Apps = emqx_cth_suite:start( Apps = emqx_cth_suite:start(
[ [
@ -111,7 +129,11 @@ init_per_group(common = Group, Config) ->
{ok, _} = emqx_common_test_http:create_default_app(), {ok, _} = emqx_common_test_http:create_default_app(),
[{apps, Apps} | Config]. [{apps, Apps} | Config].
end_per_group(_Group, Config) -> end_per_group(persistent_sessions, Config) ->
Cluster = ?config(cluster, Config),
emqx_cth_cluster:stop(Cluster),
ok;
end_per_group(common, Config) ->
Apps = ?config(apps, Config), Apps = ?config(apps, Config),
emqx_cth_suite:stop(Apps), emqx_cth_suite:stop(Apps),
ok. ok.
@ -325,26 +347,36 @@ t_monitor_api_error(_) ->
ok. ok.
%% Verifies that subscriptions from persistent sessions are correctly accounted for. %% Verifies that subscriptions from persistent sessions are correctly accounted for.
t_persistent_session_stats(_Config) -> t_persistent_session_stats(Config) ->
[N1, N2 | _] = ?config(cluster, Config),
%% pre-condition %% pre-condition
true = emqx_persistent_message:is_persistence_enabled(), true = ?ON(N1, emqx_persistent_message:is_persistence_enabled()),
Port1 = get_mqtt_port(N1, tcp),
Port2 = get_mqtt_port(N2, tcp),
NonPSClient = start_and_connect(#{ NonPSClient = start_and_connect(#{
port => Port1,
clientid => <<"non-ps">>, clientid => <<"non-ps">>,
expiry_interval => 0 expiry_interval => 0
}), }),
PSClient = start_and_connect(#{ PSClient1 = start_and_connect(#{
clientid => <<"ps">>, port => Port1,
clientid => <<"ps1">>,
expiry_interval => 30
}),
PSClient2 = start_and_connect(#{
port => Port2,
clientid => <<"ps2">>,
expiry_interval => 30 expiry_interval => 30
}), }),
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(NonPSClient, <<"non/ps/topic/+">>, 2), {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(NonPSClient, <<"non/ps/topic/+">>, 2),
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(NonPSClient, <<"non/ps/topic">>, 2), {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(NonPSClient, <<"non/ps/topic">>, 2),
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(NonPSClient, <<"common/topic/+">>, 2), {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(NonPSClient, <<"common/topic/+">>, 2),
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(NonPSClient, <<"common/topic">>, 2), {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(NonPSClient, <<"common/topic">>, 2),
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient, <<"ps/topic/+">>, 2), {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient1, <<"ps/topic/+">>, 2),
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient, <<"ps/topic">>, 2), {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient1, <<"ps/topic">>, 2),
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient, <<"common/topic/+">>, 2), {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient1, <<"common/topic/+">>, 2),
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient, <<"common/topic">>, 2), {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient1, <<"common/topic">>, 2),
{ok, _} = {ok, _} =
snabbkaffe:block_until( snabbkaffe:block_until(
?match_n_events(2, #{?snk_kind := dashboard_monitor_flushed}), ?match_n_events(2, #{?snk_kind := dashboard_monitor_flushed}),
@ -353,7 +385,7 @@ t_persistent_session_stats(_Config) ->
?retry(1_000, 10, begin ?retry(1_000, 10, begin
?assertMatch( ?assertMatch(
{ok, #{ {ok, #{
<<"connections">> := 2, <<"connections">> := 3,
<<"disconnected_durable_sessions">> := 0, <<"disconnected_durable_sessions">> := 0,
%% N.B.: we currently don't perform any deduplication between persistent %% N.B.: we currently don't perform any deduplication between persistent
%% and non-persistent routes, so we count `commont/topic' twice and get 8 %% and non-persistent routes, so we count `commont/topic' twice and get 8
@ -363,25 +395,25 @@ t_persistent_session_stats(_Config) ->
<<"subscriptions_ram">> := 4, <<"subscriptions_ram">> := 4,
<<"subscriptions_durable">> := 4 <<"subscriptions_durable">> := 4
}}, }},
request(["monitor_current"]) ?ON(N1, request(["monitor_current"]))
) )
end), end),
%% Sanity checks %% Sanity checks
PSRouteCount = emqx_persistent_session_ds_router:stats(n_routes), PSRouteCount = ?ON(N1, emqx_persistent_session_ds_router:stats(n_routes)),
?assert(PSRouteCount > 0, #{ps_route_count => PSRouteCount}), ?assert(PSRouteCount > 0, #{ps_route_count => PSRouteCount}),
PSSubCount = emqx_persistent_session_bookkeeper:get_subscription_count(), PSSubCount = ?ON(N1, emqx_persistent_session_bookkeeper:get_subscription_count()),
?assert(PSSubCount > 0, #{ps_sub_count => PSSubCount}), ?assert(PSSubCount > 0, #{ps_sub_count => PSSubCount}),
%% Now with disconnected but alive persistent sessions %% Now with disconnected but alive persistent sessions
{ok, {ok, _}} = {ok, {ok, _}} =
?wait_async_action( ?wait_async_action(
emqtt:disconnect(PSClient), emqtt:disconnect(PSClient1),
#{?snk_kind := dashboard_monitor_flushed} #{?snk_kind := dashboard_monitor_flushed}
), ),
?retry(1_000, 10, begin ?retry(1_000, 10, begin
?assertMatch( ?assertMatch(
{ok, #{ {ok, #{
<<"connections">> := 2, <<"connections">> := 3,
<<"disconnected_durable_sessions">> := 1, <<"disconnected_durable_sessions">> := 1,
%% N.B.: we currently don't perform any deduplication between persistent %% N.B.: we currently don't perform any deduplication between persistent
%% and non-persistent routes, so we count `commont/topic' twice and get 8 %% and non-persistent routes, so we count `commont/topic' twice and get 8
@ -391,7 +423,28 @@ t_persistent_session_stats(_Config) ->
<<"subscriptions_ram">> := 4, <<"subscriptions_ram">> := 4,
<<"subscriptions_durable">> := 4 <<"subscriptions_durable">> := 4
}}, }},
request(["monitor_current"]) ?ON(N1, request(["monitor_current"]))
)
end),
{ok, {ok, _}} =
?wait_async_action(
emqtt:disconnect(PSClient2),
#{?snk_kind := dashboard_monitor_flushed}
),
?retry(1_000, 10, begin
?assertMatch(
{ok, #{
<<"connections">> := 3,
<<"disconnected_durable_sessions">> := 2,
%% N.B.: we currently don't perform any deduplication between persistent
%% and non-persistent routes, so we count `commont/topic' twice and get 8
%% instead of 6 here.
<<"topics">> := 8,
<<"subscriptions">> := 8,
<<"subscriptions_ram">> := 4,
<<"subscriptions_durable">> := 4
}},
?ON(N1, request(["monitor_current"]))
) )
end), end),
@ -467,15 +520,21 @@ waiting_emqx_stats_and_monitor_update(WaitKey) ->
ok. ok.
start_and_connect(Opts) -> start_and_connect(Opts) ->
Defaults = #{clean_start => false, expiry_interval => 30}, Defaults = #{
clean_start => false,
expiry_interval => 30,
port => 1883
},
#{ #{
clientid := ClientId, clientid := ClientId,
clean_start := CleanStart, clean_start := CleanStart,
expiry_interval := EI expiry_interval := EI,
port := Port
} = maps:merge(Defaults, Opts), } = maps:merge(Defaults, Opts),
{ok, Client} = emqtt:start_link([ {ok, Client} = emqtt:start_link([
{clientid, ClientId}, {clientid, ClientId},
{clean_start, CleanStart}, {clean_start, CleanStart},
{port, Port},
{proto_ver, v5}, {proto_ver, v5},
{properties, #{'Session-Expiry-Interval' => EI}} {properties, #{'Session-Expiry-Interval' => EI}}
]), ]),
@ -484,3 +543,7 @@ start_and_connect(Opts) ->
end), end),
{ok, _} = emqtt:connect(Client), {ok, _} = emqtt:connect(Client),
Client. Client.
get_mqtt_port(Node, Type) ->
{_IP, Port} = ?ON(Node, emqx_config:get([listeners, Type, default, bind])),
Port.