emqx/apps/emqx_prometheus/src/emqx_prometheus_cluster.erl

205 lines
5.8 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_prometheus_cluster).
-include("emqx_prometheus.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
-export([
raw_data/2,
collect_json_data/2,
aggre_cluster/3,
point_to_map_fun/1,
boolean_to_number/1,
status_to_number/1,
metric_names/1
]).
-callback fetch_cluster_consistented_data() -> map().
-callback fetch_from_local_node(atom()) -> {node(), map()}.
-callback aggre_or_zip_init_acc() -> map().
-callback logic_sum_metrics() -> list().
-define(MG(K, MAP), maps:get(K, MAP)).
-define(PG0(K, PROPLISTS), proplists:get_value(K, PROPLISTS, 0)).
raw_data(Module, undefined) ->
%% TODO: for push gateway, the format mode should be configurable
raw_data(Module, ?PROM_DATA_MODE__NODE);
raw_data(Module, ?PROM_DATA_MODE__ALL_NODES_AGGREGATED = Mode) ->
AllNodesMetrics = aggre_cluster(Module, Mode),
Cluster = Module:fetch_cluster_consistented_data(),
maps:merge(AllNodesMetrics, Cluster);
raw_data(Module, ?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED = Mode) ->
AllNodesMetrics = zip_cluster_data(Module, Mode),
Cluster = Module:fetch_cluster_consistented_data(),
maps:merge(AllNodesMetrics, Cluster);
raw_data(Module, ?PROM_DATA_MODE__NODE = Mode) ->
{_Node, LocalNodeMetrics} = Module:fetch_from_local_node(Mode),
Cluster = Module:fetch_cluster_consistented_data(),
maps:merge(LocalNodeMetrics, Cluster).
fetch_data_from_all_nodes(Module, Mode) ->
Nodes = mria:running_nodes(),
_ResL = emqx_prometheus_proto_v2:raw_prom_data(
Nodes, Module, fetch_from_local_node, [Mode]
).
collect_json_data(Data, Func) when is_function(Func, 3) ->
maps:fold(
fun(K, V, Acc) ->
Func(K, V, Acc)
end,
[],
Data
);
collect_json_data(_, _) ->
error(badarg).
aggre_cluster(Module, Mode) ->
do_aggre_cluster(
Module:logic_sum_metrics(),
fetch_data_from_all_nodes(Module, Mode),
Module:aggre_or_zip_init_acc()
).
aggre_cluster(LogicSumKs, ResL, Init) ->
do_aggre_cluster(LogicSumKs, ResL, Init).
do_aggre_cluster(_LogicSumKs, [], AccIn) ->
AccIn;
do_aggre_cluster(LogicSumKs, [{ok, {_NodeName, NodeMetric}} | Rest], AccIn) ->
do_aggre_cluster(
LogicSumKs,
Rest,
maps:fold(
fun(K, V, AccIn0) ->
AccIn0#{K => aggre_metric(LogicSumKs, V, ?MG(K, AccIn0))}
end,
AccIn,
NodeMetric
)
);
do_aggre_cluster(LogicSumKs, [{_, _} | Rest], AccIn) ->
do_aggre_cluster(LogicSumKs, Rest, AccIn).
aggre_metric(LogicSumKs, NodeMetrics, AccIn0) ->
lists:foldl(
fun(K, AccIn) ->
NAccL = do_aggre_metric(
K, LogicSumKs, ?MG(K, NodeMetrics), ?MG(K, AccIn)
),
AccIn#{K => NAccL}
end,
AccIn0,
maps:keys(NodeMetrics)
).
do_aggre_metric(K, LogicSumKs, NodeMetrics, AccL) ->
lists:foldl(
fun(Point = {_Labels, _Metric}, AccIn) ->
sum(K, LogicSumKs, Point, AccIn)
end,
AccL,
NodeMetrics
).
sum(K, LogicSumKs, {Labels, Metric} = Point, MetricAccL) ->
case lists:keytake(Labels, 1, MetricAccL) of
{value, {Labels, MetricAcc}, NMetricAccL} ->
NPoint = {Labels, do_sum(K, LogicSumKs, Metric, MetricAcc)},
[NPoint | NMetricAccL];
false ->
[Point | MetricAccL]
end.
do_sum(K, LogicSumKs, Metric, MetricAcc) ->
case lists:member(K, LogicSumKs) of
true ->
logic_sum(Metric, MetricAcc);
false ->
Metric + MetricAcc
end.
zip_cluster_data(Module, Mode) ->
zip_cluster(
fetch_data_from_all_nodes(Module, Mode),
Module:aggre_or_zip_init_acc()
).
zip_cluster([], AccIn) ->
AccIn;
zip_cluster([{ok, {_NodeName, NodeMetric}} | Rest], AccIn) ->
zip_cluster(
Rest,
maps:fold(
fun(K, V, AccIn0) ->
AccIn0#{
K => do_zip_cluster(V, ?MG(K, AccIn0))
}
end,
AccIn,
NodeMetric
)
);
zip_cluster([{_, _} | Rest], AccIn) ->
zip_cluster(Rest, AccIn).
do_zip_cluster(NodeMetrics, AccIn0) ->
lists:foldl(
fun(K, AccIn) ->
AccMetricL = ?MG(K, AccIn),
NAccL = ?MG(K, NodeMetrics) ++ AccMetricL,
AccIn#{K => NAccL}
end,
AccIn0,
maps:keys(NodeMetrics)
).
point_to_map_fun(Key) ->
fun({Lables, Metric}, AccIn2) ->
LablesKVMap = maps:from_list(Lables),
[maps:merge(LablesKVMap, #{Key => Metric}) | AccIn2]
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
logic_sum(N1, N2) when
(N1 > 0 andalso N2 > 0)
->
1;
logic_sum(_, _) ->
0.
boolean_to_number(true) -> 1;
boolean_to_number(false) -> 0.
status_to_number(?status_connected) -> 1;
status_to_number(?status_connecting) -> 0;
status_to_number(?status_disconnected) -> 0;
status_to_number(?rm_status_stopped) -> 0;
status_to_number(_) -> 0.
metric_names(MetricWithType) when is_list(MetricWithType) ->
[Name || {Name, _Type} <- MetricWithType].