feat(cluster link api): add forwarding resource metrics to response

This commit is contained in:
Thales Macedo Garitezi 2024-07-24 12:26:01 -03:00
parent 34f5a886ce
commit 7829838dc5
4 changed files with 86 additions and 13 deletions

View File

@ -237,10 +237,19 @@ handle_metrics(Name) ->
{NodeMetrics0, NodeErrors} =
lists:foldl(
fun
({Node, {ok, Metrics}}, {OkAccIn, ErrAccIn}) ->
{[format_metrics(Node, Metrics) | OkAccIn], ErrAccIn};
({Node, Error}, {OkAccIn, ErrAccIn}) ->
{OkAccIn, [{Node, Error} | ErrAccIn]}
({Node, {ok, RouterMetrics}, {ok, ResourceMetrics}}, {OkAccIn, ErrAccIn}) ->
OkAcc = [format_metrics(Node, RouterMetrics, ResourceMetrics) | OkAccIn],
{OkAcc, ErrAccIn};
({Node, {ok, RouterMetrics}, ResError}, {OkAccIn, ErrAccIn}) ->
OkAcc = [format_metrics(Node, RouterMetrics, _ResourceMetrics = #{}) | OkAccIn],
{OkAcc, [{Node, #{resource => ResError}} | ErrAccIn]};
({Node, RouterError, {ok, ResourceMetrics}}, {OkAccIn, ErrAccIn}) ->
OkAcc = [format_metrics(Node, _RouterMetrics = #{}, ResourceMetrics) | OkAccIn],
{OkAcc, [{Node, #{router => RouterError}} | ErrAccIn]};
({Node, RouterError, ResourceError}, {OkAccIn, ErrAccIn}) ->
{OkAccIn, [
{Node, #{router => RouterError, resource => ResourceError}} | ErrAccIn
]}
end,
{[], []},
Results
@ -254,7 +263,7 @@ handle_metrics(Name) ->
errors => maps:from_list(NodeErrors)
})
end,
NodeMetrics1 = lists:map(fun({Node, _Error}) -> format_metrics(Node, #{}) end, NodeErrors),
NodeMetrics1 = lists:map(fun({Node, _Error}) -> format_metrics(Node, #{}, #{}) end, NodeErrors),
NodeMetrics = NodeMetrics1 ++ NodeMetrics0,
AggregatedMetrics = aggregate_metrics(NodeMetrics),
Response = #{metrics => AggregatedMetrics, node_metrics => NodeMetrics},
@ -270,12 +279,27 @@ aggregate_metrics(NodeMetrics) ->
NodeMetrics
).
format_metrics(Node, Metrics) ->
Routes = emqx_utils_maps:deep_get([counters, ?route_metric], Metrics, 0),
format_metrics(Node, RouterMetrics, ResourceMetrics) ->
Get = fun(Path, Map) -> emqx_utils_maps:deep_get(Path, Map, 0) end,
Routes = Get([counters, ?route_metric], RouterMetrics),
#{
node => Node,
metrics => #{
?route_metric => Routes
?route_metric => Routes,
'matched' => Get([counters, 'matched'], ResourceMetrics),
'success' => Get([counters, 'success'], ResourceMetrics),
'failed' => Get([counters, 'failed'], ResourceMetrics),
'dropped' => Get([counters, 'dropped'], ResourceMetrics),
'retried' => Get([counters, 'retried'], ResourceMetrics),
'received' => Get([counters, 'received'], ResourceMetrics),
'queuing' => Get([gauges, 'queuing'], ResourceMetrics),
'inflight' => Get([gauges, 'inflight'], ResourceMetrics),
'rate' => Get([rate, 'matched', current], ResourceMetrics),
'rate_last5m' => Get([rate, 'matched', last5m], ResourceMetrics),
'rate_max' => Get([rate, 'matched', max], ResourceMetrics)
}
}.

View File

@ -30,8 +30,12 @@
get_metrics(ClusterName) ->
Nodes = emqx:running_nodes(),
Timeout = 15_000,
Results = emqx_metrics_proto_v2:get_metrics(Nodes, ?METRIC_NAME, ClusterName, Timeout),
lists:zip(Nodes, Results).
RouterResults = emqx_metrics_proto_v2:get_metrics(Nodes, ?METRIC_NAME, ClusterName, Timeout),
ResourceId = emqx_cluster_link_mqtt:resource_id(ClusterName),
ResourceResults = emqx_metrics_proto_v2:get_metrics(
Nodes, resource_metrics, ResourceId, Timeout
),
lists:zip3(Nodes, RouterResults, ResourceResults).
maybe_create_metrics(ClusterName) ->
case emqx_metrics_worker:has_metrics(?METRIC_NAME, ClusterName) of

View File

@ -9,6 +9,7 @@
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/trace.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
-behaviour(emqx_resource).
-behaviour(ecpool_worker).
@ -27,6 +28,7 @@
]).
-export([
resource_id/1,
ensure_msg_fwd_resource/1,
remove_msg_fwd_resource/1,
decode_route_op/1,
@ -92,6 +94,10 @@
-type cluster_name() :: binary().
-spec resource_id(cluster_name()) -> resource_id().
resource_id(ClusterName) ->
?MSG_RES_ID(ClusterName).
-spec ensure_msg_fwd_resource(map()) ->
{ok, emqx_resource:resource_data() | already_started} | {error, Reason :: term()}.
ensure_msg_fwd_resource(#{name := Name, resource_opts := ResOpts} = ClusterConf) ->

View File

@ -477,15 +477,54 @@ t_metrics(Config) ->
?assertMatch(
{200, #{
<<"metrics">> := #{<<"routes">> := 0},
<<"metrics">> := #{
<<"routes">> := 0,
<<"matched">> := _,
<<"success">> := _,
<<"failed">> := _,
<<"dropped">> := _,
<<"retried">> := _,
<<"received">> := _,
<<"queuing">> := _,
<<"inflight">> := _,
<<"rate">> := _,
<<"rate_last5m">> := _,
<<"rate_max">> := _
},
<<"node_metrics">> := [
#{
<<"node">> := _,
<<"metrics">> := #{<<"routes">> := 0}
<<"metrics">> := #{
<<"routes">> := 0,
<<"matched">> := _,
<<"success">> := _,
<<"failed">> := _,
<<"dropped">> := _,
<<"retried">> := _,
<<"received">> := _,
<<"queuing">> := _,
<<"inflight">> := _,
<<"rate">> := _,
<<"rate_last5m">> := _,
<<"rate_max">> := _
}
},
#{
<<"node">> := _,
<<"metrics">> := #{<<"routes">> := 0}
<<"metrics">> := #{
<<"routes">> := 0,
<<"matched">> := _,
<<"success">> := _,
<<"failed">> := _,
<<"dropped">> := _,
<<"retried">> := _,
<<"received">> := _,
<<"queuing">> := _,
<<"inflight">> := _,
<<"rate">> := _,
<<"rate_last5m">> := _,
<<"rate_max">> := _
}
}
]
}},