feat(prom_auth): cluster metrics with different format-mode

This commit is contained in:
JimMoen 2024-01-17 17:18:46 +08:00
parent fb330f77e6
commit 57f3efde63
No known key found for this signature in database
4 changed files with 290 additions and 87 deletions

View File

@ -58,6 +58,7 @@
{emqx_persistent_session_ds,1}.
{emqx_plugins,1}.
{emqx_prometheus,1}.
{emqx_prometheus,2}.
{emqx_resource,1}.
{emqx_retainer,1}.
{emqx_retainer,2}.

View File

@ -43,6 +43,8 @@
data_integration/2
]).
-export([lookup_from_local_nodes/3]).
-define(TAGS, [<<"Monitor">>]).
-define(IS_TRUE(Val), ((Val =:= true) orelse (Val =:= <<"true">>))).
-define(IS_FALSE(Val), ((Val =:= false) orelse (Val =:= <<"false">>))).
@ -138,6 +140,10 @@ fields(format_mode) ->
)}
].
%% bpapi
lookup_from_local_nodes(M, F, A) ->
erlang:apply(M, F, A).
%%--------------------------------------------------------------------
%% API Handler funcs
%%--------------------------------------------------------------------
@ -195,11 +201,11 @@ response_type(#{<<"accept">> := <<"application/json">>}) ->
response_type(_) ->
<<"prometheus">>.
format_mode(#{<<"format_mode">> := <<"node">>}) ->
format_mode(#{<<"format_mode">> := node}) ->
node;
format_mode(#{<<"format_mode">> := <<"nodes_aggregated">>}) ->
format_mode(#{<<"format_mode">> := nodes_aggregated}) ->
nodes_aggregated;
format_mode(#{<<"format_mode">> := <<"nodes_unaggregated">>}) ->
format_mode(#{<<"format_mode">> := nodes_unaggregated}) ->
nodes_unaggregated;
format_mode(_) ->
node.

View File

@ -23,6 +23,11 @@
-export([collect/1]).
%% for bpapi
-export([
fetch_metric_data_from_local_node/0
]).
-include("emqx_prometheus.hrl").
-include_lib("emqx_auth/include/emqx_authn_chains.hrl").
-include_lib("prometheus/include/prometheus.hrl").
@ -65,6 +70,7 @@
-define(MG(K, MAP), maps:get(K, MAP)).
-define(MG0(K, MAP), maps:get(K, MAP, 0)).
-define(PG0(K, PROPLISTS), proplists:get_value(K, PROPLISTS, 0)).
-define(AUTHNS_WITH_TYPE, [
{emqx_authn_enable, gauge},
@ -96,6 +102,13 @@
{emqx_banned_count, gauge}
]).
-define(LOGICAL_SUM_METRIC_NAMES, [
emqx_authn_enable,
emqx_authn_status,
emqx_authz_enable,
emqx_authz_status
]).
%%--------------------------------------------------------------------
%% Collector API
%%--------------------------------------------------------------------
@ -109,37 +122,29 @@ deregister_cleanup(_) -> ok.
Callback :: prometheus_collector:collect_mf_callback().
%% erlfmt-ignore
collect_mf(?PROMETHEUS_AUTH_REGISTRY, Callback) ->
ok = add_collect_family(Callback, ?AUTHNS_WITH_TYPE, authn_data()),
ok = add_collect_family(Callback, ?AUTHN_USERS_COUNT_WITH_TYPE, authn_users_count_data()),
ok = add_collect_family(Callback, ?AUTHZS_WITH_TYPE, authz_data()),
ok = add_collect_family(Callback, ?AUTHZ_RULES_COUNT_WITH_TYPE, authz_rules_count_data()),
ok = add_collect_family(Callback, ?BANNED_WITH_TYPE, banned_count_data()),
RawData = raw_data(erlang:get(format_mode)),
ok = add_collect_family(Callback, ?AUTHNS_WITH_TYPE, ?MG(authn, RawData)),
ok = add_collect_family(Callback, ?AUTHN_USERS_COUNT_WITH_TYPE, ?MG(authn_users_count, RawData)),
ok = add_collect_family(Callback, ?AUTHZS_WITH_TYPE, ?MG(authz, RawData)),
ok = add_collect_family(Callback, ?AUTHZ_RULES_COUNT_WITH_TYPE, ?MG(authz_rules_count, RawData)),
ok = add_collect_family(Callback, ?BANNED_WITH_TYPE, ?MG(banned_count, RawData)),
ok;
collect_mf(_, _) ->
ok.
%% @private
collect(<<"json">>) ->
FormatMode = erlang:get(format_mode),
RawData = raw_data(FormatMode),
%% TODO: merge node name in json format
#{
emqx_authn => collect_auth_data(authn),
emqx_authz => collect_auth_data(authz),
emqx_authn => collect_json_data(?MG(authn, RawData)),
emqx_authz => collect_json_data(?MG(authz, RawData)),
emqx_banned => collect_banned_data()
};
collect(<<"prometheus">>) ->
prometheus_text_format:format(?PROMETHEUS_AUTH_REGISTRY).
collect_auth_data(AuthDataType) ->
maps:fold(
fun(K, V, Acc) ->
zip_auth_metrics(AuthDataType, K, V, Acc)
end,
[],
auth_data(AuthDataType)
).
collect_banned_data() ->
#{emqx_banned_count => banned_count_data()}.
add_collect_family(Callback, MetricWithType, Data) ->
_ = [add_collect_family(Name, Data, Callback, Type) || {Name, Type} <- MetricWithType],
ok.
@ -150,6 +155,38 @@ add_collect_family(Name, Data, Callback, Type) ->
collect_metrics(Name, Metrics) ->
collect_auth(Name, Metrics).
%% @private
fetch_metric_data_from_local_node() ->
{node(self()), #{
authn => authn_data(),
authz => authz_data()
}}.
fetch_cluster_consistented_metric_data() ->
#{
authn_users_count => authn_users_count_data(),
authz_rules_count => authz_rules_count_data(),
banned_count => banned_count_data()
}.
%% raw data for different format modes
raw_data(nodes_aggregated) ->
AggregatedNodesMetrics = aggre_cluster(all_nodes_metrics()),
maps:merge(AggregatedNodesMetrics, fetch_cluster_consistented_metric_data());
raw_data(nodes_unaggregated) ->
%% then fold from all nodes
AllNodesMetrics = with_node_name_label(all_nodes_metrics()),
maps:merge(AllNodesMetrics, fetch_cluster_consistented_metric_data());
raw_data(node) ->
{_Node, LocalNodeMetrics} = fetch_metric_data_from_local_node(),
maps:merge(LocalNodeMetrics, fetch_cluster_consistented_metric_data()).
all_nodes_metrics() ->
Nodes = mria:running_nodes(),
_ResL = emqx_prometheus_proto_v2:raw_prom_data(
Nodes, ?MODULE, fetch_metric_data_from_local_node, []
).
%%--------------------------------------------------------------------
%% Collector
%%--------------------------------------------------------------------
@ -370,9 +407,174 @@ banned_count_data() ->
mnesia_size(?BANNED_TABLE).
%%--------------------------------------------------------------------
%% Helper functions
%% Collect functions
%%--------------------------------------------------------------------
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% merge / zip formatting funcs for type `application/json`
collect_json_data(Data) ->
maps:fold(
fun(K, V, Acc) ->
zip_json_metrics(K, V, Acc)
end,
[],
Data
).
collect_banned_data() ->
#{emqx_banned_count => banned_count_data()}.
zip_json_metrics(Key, Points, [] = _AccIn) ->
lists:foldl(
fun({Lables, Metric}, AccIn2) ->
LablesKVMap = maps:from_list(Lables),
%% for initialized empty AccIn
%% The following fields will be put into Result
%% For Authn:
%% `id`, `emqx_authn_users_count`
%% For Authz:
%% `type`, `emqx_authz_rules_count`n
Point = (maps:merge(LablesKVMap, users_or_rule_count(LablesKVMap)))#{Key => Metric},
[Point | AccIn2]
end,
[],
Points
);
zip_json_metrics(Key, Points, AllResultedAcc) ->
ThisKeyResult = lists:foldl(
fun({Lables, Metric}, AccIn2) ->
LablesKVMap = maps:from_list(Lables),
[maps:merge(LablesKVMap, #{Key => Metric}) | AccIn2]
end,
[],
Points
),
lists:zipwith(
fun(AllResulted, ThisKeyMetricOut) ->
maps:merge(AllResulted, ThisKeyMetricOut)
end,
AllResultedAcc,
ThisKeyResult
).
user_rule_data(authn) -> authn_users_count_data();
user_rule_data(authz) -> authz_rules_count_data().
users_or_rule_count(#{id := Id}) ->
#{emqx_authn_users_count := Points} = user_rule_data(authn),
case lists:keyfind([{id, Id}], 1, Points) of
{_, Metric} ->
#{emqx_authn_users_count => Metric};
false ->
#{}
end;
users_or_rule_count(#{type := Type}) ->
#{emqx_authz_rules_count := Points} = user_rule_data(authz),
case lists:keyfind([{type, Type}], 1, Points) of
{_, Metric} ->
#{emqx_authz_rules_count => Metric};
false ->
#{}
end;
users_or_rule_count(_) ->
#{}.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% merge / zip formatting funcs for type `text/plain`
aggre_cluster(ResL) ->
do_aggre_cluster(ResL, aggre_or_zip_init_acc()).
do_aggre_cluster([], AccIn) ->
AccIn;
do_aggre_cluster(
[{ok, {_NodeName, #{authn := NodeAuthnMetrics, authz := NodeAuthzMetrics}}} | Rest],
#{authn := AuthnAcc, authz := AuthzAcc} = AccIn
) ->
do_aggre_cluster(
Rest,
AccIn#{
authn => do_aggre_metric(NodeAuthnMetrics, AuthnAcc),
authz => do_aggre_metric(NodeAuthzMetrics, AuthzAcc)
}
);
do_aggre_cluster([{_, _} | Rest], AccIn) ->
do_aggre_cluster(Rest, AccIn).
do_aggre_metric(NodeMetrics, AccIn0) ->
lists:foldl(
fun(K, AccIn) ->
NAccL = do_aggre_metric(K, ?MG(K, NodeMetrics), ?MG(K, AccIn)),
AccIn#{K => NAccL}
end,
AccIn0,
maps:keys(NodeMetrics)
).
do_aggre_metric(K, NodeMetrics, AccL) ->
lists:foldl(
fun({Labels, Metric}, AccIn) ->
NMetric =
case lists:member(K, ?LOGICAL_SUM_METRIC_NAMES) of
true ->
logic_sum(Metric, ?PG0(Labels, AccIn));
false ->
Metric + ?PG0(Labels, AccIn)
end,
[{Labels, NMetric} | AccIn]
end,
AccL,
NodeMetrics
).
logic_sum(N1, N2) when
(N1 > 0 andalso N2 > 0)
->
1;
logic_sum(_, _) ->
0.
with_node_name_label(ResL) ->
do_with_node_name_label(ResL, aggre_or_zip_init_acc()).
do_with_node_name_label([], AccIn) ->
AccIn;
do_with_node_name_label(
[{ok, {NodeName, #{authn := NodeAuthnMetrics, authz := NodeAuthzMetrics}}} | Rest],
#{authn := AuthnAcc, authz := AuthzAcc} = AccIn
) ->
do_with_node_name_label(
Rest,
AccIn#{
authn => zip_with_node_name(NodeName, NodeAuthnMetrics, AuthnAcc),
authz => zip_with_node_name(NodeName, NodeAuthzMetrics, AuthzAcc)
}
);
do_with_node_name_label([{_, _} | Rest], AccIn) ->
do_with_node_name_label(Rest, AccIn).
zip_with_node_name(NodeName, NodeMetrics, AccIn0) ->
lists:foldl(
fun(K, AccIn) ->
NAccL = do_zip_with_node_name(NodeName, ?MG(K, NodeMetrics), ?MG(K, AccIn)),
AccIn#{K => NAccL}
end,
AccIn0,
maps:keys(NodeMetrics)
).
do_zip_with_node_name(NodeName, NodeMetrics, AccL) ->
lists:foldl(
fun({Labels, Metric}, AccIn) ->
NLabels = [{node_name, NodeName} | Labels],
[{NLabels, Metric} | AccIn]
end,
AccL,
NodeMetrics
).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Helper funcs
authenticator_id(Authn) ->
emqx_authn_chains:authenticator_id(Authn).
@ -398,69 +600,11 @@ boolean_to_number(false) -> 0.
status_to_number(connected) -> 1;
status_to_number(stopped) -> 0.
zip_auth_metrics(AuthDataType, K, V, Acc) ->
LabelK = label_key(AuthDataType),
UserOrRuleD = user_rule_data(AuthDataType),
do_zip_auth_metrics(LabelK, UserOrRuleD, K, V, Acc).
do_zip_auth_metrics(LabelK, UserOrRuleD, Key, Points, [] = _AccIn) ->
lists:foldl(
fun({[{K, LabelV}], Metric}, AccIn2) when K =:= LabelK ->
%% for initialized empty AccIn
%% The following fields will be put into Result
%% For Authn:
%% `id`, `emqx_authn_users_count`
%% For Authz:
%% `type`, `emqx_authz_rules_count`
Point = (users_or_rule_count(LabelK, LabelV, UserOrRuleD))#{
LabelK => LabelV, Key => Metric
},
[Point | AccIn2]
end,
[],
Points
);
do_zip_auth_metrics(LabelK, _UserOrRuleD, Key, Points, AllResultedAcc) ->
ThisKeyResult = lists:foldl(
fun({[{K, Id}], Metric}, AccIn2) when K =:= LabelK ->
[#{LabelK => Id, Key => Metric} | AccIn2]
end,
[],
Points
),
lists:zipwith(
fun(AllResulted, ThisKeyMetricOut) ->
maps:merge(AllResulted, ThisKeyMetricOut)
end,
AllResultedAcc,
ThisKeyResult
).
auth_data(authn) -> authn_data();
auth_data(authz) -> authz_data().
label_key(authn) -> id;
label_key(authz) -> type.
user_rule_data(authn) -> authn_users_count_data();
user_rule_data(authz) -> authz_rules_count_data().
users_or_rule_count(id, Id, #{emqx_authn_users_count := Points} = _AuthnUsersD) ->
case lists:keyfind([{id, Id}], 1, Points) of
{_, Metric} ->
#{emqx_authn_users_count => Metric};
false ->
#{}
end;
users_or_rule_count(type, Type, #{emqx_authz_rules_count := Points} = _AuthzRulesD) ->
case lists:keyfind([{type, Type}], 1, Points) of
{_, Metric} ->
#{emqx_authz_rules_count => Metric};
false ->
#{}
end;
users_or_rule_count(_, _, _) ->
#{}.
metric_names(MetricWithType) when is_list(MetricWithType) ->
[Name || {Name, _Type} <- MetricWithType].
aggre_or_zip_init_acc() ->
#{
authn => maps:from_keys(authn_metric_names(), []),
authz => maps:from_keys(authz_metric_names(), [])
}.

View File

@ -0,0 +1,52 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_prometheus_proto_v2).
-behaviour(emqx_bpapi).
-export([
introduced_in/0,
start/1,
stop/1,
raw_prom_data/4
]).
-include_lib("emqx/include/bpapi.hrl").
introduced_in() ->
"5.5.0".
-spec start([node()]) -> emqx_rpc:multicall_result().
start(Nodes) ->
rpc:multicall(Nodes, emqx_prometheus, do_start, [], 5000).
-spec stop([node()]) -> emqx_rpc:multicall_result().
stop(Nodes) ->
rpc:multicall(Nodes, emqx_prometheus, do_stop, [], 5000).
-type key() :: atom() | binary() | [byte()].
-spec raw_prom_data([node()], key(), key(), key()) -> emqx_rpc:erpc_multicall(term()).
raw_prom_data(Nodes, M, F, A) ->
erpc:multicall(
Nodes,
emqx_prometheus_api,
lookup_from_local_nodes,
[M, F, A],
5000
).