diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_api.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_api.erl index 45b97e8f7..77f613e45 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_api.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_api.erl @@ -274,13 +274,41 @@ append_errors(RouterError, ResourceError, Node, Acc) -> aggregate_metrics(NodeMetrics) -> ErrorLogger = fun(_) -> ok end, - lists:foldl( - fun(#{metrics := Metrics}, Acc) -> - emqx_utils_maps:best_effort_recursive_sum(Metrics, Acc, ErrorLogger) + #{metrics := #{router := EmptyRouterMetrics}} = format_metrics(node(), #{}, #{}), + {RouterMetrics, ResourceMetrics} = lists:foldl( + fun( + #{metrics := #{router := RMetrics, forwarding := FMetrics}}, + {RouterAccIn, ResourceAccIn} + ) -> + ResourceAcc = + emqx_utils_maps:best_effort_recursive_sum(FMetrics, ResourceAccIn, ErrorLogger), + RouterAcc = merge_cluster_wide_metrics(RMetrics, RouterAccIn), + {RouterAcc, ResourceAcc} end, - #{}, + {EmptyRouterMetrics, #{}}, NodeMetrics - ). + ), + #{router => RouterMetrics, forwarding => ResourceMetrics}. + +merge_cluster_wide_metrics(Metrics, Acc) -> + %% For cluster-wide metrics, all nodes should report the same values, except if the + %% RPC to fetch a node's metrics failed, in which case all values will be 0. + F = + fun(_Key, V1, V2) -> + case {erlang:is_map(V1), erlang:is_map(V2)} of + {true, true} -> + merge_cluster_wide_metrics(V1, V2); + {true, false} -> + merge_cluster_wide_metrics(V1, #{}); + {false, true} -> + merge_cluster_wide_metrics(V2, #{}); + {false, false} -> + true = is_number(V1), + true = is_number(V2), + max(V1, V2) + end + end, + maps:merge_with(F, Acc, Metrics). format_metrics(Node, RouterMetrics, ResourceMetrics) -> Get = fun(Path, Map) -> emqx_utils_maps:deep_get(Path, Map, 0) end, diff --git a/apps/emqx_cluster_link/test/emqx_cluster_link_api_SUITE.erl b/apps/emqx_cluster_link/test/emqx_cluster_link_api_SUITE.erl index 6fd54e228..8157c86d6 100644 --- a/apps/emqx_cluster_link/test/emqx_cluster_link_api_SUITE.erl +++ b/apps/emqx_cluster_link/test/emqx_cluster_link_api_SUITE.erl @@ -612,15 +612,15 @@ t_metrics(Config) -> #{?snk_kind := clink_route_sync_complete} ), - %% Routes = 4 in source cluster, because the target cluster has some topic filters - %% configured and subscribers to them, which were replicated to the source cluster, - %% and we have 2 nodes with 2 routes each. + %% Routes = 2 in source cluster, because the target cluster has some topic filters + %% configured and subscribers to them, which were replicated to the source cluster. + %% This metric is global (cluster-wide). ?retry( 300, 10, ?assertMatch( {200, #{ - <<"metrics">> := #{<<"router">> := #{<<"routes">> := 4}}, + <<"metrics">> := #{<<"router">> := #{<<"routes">> := 2}}, <<"node_metrics">> := [ #{<<"metrics">> := #{<<"router">> := #{<<"routes">> := 2}}}, #{<<"metrics">> := #{<<"router">> := #{<<"routes">> := 2}}} @@ -652,7 +652,7 @@ t_metrics(Config) -> 10, ?assertMatch( {200, #{ - <<"metrics">> := #{<<"router">> := #{<<"routes">> := 2}}, + <<"metrics">> := #{<<"router">> := #{<<"routes">> := 1}}, <<"node_metrics">> := [ #{<<"metrics">> := #{<<"router">> := #{<<"routes">> := 1}}}, #{<<"metrics">> := #{<<"router">> := #{<<"routes">> := 1}}} @@ -712,7 +712,7 @@ t_metrics(Config) -> 10, ?assertMatch( {200, #{ - <<"metrics">> := #{<<"router">> := #{<<"routes">> := 2}}, + <<"metrics">> := #{<<"router">> := #{<<"routes">> := 1}}, <<"node_metrics">> := _ }}, get_metrics(source, SourceName)