From a752119d05ade2eccfc619d8bbb43a732c5b9d61 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Wed, 24 Jan 2024 23:40:34 +0800 Subject: [PATCH] fix(prom_di): use bridge_v2 metrics api --- apps/emqx_bridge/src/emqx_bridge_v2_api.erl | 3 + .../src/emqx_prometheus_data_integration.erl | 69 +++++++++---------- 2 files changed, 37 insertions(+), 35 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl index 95471ae5b..6dcd24355 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl @@ -1177,6 +1177,9 @@ format_resource( ) ). +%% FIXME: +%% missing metrics: +%% 'retried.success' and 'retried.failed' format_metrics(#{ counters := #{ 'dropped' := Dropped, diff --git a/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl b/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl index 15fbe8106..f3bc98f50 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl @@ -69,21 +69,24 @@ %% Callback for emqx_prometheus_cluster %%-------------------------------------------------------------------- +-define(ROOT_KEY_ACTIONS, actions). + fetch_from_local_node(Mode) -> Rules = emqx_rule_engine:get_rules(), - Bridges = emqx_bridge:list(), + Bridges = emqx_bridge_v2:list(?ROOT_KEY_ACTIONS), + Connectors = emqx_connector:list(), {node(self()), #{ rule_metric_data => rule_metric_data(Mode, Rules), action_metric_data => action_metric_data(Mode, Bridges), - connector_metric_data => connector_metric_data(Mode, Bridges) + connector_metric_data => connector_metric_data(Mode, Connectors) }}. fetch_cluster_consistented_data() -> Rules = emqx_rule_engine:get_rules(), - Bridges = emqx_bridge:list(), + Connectors = emqx_connector:list(), (maybe_collect_schema_registry())#{ rules_ov_data => rules_ov_data(Rules), - connectors_ov_data => connectors_ov_data(Bridges) + connectors_ov_data => connectors_ov_data(Connectors) }. aggre_or_zip_init_acc() -> @@ -293,10 +296,10 @@ connectors_ov_metric_meta() -> connectors_ov_metric(names) -> emqx_prometheus_cluster:metric_names(connectors_ov_metric_meta()). -connectors_ov_data(Brdiges) -> +connectors_ov_data(Connectors) -> #{ %% Both Bridge V1 and V2 - emqx_connectors_count => erlang:length(Brdiges) + emqx_connectors_count => erlang:length(Connectors) }. %%======================================== @@ -416,29 +419,25 @@ action_point(Mode, Id, V) -> {with_node_label(Mode, [{id, Id}]), V}. get_bridge_metric(Type, Name) -> - case emqx_bridge:get_metrics(Type, Name) of - #{counters := Counters, gauges := Gauges} -> - #{ - emqx_action_matched => ?MG0(matched, Counters), - emqx_action_dropped => ?MG0(dropped, Counters), - emqx_action_success => ?MG0(success, Counters), - emqx_action_failed => ?MG0(failed, Counters), - emqx_action_inflight => ?MG0(inflight, Gauges), - emqx_action_received => ?MG0(received, Counters), - emqx_action_late_reply => ?MG0(late_reply, Counters), - emqx_action_retried => ?MG0(retried, Counters), - emqx_action_retried_success => ?MG0('retried.success', Counters), - emqx_action_retried_failed => ?MG0('retried.failed', Counters), - emqx_action_dropped_resource_stopped => ?MG0('dropped.resource_stopped', Counters), - emqx_action_dropped_resource_not_found => ?MG0( - 'dropped.resource_not_found', Counters - ), - emqx_action_dropped_queue_full => ?MG0('dropped.queue_full', Counters), - emqx_action_dropped_other => ?MG0('dropped.other', Counters), - emqx_action_dropped_expired => ?MG0('dropped.expired', Counters), - emqx_action_queuing => ?MG0(queuing, Gauges) - } - end. + #{counters := Counters, gauges := Gauges} = emqx_bridge_v2:get_metrics(Type, Name), + #{ + emqx_action_matched => ?MG0(matched, Counters), + emqx_action_dropped => ?MG0(dropped, Counters), + emqx_action_success => ?MG0(success, Counters), + emqx_action_failed => ?MG0(failed, Counters), + emqx_action_inflight => ?MG0(inflight, Gauges), + emqx_action_received => ?MG0(received, Counters), + emqx_action_late_reply => ?MG0(late_reply, Counters), + emqx_action_retried => ?MG0(retried, Counters), + emqx_action_retried_success => ?MG0('retried.success', Counters), + emqx_action_retried_failed => ?MG0('retried.failed', Counters), + emqx_action_dropped_resource_stopped => ?MG0('dropped.resource_stopped', Counters), + emqx_action_dropped_resource_not_found => ?MG0('dropped.resource_not_found', Counters), + emqx_action_dropped_queue_full => ?MG0('dropped.queue_full', Counters), + emqx_action_dropped_other => ?MG0('dropped.other', Counters), + emqx_action_dropped_expired => ?MG0('dropped.expired', Counters), + emqx_action_queuing => ?MG0(queuing, Gauges) + }. %%==================== %% Connector Metric @@ -453,14 +452,14 @@ connector_metric_meta() -> connectr_metric(names) -> emqx_prometheus_cluster:metric_names(connector_metric_meta()). -connector_metric_data(Mode, Bridges) -> +connector_metric_data(Mode, Connectors) -> lists:foldl( - fun(#{type := Type, name := Name} = Bridge, AccIn) -> - Id = emqx_bridge_resource:bridge_id(Type, Name), - merge_acc_with_connectors(Mode, Id, get_connector_status(Bridge), AccIn) + fun(#{type := Type, name := Name} = Connector, AccIn) -> + Id = emqx_connector_resource:connector_id(Type, Name), + merge_acc_with_connectors(Mode, Id, get_connector_status(Connector), AccIn) end, maps:from_keys(connectr_metric(names), []), - Bridges + Connectors ). merge_acc_with_connectors(Mode, Id, ConnectorMetrics, PointsAcc) -> @@ -475,7 +474,7 @@ merge_acc_with_connectors(Mode, Id, ConnectorMetrics, PointsAcc) -> connector_point(Mode, Id, V) -> {with_node_label(Mode, [{id, Id}]), V}. -get_connector_status(#{resource_data := ResourceData} = _Bridge) -> +get_connector_status(#{resource_data := ResourceData} = _Connector) -> Enabled = emqx_utils_maps:deep_get([config, enable], ResourceData), Status = ?MG(status, ResourceData), #{