From c3da7923233b1d07f8208c47d5c8137fdc66f2da Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 18 Jan 2024 11:23:23 +0800 Subject: [PATCH] feat(prom_di): cluster aggregated/unaggregated metrics --- .../src/emqx_prometheus_data_integration.erl | 310 ++++++++++++++---- 1 file changed, 246 insertions(+), 64 deletions(-) diff --git a/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl b/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl index edbdc1afb..06a417d2d 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl @@ -24,6 +24,11 @@ -export([collect/1]). +%% for bpapi +-export([ + fetch_metric_data_from_local_node/0 +]). + -export([add_collect_family/4]). -include("emqx_prometheus.hrl"). @@ -105,6 +110,12 @@ -else. -endif. +-define(LOGICAL_SUM_METRIC_NAMES, [ + emqx_rule_enable, + emqx_connector_enable, + emqx_connector_status +]). + %%-------------------------------------------------------------------- %% Collector API %%-------------------------------------------------------------------- @@ -117,23 +128,24 @@ deregister_cleanup(_) -> ok. _Registry :: prometheus_registry:registry(), Callback :: prometheus_collector:collect_mf_callback(). collect_mf(?PROMETHEUS_DATA_INTEGRATION_REGISTRY, Callback) -> - Rules = emqx_rule_engine:get_rules(), - Bridges = emqx_bridge:list(), + RawData = raw_data(erlang:get(format_mode)), + %% Data Integration Overview - ok = add_collect_family(Callback, ?RULES_WITH_TYPE, rules_data(Rules)), - ok = add_collect_family(Callback, ?CONNECTORS_WITH_TYPE, connectors_data(Bridges)), + ok = add_collect_family(Callback, ?RULES_WITH_TYPE, ?MG(rules_data, RawData)), + ok = add_collect_family(Callback, ?CONNECTORS_WITH_TYPE, ?MG(connectors_data, RawData)), ok = maybe_collect_family_schema_registry(Callback), %% Rule Specific - ok = add_collect_family(Callback, ?RULES_SPECIFIC_WITH_TYPE, rule_specific_data(Rules)), + RuleSpecificDs = ?MG(rule_specific_data, RawData), + ok = add_collect_family(Callback, ?RULES_SPECIFIC_WITH_TYPE, RuleSpecificDs), %% Action Specific - ok = add_collect_family(Callback, ?ACTION_SPECIFIC_WITH_TYPE, action_specific_data(Bridges)), + ActionSpecificDs = ?MG(action_specific_data, RawData), + ok = add_collect_family(Callback, ?ACTION_SPECIFIC_WITH_TYPE, ActionSpecificDs), %% Connector Specific - ok = add_collect_family( - Callback, ?CONNECTOR_SPECIFIC_WITH_TYPE, connector_specific_data(Bridges) - ), + ConnectorSpecificDs = ?MG(connector_specific_data, RawData), + ok = add_collect_family(Callback, ?CONNECTOR_SPECIFIC_WITH_TYPE, ConnectorSpecificDs), ok; collect_mf(_, _) -> @@ -141,13 +153,14 @@ collect_mf(_, _) -> %% @private collect(<<"json">>) -> + RawData = raw_data(erlang:get(format_mode)), Rules = emqx_rule_engine:get_rules(), Bridges = emqx_bridge:list(), #{ data_integration_overview => collect_data_integration_overview(Rules, Bridges), - rules => collect_data_integration(rules, Rules), - actions => collect_data_integration(actions, Bridges), - connectors => collect_data_integration(connectors, Bridges) + rules => collect_json_data(?MG(rule_specific_data, RawData)), + actions => collect_json_data(?MG(action_specific_data, RawData)), + connectors => collect_json_data(?MG(connector_specific_data, RawData)) }; collect(<<"prometheus">>) -> prometheus_text_format:format(?PROMETHEUS_DATA_INTEGRATION_REGISTRY). @@ -166,32 +179,23 @@ add_collect_family(Name, Data, Callback, Type) -> collect_metrics(Name, Metrics) -> collect_di(Name, Metrics). -collect_data_integration_overview(Rules, Bridges) -> - RulesD = rules_data(Rules), - ConnectorsD = connectors_data(Bridges), +%% @private +fetch_metric_data_from_local_node() -> + Rules = emqx_rule_engine:get_rules(), + Bridges = emqx_bridge:list(), + {node(self()), #{ + rule_specific_data => rule_specific_data(Rules), + action_specific_data => action_specific_data(Bridges), + connector_specific_data => connector_specific_data(Bridges) + }}. - M1 = lists:foldl( - fun(K, AccIn) -> AccIn#{K => ?MG(K, RulesD)} end, - #{}, - metric_names(?RULES_WITH_TYPE) - ), - M2 = lists:foldl( - fun(K, AccIn) -> AccIn#{K => ?MG(K, ConnectorsD)} end, - #{}, - metric_names(?CONNECTORS_WITH_TYPE) - ), - M3 = maybe_collect_schema_registry(), - - lists:foldl(fun(M, AccIn) -> maps:merge(M, AccIn) end, #{}, [M1, M2, M3]). - -collect_data_integration(Type, DataSeed) -> - maps:fold( - fun(K, V, Acc) -> - zip_metrics(Type, K, V, Acc) - end, - [], - di_data(Type, DataSeed) - ). +fetch_cluster_consistented_metric_data() -> + Rules = emqx_rule_engine:get_rules(), + Bridges = emqx_bridge:list(), + (maybe_collect_schema_registry())#{ + rules_data => rules_data(Rules), + connectors_data => connectors_data(Bridges) + }. -if(?EMQX_RELEASE_EDITION == ee). maybe_collect_family_schema_registry(Callback) -> @@ -208,6 +212,24 @@ maybe_collect_schema_registry() -> #{}. -endif. +%% raw data for different format modes +raw_data(nodes_aggregated) -> + AggregatedNodesMetrics = aggre_cluster(metrics_data_from_all_nodes()), + maps:merge(AggregatedNodesMetrics, fetch_cluster_consistented_metric_data()); +raw_data(nodes_unaggregated) -> + %% then fold from all nodes + AllNodesMetrics = with_node_name_label(metrics_data_from_all_nodes()), + maps:merge(AllNodesMetrics, fetch_cluster_consistented_metric_data()); +raw_data(node) -> + {_Node, LocalNodeMetrics} = fetch_metric_data_from_local_node(), + maps:merge(LocalNodeMetrics, fetch_cluster_consistented_metric_data()). + +metrics_data_from_all_nodes() -> + Nodes = mria:running_nodes(), + _ResL = emqx_prometheus_proto_v2:raw_prom_data( + Nodes, ?MODULE, fetch_metric_data_from_local_node, [] + ). + %%-------------------------------------------------------------------- %% Collector %%-------------------------------------------------------------------- @@ -352,7 +374,7 @@ rule_specific_data(Rules) -> fun(#{id := Id} = Rule, AccIn) -> merge_acc_with_rules(Id, get_metric(Rule), AccIn) end, - maps:from_keys(metric_names(?RULES_SPECIFIC_WITH_TYPE), []), + maps:from_keys(rule_specific_metric_names(), []), Rules ). @@ -388,6 +410,9 @@ get_metric(#{id := Id, enable := Bool} = _Rule) -> } end. +rule_specific_metric_names() -> + metric_names(?RULES_SPECIFIC_WITH_TYPE). + %%==================== %% Specific Action %% With action_id: `{type}:{name}` as label key: `action_id` @@ -398,7 +423,7 @@ action_specific_data(Bridges) -> Id = emqx_bridge_resource:bridge_id(Type, Name), merge_acc_with_bridges(Id, get_bridge_metric(Type, Name), AccIn) end, - maps:from_keys(metric_names(?ACTION_SPECIFIC_WITH_TYPE), []), + maps:from_keys(action_specific_metric_names(), []), Bridges ). @@ -439,6 +464,9 @@ get_bridge_metric(Type, Name) -> } end. +action_specific_metric_names() -> + metric_names(?ACTION_SPECIFIC_WITH_TYPE). + %%==================== %% Specific Connector %% With connector_id: `{type}:{name}` as label key: `connector_id` @@ -449,7 +477,7 @@ connector_specific_data(Bridges) -> Id = emqx_bridge_resource:bridge_id(Type, Name), merge_acc_with_connectors(Id, get_connector_status(Bridge), AccIn) end, - maps:from_keys(metric_names(?CONNECTOR_SPECIFIC_WITH_TYPE), []), + maps:from_keys(connectr_specific_metric_names(), []), Bridges ). @@ -473,24 +501,47 @@ get_connector_status(#{resource_data := ResourceData} = _Bridge) -> emqx_connector_status => status_to_number(Status) }. -%%-------------------------------------------------------------------- +connectr_specific_metric_names() -> + metric_names(?CONNECTOR_SPECIFIC_WITH_TYPE). %%-------------------------------------------------------------------- -%% Help funcs +%% Collect functions +%%-------------------------------------------------------------------- -boolean_to_number(true) -> 1; -boolean_to_number(false) -> 0. +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% merge / zip formatting funcs for type `application/json` -status_to_number(connected) -> 1; -status_to_number(disconnected) -> 0. +collect_data_integration_overview(Rules, Bridges) -> + RulesD = rules_data(Rules), + ConnectorsD = connectors_data(Bridges), -zip_metrics(Type, K, V, Acc) -> - LabelK = label_key(Type), - do_zip_metrics(LabelK, K, V, Acc). + M1 = lists:foldl( + fun(K, AccIn) -> AccIn#{K => ?MG(K, RulesD)} end, + #{}, + metric_names(?RULES_WITH_TYPE) + ), + M2 = lists:foldl( + fun(K, AccIn) -> AccIn#{K => ?MG(K, ConnectorsD)} end, + #{}, + metric_names(?CONNECTORS_WITH_TYPE) + ), + M3 = maybe_collect_schema_registry(), -do_zip_metrics(LabelK, Key, Points, [] = _AccIn) -> + lists:foldl(fun(M, AccIn) -> maps:merge(M, AccIn) end, #{}, [M1, M2, M3]). + +collect_json_data(Data) -> + maps:fold( + fun(K, V, Acc) -> + zip_json_metrics(K, V, Acc) + end, + [], + Data + ). + +zip_json_metrics(Key, Points, [] = _AccIn) -> lists:foldl( - fun({[{K, LabelV}], Metric}, AccIn2) when K =:= LabelK -> + fun({Lables, Metric}, AccIn2) -> + LablesKVMap = maps:from_list(Lables), %% for initialized empty AccIn %% The following fields will be put into Result %% For Rules: @@ -500,19 +551,17 @@ do_zip_metrics(LabelK, Key, Points, [] = _AccIn) -> %% FOR Connectors %% `id` => [CONNECTOR_ID] %% CONNECTOR_ID = BRIDGE_ID %% formatted with {type}:{name} - Point = - #{ - LabelK => LabelV, Key => Metric - }, + Point = LablesKVMap#{Key => Metric}, [Point | AccIn2] end, [], Points ); -do_zip_metrics(LabelK, Key, Points, AllResultedAcc) -> +zip_json_metrics(Key, Points, AllResultedAcc) -> ThisKeyResult = lists:foldl( - fun({[{K, Id}], Metric}, AccIn2) when K =:= LabelK -> - [#{LabelK => Id, Key => Metric} | AccIn2] + fun({Lables, Metric}, AccIn2) -> + LablesKVMap = maps:from_list(Lables), + [maps:merge(LablesKVMap, #{Key => Metric}) | AccIn2] end, [], Points @@ -525,13 +574,146 @@ do_zip_metrics(LabelK, Key, Points, AllResultedAcc) -> ThisKeyResult ). -di_data(rules, Rules) -> rule_specific_data(Rules); -di_data(actions, Bridges) -> action_specific_data(Bridges); -di_data(connectors, Bridges) -> connector_specific_data(Bridges). +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% merge / zip formatting funcs for type `text/plain` +aggre_cluster(ResL) -> + do_aggre_cluster(ResL, aggre_or_zip_init_acc()). -label_key(rules) -> id; -label_key(actions) -> id; -label_key(connectors) -> id. +do_aggre_cluster([], AccIn) -> + AccIn; +do_aggre_cluster( + [ + {ok, + {_NodeName, #{ + rule_specific_data := NodeRuleMetrics, + action_specific_data := NodeActionMetrics, + connector_specific_data := NodeConnectorMetrics + }}} + | Rest + ], + #{ + rule_specific_data := RuleAcc, + action_specific_data := ActionAcc, + connector_specific_data := ConnAcc + } = AccIn +) -> + do_aggre_cluster( + Rest, + AccIn#{ + %% TODO + rule_specific_data => do_aggre_metric(NodeRuleMetrics, RuleAcc), + action_specific_data => do_aggre_metric(NodeActionMetrics, ActionAcc), + connector_specific_data => do_aggre_metric(NodeConnectorMetrics, ConnAcc) + } + ); +do_aggre_cluster([{_, _} | Rest], AccIn) -> + do_aggre_cluster(Rest, AccIn). + +do_aggre_metric(NodeMetrics, AccIn0) -> + lists:foldl( + fun(K, AccIn) -> + NAccL = do_aggre_metric(K, ?MG(K, NodeMetrics), ?MG(K, AccIn)), + AccIn#{K => NAccL} + end, + AccIn0, + maps:keys(NodeMetrics) + ). + +-define(PG0(K, PROPLISTS), proplists:get_value(K, PROPLISTS, 0)). + +do_aggre_metric(K, NodeMetrics, AccL) -> + lists:foldl( + fun({Labels, Metric}, AccIn) -> + NMetric = + case lists:member(K, ?LOGICAL_SUM_METRIC_NAMES) of + true -> + logic_sum(Metric, ?PG0(Labels, AccIn)); + false -> + Metric + ?PG0(Labels, AccIn) + end, + [{Labels, NMetric} | AccIn] + end, + AccL, + NodeMetrics + ). + +with_node_name_label(ResL) -> + do_with_node_name_label( + ResL, + aggre_or_zip_init_acc() + ). + +do_with_node_name_label([], AccIn) -> + AccIn; +do_with_node_name_label( + [ + {ok, + {NodeName, #{ + rule_specific_data := NodeRuleMetrics, + action_specific_data := NodeActionMetrics, + connector_specific_data := NodeConnectorMetrics + }}} + | Rest + ], + #{ + rule_specific_data := RuleAcc, + action_specific_data := ActionAcc, + connector_specific_data := ConnAcc + } = AccIn +) -> + do_with_node_name_label( + Rest, + AccIn#{ + rule_specific_data => zip_with_node_name(NodeName, NodeRuleMetrics, RuleAcc), + action_specific_data => zip_with_node_name(NodeName, NodeActionMetrics, ActionAcc), + connector_specific_data => zip_with_node_name(NodeName, NodeConnectorMetrics, ConnAcc) + } + ); +do_with_node_name_label([{_, _} | Rest], AccIn) -> + do_with_node_name_label(Rest, AccIn). + +zip_with_node_name(NodeName, NodeMetrics, AccIn0) -> + lists:foldl( + fun(K, AccIn) -> + NAccL = do_zip_with_node_name(NodeName, ?MG(K, NodeMetrics), ?MG(K, AccIn)), + AccIn#{K => NAccL} + end, + AccIn0, + maps:keys(NodeMetrics) + ). + +do_zip_with_node_name(NodeName, NodeMetrics, AccL) -> + lists:foldl( + fun({Labels, Metric}, AccIn) -> + NLabels = [{node_name, NodeName} | Labels], + [{NLabels, Metric} | AccIn] + end, + AccL, + NodeMetrics + ). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Helper funcs + +boolean_to_number(true) -> 1; +boolean_to_number(false) -> 0. + +status_to_number(connected) -> 1; +status_to_number(disconnected) -> 0. + +logic_sum(N1, N2) when + (N1 > 0 andalso N2 > 0) +-> + 1; +logic_sum(_, _) -> + 0. metric_names(MetricWithType) when is_list(MetricWithType) -> [Name || {Name, _Type} <- MetricWithType]. + +aggre_or_zip_init_acc() -> + #{ + rule_specific_data => maps:from_keys(rule_specific_metric_names(), []), + action_specific_data => maps:from_keys(action_specific_metric_names(), []), + connector_specific_data => maps:from_keys(connectr_specific_metric_names(), []) + }.