From 731efd8b49dd41c1f0b684ee5761503bd2ce5d43 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Tue, 23 Jan 2024 16:57:08 +0800 Subject: [PATCH] fix(prom): cluster aggre/unaggre labels --- apps/emqx_prometheus/src/emqx_prometheus.erl | 58 +++++++----- .../src/emqx_prometheus_auth.erl | 61 ++++++++----- .../src/emqx_prometheus_cluster.erl | 90 +++++++++---------- .../src/emqx_prometheus_data_integration.erl | 56 +++++++----- 4 files changed, 151 insertions(+), 114 deletions(-) diff --git a/apps/emqx_prometheus/src/emqx_prometheus.erl b/apps/emqx_prometheus/src/emqx_prometheus.erl index 2ddfc4def..7bbfd5bd5 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus.erl @@ -24,7 +24,7 @@ -behaviour(emqx_prometheus_cluster). -export([ - fetch_data_from_local_node/0, + fetch_from_local_node/1, fetch_cluster_consistented_data/0, aggre_or_zip_init_acc/0, logic_sum_metrics/0 @@ -241,20 +241,20 @@ add_collect_family(Name, Data, Callback, Type) -> Callback(create_mf(Name, _Help = <<"">>, Type, ?MODULE, Data)). %% behaviour -fetch_data_from_local_node() -> +fetch_from_local_node(Mode) -> {node(self()), #{ - stats_data => stats_data(), - vm_data => vm_data(), - cluster_data => cluster_data(), + stats_data => stats_data(Mode), + vm_data => vm_data(Mode), + cluster_data => cluster_data(Mode), %% Metrics - emqx_packet_data => emqx_metric_data(emqx_packet_metric_meta()), - emqx_message_data => emqx_metric_data(message_metric_meta()), - emqx_delivery_data => emqx_metric_data(delivery_metric_meta()), - emqx_client_data => emqx_metric_data(client_metric_meta()), - emqx_session_data => emqx_metric_data(session_metric_meta()), - emqx_olp_data => emqx_metric_data(olp_metric_meta()), - emqx_acl_data => emqx_metric_data(acl_metric_meta()), - emqx_authn_data => emqx_metric_data(authn_metric_meta()) + emqx_packet_data => emqx_metric_data(emqx_packet_metric_meta(), Mode), + emqx_message_data => emqx_metric_data(message_metric_meta(), Mode), + emqx_delivery_data => emqx_metric_data(delivery_metric_meta(), Mode), + emqx_client_data => emqx_metric_data(client_metric_meta(), Mode), + emqx_session_data => emqx_metric_data(session_metric_meta(), Mode), + emqx_olp_data => emqx_metric_data(olp_metric_meta(), Mode), + emqx_acl_data => emqx_metric_data(acl_metric_meta(), Mode), + emqx_authn_data => emqx_metric_data(authn_metric_meta(), Mode) }}. fetch_cluster_consistented_data() -> @@ -495,11 +495,11 @@ stats_metric_meta() -> {emqx_delayed_max, counter, 'delayed.max'} ]. -stats_data() -> +stats_data(Mode) -> Stats = emqx_stats:getstats(), lists:foldl( fun({Name, _Type, MetricKAtom}, AccIn) -> - AccIn#{Name => [{[], ?C(MetricKAtom, Stats)}]} + AccIn#{Name => [{with_node_label(Mode, []), ?C(MetricKAtom, Stats)}]} end, #{}, stats_metric_meta() @@ -519,11 +519,18 @@ vm_metric_meta() -> {emqx_vm_used_memory, gauge, 'used_memory'} ]. -vm_data() -> +vm_data(Mode) -> VmStats = emqx_mgmt:vm_stats(), lists:foldl( fun({Name, _Type, MetricKAtom}, AccIn) -> - AccIn#{Name => [{[], ?C(MetricKAtom, VmStats)}]} + Labels = + case Mode of + node -> + []; + _ -> + [{node, node(self())}] + end, + AccIn#{Name => [{Labels, ?C(MetricKAtom, VmStats)}]} end, #{}, vm_metric_meta() @@ -539,23 +546,23 @@ cluster_metric_meta() -> {emqx_cluster_nodes_stopped, gauge, undefined} ]. -cluster_data() -> +cluster_data(Mode) -> Running = emqx:cluster_nodes(running), Stopped = emqx:cluster_nodes(stopped), #{ - emqx_cluster_nodes_running => [{[], length(Running)}], - emqx_cluster_nodes_stopped => [{[], length(Stopped)}] + emqx_cluster_nodes_running => [{with_node_label(Mode, []), length(Running)}], + emqx_cluster_nodes_stopped => [{with_node_label(Mode, []), length(Stopped)}] }. %%======================================== %% Metrics %%======================================== -emqx_metric_data(MetricNameTypeKeyL) -> +emqx_metric_data(MetricNameTypeKeyL, Mode) -> Metrics = emqx_metrics:all(), lists:foldl( fun({Name, _Type, MetricKAtom}, AccIn) -> - AccIn#{Name => [{[], ?C(MetricKAtom, Metrics)}]} + AccIn#{Name => [{with_node_label(Mode, []), ?C(MetricKAtom, Metrics)}]} end, #{}, MetricNameTypeKeyL @@ -911,6 +918,13 @@ zip_json_prom_stats_metrics(Key, Points, AllResultedAcc) -> metrics_name(MetricsAll) -> [Name || {Name, _, _} <- MetricsAll]. +with_node_label(?PROM_DATA_MODE__NODE, Labels) -> + Labels; +with_node_label(?PROM_DATA_MODE__ALL_NODES_AGGREGATED, Labels) -> + Labels; +with_node_label(?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED, Labels) -> + [{node, node(self())} | Labels]. + %%-------------------------------------------------------------------- %% bpapi diff --git a/apps/emqx_prometheus/src/emqx_prometheus_auth.erl b/apps/emqx_prometheus/src/emqx_prometheus_auth.erl index 0d0607518..e5574952c 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_auth.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_auth.erl @@ -26,7 +26,7 @@ %% for bpapi -behaviour(emqx_prometheus_cluster). -export([ - fetch_data_from_local_node/0, + fetch_from_local_node/1, fetch_cluster_consistented_data/0, aggre_or_zip_init_acc/0, logic_sum_metrics/0 @@ -126,10 +126,10 @@ collect_metrics(Name, Metrics) -> collect_auth(Name, Metrics). %% behaviour -fetch_data_from_local_node() -> +fetch_from_local_node(Mode) -> {node(self()), #{ - authn_data => authn_data(), - authz_data => authz_data() + authn_data => authn_data(Mode), + authz_data => authz_data(Mode) }}. fetch_cluster_consistented_data() -> @@ -224,38 +224,41 @@ authn_metric_meta() -> authn_metric(names) -> emqx_prometheus_cluster:metric_names(authn_metric_meta()). --spec authn_data() -> #{Key => [Point]} when +-spec authn_data(atom()) -> #{Key => [Point]} when Key :: authn_metric_name(), Point :: {[Label], Metric}, Label :: IdLabel, IdLabel :: {id, AuthnName :: binary()}, Metric :: number(). -authn_data() -> +authn_data(Mode) -> Authns = emqx_config:get([authentication]), lists:foldl( fun(Key, AccIn) -> - AccIn#{Key => authn_backend_to_points(Key, Authns)} + AccIn#{Key => authn_backend_to_points(Mode, Key, Authns)} end, #{}, authn_metric(names) ). --spec authn_backend_to_points(Key, list(Authn)) -> list(Point) when +-spec authn_backend_to_points(atom(), Key, list(Authn)) -> list(Point) when Key :: authn_metric_name(), Authn :: map(), Point :: {[Label], Metric}, Label :: IdLabel, IdLabel :: {id, AuthnName :: binary()}, Metric :: number(). -authn_backend_to_points(Key, Authns) -> - do_authn_backend_to_points(Key, Authns, []). +authn_backend_to_points(Mode, Key, Authns) -> + do_authn_backend_to_points(Mode, Key, Authns, []). -do_authn_backend_to_points(_K, [], AccIn) -> +do_authn_backend_to_points(_Mode, _K, [], AccIn) -> lists:reverse(AccIn); -do_authn_backend_to_points(K, [Authn | Rest], AccIn) -> +do_authn_backend_to_points(Mode, K, [Authn | Rest], AccIn) -> Id = authenticator_id(Authn), - Point = {[{id, Id}], do_metric(K, Authn, lookup_authn_metrics_local(Id))}, - do_authn_backend_to_points(K, Rest, [Point | AccIn]). + Point = { + with_node_label(Mode, [{id, Id}]), + do_metric(K, Authn, lookup_authn_metrics_local(Id)) + }, + do_authn_backend_to_points(Mode, K, Rest, [Point | AccIn]). lookup_authn_metrics_local(Id) -> case emqx_authn_api:lookup_from_local_node(?GLOBAL, Id) of @@ -317,38 +320,41 @@ authz_metric_meta() -> authz_metric(names) -> emqx_prometheus_cluster:metric_names(authz_metric_meta()). --spec authz_data() -> #{Key => [Point]} when +-spec authz_data(atom()) -> #{Key => [Point]} when Key :: authz_metric_name(), Point :: {[Label], Metric}, Label :: TypeLabel, TypeLabel :: {type, AuthZType :: binary()}, Metric :: number(). -authz_data() -> +authz_data(Mode) -> Authzs = emqx_config:get([authorization, sources]), lists:foldl( fun(Key, AccIn) -> - AccIn#{Key => authz_backend_to_points(Key, Authzs)} + AccIn#{Key => authz_backend_to_points(Mode, Key, Authzs)} end, #{}, authz_metric(names) ). --spec authz_backend_to_points(Key, list(Authz)) -> list(Point) when +-spec authz_backend_to_points(atom(), Key, list(Authz)) -> list(Point) when Key :: authz_metric_name(), Authz :: map(), Point :: {[Label], Metric}, Label :: TypeLabel, TypeLabel :: {type, AuthZType :: binary()}, Metric :: number(). -authz_backend_to_points(Key, Authzs) -> - do_authz_backend_to_points(Key, Authzs, []). +authz_backend_to_points(Mode, Key, Authzs) -> + do_authz_backend_to_points(Mode, Key, Authzs, []). -do_authz_backend_to_points(_K, [], AccIn) -> +do_authz_backend_to_points(_Mode, _K, [], AccIn) -> lists:reverse(AccIn); -do_authz_backend_to_points(K, [Authz | Rest], AccIn) -> +do_authz_backend_to_points(Mode, K, [Authz | Rest], AccIn) -> Type = maps:get(type, Authz), - Point = {[{type, Type}], do_metric(K, Authz, lookup_authz_metrics_local(Type))}, - do_authz_backend_to_points(K, Rest, [Point | AccIn]). + Point = { + with_node_label(Mode, [{type, Type}]), + do_metric(K, Authz, lookup_authz_metrics_local(Type)) + }, + do_authz_backend_to_points(Mode, K, Rest, [Point | AccIn]). lookup_authz_metrics_local(Type) -> case emqx_authz_api_sources:lookup_from_local_node(Type) of @@ -481,3 +487,10 @@ do_metric(emqx_authn_enable, #{enable := B}, _) -> emqx_prometheus_cluster:boolean_to_number(B); do_metric(K, _, Metrics) -> ?MG0(K, Metrics). + +with_node_label(?PROM_DATA_MODE__NODE, Labels) -> + Labels; +with_node_label(?PROM_DATA_MODE__ALL_NODES_AGGREGATED, Labels) -> + Labels; +with_node_label(?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED, Labels) -> + [{node, node(self())} | Labels]. diff --git a/apps/emqx_prometheus/src/emqx_prometheus_cluster.erl b/apps/emqx_prometheus/src/emqx_prometheus_cluster.erl index ae9b47c39..02209be22 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_cluster.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_cluster.erl @@ -23,7 +23,7 @@ collect_json_data/2, aggre_cluster/3, - with_node_name_label/2, + %% with_node_name_label/2, point_to_map_fun/1, @@ -34,7 +34,7 @@ -callback fetch_cluster_consistented_data() -> map(). --callback fetch_data_from_local_node() -> {node(), map()}. +-callback fetch_from_local_node(atom()) -> {node(), map()}. -callback aggre_or_zip_init_acc() -> map(). @@ -46,23 +46,23 @@ raw_data(Module, undefined) -> %% TODO: for push gateway, the format mode should be configurable raw_data(Module, ?PROM_DATA_MODE__NODE); -raw_data(Module, ?PROM_DATA_MODE__ALL_NODES_AGGREGATED) -> - AllNodesMetrics = aggre_cluster(Module), +raw_data(Module, ?PROM_DATA_MODE__ALL_NODES_AGGREGATED = Mode) -> + AllNodesMetrics = aggre_cluster(Module, Mode), Cluster = Module:fetch_cluster_consistented_data(), maps:merge(AllNodesMetrics, Cluster); -raw_data(Module, ?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED) -> - AllNodesMetrics = with_node_name_label(Module), +raw_data(Module, ?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED = Mode) -> + AllNodesMetrics = zip_cluster_data(Module, Mode), Cluster = Module:fetch_cluster_consistented_data(), maps:merge(AllNodesMetrics, Cluster); -raw_data(Module, ?PROM_DATA_MODE__NODE) -> - {_Node, LocalNodeMetrics} = Module:fetch_data_from_local_node(), +raw_data(Module, ?PROM_DATA_MODE__NODE = Mode) -> + {_Node, LocalNodeMetrics} = Module:fetch_from_local_node(Mode), Cluster = Module:fetch_cluster_consistented_data(), maps:merge(LocalNodeMetrics, Cluster). -metrics_data_from_all_nodes(Module) -> +fetch_data_from_all_nodes(Module, Mode) -> Nodes = mria:running_nodes(), _ResL = emqx_prometheus_proto_v2:raw_prom_data( - Nodes, Module, fetch_data_from_local_node, [] + Nodes, Module, fetch_from_local_node, [Mode] ). collect_json_data(Data, Func) when is_function(Func, 3) -> @@ -76,17 +76,13 @@ collect_json_data(Data, Func) when is_function(Func, 3) -> collect_json_data(_, _) -> error(badarg). -aggre_cluster(Module) -> +aggre_cluster(Module, Mode) -> do_aggre_cluster( Module:logic_sum_metrics(), - metrics_data_from_all_nodes(Module), + fetch_data_from_all_nodes(Module, Mode), Module:aggre_or_zip_init_acc() ). -with_node_name_label(Module) -> - ResL = metrics_data_from_all_nodes(Module), - do_with_node_name_label(ResL, Module:aggre_or_zip_init_acc()). - aggre_cluster(LogicSumKs, ResL, Init) -> do_aggre_cluster(LogicSumKs, ResL, Init). @@ -121,61 +117,65 @@ aggre_metric(LogicSumKs, NodeMetrics, AccIn0) -> do_aggre_metric(K, LogicSumKs, NodeMetrics, AccL) -> lists:foldl( - fun({Labels, Metric}, AccIn) -> - NMetric = - case lists:member(K, LogicSumKs) of - true -> - logic_sum(Metric, ?PG0(Labels, AccIn)); - false -> - Metric + ?PG0(Labels, AccIn) - end, - [{Labels, NMetric} | AccIn] + fun(Point = {_Labels, _Metric}, AccIn) -> + sum(K, LogicSumKs, Point, AccIn) end, AccL, NodeMetrics ). -with_node_name_label(ResL, Init) -> - do_with_node_name_label(ResL, Init). +sum(K, LogicSumKs, {Labels, Metric} = Point, MetricAccL) -> + case lists:keytake(Labels, 1, MetricAccL) of + {value, {Labels, MetricAcc}, NMetricAccL} -> + NPoint = {Labels, do_sum(K, LogicSumKs, Metric, MetricAcc)}, + [NPoint | NMetricAccL]; + false -> + [Point | MetricAccL] + end. -do_with_node_name_label([], AccIn) -> +do_sum(K, LogicSumKs, Metric, MetricAcc) -> + case lists:member(K, LogicSumKs) of + true -> + logic_sum(Metric, MetricAcc); + false -> + Metric + MetricAcc + end. + +zip_cluster_data(Module, Mode) -> + zip_cluster( + fetch_data_from_all_nodes(Module, Mode), + Module:aggre_or_zip_init_acc() + ). + +zip_cluster([], AccIn) -> AccIn; -do_with_node_name_label([{ok, {NodeName, NodeMetric}} | Rest], AccIn) -> - do_with_node_name_label( +zip_cluster([{ok, {_NodeName, NodeMetric}} | Rest], AccIn) -> + zip_cluster( Rest, maps:fold( fun(K, V, AccIn0) -> AccIn0#{ - K => zip_with_node_name(NodeName, V, ?MG(K, AccIn0)) + K => do_zip_cluster(V, ?MG(K, AccIn0)) } end, AccIn, NodeMetric ) ); -do_with_node_name_label([{_, _} | Rest], AccIn) -> - do_with_node_name_label(Rest, AccIn). +zip_cluster([{_, _} | Rest], AccIn) -> + zip_cluster(Rest, AccIn). -zip_with_node_name(NodeName, NodeMetrics, AccIn0) -> +do_zip_cluster(NodeMetrics, AccIn0) -> lists:foldl( fun(K, AccIn) -> - NAccL = do_zip_with_node_name(NodeName, ?MG(K, NodeMetrics), ?MG(K, AccIn)), + AccMetricL = ?MG(K, AccIn), + NAccL = ?MG(K, NodeMetrics) ++ AccMetricL, AccIn#{K => NAccL} end, AccIn0, maps:keys(NodeMetrics) ). -do_zip_with_node_name(NodeName, NodeMetrics, AccL) -> - lists:foldl( - fun({Labels, Metric}, AccIn) -> - NLabels = [{node, NodeName} | Labels], - [{NLabels, Metric} | AccIn] - end, - AccL, - NodeMetrics - ). - point_to_map_fun(Key) -> fun({Lables, Metric}, AccIn2) -> LablesKVMap = maps:from_list(Lables), diff --git a/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl b/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl index 008a029a8..15fbe8106 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl @@ -31,7 +31,7 @@ %% for bpapi -behaviour(emqx_prometheus_cluster). -export([ - fetch_data_from_local_node/0, + fetch_from_local_node/1, fetch_cluster_consistented_data/0, aggre_or_zip_init_acc/0, logic_sum_metrics/0 @@ -69,13 +69,13 @@ %% Callback for emqx_prometheus_cluster %%-------------------------------------------------------------------- -fetch_data_from_local_node() -> +fetch_from_local_node(Mode) -> Rules = emqx_rule_engine:get_rules(), Bridges = emqx_bridge:list(), {node(self()), #{ - rule_metric_data => rule_metric_data(Rules), - action_metric_data => action_metric_data(Bridges), - connector_metric_data => connector_metric_data(Bridges) + rule_metric_data => rule_metric_data(Mode, Rules), + action_metric_data => action_metric_data(Mode, Bridges), + connector_metric_data => connector_metric_data(Mode, Bridges) }}. fetch_cluster_consistented_data() -> @@ -325,26 +325,26 @@ rule_metric_meta() -> rule_metric(names) -> emqx_prometheus_cluster:metric_names(rule_metric_meta()). -rule_metric_data(Rules) -> +rule_metric_data(Mode, Rules) -> lists:foldl( fun(#{id := Id} = Rule, AccIn) -> - merge_acc_with_rules(Id, get_metric(Rule), AccIn) + merge_acc_with_rules(Mode, Id, get_metric(Rule), AccIn) end, maps:from_keys(rule_metric(names), []), Rules ). -merge_acc_with_rules(Id, RuleMetrics, PointsAcc) -> +merge_acc_with_rules(Mode, Id, RuleMetrics, PointsAcc) -> maps:fold( fun(K, V, AccIn) -> - AccIn#{K => [rule_point(Id, V) | ?MG(K, AccIn)]} + AccIn#{K => [rule_point(Mode, Id, V) | ?MG(K, AccIn)]} end, PointsAcc, RuleMetrics ). -rule_point(Id, V) -> - {[{id, Id}], V}. +rule_point(Mode, Id, V) -> + {with_node_label(Mode, [{id, Id}]), V}. get_metric(#{id := Id, enable := Bool} = _Rule) -> case emqx_metrics_worker:get_metrics(rule_metrics, Id) of @@ -393,27 +393,27 @@ action_metric_meta() -> action_metric(names) -> emqx_prometheus_cluster:metric_names(action_metric_meta()). -action_metric_data(Bridges) -> +action_metric_data(Mode, Bridges) -> lists:foldl( fun(#{type := Type, name := Name} = _Bridge, AccIn) -> Id = emqx_bridge_resource:bridge_id(Type, Name), - merge_acc_with_bridges(Id, get_bridge_metric(Type, Name), AccIn) + merge_acc_with_bridges(Mode, Id, get_bridge_metric(Type, Name), AccIn) end, maps:from_keys(action_metric(names), []), Bridges ). -merge_acc_with_bridges(Id, BridgeMetrics, PointsAcc) -> +merge_acc_with_bridges(Mode, Id, BridgeMetrics, PointsAcc) -> maps:fold( fun(K, V, AccIn) -> - AccIn#{K => [action_point(Id, V) | ?MG(K, AccIn)]} + AccIn#{K => [action_point(Mode, Id, V) | ?MG(K, AccIn)]} end, PointsAcc, BridgeMetrics ). -action_point(Id, V) -> - {[{id, Id}], V}. +action_point(Mode, Id, V) -> + {with_node_label(Mode, [{id, Id}]), V}. get_bridge_metric(Type, Name) -> case emqx_bridge:get_metrics(Type, Name) of @@ -453,27 +453,27 @@ connector_metric_meta() -> connectr_metric(names) -> emqx_prometheus_cluster:metric_names(connector_metric_meta()). -connector_metric_data(Bridges) -> +connector_metric_data(Mode, Bridges) -> lists:foldl( fun(#{type := Type, name := Name} = Bridge, AccIn) -> Id = emqx_bridge_resource:bridge_id(Type, Name), - merge_acc_with_connectors(Id, get_connector_status(Bridge), AccIn) + merge_acc_with_connectors(Mode, Id, get_connector_status(Bridge), AccIn) end, maps:from_keys(connectr_metric(names), []), Bridges ). -merge_acc_with_connectors(Id, ConnectorMetrics, PointsAcc) -> +merge_acc_with_connectors(Mode, Id, ConnectorMetrics, PointsAcc) -> maps:fold( fun(K, V, AccIn) -> - AccIn#{K => [connector_point(Id, V) | ?MG(K, AccIn)]} + AccIn#{K => [connector_point(Mode, Id, V) | ?MG(K, AccIn)]} end, PointsAcc, ConnectorMetrics ). -connector_point(Id, V) -> - {[{id, Id}], V}. +connector_point(Mode, Id, V) -> + {with_node_label(Mode, [{id, Id}]), V}. get_connector_status(#{resource_data := ResourceData} = _Bridge) -> Enabled = emqx_utils_maps:deep_get([config, enable], ResourceData), @@ -532,3 +532,13 @@ zip_json_data_integration_metrics(Key, Points, [] = _AccIn) -> zip_json_data_integration_metrics(Key, Points, AllResultedAcc) -> ThisKeyResult = lists:foldl(emqx_prometheus_cluster:point_to_map_fun(Key), [], Points), lists:zipwith(fun maps:merge/2, AllResultedAcc, ThisKeyResult). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Helper funcs + +with_node_label(?PROM_DATA_MODE__NODE, Labels) -> + Labels; +with_node_label(?PROM_DATA_MODE__ALL_NODES_AGGREGATED, Labels) -> + Labels; +with_node_label(?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED, Labels) -> + [{node, node(self())} | Labels].