feat(prom_di): cluster aggregated/unaggregated metrics

This commit is contained in:
JimMoen 2024-01-18 11:23:23 +08:00
parent 9627124d67
commit c3da792323
No known key found for this signature in database
1 changed files with 246 additions and 64 deletions

View File

@ -24,6 +24,11 @@
-export([collect/1]). -export([collect/1]).
%% for bpapi
-export([
fetch_metric_data_from_local_node/0
]).
-export([add_collect_family/4]). -export([add_collect_family/4]).
-include("emqx_prometheus.hrl"). -include("emqx_prometheus.hrl").
@ -105,6 +110,12 @@
-else. -else.
-endif. -endif.
-define(LOGICAL_SUM_METRIC_NAMES, [
emqx_rule_enable,
emqx_connector_enable,
emqx_connector_status
]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Collector API %% Collector API
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -117,23 +128,24 @@ deregister_cleanup(_) -> ok.
_Registry :: prometheus_registry:registry(), _Registry :: prometheus_registry:registry(),
Callback :: prometheus_collector:collect_mf_callback(). Callback :: prometheus_collector:collect_mf_callback().
collect_mf(?PROMETHEUS_DATA_INTEGRATION_REGISTRY, Callback) -> collect_mf(?PROMETHEUS_DATA_INTEGRATION_REGISTRY, Callback) ->
Rules = emqx_rule_engine:get_rules(), RawData = raw_data(erlang:get(format_mode)),
Bridges = emqx_bridge:list(),
%% Data Integration Overview %% Data Integration Overview
ok = add_collect_family(Callback, ?RULES_WITH_TYPE, rules_data(Rules)), ok = add_collect_family(Callback, ?RULES_WITH_TYPE, ?MG(rules_data, RawData)),
ok = add_collect_family(Callback, ?CONNECTORS_WITH_TYPE, connectors_data(Bridges)), ok = add_collect_family(Callback, ?CONNECTORS_WITH_TYPE, ?MG(connectors_data, RawData)),
ok = maybe_collect_family_schema_registry(Callback), ok = maybe_collect_family_schema_registry(Callback),
%% Rule Specific %% Rule Specific
ok = add_collect_family(Callback, ?RULES_SPECIFIC_WITH_TYPE, rule_specific_data(Rules)), RuleSpecificDs = ?MG(rule_specific_data, RawData),
ok = add_collect_family(Callback, ?RULES_SPECIFIC_WITH_TYPE, RuleSpecificDs),
%% Action Specific %% Action Specific
ok = add_collect_family(Callback, ?ACTION_SPECIFIC_WITH_TYPE, action_specific_data(Bridges)), ActionSpecificDs = ?MG(action_specific_data, RawData),
ok = add_collect_family(Callback, ?ACTION_SPECIFIC_WITH_TYPE, ActionSpecificDs),
%% Connector Specific %% Connector Specific
ok = add_collect_family( ConnectorSpecificDs = ?MG(connector_specific_data, RawData),
Callback, ?CONNECTOR_SPECIFIC_WITH_TYPE, connector_specific_data(Bridges) ok = add_collect_family(Callback, ?CONNECTOR_SPECIFIC_WITH_TYPE, ConnectorSpecificDs),
),
ok; ok;
collect_mf(_, _) -> collect_mf(_, _) ->
@ -141,13 +153,14 @@ collect_mf(_, _) ->
%% @private %% @private
collect(<<"json">>) -> collect(<<"json">>) ->
RawData = raw_data(erlang:get(format_mode)),
Rules = emqx_rule_engine:get_rules(), Rules = emqx_rule_engine:get_rules(),
Bridges = emqx_bridge:list(), Bridges = emqx_bridge:list(),
#{ #{
data_integration_overview => collect_data_integration_overview(Rules, Bridges), data_integration_overview => collect_data_integration_overview(Rules, Bridges),
rules => collect_data_integration(rules, Rules), rules => collect_json_data(?MG(rule_specific_data, RawData)),
actions => collect_data_integration(actions, Bridges), actions => collect_json_data(?MG(action_specific_data, RawData)),
connectors => collect_data_integration(connectors, Bridges) connectors => collect_json_data(?MG(connector_specific_data, RawData))
}; };
collect(<<"prometheus">>) -> collect(<<"prometheus">>) ->
prometheus_text_format:format(?PROMETHEUS_DATA_INTEGRATION_REGISTRY). prometheus_text_format:format(?PROMETHEUS_DATA_INTEGRATION_REGISTRY).
@ -166,32 +179,23 @@ add_collect_family(Name, Data, Callback, Type) ->
collect_metrics(Name, Metrics) -> collect_metrics(Name, Metrics) ->
collect_di(Name, Metrics). collect_di(Name, Metrics).
collect_data_integration_overview(Rules, Bridges) -> %% @private
RulesD = rules_data(Rules), fetch_metric_data_from_local_node() ->
ConnectorsD = connectors_data(Bridges), Rules = emqx_rule_engine:get_rules(),
Bridges = emqx_bridge:list(),
{node(self()), #{
rule_specific_data => rule_specific_data(Rules),
action_specific_data => action_specific_data(Bridges),
connector_specific_data => connector_specific_data(Bridges)
}}.
M1 = lists:foldl( fetch_cluster_consistented_metric_data() ->
fun(K, AccIn) -> AccIn#{K => ?MG(K, RulesD)} end, Rules = emqx_rule_engine:get_rules(),
#{}, Bridges = emqx_bridge:list(),
metric_names(?RULES_WITH_TYPE) (maybe_collect_schema_registry())#{
), rules_data => rules_data(Rules),
M2 = lists:foldl( connectors_data => connectors_data(Bridges)
fun(K, AccIn) -> AccIn#{K => ?MG(K, ConnectorsD)} end, }.
#{},
metric_names(?CONNECTORS_WITH_TYPE)
),
M3 = maybe_collect_schema_registry(),
lists:foldl(fun(M, AccIn) -> maps:merge(M, AccIn) end, #{}, [M1, M2, M3]).
collect_data_integration(Type, DataSeed) ->
maps:fold(
fun(K, V, Acc) ->
zip_metrics(Type, K, V, Acc)
end,
[],
di_data(Type, DataSeed)
).
-if(?EMQX_RELEASE_EDITION == ee). -if(?EMQX_RELEASE_EDITION == ee).
maybe_collect_family_schema_registry(Callback) -> maybe_collect_family_schema_registry(Callback) ->
@ -208,6 +212,24 @@ maybe_collect_schema_registry() ->
#{}. #{}.
-endif. -endif.
%% raw data for different format modes
raw_data(nodes_aggregated) ->
AggregatedNodesMetrics = aggre_cluster(metrics_data_from_all_nodes()),
maps:merge(AggregatedNodesMetrics, fetch_cluster_consistented_metric_data());
raw_data(nodes_unaggregated) ->
%% then fold from all nodes
AllNodesMetrics = with_node_name_label(metrics_data_from_all_nodes()),
maps:merge(AllNodesMetrics, fetch_cluster_consistented_metric_data());
raw_data(node) ->
{_Node, LocalNodeMetrics} = fetch_metric_data_from_local_node(),
maps:merge(LocalNodeMetrics, fetch_cluster_consistented_metric_data()).
metrics_data_from_all_nodes() ->
Nodes = mria:running_nodes(),
_ResL = emqx_prometheus_proto_v2:raw_prom_data(
Nodes, ?MODULE, fetch_metric_data_from_local_node, []
).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Collector %% Collector
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -352,7 +374,7 @@ rule_specific_data(Rules) ->
fun(#{id := Id} = Rule, AccIn) -> fun(#{id := Id} = Rule, AccIn) ->
merge_acc_with_rules(Id, get_metric(Rule), AccIn) merge_acc_with_rules(Id, get_metric(Rule), AccIn)
end, end,
maps:from_keys(metric_names(?RULES_SPECIFIC_WITH_TYPE), []), maps:from_keys(rule_specific_metric_names(), []),
Rules Rules
). ).
@ -388,6 +410,9 @@ get_metric(#{id := Id, enable := Bool} = _Rule) ->
} }
end. end.
rule_specific_metric_names() ->
metric_names(?RULES_SPECIFIC_WITH_TYPE).
%%==================== %%====================
%% Specific Action %% Specific Action
%% With action_id: `{type}:{name}` as label key: `action_id` %% With action_id: `{type}:{name}` as label key: `action_id`
@ -398,7 +423,7 @@ action_specific_data(Bridges) ->
Id = emqx_bridge_resource:bridge_id(Type, Name), Id = emqx_bridge_resource:bridge_id(Type, Name),
merge_acc_with_bridges(Id, get_bridge_metric(Type, Name), AccIn) merge_acc_with_bridges(Id, get_bridge_metric(Type, Name), AccIn)
end, end,
maps:from_keys(metric_names(?ACTION_SPECIFIC_WITH_TYPE), []), maps:from_keys(action_specific_metric_names(), []),
Bridges Bridges
). ).
@ -439,6 +464,9 @@ get_bridge_metric(Type, Name) ->
} }
end. end.
action_specific_metric_names() ->
metric_names(?ACTION_SPECIFIC_WITH_TYPE).
%%==================== %%====================
%% Specific Connector %% Specific Connector
%% With connector_id: `{type}:{name}` as label key: `connector_id` %% With connector_id: `{type}:{name}` as label key: `connector_id`
@ -449,7 +477,7 @@ connector_specific_data(Bridges) ->
Id = emqx_bridge_resource:bridge_id(Type, Name), Id = emqx_bridge_resource:bridge_id(Type, Name),
merge_acc_with_connectors(Id, get_connector_status(Bridge), AccIn) merge_acc_with_connectors(Id, get_connector_status(Bridge), AccIn)
end, end,
maps:from_keys(metric_names(?CONNECTOR_SPECIFIC_WITH_TYPE), []), maps:from_keys(connectr_specific_metric_names(), []),
Bridges Bridges
). ).
@ -473,24 +501,47 @@ get_connector_status(#{resource_data := ResourceData} = _Bridge) ->
emqx_connector_status => status_to_number(Status) emqx_connector_status => status_to_number(Status)
}. }.
%%-------------------------------------------------------------------- connectr_specific_metric_names() ->
metric_names(?CONNECTOR_SPECIFIC_WITH_TYPE).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Help funcs %% Collect functions
%%--------------------------------------------------------------------
boolean_to_number(true) -> 1; %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
boolean_to_number(false) -> 0. %% merge / zip formatting funcs for type `application/json`
status_to_number(connected) -> 1; collect_data_integration_overview(Rules, Bridges) ->
status_to_number(disconnected) -> 0. RulesD = rules_data(Rules),
ConnectorsD = connectors_data(Bridges),
zip_metrics(Type, K, V, Acc) -> M1 = lists:foldl(
LabelK = label_key(Type), fun(K, AccIn) -> AccIn#{K => ?MG(K, RulesD)} end,
do_zip_metrics(LabelK, K, V, Acc). #{},
metric_names(?RULES_WITH_TYPE)
),
M2 = lists:foldl(
fun(K, AccIn) -> AccIn#{K => ?MG(K, ConnectorsD)} end,
#{},
metric_names(?CONNECTORS_WITH_TYPE)
),
M3 = maybe_collect_schema_registry(),
do_zip_metrics(LabelK, Key, Points, [] = _AccIn) -> lists:foldl(fun(M, AccIn) -> maps:merge(M, AccIn) end, #{}, [M1, M2, M3]).
collect_json_data(Data) ->
maps:fold(
fun(K, V, Acc) ->
zip_json_metrics(K, V, Acc)
end,
[],
Data
).
zip_json_metrics(Key, Points, [] = _AccIn) ->
lists:foldl( lists:foldl(
fun({[{K, LabelV}], Metric}, AccIn2) when K =:= LabelK -> fun({Lables, Metric}, AccIn2) ->
LablesKVMap = maps:from_list(Lables),
%% for initialized empty AccIn %% for initialized empty AccIn
%% The following fields will be put into Result %% The following fields will be put into Result
%% For Rules: %% For Rules:
@ -500,19 +551,17 @@ do_zip_metrics(LabelK, Key, Points, [] = _AccIn) ->
%% FOR Connectors %% FOR Connectors
%% `id` => [CONNECTOR_ID] %% CONNECTOR_ID = BRIDGE_ID %% `id` => [CONNECTOR_ID] %% CONNECTOR_ID = BRIDGE_ID
%% formatted with {type}:{name} %% formatted with {type}:{name}
Point = Point = LablesKVMap#{Key => Metric},
#{
LabelK => LabelV, Key => Metric
},
[Point | AccIn2] [Point | AccIn2]
end, end,
[], [],
Points Points
); );
do_zip_metrics(LabelK, Key, Points, AllResultedAcc) -> zip_json_metrics(Key, Points, AllResultedAcc) ->
ThisKeyResult = lists:foldl( ThisKeyResult = lists:foldl(
fun({[{K, Id}], Metric}, AccIn2) when K =:= LabelK -> fun({Lables, Metric}, AccIn2) ->
[#{LabelK => Id, Key => Metric} | AccIn2] LablesKVMap = maps:from_list(Lables),
[maps:merge(LablesKVMap, #{Key => Metric}) | AccIn2]
end, end,
[], [],
Points Points
@ -525,13 +574,146 @@ do_zip_metrics(LabelK, Key, Points, AllResultedAcc) ->
ThisKeyResult ThisKeyResult
). ).
di_data(rules, Rules) -> rule_specific_data(Rules); %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
di_data(actions, Bridges) -> action_specific_data(Bridges); %% merge / zip formatting funcs for type `text/plain`
di_data(connectors, Bridges) -> connector_specific_data(Bridges). aggre_cluster(ResL) ->
do_aggre_cluster(ResL, aggre_or_zip_init_acc()).
label_key(rules) -> id; do_aggre_cluster([], AccIn) ->
label_key(actions) -> id; AccIn;
label_key(connectors) -> id. do_aggre_cluster(
[
{ok,
{_NodeName, #{
rule_specific_data := NodeRuleMetrics,
action_specific_data := NodeActionMetrics,
connector_specific_data := NodeConnectorMetrics
}}}
| Rest
],
#{
rule_specific_data := RuleAcc,
action_specific_data := ActionAcc,
connector_specific_data := ConnAcc
} = AccIn
) ->
do_aggre_cluster(
Rest,
AccIn#{
%% TODO
rule_specific_data => do_aggre_metric(NodeRuleMetrics, RuleAcc),
action_specific_data => do_aggre_metric(NodeActionMetrics, ActionAcc),
connector_specific_data => do_aggre_metric(NodeConnectorMetrics, ConnAcc)
}
);
do_aggre_cluster([{_, _} | Rest], AccIn) ->
do_aggre_cluster(Rest, AccIn).
do_aggre_metric(NodeMetrics, AccIn0) ->
lists:foldl(
fun(K, AccIn) ->
NAccL = do_aggre_metric(K, ?MG(K, NodeMetrics), ?MG(K, AccIn)),
AccIn#{K => NAccL}
end,
AccIn0,
maps:keys(NodeMetrics)
).
-define(PG0(K, PROPLISTS), proplists:get_value(K, PROPLISTS, 0)).
do_aggre_metric(K, NodeMetrics, AccL) ->
lists:foldl(
fun({Labels, Metric}, AccIn) ->
NMetric =
case lists:member(K, ?LOGICAL_SUM_METRIC_NAMES) of
true ->
logic_sum(Metric, ?PG0(Labels, AccIn));
false ->
Metric + ?PG0(Labels, AccIn)
end,
[{Labels, NMetric} | AccIn]
end,
AccL,
NodeMetrics
).
with_node_name_label(ResL) ->
do_with_node_name_label(
ResL,
aggre_or_zip_init_acc()
).
do_with_node_name_label([], AccIn) ->
AccIn;
do_with_node_name_label(
[
{ok,
{NodeName, #{
rule_specific_data := NodeRuleMetrics,
action_specific_data := NodeActionMetrics,
connector_specific_data := NodeConnectorMetrics
}}}
| Rest
],
#{
rule_specific_data := RuleAcc,
action_specific_data := ActionAcc,
connector_specific_data := ConnAcc
} = AccIn
) ->
do_with_node_name_label(
Rest,
AccIn#{
rule_specific_data => zip_with_node_name(NodeName, NodeRuleMetrics, RuleAcc),
action_specific_data => zip_with_node_name(NodeName, NodeActionMetrics, ActionAcc),
connector_specific_data => zip_with_node_name(NodeName, NodeConnectorMetrics, ConnAcc)
}
);
do_with_node_name_label([{_, _} | Rest], AccIn) ->
do_with_node_name_label(Rest, AccIn).
zip_with_node_name(NodeName, NodeMetrics, AccIn0) ->
lists:foldl(
fun(K, AccIn) ->
NAccL = do_zip_with_node_name(NodeName, ?MG(K, NodeMetrics), ?MG(K, AccIn)),
AccIn#{K => NAccL}
end,
AccIn0,
maps:keys(NodeMetrics)
).
do_zip_with_node_name(NodeName, NodeMetrics, AccL) ->
lists:foldl(
fun({Labels, Metric}, AccIn) ->
NLabels = [{node_name, NodeName} | Labels],
[{NLabels, Metric} | AccIn]
end,
AccL,
NodeMetrics
).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Helper funcs
boolean_to_number(true) -> 1;
boolean_to_number(false) -> 0.
status_to_number(connected) -> 1;
status_to_number(disconnected) -> 0.
logic_sum(N1, N2) when
(N1 > 0 andalso N2 > 0)
->
1;
logic_sum(_, _) ->
0.
metric_names(MetricWithType) when is_list(MetricWithType) -> metric_names(MetricWithType) when is_list(MetricWithType) ->
[Name || {Name, _Type} <- MetricWithType]. [Name || {Name, _Type} <- MetricWithType].
aggre_or_zip_init_acc() ->
#{
rule_specific_data => maps:from_keys(rule_specific_metric_names(), []),
action_specific_data => maps:from_keys(action_specific_metric_names(), []),
connector_specific_data => maps:from_keys(connectr_specific_metric_names(), [])
}.