fix(cluster link metrics): route count metric is cluster-wide
This commit is contained in:
parent
6da71200f3
commit
03821c7b49
|
@ -274,13 +274,41 @@ append_errors(RouterError, ResourceError, Node, Acc) ->
|
|||
|
||||
aggregate_metrics(NodeMetrics) ->
|
||||
ErrorLogger = fun(_) -> ok end,
|
||||
lists:foldl(
|
||||
fun(#{metrics := Metrics}, Acc) ->
|
||||
emqx_utils_maps:best_effort_recursive_sum(Metrics, Acc, ErrorLogger)
|
||||
#{metrics := #{router := EmptyRouterMetrics}} = format_metrics(node(), #{}, #{}),
|
||||
{RouterMetrics, ResourceMetrics} = lists:foldl(
|
||||
fun(
|
||||
#{metrics := #{router := RMetrics, forwarding := FMetrics}},
|
||||
{RouterAccIn, ResourceAccIn}
|
||||
) ->
|
||||
ResourceAcc =
|
||||
emqx_utils_maps:best_effort_recursive_sum(FMetrics, ResourceAccIn, ErrorLogger),
|
||||
RouterAcc = merge_cluster_wide_metrics(RMetrics, RouterAccIn),
|
||||
{RouterAcc, ResourceAcc}
|
||||
end,
|
||||
#{},
|
||||
{EmptyRouterMetrics, #{}},
|
||||
NodeMetrics
|
||||
).
|
||||
),
|
||||
#{router => RouterMetrics, forwarding => ResourceMetrics}.
|
||||
|
||||
merge_cluster_wide_metrics(Metrics, Acc) ->
|
||||
%% For cluster-wide metrics, all nodes should report the same values, except if the
|
||||
%% RPC to fetch a node's metrics failed, in which case all values will be 0.
|
||||
F =
|
||||
fun(_Key, V1, V2) ->
|
||||
case {erlang:is_map(V1), erlang:is_map(V2)} of
|
||||
{true, true} ->
|
||||
merge_cluster_wide_metrics(V1, V2);
|
||||
{true, false} ->
|
||||
merge_cluster_wide_metrics(V1, #{});
|
||||
{false, true} ->
|
||||
merge_cluster_wide_metrics(V2, #{});
|
||||
{false, false} ->
|
||||
true = is_number(V1),
|
||||
true = is_number(V2),
|
||||
max(V1, V2)
|
||||
end
|
||||
end,
|
||||
maps:merge_with(F, Acc, Metrics).
|
||||
|
||||
format_metrics(Node, RouterMetrics, ResourceMetrics) ->
|
||||
Get = fun(Path, Map) -> emqx_utils_maps:deep_get(Path, Map, 0) end,
|
||||
|
|
|
@ -612,15 +612,15 @@ t_metrics(Config) ->
|
|||
#{?snk_kind := clink_route_sync_complete}
|
||||
),
|
||||
|
||||
%% Routes = 4 in source cluster, because the target cluster has some topic filters
|
||||
%% configured and subscribers to them, which were replicated to the source cluster,
|
||||
%% and we have 2 nodes with 2 routes each.
|
||||
%% Routes = 2 in source cluster, because the target cluster has some topic filters
|
||||
%% configured and subscribers to them, which were replicated to the source cluster.
|
||||
%% This metric is global (cluster-wide).
|
||||
?retry(
|
||||
300,
|
||||
10,
|
||||
?assertMatch(
|
||||
{200, #{
|
||||
<<"metrics">> := #{<<"router">> := #{<<"routes">> := 4}},
|
||||
<<"metrics">> := #{<<"router">> := #{<<"routes">> := 2}},
|
||||
<<"node_metrics">> := [
|
||||
#{<<"metrics">> := #{<<"router">> := #{<<"routes">> := 2}}},
|
||||
#{<<"metrics">> := #{<<"router">> := #{<<"routes">> := 2}}}
|
||||
|
@ -652,7 +652,7 @@ t_metrics(Config) ->
|
|||
10,
|
||||
?assertMatch(
|
||||
{200, #{
|
||||
<<"metrics">> := #{<<"router">> := #{<<"routes">> := 2}},
|
||||
<<"metrics">> := #{<<"router">> := #{<<"routes">> := 1}},
|
||||
<<"node_metrics">> := [
|
||||
#{<<"metrics">> := #{<<"router">> := #{<<"routes">> := 1}}},
|
||||
#{<<"metrics">> := #{<<"router">> := #{<<"routes">> := 1}}}
|
||||
|
@ -712,7 +712,7 @@ t_metrics(Config) ->
|
|||
10,
|
||||
?assertMatch(
|
||||
{200, #{
|
||||
<<"metrics">> := #{<<"router">> := #{<<"routes">> := 2}},
|
||||
<<"metrics">> := #{<<"router">> := #{<<"routes">> := 1}},
|
||||
<<"node_metrics">> := _
|
||||
}},
|
||||
get_metrics(source, SourceName)
|
||||
|
|
Loading…
Reference in New Issue