From 57f3efde63dd254952754060faa9773ced15a860 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Wed, 17 Jan 2024 17:18:46 +0800 Subject: [PATCH] feat(prom_auth): cluster metrics with different format-mode --- apps/emqx/priv/bpapi.versions | 1 + .../src/emqx_prometheus_api.erl | 12 +- .../src/emqx_prometheus_auth.erl | 312 +++++++++++++----- .../src/proto/emqx_prometheus_proto_v2.erl | 52 +++ 4 files changed, 290 insertions(+), 87 deletions(-) create mode 100644 apps/emqx_prometheus/src/proto/emqx_prometheus_proto_v2.erl diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 9bd824242..859d7fbe0 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -58,6 +58,7 @@ {emqx_persistent_session_ds,1}. {emqx_plugins,1}. {emqx_prometheus,1}. +{emqx_prometheus,2}. {emqx_resource,1}. {emqx_retainer,1}. {emqx_retainer,2}. diff --git a/apps/emqx_prometheus/src/emqx_prometheus_api.erl b/apps/emqx_prometheus/src/emqx_prometheus_api.erl index 9263b6a6a..ea71e7ee2 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_api.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_api.erl @@ -43,6 +43,8 @@ data_integration/2 ]). +-export([lookup_from_local_nodes/3]). + -define(TAGS, [<<"Monitor">>]). -define(IS_TRUE(Val), ((Val =:= true) orelse (Val =:= <<"true">>))). -define(IS_FALSE(Val), ((Val =:= false) orelse (Val =:= <<"false">>))). @@ -138,6 +140,10 @@ fields(format_mode) -> )} ]. +%% bpapi +lookup_from_local_nodes(M, F, A) -> + erlang:apply(M, F, A). + %%-------------------------------------------------------------------- %% API Handler funcs %%-------------------------------------------------------------------- @@ -195,11 +201,11 @@ response_type(#{<<"accept">> := <<"application/json">>}) -> response_type(_) -> <<"prometheus">>. -format_mode(#{<<"format_mode">> := <<"node">>}) -> +format_mode(#{<<"format_mode">> := node}) -> node; -format_mode(#{<<"format_mode">> := <<"nodes_aggregated">>}) -> +format_mode(#{<<"format_mode">> := nodes_aggregated}) -> nodes_aggregated; -format_mode(#{<<"format_mode">> := <<"nodes_unaggregated">>}) -> +format_mode(#{<<"format_mode">> := nodes_unaggregated}) -> nodes_unaggregated; format_mode(_) -> node. diff --git a/apps/emqx_prometheus/src/emqx_prometheus_auth.erl b/apps/emqx_prometheus/src/emqx_prometheus_auth.erl index 57406d2d2..3e9a9d007 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_auth.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_auth.erl @@ -23,6 +23,11 @@ -export([collect/1]). +%% for bpapi +-export([ + fetch_metric_data_from_local_node/0 +]). + -include("emqx_prometheus.hrl"). -include_lib("emqx_auth/include/emqx_authn_chains.hrl"). -include_lib("prometheus/include/prometheus.hrl"). @@ -65,6 +70,7 @@ -define(MG(K, MAP), maps:get(K, MAP)). -define(MG0(K, MAP), maps:get(K, MAP, 0)). +-define(PG0(K, PROPLISTS), proplists:get_value(K, PROPLISTS, 0)). -define(AUTHNS_WITH_TYPE, [ {emqx_authn_enable, gauge}, @@ -96,6 +102,13 @@ {emqx_banned_count, gauge} ]). +-define(LOGICAL_SUM_METRIC_NAMES, [ + emqx_authn_enable, + emqx_authn_status, + emqx_authz_enable, + emqx_authz_status +]). + %%-------------------------------------------------------------------- %% Collector API %%-------------------------------------------------------------------- @@ -109,37 +122,29 @@ deregister_cleanup(_) -> ok. Callback :: prometheus_collector:collect_mf_callback(). %% erlfmt-ignore collect_mf(?PROMETHEUS_AUTH_REGISTRY, Callback) -> - ok = add_collect_family(Callback, ?AUTHNS_WITH_TYPE, authn_data()), - ok = add_collect_family(Callback, ?AUTHN_USERS_COUNT_WITH_TYPE, authn_users_count_data()), - ok = add_collect_family(Callback, ?AUTHZS_WITH_TYPE, authz_data()), - ok = add_collect_family(Callback, ?AUTHZ_RULES_COUNT_WITH_TYPE, authz_rules_count_data()), - ok = add_collect_family(Callback, ?BANNED_WITH_TYPE, banned_count_data()), + RawData = raw_data(erlang:get(format_mode)), + ok = add_collect_family(Callback, ?AUTHNS_WITH_TYPE, ?MG(authn, RawData)), + ok = add_collect_family(Callback, ?AUTHN_USERS_COUNT_WITH_TYPE, ?MG(authn_users_count, RawData)), + ok = add_collect_family(Callback, ?AUTHZS_WITH_TYPE, ?MG(authz, RawData)), + ok = add_collect_family(Callback, ?AUTHZ_RULES_COUNT_WITH_TYPE, ?MG(authz_rules_count, RawData)), + ok = add_collect_family(Callback, ?BANNED_WITH_TYPE, ?MG(banned_count, RawData)), ok; collect_mf(_, _) -> ok. %% @private collect(<<"json">>) -> + FormatMode = erlang:get(format_mode), + RawData = raw_data(FormatMode), + %% TODO: merge node name in json format #{ - emqx_authn => collect_auth_data(authn), - emqx_authz => collect_auth_data(authz), + emqx_authn => collect_json_data(?MG(authn, RawData)), + emqx_authz => collect_json_data(?MG(authz, RawData)), emqx_banned => collect_banned_data() }; collect(<<"prometheus">>) -> prometheus_text_format:format(?PROMETHEUS_AUTH_REGISTRY). -collect_auth_data(AuthDataType) -> - maps:fold( - fun(K, V, Acc) -> - zip_auth_metrics(AuthDataType, K, V, Acc) - end, - [], - auth_data(AuthDataType) - ). - -collect_banned_data() -> - #{emqx_banned_count => banned_count_data()}. - add_collect_family(Callback, MetricWithType, Data) -> _ = [add_collect_family(Name, Data, Callback, Type) || {Name, Type} <- MetricWithType], ok. @@ -150,6 +155,38 @@ add_collect_family(Name, Data, Callback, Type) -> collect_metrics(Name, Metrics) -> collect_auth(Name, Metrics). +%% @private +fetch_metric_data_from_local_node() -> + {node(self()), #{ + authn => authn_data(), + authz => authz_data() + }}. + +fetch_cluster_consistented_metric_data() -> + #{ + authn_users_count => authn_users_count_data(), + authz_rules_count => authz_rules_count_data(), + banned_count => banned_count_data() + }. + +%% raw data for different format modes +raw_data(nodes_aggregated) -> + AggregatedNodesMetrics = aggre_cluster(all_nodes_metrics()), + maps:merge(AggregatedNodesMetrics, fetch_cluster_consistented_metric_data()); +raw_data(nodes_unaggregated) -> + %% then fold from all nodes + AllNodesMetrics = with_node_name_label(all_nodes_metrics()), + maps:merge(AllNodesMetrics, fetch_cluster_consistented_metric_data()); +raw_data(node) -> + {_Node, LocalNodeMetrics} = fetch_metric_data_from_local_node(), + maps:merge(LocalNodeMetrics, fetch_cluster_consistented_metric_data()). + +all_nodes_metrics() -> + Nodes = mria:running_nodes(), + _ResL = emqx_prometheus_proto_v2:raw_prom_data( + Nodes, ?MODULE, fetch_metric_data_from_local_node, [] + ). + %%-------------------------------------------------------------------- %% Collector %%-------------------------------------------------------------------- @@ -370,9 +407,174 @@ banned_count_data() -> mnesia_size(?BANNED_TABLE). %%-------------------------------------------------------------------- -%% Helper functions +%% Collect functions %%-------------------------------------------------------------------- +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% merge / zip formatting funcs for type `application/json` +collect_json_data(Data) -> + maps:fold( + fun(K, V, Acc) -> + zip_json_metrics(K, V, Acc) + end, + [], + Data + ). + +collect_banned_data() -> + #{emqx_banned_count => banned_count_data()}. + +zip_json_metrics(Key, Points, [] = _AccIn) -> + lists:foldl( + fun({Lables, Metric}, AccIn2) -> + LablesKVMap = maps:from_list(Lables), + %% for initialized empty AccIn + %% The following fields will be put into Result + %% For Authn: + %% `id`, `emqx_authn_users_count` + %% For Authz: + %% `type`, `emqx_authz_rules_count`n + Point = (maps:merge(LablesKVMap, users_or_rule_count(LablesKVMap)))#{Key => Metric}, + [Point | AccIn2] + end, + [], + Points + ); +zip_json_metrics(Key, Points, AllResultedAcc) -> + ThisKeyResult = lists:foldl( + fun({Lables, Metric}, AccIn2) -> + LablesKVMap = maps:from_list(Lables), + [maps:merge(LablesKVMap, #{Key => Metric}) | AccIn2] + end, + [], + Points + ), + lists:zipwith( + fun(AllResulted, ThisKeyMetricOut) -> + maps:merge(AllResulted, ThisKeyMetricOut) + end, + AllResultedAcc, + ThisKeyResult + ). + +user_rule_data(authn) -> authn_users_count_data(); +user_rule_data(authz) -> authz_rules_count_data(). + +users_or_rule_count(#{id := Id}) -> + #{emqx_authn_users_count := Points} = user_rule_data(authn), + case lists:keyfind([{id, Id}], 1, Points) of + {_, Metric} -> + #{emqx_authn_users_count => Metric}; + false -> + #{} + end; +users_or_rule_count(#{type := Type}) -> + #{emqx_authz_rules_count := Points} = user_rule_data(authz), + case lists:keyfind([{type, Type}], 1, Points) of + {_, Metric} -> + #{emqx_authz_rules_count => Metric}; + false -> + #{} + end; +users_or_rule_count(_) -> + #{}. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% merge / zip formatting funcs for type `text/plain` +aggre_cluster(ResL) -> + do_aggre_cluster(ResL, aggre_or_zip_init_acc()). + +do_aggre_cluster([], AccIn) -> + AccIn; +do_aggre_cluster( + [{ok, {_NodeName, #{authn := NodeAuthnMetrics, authz := NodeAuthzMetrics}}} | Rest], + #{authn := AuthnAcc, authz := AuthzAcc} = AccIn +) -> + do_aggre_cluster( + Rest, + AccIn#{ + authn => do_aggre_metric(NodeAuthnMetrics, AuthnAcc), + authz => do_aggre_metric(NodeAuthzMetrics, AuthzAcc) + } + ); +do_aggre_cluster([{_, _} | Rest], AccIn) -> + do_aggre_cluster(Rest, AccIn). + +do_aggre_metric(NodeMetrics, AccIn0) -> + lists:foldl( + fun(K, AccIn) -> + NAccL = do_aggre_metric(K, ?MG(K, NodeMetrics), ?MG(K, AccIn)), + AccIn#{K => NAccL} + end, + AccIn0, + maps:keys(NodeMetrics) + ). + +do_aggre_metric(K, NodeMetrics, AccL) -> + lists:foldl( + fun({Labels, Metric}, AccIn) -> + NMetric = + case lists:member(K, ?LOGICAL_SUM_METRIC_NAMES) of + true -> + logic_sum(Metric, ?PG0(Labels, AccIn)); + false -> + Metric + ?PG0(Labels, AccIn) + end, + [{Labels, NMetric} | AccIn] + end, + AccL, + NodeMetrics + ). + +logic_sum(N1, N2) when + (N1 > 0 andalso N2 > 0) +-> + 1; +logic_sum(_, _) -> + 0. + +with_node_name_label(ResL) -> + do_with_node_name_label(ResL, aggre_or_zip_init_acc()). + +do_with_node_name_label([], AccIn) -> + AccIn; +do_with_node_name_label( + [{ok, {NodeName, #{authn := NodeAuthnMetrics, authz := NodeAuthzMetrics}}} | Rest], + #{authn := AuthnAcc, authz := AuthzAcc} = AccIn +) -> + do_with_node_name_label( + Rest, + AccIn#{ + authn => zip_with_node_name(NodeName, NodeAuthnMetrics, AuthnAcc), + authz => zip_with_node_name(NodeName, NodeAuthzMetrics, AuthzAcc) + } + ); +do_with_node_name_label([{_, _} | Rest], AccIn) -> + do_with_node_name_label(Rest, AccIn). + +zip_with_node_name(NodeName, NodeMetrics, AccIn0) -> + lists:foldl( + fun(K, AccIn) -> + NAccL = do_zip_with_node_name(NodeName, ?MG(K, NodeMetrics), ?MG(K, AccIn)), + AccIn#{K => NAccL} + end, + AccIn0, + maps:keys(NodeMetrics) + ). + +do_zip_with_node_name(NodeName, NodeMetrics, AccL) -> + lists:foldl( + fun({Labels, Metric}, AccIn) -> + NLabels = [{node_name, NodeName} | Labels], + [{NLabels, Metric} | AccIn] + end, + AccL, + NodeMetrics + ). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Helper funcs + authenticator_id(Authn) -> emqx_authn_chains:authenticator_id(Authn). @@ -398,69 +600,11 @@ boolean_to_number(false) -> 0. status_to_number(connected) -> 1; status_to_number(stopped) -> 0. -zip_auth_metrics(AuthDataType, K, V, Acc) -> - LabelK = label_key(AuthDataType), - UserOrRuleD = user_rule_data(AuthDataType), - do_zip_auth_metrics(LabelK, UserOrRuleD, K, V, Acc). - -do_zip_auth_metrics(LabelK, UserOrRuleD, Key, Points, [] = _AccIn) -> - lists:foldl( - fun({[{K, LabelV}], Metric}, AccIn2) when K =:= LabelK -> - %% for initialized empty AccIn - %% The following fields will be put into Result - %% For Authn: - %% `id`, `emqx_authn_users_count` - %% For Authz: - %% `type`, `emqx_authz_rules_count` - Point = (users_or_rule_count(LabelK, LabelV, UserOrRuleD))#{ - LabelK => LabelV, Key => Metric - }, - [Point | AccIn2] - end, - [], - Points - ); -do_zip_auth_metrics(LabelK, _UserOrRuleD, Key, Points, AllResultedAcc) -> - ThisKeyResult = lists:foldl( - fun({[{K, Id}], Metric}, AccIn2) when K =:= LabelK -> - [#{LabelK => Id, Key => Metric} | AccIn2] - end, - [], - Points - ), - lists:zipwith( - fun(AllResulted, ThisKeyMetricOut) -> - maps:merge(AllResulted, ThisKeyMetricOut) - end, - AllResultedAcc, - ThisKeyResult - ). - -auth_data(authn) -> authn_data(); -auth_data(authz) -> authz_data(). - -label_key(authn) -> id; -label_key(authz) -> type. - -user_rule_data(authn) -> authn_users_count_data(); -user_rule_data(authz) -> authz_rules_count_data(). - -users_or_rule_count(id, Id, #{emqx_authn_users_count := Points} = _AuthnUsersD) -> - case lists:keyfind([{id, Id}], 1, Points) of - {_, Metric} -> - #{emqx_authn_users_count => Metric}; - false -> - #{} - end; -users_or_rule_count(type, Type, #{emqx_authz_rules_count := Points} = _AuthzRulesD) -> - case lists:keyfind([{type, Type}], 1, Points) of - {_, Metric} -> - #{emqx_authz_rules_count => Metric}; - false -> - #{} - end; -users_or_rule_count(_, _, _) -> - #{}. - metric_names(MetricWithType) when is_list(MetricWithType) -> [Name || {Name, _Type} <- MetricWithType]. + +aggre_or_zip_init_acc() -> + #{ + authn => maps:from_keys(authn_metric_names(), []), + authz => maps:from_keys(authz_metric_names(), []) + }. diff --git a/apps/emqx_prometheus/src/proto/emqx_prometheus_proto_v2.erl b/apps/emqx_prometheus/src/proto/emqx_prometheus_proto_v2.erl new file mode 100644 index 000000000..e3f9b0a26 --- /dev/null +++ b/apps/emqx_prometheus/src/proto/emqx_prometheus_proto_v2.erl @@ -0,0 +1,52 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_prometheus_proto_v2). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + start/1, + stop/1, + + raw_prom_data/4 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.5.0". + +-spec start([node()]) -> emqx_rpc:multicall_result(). +start(Nodes) -> + rpc:multicall(Nodes, emqx_prometheus, do_start, [], 5000). + +-spec stop([node()]) -> emqx_rpc:multicall_result(). +stop(Nodes) -> + rpc:multicall(Nodes, emqx_prometheus, do_stop, [], 5000). + +-type key() :: atom() | binary() | [byte()]. + +-spec raw_prom_data([node()], key(), key(), key()) -> emqx_rpc:erpc_multicall(term()). +raw_prom_data(Nodes, M, F, A) -> + erpc:multicall( + Nodes, + emqx_prometheus_api, + lookup_from_local_nodes, + [M, F, A], + 5000 + ).