refactor: use utils func for prom_auth and prom_di

This commit is contained in:
JimMoen 2024-01-18 17:49:24 +08:00
parent c3da792323
commit 8cb12c6a74
No known key found for this signature in database
3 changed files with 214 additions and 306 deletions

View File

@ -28,6 +28,11 @@
fetch_metric_data_from_local_node/0
]).
%% %% @private
-export([
zip_json_auth_metrics/3
]).
-include("emqx_prometheus.hrl").
-include_lib("emqx_auth/include/emqx_authn_chains.hrl").
-include_lib("prometheus/include/prometheus.hrl").
@ -282,7 +287,7 @@ lookup_authn_metrics_local(Id) ->
case emqx_authn_api:lookup_from_local_node(?GLOBAL, Id) of
{ok, {_Node, Status, #{counters := Counters}, _ResourceMetrics}} ->
#{
emqx_authn_status => status_to_number(Status),
emqx_authn_status => emqx_prometheus_utils:status_to_number(Status),
emqx_authn_nomatch => ?MG0(nomatch, Counters),
emqx_authn_total => ?MG0(total, Counters),
emqx_authn_success => ?MG0(success, Counters),
@ -293,7 +298,7 @@ lookup_authn_metrics_local(Id) ->
end.
authn_metric_names() ->
metric_names(?AUTHNS_WITH_TYPE).
emqx_prometheus_utils:metric_names(?AUTHNS_WITH_TYPE).
%%====================
%% Authn users count
@ -360,7 +365,7 @@ lookup_authz_metrics_local(Type) ->
case emqx_authz_api_sources:lookup_from_local_node(Type) of
{ok, {_Node, Status, #{counters := Counters}, _ResourceMetrics}} ->
#{
emqx_authz_status => status_to_number(Status),
emqx_authz_status => emqx_prometheus_utils:status_to_number(Status),
emqx_authz_nomatch => ?MG0(nomatch, Counters),
emqx_authz_total => ?MG0(total, Counters),
emqx_authz_success => ?MG0(success, Counters),
@ -371,7 +376,7 @@ lookup_authz_metrics_local(Type) ->
end.
authz_metric_names() ->
metric_names(?AUTHZS_WITH_TYPE).
emqx_prometheus_utils:metric_names(?AUTHZS_WITH_TYPE).
%%====================
%% Authz rules count
@ -412,56 +417,35 @@ banned_count_data() ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% merge / zip formatting funcs for type `application/json`
collect_json_data(Data) ->
maps:fold(
fun(K, V, Acc) ->
zip_json_metrics(K, V, Acc)
end,
[],
Data
).
emqx_prometheus_utils:collect_json_data(Data, fun zip_json_auth_metrics/3).
collect_banned_data() ->
#{emqx_banned_count => banned_count_data()}.
zip_json_metrics(Key, Points, [] = _AccIn) ->
%% for initialized empty AccIn
%% The following fields will be put into Result
%% For Authn:
%% `id`, `emqx_authn_users_count`
%% For Authz:
%% `type`, `emqx_authz_rules_count`n
zip_json_auth_metrics(Key, Points, [] = _AccIn) ->
lists:foldl(
fun({Lables, Metric}, AccIn2) ->
LablesKVMap = maps:from_list(Lables),
%% for initialized empty AccIn
%% The following fields will be put into Result
%% For Authn:
%% `id`, `emqx_authn_users_count`
%% For Authz:
%% `type`, `emqx_authz_rules_count`n
Point = (maps:merge(LablesKVMap, users_or_rule_count(LablesKVMap)))#{Key => Metric},
[Point | AccIn2]
end,
[],
Points
);
zip_json_metrics(Key, Points, AllResultedAcc) ->
ThisKeyResult = lists:foldl(
fun({Lables, Metric}, AccIn2) ->
LablesKVMap = maps:from_list(Lables),
[maps:merge(LablesKVMap, #{Key => Metric}) | AccIn2]
end,
[],
Points
),
lists:zipwith(
fun(AllResulted, ThisKeyMetricOut) ->
maps:merge(AllResulted, ThisKeyMetricOut)
end,
AllResultedAcc,
ThisKeyResult
).
user_rule_data(authn) -> authn_users_count_data();
user_rule_data(authz) -> authz_rules_count_data().
zip_json_auth_metrics(Key, Points, AllResultedAcc) ->
ThisKeyResult = lists:foldl(emqx_prometheus_utils:point_to_map_fun(Key), [], Points),
lists:zipwith(fun maps:merge/2, AllResultedAcc, ThisKeyResult).
users_or_rule_count(#{id := Id}) ->
#{emqx_authn_users_count := Points} = user_rule_data(authn),
#{emqx_authn_users_count := Points} = authn_users_count_data(),
case lists:keyfind([{id, Id}], 1, Points) of
{_, Metric} ->
#{emqx_authn_users_count => Metric};
@ -469,7 +453,7 @@ users_or_rule_count(#{id := Id}) ->
#{}
end;
users_or_rule_count(#{type := Type}) ->
#{emqx_authz_rules_count := Points} = user_rule_data(authz),
#{emqx_authz_rules_count := Points} = authz_rules_count_data(),
case lists:keyfind([{type, Type}], 1, Points) of
{_, Metric} ->
#{emqx_authz_rules_count => Metric};
@ -482,95 +466,10 @@ users_or_rule_count(_) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% merge / zip formatting funcs for type `text/plain`
aggre_cluster(ResL) ->
do_aggre_cluster(ResL, aggre_or_zip_init_acc()).
do_aggre_cluster([], AccIn) ->
AccIn;
do_aggre_cluster(
[{ok, {_NodeName, #{authn := NodeAuthnMetrics, authz := NodeAuthzMetrics}}} | Rest],
#{authn := AuthnAcc, authz := AuthzAcc} = AccIn
) ->
do_aggre_cluster(
Rest,
AccIn#{
authn => do_aggre_metric(NodeAuthnMetrics, AuthnAcc),
authz => do_aggre_metric(NodeAuthzMetrics, AuthzAcc)
}
);
do_aggre_cluster([{_, _} | Rest], AccIn) ->
do_aggre_cluster(Rest, AccIn).
do_aggre_metric(NodeMetrics, AccIn0) ->
lists:foldl(
fun(K, AccIn) ->
NAccL = do_aggre_metric(K, ?MG(K, NodeMetrics), ?MG(K, AccIn)),
AccIn#{K => NAccL}
end,
AccIn0,
maps:keys(NodeMetrics)
).
do_aggre_metric(K, NodeMetrics, AccL) ->
lists:foldl(
fun({Labels, Metric}, AccIn) ->
NMetric =
case lists:member(K, ?LOGICAL_SUM_METRIC_NAMES) of
true ->
logic_sum(Metric, ?PG0(Labels, AccIn));
false ->
Metric + ?PG0(Labels, AccIn)
end,
[{Labels, NMetric} | AccIn]
end,
AccL,
NodeMetrics
).
logic_sum(N1, N2) when
(N1 > 0 andalso N2 > 0)
->
1;
logic_sum(_, _) ->
0.
emqx_prometheus_utils:aggre_cluster(?LOGICAL_SUM_METRIC_NAMES, ResL, aggre_or_zip_init_acc()).
with_node_name_label(ResL) ->
do_with_node_name_label(ResL, aggre_or_zip_init_acc()).
do_with_node_name_label([], AccIn) ->
AccIn;
do_with_node_name_label(
[{ok, {NodeName, #{authn := NodeAuthnMetrics, authz := NodeAuthzMetrics}}} | Rest],
#{authn := AuthnAcc, authz := AuthzAcc} = AccIn
) ->
do_with_node_name_label(
Rest,
AccIn#{
authn => zip_with_node_name(NodeName, NodeAuthnMetrics, AuthnAcc),
authz => zip_with_node_name(NodeName, NodeAuthzMetrics, AuthzAcc)
}
);
do_with_node_name_label([{_, _} | Rest], AccIn) ->
do_with_node_name_label(Rest, AccIn).
zip_with_node_name(NodeName, NodeMetrics, AccIn0) ->
lists:foldl(
fun(K, AccIn) ->
NAccL = do_zip_with_node_name(NodeName, ?MG(K, NodeMetrics), ?MG(K, AccIn)),
AccIn#{K => NAccL}
end,
AccIn0,
maps:keys(NodeMetrics)
).
do_zip_with_node_name(NodeName, NodeMetrics, AccL) ->
lists:foldl(
fun({Labels, Metric}, AccIn) ->
NLabels = [{node_name, NodeName} | Labels],
[{NLabels, Metric} | AccIn]
end,
AccL,
NodeMetrics
).
emqx_prometheus_utils:with_node_name_label(ResL, aggre_or_zip_init_acc()).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Helper funcs
@ -590,19 +489,10 @@ mnesia_size(Tab) ->
mnesia:table_info(Tab, size).
do_metric(emqx_authn_enable, #{enable := B}, _) ->
boolean_to_number(B);
emqx_prometheus_utils:boolean_to_number(B);
do_metric(K, _, Metrics) ->
?MG0(K, Metrics).
boolean_to_number(true) -> 1;
boolean_to_number(false) -> 0.
status_to_number(connected) -> 1;
status_to_number(stopped) -> 0.
metric_names(MetricWithType) when is_list(MetricWithType) ->
[Name || {Name, _Type} <- MetricWithType].
aggre_or_zip_init_acc() ->
#{
authn => maps:from_keys(authn_metric_names(), []),

View File

@ -24,6 +24,10 @@
-export([collect/1]).
-export([
zip_json_data_integration_metrics/3
]).
%% for bpapi
-export([
fetch_metric_data_from_local_node/0
@ -394,7 +398,7 @@ get_metric(#{id := Id, enable := Bool} = _Rule) ->
case emqx_metrics_worker:get_metrics(rule_metrics, Id) of
#{counters := Counters} ->
#{
emqx_rule_enable => boolean_to_number(Bool),
emqx_rule_enable => emqx_prometheus_utils:boolean_to_number(Bool),
emqx_rule_matched => ?MG(matched, Counters),
emqx_rule_failed => ?MG(failed, Counters),
emqx_rule_passed => ?MG(passed, Counters),
@ -411,7 +415,7 @@ get_metric(#{id := Id, enable := Bool} = _Rule) ->
end.
rule_specific_metric_names() ->
metric_names(?RULES_SPECIFIC_WITH_TYPE).
emqx_prometheus_utils:metric_names(?RULES_SPECIFIC_WITH_TYPE).
%%====================
%% Specific Action
@ -465,7 +469,7 @@ get_bridge_metric(Type, Name) ->
end.
action_specific_metric_names() ->
metric_names(?ACTION_SPECIFIC_WITH_TYPE).
emqx_prometheus_utils:metric_names(?ACTION_SPECIFIC_WITH_TYPE).
%%====================
%% Specific Connector
@ -497,12 +501,12 @@ get_connector_status(#{resource_data := ResourceData} = _Bridge) ->
Enabled = emqx_utils_maps:deep_get([config, enable], ResourceData),
Status = ?MG(status, ResourceData),
#{
emqx_connector_enable => boolean_to_number(Enabled),
emqx_connector_status => status_to_number(Status)
emqx_connector_enable => emqx_prometheus_utils:boolean_to_number(Enabled),
emqx_connector_status => emqx_prometheus_utils:status_to_number(Status)
}.
connectr_specific_metric_names() ->
metric_names(?CONNECTOR_SPECIFIC_WITH_TYPE).
emqx_prometheus_utils:metric_names(?CONNECTOR_SPECIFIC_WITH_TYPE).
%%--------------------------------------------------------------------
%% Collect functions
@ -510,7 +514,6 @@ connectr_specific_metric_names() ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% merge / zip formatting funcs for type `application/json`
collect_data_integration_overview(Rules, Bridges) ->
RulesD = rules_data(Rules),
ConnectorsD = connectors_data(Bridges),
@ -518,199 +521,54 @@ collect_data_integration_overview(Rules, Bridges) ->
M1 = lists:foldl(
fun(K, AccIn) -> AccIn#{K => ?MG(K, RulesD)} end,
#{},
metric_names(?RULES_WITH_TYPE)
emqx_prometheus_utils:metric_names(?RULES_WITH_TYPE)
),
M2 = lists:foldl(
fun(K, AccIn) -> AccIn#{K => ?MG(K, ConnectorsD)} end,
#{},
metric_names(?CONNECTORS_WITH_TYPE)
emqx_prometheus_utils:metric_names(?CONNECTORS_WITH_TYPE)
),
M3 = maybe_collect_schema_registry(),
lists:foldl(fun(M, AccIn) -> maps:merge(M, AccIn) end, #{}, [M1, M2, M3]).
collect_json_data(Data) ->
maps:fold(
fun(K, V, Acc) ->
zip_json_metrics(K, V, Acc)
end,
[],
Data
).
emqx_prometheus_utils:collect_json_data(Data, fun zip_json_data_integration_metrics/3).
zip_json_metrics(Key, Points, [] = _AccIn) ->
%% for initialized empty AccIn
%% The following fields will be put into Result
%% For Rules:
%% `id` => [RULE_ID]
%% For Actions
%% `id` => [ACTION_ID]
%% FOR Connectors
%% `id` => [CONNECTOR_ID] %% CONNECTOR_ID = BRIDGE_ID
%% formatted with {type}:{name}
zip_json_data_integration_metrics(Key, Points, [] = _AccIn) ->
lists:foldl(
fun({Lables, Metric}, AccIn2) ->
LablesKVMap = maps:from_list(Lables),
%% for initialized empty AccIn
%% The following fields will be put into Result
%% For Rules:
%% `id` => [RULE_ID]
%% For Actions
%% `id` => [ACTION_ID]
%% FOR Connectors
%% `id` => [CONNECTOR_ID] %% CONNECTOR_ID = BRIDGE_ID
%% formatted with {type}:{name}
Point = LablesKVMap#{Key => Metric},
[Point | AccIn2]
end,
[],
Points
);
zip_json_metrics(Key, Points, AllResultedAcc) ->
ThisKeyResult = lists:foldl(
fun({Lables, Metric}, AccIn2) ->
LablesKVMap = maps:from_list(Lables),
[maps:merge(LablesKVMap, #{Key => Metric}) | AccIn2]
end,
[],
Points
),
lists:zipwith(
fun(AllResulted, ThisKeyMetricOut) ->
maps:merge(AllResulted, ThisKeyMetricOut)
end,
AllResultedAcc,
ThisKeyResult
).
zip_json_data_integration_metrics(Key, Points, AllResultedAcc) ->
ThisKeyResult = lists:foldl(emqx_prometheus_utils:point_to_map_fun(Key), [], Points),
lists:zipwith(fun maps:merge/2, AllResultedAcc, ThisKeyResult).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% merge / zip formatting funcs for type `text/plain`
aggre_cluster(ResL) ->
do_aggre_cluster(ResL, aggre_or_zip_init_acc()).
do_aggre_cluster([], AccIn) ->
AccIn;
do_aggre_cluster(
[
{ok,
{_NodeName, #{
rule_specific_data := NodeRuleMetrics,
action_specific_data := NodeActionMetrics,
connector_specific_data := NodeConnectorMetrics
}}}
| Rest
],
#{
rule_specific_data := RuleAcc,
action_specific_data := ActionAcc,
connector_specific_data := ConnAcc
} = AccIn
) ->
do_aggre_cluster(
Rest,
AccIn#{
%% TODO
rule_specific_data => do_aggre_metric(NodeRuleMetrics, RuleAcc),
action_specific_data => do_aggre_metric(NodeActionMetrics, ActionAcc),
connector_specific_data => do_aggre_metric(NodeConnectorMetrics, ConnAcc)
}
);
do_aggre_cluster([{_, _} | Rest], AccIn) ->
do_aggre_cluster(Rest, AccIn).
do_aggre_metric(NodeMetrics, AccIn0) ->
lists:foldl(
fun(K, AccIn) ->
NAccL = do_aggre_metric(K, ?MG(K, NodeMetrics), ?MG(K, AccIn)),
AccIn#{K => NAccL}
end,
AccIn0,
maps:keys(NodeMetrics)
).
-define(PG0(K, PROPLISTS), proplists:get_value(K, PROPLISTS, 0)).
do_aggre_metric(K, NodeMetrics, AccL) ->
lists:foldl(
fun({Labels, Metric}, AccIn) ->
NMetric =
case lists:member(K, ?LOGICAL_SUM_METRIC_NAMES) of
true ->
logic_sum(Metric, ?PG0(Labels, AccIn));
false ->
Metric + ?PG0(Labels, AccIn)
end,
[{Labels, NMetric} | AccIn]
end,
AccL,
NodeMetrics
).
emqx_prometheus_utils:aggre_cluster(?LOGICAL_SUM_METRIC_NAMES, ResL, aggre_or_zip_init_acc()).
with_node_name_label(ResL) ->
do_with_node_name_label(
ResL,
aggre_or_zip_init_acc()
).
do_with_node_name_label([], AccIn) ->
AccIn;
do_with_node_name_label(
[
{ok,
{NodeName, #{
rule_specific_data := NodeRuleMetrics,
action_specific_data := NodeActionMetrics,
connector_specific_data := NodeConnectorMetrics
}}}
| Rest
],
#{
rule_specific_data := RuleAcc,
action_specific_data := ActionAcc,
connector_specific_data := ConnAcc
} = AccIn
) ->
do_with_node_name_label(
Rest,
AccIn#{
rule_specific_data => zip_with_node_name(NodeName, NodeRuleMetrics, RuleAcc),
action_specific_data => zip_with_node_name(NodeName, NodeActionMetrics, ActionAcc),
connector_specific_data => zip_with_node_name(NodeName, NodeConnectorMetrics, ConnAcc)
}
);
do_with_node_name_label([{_, _} | Rest], AccIn) ->
do_with_node_name_label(Rest, AccIn).
zip_with_node_name(NodeName, NodeMetrics, AccIn0) ->
lists:foldl(
fun(K, AccIn) ->
NAccL = do_zip_with_node_name(NodeName, ?MG(K, NodeMetrics), ?MG(K, AccIn)),
AccIn#{K => NAccL}
end,
AccIn0,
maps:keys(NodeMetrics)
).
do_zip_with_node_name(NodeName, NodeMetrics, AccL) ->
lists:foldl(
fun({Labels, Metric}, AccIn) ->
NLabels = [{node_name, NodeName} | Labels],
[{NLabels, Metric} | AccIn]
end,
AccL,
NodeMetrics
).
emqx_prometheus_utils:with_node_name_label(ResL, aggre_or_zip_init_acc()).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Helper funcs
boolean_to_number(true) -> 1;
boolean_to_number(false) -> 0.
status_to_number(connected) -> 1;
status_to_number(disconnected) -> 0.
logic_sum(N1, N2) when
(N1 > 0 andalso N2 > 0)
->
1;
logic_sum(_, _) ->
0.
metric_names(MetricWithType) when is_list(MetricWithType) ->
[Name || {Name, _Type} <- MetricWithType].
aggre_or_zip_init_acc() ->
#{
rule_specific_data => maps:from_keys(rule_specific_metric_names(), []),

View File

@ -0,0 +1,160 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_prometheus_utils).
-export([
collect_json_data/2,
aggre_cluster/3,
with_node_name_label/2,
point_to_map_fun/1,
boolean_to_number/1,
status_to_number/1,
metric_names/1
]).
-define(MG(K, MAP), maps:get(K, MAP)).
-define(PG0(K, PROPLISTS), proplists:get_value(K, PROPLISTS, 0)).
collect_json_data(Data, Func) when is_function(Func, 3) ->
maps:fold(
fun(K, V, Acc) ->
Func(K, V, Acc)
end,
[],
Data
);
collect_json_data(_, _) ->
error(badarg).
aggre_cluster(LogicSumKs, ResL, Init) ->
do_aggre_cluster(LogicSumKs, ResL, Init).
do_aggre_cluster(_LogicSumKs, [], AccIn) ->
AccIn;
do_aggre_cluster(LogicSumKs, [{ok, {_NodeName, NodeMetric}} | Rest], AccIn) ->
do_aggre_cluster(
LogicSumKs,
Rest,
maps:fold(
fun(K, V, AccIn0) ->
AccIn0#{K => aggre_metric(LogicSumKs, V, ?MG(K, AccIn0))}
end,
AccIn,
NodeMetric
)
%% merge_node_and_acc()
);
do_aggre_cluster(LogicSumKs, [{_, _} | Rest], AccIn) ->
do_aggre_cluster(LogicSumKs, Rest, AccIn).
aggre_metric(LogicSumKs, NodeMetrics, AccIn0) ->
lists:foldl(
fun(K, AccIn) ->
NAccL = do_aggre_metric(
K, LogicSumKs, ?MG(K, NodeMetrics), ?MG(K, AccIn)
),
AccIn#{K => NAccL}
end,
AccIn0,
maps:keys(NodeMetrics)
).
do_aggre_metric(K, LogicSumKs, NodeMetrics, AccL) ->
lists:foldl(
fun({Labels, Metric}, AccIn) ->
NMetric =
case lists:member(K, LogicSumKs) of
true ->
logic_sum(Metric, ?PG0(Labels, AccIn));
false ->
Metric + ?PG0(Labels, AccIn)
end,
[{Labels, NMetric} | AccIn]
end,
AccL,
NodeMetrics
).
with_node_name_label(ResL, Init) ->
do_with_node_name_label(ResL, Init).
do_with_node_name_label([], AccIn) ->
AccIn;
do_with_node_name_label([{ok, {NodeName, NodeMetric}} | Rest], AccIn) ->
do_with_node_name_label(
Rest,
maps:fold(
fun(K, V, AccIn0) ->
AccIn0#{
K => zip_with_node_name(NodeName, V, ?MG(K, AccIn0))
}
end,
AccIn,
NodeMetric
)
);
do_with_node_name_label([{_, _} | Rest], AccIn) ->
do_with_node_name_label(Rest, AccIn).
zip_with_node_name(NodeName, NodeMetrics, AccIn0) ->
lists:foldl(
fun(K, AccIn) ->
NAccL = do_zip_with_node_name(NodeName, ?MG(K, NodeMetrics), ?MG(K, AccIn)),
AccIn#{K => NAccL}
end,
AccIn0,
maps:keys(NodeMetrics)
).
do_zip_with_node_name(NodeName, NodeMetrics, AccL) ->
lists:foldl(
fun({Labels, Metric}, AccIn) ->
NLabels = [{node, NodeName} | Labels],
[{NLabels, Metric} | AccIn]
end,
AccL,
NodeMetrics
).
point_to_map_fun(Key) ->
fun({Lables, Metric}, AccIn2) ->
LablesKVMap = maps:from_list(Lables),
[maps:merge(LablesKVMap, #{Key => Metric}) | AccIn2]
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
logic_sum(N1, N2) when
(N1 > 0 andalso N2 > 0)
->
1;
logic_sum(_, _) ->
0.
boolean_to_number(true) -> 1;
boolean_to_number(false) -> 0.
status_to_number(connected) -> 1;
%% for auth
status_to_number(stopped) -> 0;
%% for data_integration
status_to_number(disconnected) -> 0.
metric_names(MetricWithType) when is_list(MetricWithType) ->
[Name || {Name, _Type} <- MetricWithType].