refactor(prom): prom_auth and prom_di as prom_cluster behaviour

This commit is contained in:
JimMoen 2024-01-19 15:52:09 +08:00
parent b480c5b371
commit c6c1a7fc28
No known key found for this signature in database
3 changed files with 113 additions and 113 deletions

View File

@ -24,8 +24,12 @@
-export([collect/1]).
%% for bpapi
-behaviour(emqx_prometheus_cluster).
-export([
fetch_metric_data_from_local_node/0
fetch_data_from_local_node/0,
fetch_cluster_consistented_data/0,
aggre_or_zip_init_acc/0,
logic_sum_metrics/0
]).
%% %% @private
@ -127,7 +131,7 @@ deregister_cleanup(_) -> ok.
Callback :: prometheus_collector:collect_mf_callback().
%% erlfmt-ignore
collect_mf(?PROMETHEUS_AUTH_REGISTRY, Callback) ->
RawData = raw_data(?GET_PROM_DATA_MODE()),
RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()),
ok = add_collect_family(Callback, ?AUTHNS_WITH_TYPE, ?MG(authn, RawData)),
ok = add_collect_family(Callback, ?AUTHN_USERS_COUNT_WITH_TYPE, ?MG(authn_users_count, RawData)),
ok = add_collect_family(Callback, ?AUTHZS_WITH_TYPE, ?MG(authz, RawData)),
@ -139,8 +143,7 @@ collect_mf(_, _) ->
%% @private
collect(<<"json">>) ->
RawData = raw_data(?GET_PROM_DATA_MODE()),
%% TODO: merge node name in json format
RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()),
#{
emqx_authn => collect_json_data(?MG(authn, RawData)),
emqx_authz => collect_json_data(?MG(authz, RawData)),
@ -159,37 +162,28 @@ add_collect_family(Name, Data, Callback, Type) ->
collect_metrics(Name, Metrics) ->
collect_auth(Name, Metrics).
%% @private
fetch_metric_data_from_local_node() ->
%% behaviour
fetch_data_from_local_node() ->
{node(self()), #{
authn => authn_data(),
authz => authz_data()
}}.
fetch_cluster_consistented_metric_data() ->
fetch_cluster_consistented_data() ->
#{
authn_users_count => authn_users_count_data(),
authz_rules_count => authz_rules_count_data(),
banned_count => banned_count_data()
}.
%% raw data for different format modes
raw_data(?PROM_DATA_MODE__ALL_NODES_AGGREGATED) ->
AggregatedNodesMetrics = aggre_cluster(all_nodes_metrics()),
maps:merge(AggregatedNodesMetrics, fetch_cluster_consistented_metric_data());
raw_data(?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED) ->
%% then fold from all nodes
AllNodesMetrics = with_node_name_label(all_nodes_metrics()),
maps:merge(AllNodesMetrics, fetch_cluster_consistented_metric_data());
raw_data(?PROM_DATA_MODE__NODE) ->
{_Node, LocalNodeMetrics} = fetch_metric_data_from_local_node(),
maps:merge(LocalNodeMetrics, fetch_cluster_consistented_metric_data()).
aggre_or_zip_init_acc() ->
#{
authn => maps:from_keys(authn_metric_names(), []),
authz => maps:from_keys(authz_metric_names(), [])
}.
all_nodes_metrics() ->
Nodes = mria:running_nodes(),
_ResL = emqx_prometheus_proto_v2:raw_prom_data(
Nodes, ?MODULE, fetch_metric_data_from_local_node, []
).
logic_sum_metrics() ->
?LOGICAL_SUM_METRIC_NAMES.
%%--------------------------------------------------------------------
%% Collector
@ -286,7 +280,7 @@ lookup_authn_metrics_local(Id) ->
case emqx_authn_api:lookup_from_local_node(?GLOBAL, Id) of
{ok, {_Node, Status, #{counters := Counters}, _ResourceMetrics}} ->
#{
emqx_authn_status => emqx_prometheus_utils:status_to_number(Status),
emqx_authn_status => emqx_prometheus_cluster:status_to_number(Status),
emqx_authn_nomatch => ?MG0(nomatch, Counters),
emqx_authn_total => ?MG0(total, Counters),
emqx_authn_success => ?MG0(success, Counters),
@ -297,7 +291,7 @@ lookup_authn_metrics_local(Id) ->
end.
authn_metric_names() ->
emqx_prometheus_utils:metric_names(?AUTHNS_WITH_TYPE).
emqx_prometheus_cluster:metric_names(?AUTHNS_WITH_TYPE).
%%====================
%% Authn users count
@ -364,7 +358,7 @@ lookup_authz_metrics_local(Type) ->
case emqx_authz_api_sources:lookup_from_local_node(Type) of
{ok, {_Node, Status, #{counters := Counters}, _ResourceMetrics}} ->
#{
emqx_authz_status => emqx_prometheus_utils:status_to_number(Status),
emqx_authz_status => emqx_prometheus_cluster:status_to_number(Status),
emqx_authz_nomatch => ?MG0(nomatch, Counters),
emqx_authz_total => ?MG0(total, Counters),
emqx_authz_success => ?MG0(success, Counters),
@ -375,7 +369,7 @@ lookup_authz_metrics_local(Type) ->
end.
authz_metric_names() ->
emqx_prometheus_utils:metric_names(?AUTHZS_WITH_TYPE).
emqx_prometheus_cluster:metric_names(?AUTHZS_WITH_TYPE).
%%====================
%% Authz rules count
@ -418,7 +412,7 @@ banned_count_data() ->
%% merge / zip formatting funcs for type `application/json`
collect_json_data(Data) ->
emqx_prometheus_utils:collect_json_data(Data, fun zip_json_auth_metrics/3).
emqx_prometheus_cluster:collect_json_data(Data, fun zip_json_auth_metrics/3).
collect_banned_data() ->
#{emqx_banned_count => banned_count_data()}.
@ -440,7 +434,7 @@ zip_json_auth_metrics(Key, Points, [] = _AccIn) ->
Points
);
zip_json_auth_metrics(Key, Points, AllResultedAcc) ->
ThisKeyResult = lists:foldl(emqx_prometheus_utils:point_to_map_fun(Key), [], Points),
ThisKeyResult = lists:foldl(emqx_prometheus_cluster:point_to_map_fun(Key), [], Points),
lists:zipwith(fun maps:merge/2, AllResultedAcc, ThisKeyResult).
users_or_rule_count(#{id := Id}) ->
@ -462,14 +456,6 @@ users_or_rule_count(#{type := Type}) ->
users_or_rule_count(_) ->
#{}.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% merge / zip formatting funcs for type `text/plain`
aggre_cluster(ResL) ->
emqx_prometheus_utils:aggre_cluster(?LOGICAL_SUM_METRIC_NAMES, ResL, aggre_or_zip_init_acc()).
with_node_name_label(ResL) ->
emqx_prometheus_utils:with_node_name_label(ResL, aggre_or_zip_init_acc()).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Helper funcs
@ -488,12 +474,6 @@ mnesia_size(Tab) ->
mnesia:table_info(Tab, size).
do_metric(emqx_authn_enable, #{enable := B}, _) ->
emqx_prometheus_utils:boolean_to_number(B);
emqx_prometheus_cluster:boolean_to_number(B);
do_metric(K, _, Metrics) ->
?MG0(K, Metrics).
aggre_or_zip_init_acc() ->
#{
authn => maps:from_keys(authn_metric_names(), []),
authz => maps:from_keys(authz_metric_names(), [])
}.

View File

@ -13,9 +13,13 @@
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_prometheus_utils).
-module(emqx_prometheus_cluster).
-include("emqx_prometheus.hrl").
-export([
raw_data/2,
collect_json_data/2,
aggre_cluster/3,
@ -28,9 +32,34 @@
metric_names/1
]).
-callback fetch_cluster_consistented_data() -> map().
-callback fetch_data_from_local_node() -> {node(), map()}.
-callback aggre_or_zip_init_acc() -> map().
-define(MG(K, MAP), maps:get(K, MAP)).
-define(PG0(K, PROPLISTS), proplists:get_value(K, PROPLISTS, 0)).
raw_data(Module, ?PROM_DATA_MODE__ALL_NODES_AGGREGATED) ->
AllNodesMetrics = aggre_cluster(Module),
Cluster = Module:fetch_cluster_consistented_data(),
maps:merge(AllNodesMetrics, Cluster);
raw_data(Module, ?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED) ->
AllNodesMetrics = with_node_name_label(Module),
Cluster = Module:fetch_cluster_consistented_data(),
maps:merge(AllNodesMetrics, Cluster);
raw_data(Module, ?PROM_DATA_MODE__NODE) ->
{_Node, LocalNodeMetrics} = Module:fetch_data_from_local_node(),
Cluster = Module:fetch_cluster_consistented_data(),
maps:merge(LocalNodeMetrics, Cluster).
metrics_data_from_all_nodes(Module) ->
Nodes = mria:running_nodes(),
_ResL = emqx_prometheus_proto_v2:raw_prom_data(
Nodes, Module, fetch_data_from_local_node, []
).
collect_json_data(Data, Func) when is_function(Func, 3) ->
maps:fold(
fun(K, V, Acc) ->
@ -42,6 +71,17 @@ collect_json_data(Data, Func) when is_function(Func, 3) ->
collect_json_data(_, _) ->
error(badarg).
aggre_cluster(Module) ->
do_aggre_cluster(
Module:logic_sum_metrics(),
metrics_data_from_all_nodes(Module),
Module:aggre_or_zip_init_acc()
).
with_node_name_label(Module) ->
ResL = metrics_data_from_all_nodes(Module),
do_with_node_name_label(ResL, Module:aggre_or_zip_init_acc()).
aggre_cluster(LogicSumKs, ResL, Init) ->
do_aggre_cluster(LogicSumKs, ResL, Init).
@ -58,7 +98,6 @@ do_aggre_cluster(LogicSumKs, [{ok, {_NodeName, NodeMetric}} | Rest], AccIn) ->
AccIn,
NodeMetric
)
%% merge_node_and_acc()
);
do_aggre_cluster(LogicSumKs, [{_, _} | Rest], AccIn) ->
do_aggre_cluster(LogicSumKs, Rest, AccIn).

View File

@ -29,8 +29,12 @@
]).
%% for bpapi
-behaviour(emqx_prometheus_cluster).
-export([
fetch_metric_data_from_local_node/0
fetch_data_from_local_node/0,
fetch_cluster_consistented_data/0,
aggre_or_zip_init_acc/0,
logic_sum_metrics/0
]).
-export([add_collect_family/4]).
@ -120,6 +124,37 @@
emqx_connector_status
]).
%%--------------------------------------------------------------------
%% Callback for emqx_prometheus_cluster
%%--------------------------------------------------------------------
fetch_data_from_local_node() ->
Rules = emqx_rule_engine:get_rules(),
Bridges = emqx_bridge:list(),
{node(self()), #{
rule_specific_data => rule_specific_data(Rules),
action_specific_data => action_specific_data(Bridges),
connector_specific_data => connector_specific_data(Bridges)
}}.
fetch_cluster_consistented_data() ->
Rules = emqx_rule_engine:get_rules(),
Bridges = emqx_bridge:list(),
(maybe_collect_schema_registry())#{
rules_data => rules_data(Rules),
connectors_data => connectors_data(Bridges)
}.
aggre_or_zip_init_acc() ->
#{
rule_specific_data => maps:from_keys(rule_specific_metric_names(), []),
action_specific_data => maps:from_keys(action_specific_metric_names(), []),
connector_specific_data => maps:from_keys(connectr_specific_metric_names(), [])
}.
logic_sum_metrics() ->
?LOGICAL_SUM_METRIC_NAMES.
%%--------------------------------------------------------------------
%% Collector API
%%--------------------------------------------------------------------
@ -132,7 +167,7 @@ deregister_cleanup(_) -> ok.
_Registry :: prometheus_registry:registry(),
Callback :: prometheus_collector:collect_mf_callback().
collect_mf(?PROMETHEUS_DATA_INTEGRATION_REGISTRY, Callback) ->
RawData = raw_data(?GET_PROM_DATA_MODE()),
RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()),
%% Data Integration Overview
ok = add_collect_family(Callback, ?RULES_WITH_TYPE, ?MG(rules_data, RawData)),
@ -157,7 +192,7 @@ collect_mf(_, _) ->
%% @private
collect(<<"json">>) ->
RawData = raw_data(?GET_PROM_DATA_MODE()),
RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()),
Rules = emqx_rule_engine:get_rules(),
Bridges = emqx_bridge:list(),
#{
@ -183,24 +218,6 @@ add_collect_family(Name, Data, Callback, Type) ->
collect_metrics(Name, Metrics) ->
collect_di(Name, Metrics).
%% @private
fetch_metric_data_from_local_node() ->
Rules = emqx_rule_engine:get_rules(),
Bridges = emqx_bridge:list(),
{node(self()), #{
rule_specific_data => rule_specific_data(Rules),
action_specific_data => action_specific_data(Bridges),
connector_specific_data => connector_specific_data(Bridges)
}}.
fetch_cluster_consistented_metric_data() ->
Rules = emqx_rule_engine:get_rules(),
Bridges = emqx_bridge:list(),
(maybe_collect_schema_registry())#{
rules_data => rules_data(Rules),
connectors_data => connectors_data(Bridges)
}.
-if(?EMQX_RELEASE_EDITION == ee).
maybe_collect_family_schema_registry(Callback) ->
ok = add_collect_family(Callback, ?SCHEMA_REGISTRY_WITH_TYPE, schema_registry_data()),
@ -216,24 +233,6 @@ maybe_collect_schema_registry() ->
#{}.
-endif.
%% raw data for different format modes
raw_data(?PROM_DATA_MODE__ALL_NODES_AGGREGATED) ->
AggregatedNodesMetrics = aggre_cluster(metrics_data_from_all_nodes()),
maps:merge(AggregatedNodesMetrics, fetch_cluster_consistented_metric_data());
raw_data(?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED) ->
%% then fold from all nodes
AllNodesMetrics = with_node_name_label(metrics_data_from_all_nodes()),
maps:merge(AllNodesMetrics, fetch_cluster_consistented_metric_data());
raw_data(?PROM_DATA_MODE__NODE) ->
{_Node, LocalNodeMetrics} = fetch_metric_data_from_local_node(),
maps:merge(LocalNodeMetrics, fetch_cluster_consistented_metric_data()).
metrics_data_from_all_nodes() ->
Nodes = mria:running_nodes(),
_ResL = emqx_prometheus_proto_v2:raw_prom_data(
Nodes, ?MODULE, fetch_metric_data_from_local_node, []
).
%%--------------------------------------------------------------------
%% Collector
%%--------------------------------------------------------------------
@ -398,7 +397,7 @@ get_metric(#{id := Id, enable := Bool} = _Rule) ->
case emqx_metrics_worker:get_metrics(rule_metrics, Id) of
#{counters := Counters} ->
#{
emqx_rule_enable => emqx_prometheus_utils:boolean_to_number(Bool),
emqx_rule_enable => emqx_prometheus_cluster:boolean_to_number(Bool),
emqx_rule_matched => ?MG(matched, Counters),
emqx_rule_failed => ?MG(failed, Counters),
emqx_rule_passed => ?MG(passed, Counters),
@ -415,7 +414,7 @@ get_metric(#{id := Id, enable := Bool} = _Rule) ->
end.
rule_specific_metric_names() ->
emqx_prometheus_utils:metric_names(?RULES_SPECIFIC_WITH_TYPE).
emqx_prometheus_cluster:metric_names(?RULES_SPECIFIC_WITH_TYPE).
%%====================
%% Specific Action
@ -469,7 +468,7 @@ get_bridge_metric(Type, Name) ->
end.
action_specific_metric_names() ->
emqx_prometheus_utils:metric_names(?ACTION_SPECIFIC_WITH_TYPE).
emqx_prometheus_cluster:metric_names(?ACTION_SPECIFIC_WITH_TYPE).
%%====================
%% Specific Connector
@ -501,12 +500,12 @@ get_connector_status(#{resource_data := ResourceData} = _Bridge) ->
Enabled = emqx_utils_maps:deep_get([config, enable], ResourceData),
Status = ?MG(status, ResourceData),
#{
emqx_connector_enable => emqx_prometheus_utils:boolean_to_number(Enabled),
emqx_connector_status => emqx_prometheus_utils:status_to_number(Status)
emqx_connector_enable => emqx_prometheus_cluster:boolean_to_number(Enabled),
emqx_connector_status => emqx_prometheus_cluster:status_to_number(Status)
}.
connectr_specific_metric_names() ->
emqx_prometheus_utils:metric_names(?CONNECTOR_SPECIFIC_WITH_TYPE).
emqx_prometheus_cluster:metric_names(?CONNECTOR_SPECIFIC_WITH_TYPE).
%%--------------------------------------------------------------------
%% Collect functions
@ -521,19 +520,19 @@ collect_data_integration_overview(Rules, Bridges) ->
M1 = lists:foldl(
fun(K, AccIn) -> AccIn#{K => ?MG(K, RulesD)} end,
#{},
emqx_prometheus_utils:metric_names(?RULES_WITH_TYPE)
emqx_prometheus_cluster:metric_names(?RULES_WITH_TYPE)
),
M2 = lists:foldl(
fun(K, AccIn) -> AccIn#{K => ?MG(K, ConnectorsD)} end,
#{},
emqx_prometheus_utils:metric_names(?CONNECTORS_WITH_TYPE)
emqx_prometheus_cluster:metric_names(?CONNECTORS_WITH_TYPE)
),
M3 = maybe_collect_schema_registry(),
lists:foldl(fun(M, AccIn) -> maps:merge(M, AccIn) end, #{}, [M1, M2, M3]).
collect_json_data(Data) ->
emqx_prometheus_utils:collect_json_data(Data, fun zip_json_data_integration_metrics/3).
emqx_prometheus_cluster:collect_json_data(Data, fun zip_json_data_integration_metrics/3).
%% for initialized empty AccIn
%% The following fields will be put into Result
@ -555,23 +554,5 @@ zip_json_data_integration_metrics(Key, Points, [] = _AccIn) ->
Points
);
zip_json_data_integration_metrics(Key, Points, AllResultedAcc) ->
ThisKeyResult = lists:foldl(emqx_prometheus_utils:point_to_map_fun(Key), [], Points),
ThisKeyResult = lists:foldl(emqx_prometheus_cluster:point_to_map_fun(Key), [], Points),
lists:zipwith(fun maps:merge/2, AllResultedAcc, ThisKeyResult).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% merge / zip formatting funcs for type `text/plain`
aggre_cluster(ResL) ->
emqx_prometheus_utils:aggre_cluster(?LOGICAL_SUM_METRIC_NAMES, ResL, aggre_or_zip_init_acc()).
with_node_name_label(ResL) ->
emqx_prometheus_utils:with_node_name_label(ResL, aggre_or_zip_init_acc()).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Helper funcs
aggre_or_zip_init_acc() ->
#{
rule_specific_data => maps:from_keys(rule_specific_metric_names(), []),
action_specific_data => maps:from_keys(action_specific_metric_names(), []),
connector_specific_data => maps:from_keys(connectr_specific_metric_names(), [])
}.