From 36f009b0c2fcc18d7a71498c04c0c7c76f4d1b7f Mon Sep 17 00:00:00 2001 From: JimMoen Date: Mon, 15 Jan 2024 13:49:38 +0800 Subject: [PATCH] fix(prom): connectors specific data --- .../src/emqx_prometheus_data_integration.erl | 65 +++++++++++++++---- 1 file changed, 52 insertions(+), 13 deletions(-) diff --git a/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl b/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl index 9840d2409..3546697cc 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl @@ -36,13 +36,13 @@ schema_registry/0, schema_registry_data/0, connectors/0, - connectors_data/0, + connectors_data/1, rule_specific/0, rule_specific_data/1, action_specific/0, - action_specific_data/0, + action_specific_data/1, connector_specific/0, - connector_specific_data/0 + connector_specific_data/1 ]). -include("emqx_prometheus.hrl"). @@ -84,11 +84,13 @@ deregister_cleanup(_) -> ok. %% erlfmt-ignore collect_mf(?PROMETHEUS_DATA_INTEGRATION_REGISTRY, Callback) -> Rules = emqx_rule_engine:get_rules(), + Bridges =emqx_bridge:list(), _ = [add_collect_family(Name, rules_data(Rules), Callback, gauge) || Name <- rules()], _ = [add_collect_family(Name, actions_data(Rules), Callback, gauge) || Name <- actions()], - _ = [add_collect_family(Name, connectors_data(), Callback, gauge) || Name <- connectors()], + _ = [add_collect_family(Name, connectors_data(Bridges), Callback, gauge) || Name <- connectors()], _ = [add_collect_family(Name, rule_specific_data(Rules), Callback, gauge) || Name <- rule_specific()], - _ = [add_collect_family(Name, action_specific_data(), Callback, gauge) || Name <- action_specific()], + _ = [add_collect_family(Name, action_specific_data(Bridges), Callback, gauge) || Name <- action_specific()], + _ = [add_collect_family(Name, connector_specific_data(Bridges), Callback, gauge) || Name <- connector_specific()], ok = maybe_collect_family_schema_registry(Callback), ok; collect_mf(_, _) -> @@ -222,11 +224,15 @@ collect_di(K = emqx_action_queuing, Data) -> collect_di(K = emqx_action_rate_last5m, Data) -> gauge_metrics(?MG(K, Data)); collect_di(K = emqx_action_rate_max, Data) -> - gauge_metrics(?MG(K, Data)). - + gauge_metrics(?MG(K, Data)); %%==================== %% Specific Connector +collect_di(K = emqx_connector_enable, Data) -> + gauge_metrics(?MG(K, Data)); +collect_di(K = emqx_connector_status, Data) -> + gauge_metrics(?MG(K, Data)). + %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- @@ -328,10 +334,10 @@ connectors() -> emqx_connector_count ]. -connectors_data() -> +connectors_data(Brdiges) -> #{ %% Both Bridge V1 and V2 - emqx_connector_count => erlang:length(emqx_bridge:list()) + emqx_connector_count => erlang:length(Brdiges) }. %%======================================== @@ -429,14 +435,14 @@ action_specific() -> emqx_action_rate_max ]. -action_specific_data() -> +action_specific_data(Bridges) -> lists:foldl( fun(#{type := Type, name := Name} = _Bridge, AccIn) -> Id = emqx_bridge_resource:bridge_id(Type, Name), merge_acc_with_bridges(Id, get_bridge_metric(Type, Name), AccIn) end, maps:from_keys(action_specific(), []), - emqx_bridge:list() + Bridges ). merge_acc_with_bridges(Id, BridgeMetrics, PointsAcc) -> @@ -491,10 +497,43 @@ connector_specific() -> emqx_connector_status ]. -connector_specific_data() -> - []. +connector_specific_data(Bridges) -> + lists:foldl( + fun(#{type := Type, name := Name} = Bridge, AccIn) -> + Id = emqx_bridge_resource:bridge_id(Type, Name), + merge_acc_with_connectors(Id, get_connector_status(Bridge), AccIn) + end, + maps:from_keys(connector_specific(), []), + Bridges + ). + +merge_acc_with_connectors(Id, ConnectorMetrics, PointsAcc) -> + maps:fold( + fun(K, V, AccIn) -> + AccIn#{K => [connector_point(Id, V) | ?MG(K, AccIn)]} + end, + PointsAcc, + ConnectorMetrics + ). + +connector_point(Id, V) -> + {[{id, Id}], V}. + +get_connector_status(#{resource_data := ResourceData} = _Bridge) -> + Enabled = emqx_utils_maps:deep_get([config, enable], ResourceData), + Status = ?MG(status, ResourceData), + #{ + emqx_connector_enable => boolean_to_number(Enabled), + emqx_connector_status => status_to_number(Status) + }. %%-------------------------------------------------------------------- %%-------------------------------------------------------------------- %% Help funcs + +boolean_to_number(true) -> 1; +boolean_to_number(false) -> 0. + +status_to_number(connected) -> 1; +status_to_number(disconnected) -> 0.