diff --git a/apps/emqx_prometheus/src/emqx_prometheus_cluster.erl b/apps/emqx_prometheus/src/emqx_prometheus_cluster.erl index c4f1dc3b8..00a464811 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_cluster.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_cluster.erl @@ -24,7 +24,6 @@ collect_json_data/2, aggre_cluster/3, - %% with_node_name_label/2, point_to_map_fun/1, diff --git a/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl b/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl index 32dc934c7..7f122aaed 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl @@ -73,20 +73,23 @@ fetch_from_local_node(Mode) -> Rules = emqx_rule_engine:get_rules(), - Bridges = emqx_bridge_v2:list(?ROOT_KEY_ACTIONS), + BridgesV1 = emqx:get_config([bridges], #{}), + BridgeV2Actions = emqx_bridge_v2:list(?ROOT_KEY_ACTIONS), Connectors = emqx_connector:list(), {node(self()), #{ rule_metric_data => rule_metric_data(Mode, Rules), - action_metric_data => action_metric_data(Mode, Bridges), - connector_metric_data => connector_metric_data(Mode, Connectors) + action_metric_data => action_metric_data(Mode, BridgeV2Actions), + connector_metric_data => connector_metric_data(Mode, BridgesV1, Connectors) }}. fetch_cluster_consistented_data() -> Rules = emqx_rule_engine:get_rules(), + %% for bridge v1 + BridgesV1 = emqx:get_config([bridges], #{}), Connectors = emqx_connector:list(), (maybe_collect_schema_registry())#{ rules_ov_data => rules_ov_data(Rules), - connectors_ov_data => connectors_ov_data(Connectors) + connectors_ov_data => connectors_ov_data(BridgesV1, Connectors) }. aggre_or_zip_init_acc() -> @@ -144,9 +147,13 @@ collect_mf(_, _) -> collect(<<"json">>) -> RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()), Rules = emqx_rule_engine:get_rules(), - Bridges = emqx_bridge:list(), + Connectors = emqx_connector:list(), + %% for bridge v1 + BridgesV1 = emqx:get_config([bridges], #{}), #{ - data_integration_overview => collect_data_integration_overview(Rules, Bridges), + data_integration_overview => collect_data_integration_overview( + Rules, BridgesV1, Connectors + ), rules => collect_json_data(?MG(rule_metric_data, RawData)), actions => collect_json_data(?MG(action_metric_data, RawData)), connectors => collect_json_data(?MG(connector_metric_data, RawData)) @@ -298,10 +305,17 @@ connectors_ov_metric_meta() -> connectors_ov_metric(names) -> emqx_prometheus_cluster:metric_names(connectors_ov_metric_meta()). -connectors_ov_data(Connectors) -> +connectors_ov_data(BridgesV1, Connectors) -> + %% Both Bridge V1 and V2 + V1ConnectorsCnt = maps:fold( + fun(_Type, NameAndConf, AccIn) -> + AccIn + maps:size(NameAndConf) + end, + 0, + BridgesV1 + ), #{ - %% Both Bridge V1 and V2 - emqx_connectors_count => erlang:length(Connectors) + emqx_connectors_count => erlang:length(Connectors) + V1ConnectorsCnt }. %%======================================== @@ -466,16 +480,44 @@ connector_metric_meta() -> connectr_metric(names) -> emqx_prometheus_cluster:metric_names(connector_metric_meta()). -connector_metric_data(Mode, Connectors) -> +connector_metric_data(Mode, BridgesV1, Connectors) -> + AccIn = maps:from_keys(connectr_metric(names), []), + Acc0 = connector_metric_data_v1(Mode, BridgesV1, AccIn), + _AccOut = connector_metric_data_v2(Mode, Connectors, Acc0). + +connector_metric_data_v2(Mode, Connectors, InitAcc) -> lists:foldl( - fun(#{type := Type, name := Name} = Connector, AccIn) -> + fun(#{type := Type, name := Name, resource_data := ResourceData} = _Connector, AccIn) -> Id = emqx_connector_resource:connector_id(Type, Name), - merge_acc_with_connectors(Mode, Id, get_connector_status(Connector), AccIn) + merge_acc_with_connectors(Mode, Id, get_connector_status(ResourceData), AccIn) end, - maps:from_keys(connectr_metric(names), []), + InitAcc, Connectors ). +connector_metric_data_v1(Mode, BridgesV1, InitAcc) -> + maps:fold( + fun(Type, NameAndConfMap, Acc0) -> + maps:fold( + fun(Name, _Conf, Acc1) -> + BridgeV1Id = emqx_bridge_resource:resource_id(Type, Name), + case emqx_resource:get_instance(BridgeV1Id) of + {error, not_found} -> + Acc1; + {ok, _, ResourceData} -> + merge_acc_with_connectors( + Mode, BridgeV1Id, get_connector_status(ResourceData), Acc1 + ) + end + end, + Acc0, + NameAndConfMap + ) + end, + InitAcc, + BridgesV1 + ). + merge_acc_with_connectors(Mode, Id, ConnectorMetrics, PointsAcc) -> maps:fold( fun(K, V, AccIn) -> @@ -488,7 +530,7 @@ merge_acc_with_connectors(Mode, Id, ConnectorMetrics, PointsAcc) -> connector_point(Mode, Id, V) -> {with_node_label(Mode, [{id, Id}]), V}. -get_connector_status(#{resource_data := ResourceData} = _Connector) -> +get_connector_status(ResourceData) -> Enabled = emqx_utils_maps:deep_get([config, enable], ResourceData), Status = ?MG(status, ResourceData), #{ @@ -502,9 +544,9 @@ get_connector_status(#{resource_data := ResourceData} = _Connector) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% merge / zip formatting funcs for type `application/json` -collect_data_integration_overview(Rules, Bridges) -> +collect_data_integration_overview(Rules, BridgesV1, Connectors) -> RulesD = rules_ov_data(Rules), - ConnectorsD = connectors_ov_data(Bridges), + ConnectorsD = connectors_ov_data(BridgesV1, Connectors), M1 = lists:foldl( fun(K, AccIn) -> AccIn#{K => ?MG(K, RulesD)} end,