From 7829838dc5f9083bdbc41bbf61b534fb6d0c0ef9 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 24 Jul 2024 12:26:01 -0300 Subject: [PATCH] feat(cluster link api): add forwarding resource metrics to response --- .../src/emqx_cluster_link_api.erl | 40 +++++++++++++---- .../src/emqx_cluster_link_metrics.erl | 8 +++- .../src/emqx_cluster_link_mqtt.erl | 6 +++ .../test/emqx_cluster_link_api_SUITE.erl | 45 +++++++++++++++++-- 4 files changed, 86 insertions(+), 13 deletions(-) diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_api.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_api.erl index 257a19854..6511cf395 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_api.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_api.erl @@ -237,10 +237,19 @@ handle_metrics(Name) -> {NodeMetrics0, NodeErrors} = lists:foldl( fun - ({Node, {ok, Metrics}}, {OkAccIn, ErrAccIn}) -> - {[format_metrics(Node, Metrics) | OkAccIn], ErrAccIn}; - ({Node, Error}, {OkAccIn, ErrAccIn}) -> - {OkAccIn, [{Node, Error} | ErrAccIn]} + ({Node, {ok, RouterMetrics}, {ok, ResourceMetrics}}, {OkAccIn, ErrAccIn}) -> + OkAcc = [format_metrics(Node, RouterMetrics, ResourceMetrics) | OkAccIn], + {OkAcc, ErrAccIn}; + ({Node, {ok, RouterMetrics}, ResError}, {OkAccIn, ErrAccIn}) -> + OkAcc = [format_metrics(Node, RouterMetrics, _ResourceMetrics = #{}) | OkAccIn], + {OkAcc, [{Node, #{resource => ResError}} | ErrAccIn]}; + ({Node, RouterError, {ok, ResourceMetrics}}, {OkAccIn, ErrAccIn}) -> + OkAcc = [format_metrics(Node, _RouterMetrics = #{}, ResourceMetrics) | OkAccIn], + {OkAcc, [{Node, #{router => RouterError}} | ErrAccIn]}; + ({Node, RouterError, ResourceError}, {OkAccIn, ErrAccIn}) -> + {OkAccIn, [ + {Node, #{router => RouterError, resource => ResourceError}} | ErrAccIn + ]} end, {[], []}, Results @@ -254,7 +263,7 @@ handle_metrics(Name) -> errors => maps:from_list(NodeErrors) }) end, - NodeMetrics1 = lists:map(fun({Node, _Error}) -> format_metrics(Node, #{}) end, NodeErrors), + NodeMetrics1 = lists:map(fun({Node, _Error}) -> format_metrics(Node, #{}, #{}) end, NodeErrors), NodeMetrics = NodeMetrics1 ++ NodeMetrics0, AggregatedMetrics = aggregate_metrics(NodeMetrics), Response = #{metrics => AggregatedMetrics, node_metrics => NodeMetrics}, @@ -270,12 +279,27 @@ aggregate_metrics(NodeMetrics) -> NodeMetrics ). -format_metrics(Node, Metrics) -> - Routes = emqx_utils_maps:deep_get([counters, ?route_metric], Metrics, 0), +format_metrics(Node, RouterMetrics, ResourceMetrics) -> + Get = fun(Path, Map) -> emqx_utils_maps:deep_get(Path, Map, 0) end, + Routes = Get([counters, ?route_metric], RouterMetrics), #{ node => Node, metrics => #{ - ?route_metric => Routes + ?route_metric => Routes, + + 'matched' => Get([counters, 'matched'], ResourceMetrics), + 'success' => Get([counters, 'success'], ResourceMetrics), + 'failed' => Get([counters, 'failed'], ResourceMetrics), + 'dropped' => Get([counters, 'dropped'], ResourceMetrics), + 'retried' => Get([counters, 'retried'], ResourceMetrics), + 'received' => Get([counters, 'received'], ResourceMetrics), + + 'queuing' => Get([gauges, 'queuing'], ResourceMetrics), + 'inflight' => Get([gauges, 'inflight'], ResourceMetrics), + + 'rate' => Get([rate, 'matched', current], ResourceMetrics), + 'rate_last5m' => Get([rate, 'matched', last5m], ResourceMetrics), + 'rate_max' => Get([rate, 'matched', max], ResourceMetrics) } }. diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_metrics.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_metrics.erl index 695419c50..61e5fc9ce 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_metrics.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_metrics.erl @@ -30,8 +30,12 @@ get_metrics(ClusterName) -> Nodes = emqx:running_nodes(), Timeout = 15_000, - Results = emqx_metrics_proto_v2:get_metrics(Nodes, ?METRIC_NAME, ClusterName, Timeout), - lists:zip(Nodes, Results). + RouterResults = emqx_metrics_proto_v2:get_metrics(Nodes, ?METRIC_NAME, ClusterName, Timeout), + ResourceId = emqx_cluster_link_mqtt:resource_id(ClusterName), + ResourceResults = emqx_metrics_proto_v2:get_metrics( + Nodes, resource_metrics, ResourceId, Timeout + ), + lists:zip3(Nodes, RouterResults, ResourceResults). maybe_create_metrics(ClusterName) -> case emqx_metrics_worker:has_metrics(?METRIC_NAME, ClusterName) of diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl index 65e02f53e..5a0f6db9c 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl @@ -9,6 +9,7 @@ -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("snabbkaffe/include/trace.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). -behaviour(emqx_resource). -behaviour(ecpool_worker). @@ -27,6 +28,7 @@ ]). -export([ + resource_id/1, ensure_msg_fwd_resource/1, remove_msg_fwd_resource/1, decode_route_op/1, @@ -92,6 +94,10 @@ -type cluster_name() :: binary(). +-spec resource_id(cluster_name()) -> resource_id(). +resource_id(ClusterName) -> + ?MSG_RES_ID(ClusterName). + -spec ensure_msg_fwd_resource(map()) -> {ok, emqx_resource:resource_data() | already_started} | {error, Reason :: term()}. ensure_msg_fwd_resource(#{name := Name, resource_opts := ResOpts} = ClusterConf) -> diff --git a/apps/emqx_cluster_link/test/emqx_cluster_link_api_SUITE.erl b/apps/emqx_cluster_link/test/emqx_cluster_link_api_SUITE.erl index 6b469272a..e8e8f345e 100644 --- a/apps/emqx_cluster_link/test/emqx_cluster_link_api_SUITE.erl +++ b/apps/emqx_cluster_link/test/emqx_cluster_link_api_SUITE.erl @@ -477,15 +477,54 @@ t_metrics(Config) -> ?assertMatch( {200, #{ - <<"metrics">> := #{<<"routes">> := 0}, + <<"metrics">> := #{ + <<"routes">> := 0, + <<"matched">> := _, + <<"success">> := _, + <<"failed">> := _, + <<"dropped">> := _, + <<"retried">> := _, + <<"received">> := _, + <<"queuing">> := _, + <<"inflight">> := _, + <<"rate">> := _, + <<"rate_last5m">> := _, + <<"rate_max">> := _ + }, <<"node_metrics">> := [ #{ <<"node">> := _, - <<"metrics">> := #{<<"routes">> := 0} + <<"metrics">> := #{ + <<"routes">> := 0, + <<"matched">> := _, + <<"success">> := _, + <<"failed">> := _, + <<"dropped">> := _, + <<"retried">> := _, + <<"received">> := _, + <<"queuing">> := _, + <<"inflight">> := _, + <<"rate">> := _, + <<"rate_last5m">> := _, + <<"rate_max">> := _ + } }, #{ <<"node">> := _, - <<"metrics">> := #{<<"routes">> := 0} + <<"metrics">> := #{ + <<"routes">> := 0, + <<"matched">> := _, + <<"success">> := _, + <<"failed">> := _, + <<"dropped">> := _, + <<"retried">> := _, + <<"received">> := _, + <<"queuing">> := _, + <<"inflight">> := _, + <<"rate">> := _, + <<"rate_last5m">> := _, + <<"rate_max">> := _ + } } ] }},