From 8f7964f435306e45ecc7d02dda3023d9b1fb2ea7 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Mon, 15 Jan 2024 14:41:26 +0800 Subject: [PATCH] feat(prom): data integration metrics in josn format --- .../src/emqx_prometheus_auth.erl | 3 +- .../src/emqx_prometheus_data_integration.erl | 159 ++++++++++++++---- 2 files changed, 130 insertions(+), 32 deletions(-) diff --git a/apps/emqx_prometheus/src/emqx_prometheus_auth.erl b/apps/emqx_prometheus/src/emqx_prometheus_auth.erl index c7c65b2cb..06d6246f1 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_auth.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_auth.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -95,7 +95,6 @@ collect_mf(_, _) -> %% @private collect(<<"json">>) -> - %% TODO #{ emqx_authn => collect_auth_data(authn), emqx_authz => collect_auth_data(authz), diff --git a/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl b/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl index 3546697cc..3cdc3a01c 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -26,24 +26,7 @@ -export([add_collect_family/4]). --export([ - rules/0, - rules_data/1, - actions/0, - actions_data/1, - actions_exec_count/0, - actions_exec_count_data/0, - schema_registry/0, - schema_registry_data/0, - connectors/0, - connectors_data/1, - rule_specific/0, - rule_specific_data/1, - action_specific/0, - action_specific_data/1, - connector_specific/0, - connector_specific_data/1 -]). +-export([actions_exec_count/0, actions_exec_count_data/0]). -include("emqx_prometheus.hrl"). -include_lib("prometheus/include/prometheus.hrl"). @@ -81,34 +64,97 @@ deregister_cleanup(_) -> ok. -spec collect_mf(_Registry, Callback) -> ok when _Registry :: prometheus_registry:registry(), Callback :: prometheus_collector:collect_mf_callback(). -%% erlfmt-ignore collect_mf(?PROMETHEUS_DATA_INTEGRATION_REGISTRY, Callback) -> Rules = emqx_rule_engine:get_rules(), - Bridges =emqx_bridge:list(), + Bridges = emqx_bridge:list(), + %% Data Integration Overview _ = [add_collect_family(Name, rules_data(Rules), Callback, gauge) || Name <- rules()], _ = [add_collect_family(Name, actions_data(Rules), Callback, gauge) || Name <- actions()], - _ = [add_collect_family(Name, connectors_data(Bridges), Callback, gauge) || Name <- connectors()], - _ = [add_collect_family(Name, rule_specific_data(Rules), Callback, gauge) || Name <- rule_specific()], - _ = [add_collect_family(Name, action_specific_data(Bridges), Callback, gauge) || Name <- action_specific()], - _ = [add_collect_family(Name, connector_specific_data(Bridges), Callback, gauge) || Name <- connector_specific()], + _ = [ + add_collect_family(Name, connectors_data(Bridges), Callback, gauge) + || Name <- connectors() + ], ok = maybe_collect_family_schema_registry(Callback), + + %% Rule Specific + _ = [ + add_collect_family(Name, rule_specific_data(Rules), Callback, gauge) + || Name <- rule_specific() + ], + + %% Action Specific + _ = [ + add_collect_family(Name, action_specific_data(Bridges), Callback, gauge) + || Name <- action_specific() + ], + + %% Connector Specific + _ = [ + add_collect_family(Name, connector_specific_data(Bridges), Callback, gauge) + || Name <- connector_specific() + ], + ok; collect_mf(_, _) -> ok. %% @private collect(<<"json">>) -> - %% TODO - #{}; + Rules = emqx_rule_engine:get_rules(), + Bridges = emqx_bridge:list(), + #{ + data_integration_overview => collect_data_integration(overview, {Rules, Bridges}), + rules => collect_data_integration(rules, Rules), + actions => collect_data_integration(actions, Bridges), + connectors => collect_data_integration(connectors, Bridges) + }; collect(<<"prometheus">>) -> prometheus_text_format:format(?PROMETHEUS_DATA_INTEGRATION_REGISTRY). +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + add_collect_family(Name, Data, Callback, Type) -> Callback(create_mf(Name, _Help = <<"">>, Type, ?MODULE, Data)). collect_metrics(Name, Metrics) -> collect_di(Name, Metrics). +collect_data_integration(overview, {Rules, Bridges}) -> + RulesD = rules_data(Rules), + ActionsD = actions_data(Rules), + ConnectorsD = connectors_data(Bridges), + + M1 = lists:foldl(fun(K, AccIn) -> AccIn#{K => ?MG(K, RulesD)} end, #{}, rules()), + M2 = lists:foldl(fun(K, AccIn) -> AccIn#{K => ?MG(K, ActionsD)} end, #{}, actions()), + M3 = lists:foldl(fun(K, AccIn) -> AccIn#{K => ?MG(K, ConnectorsD)} end, #{}, connectors()), + M4 = maybe_collect_schema_registry(), + + lists:foldl(fun(M, AccIn) -> maps:merge(M, AccIn) end, #{}, [M1, M2, M3, M4]); +collect_data_integration(Type = rules, Rules) -> + maps:fold( + fun(K, V, Acc) -> + zip_metrics(Type, K, V, Acc) + end, + [], + di_data(Type, Rules) + ); +collect_data_integration(Type = actions, Rules) -> + maps:fold( + fun(K, V, Acc) -> + zip_metrics(Type, K, V, Acc) + end, + [], + di_data(Type, Rules) + ); +collect_data_integration(Type = connectors, Bridges) -> + maps:fold( + fun(K, V, Acc) -> + zip_metrics(Type, K, V, Acc) + end, + [], + di_data(Type, Bridges) + ). + -if(?EMQX_RELEASE_EDITION == ee). maybe_collect_family_schema_registry(Callback) -> _ = [ @@ -116,9 +162,15 @@ maybe_collect_family_schema_registry(Callback) -> || Name <- schema_registry() ], ok. + +maybe_collect_schema_registry() -> + schema_registry_data(). -else. maybe_collect_family_schema_registry(_) -> ok. + +maybe_collect_schema_registry() -> + #{}. -endif. %%-------------------------------------------------------------------- @@ -307,7 +359,7 @@ actions_exec_count() -> ]. actions_exec_count_data() -> - []. + #{}. %%==================== %% Schema Registry @@ -485,8 +537,6 @@ get_bridge_metric(Type, Name) -> } end. -%% TODO: Bridge V2 - %%==================== %% Specific Connector %% With connector_id: `{type}:{name}` as label key: `connector_id` @@ -537,3 +587,52 @@ boolean_to_number(false) -> 0. status_to_number(connected) -> 1; status_to_number(disconnected) -> 0. + +zip_metrics(Type, K, V, Acc) -> + LabelK = label_key(Type), + do_zip_metrics(LabelK, K, V, Acc). + +do_zip_metrics(LabelK, Key, Points, [] = _AccIn) -> + lists:foldl( + fun({[{K, LabelV}], Metric}, AccIn2) when K =:= LabelK -> + %% for initialized empty AccIn + %% The following fields will be put into Result + %% For Rules: + %% `id` => [RULE_ID] + %% For Actions + %% `id` => [ACTION_ID] + %% FOR Connectors + %% `id` => [CONNECTOR_ID] %% CONNECTOR_ID = BRIDGE_ID + %% formatted with {type}:{name} + Point = + #{ + LabelK => LabelV, Key => Metric + }, + [Point | AccIn2] + end, + [], + Points + ); +do_zip_metrics(LabelK, Key, Points, AllResultedAcc) -> + ThisKeyResult = lists:foldl( + fun({[{K, Id}], Metric}, AccIn2) when K =:= LabelK -> + [#{LabelK => Id, Key => Metric} | AccIn2] + end, + [], + Points + ), + lists:zipwith( + fun(AllResulted, ThisKeyMetricOut) -> + maps:merge(AllResulted, ThisKeyMetricOut) + end, + AllResultedAcc, + ThisKeyResult + ). + +di_data(rules, Rules) -> rule_specific_data(Rules); +di_data(actions, Bridges) -> action_specific_data(Bridges); +di_data(connectors, Bridges) -> connector_specific_data(Bridges). + +label_key(rules) -> id; +label_key(actions) -> id; +label_key(connectors) -> id.