From 5158395bcf172c3493b9302d73dcccea63559fa8 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Sun, 14 Jan 2024 04:39:36 +0800 Subject: [PATCH] feat(prometheus): data integration prom data --- apps/emqx_conf/src/emqx_conf_schema.erl | 1 + .../src/emqx_prometheus_api.erl | 21 +- .../src/emqx_prometheus_data_integration.erl | 491 ++++++++++++++++++ rel/i18n/emqx_prometheus_api.hocon | 5 + 4 files changed, 516 insertions(+), 2 deletions(-) create mode 100644 apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl index 571f5785b..abb2e14e3 100644 --- a/apps/emqx_conf/src/emqx_conf_schema.erl +++ b/apps/emqx_conf/src/emqx_conf_schema.erl @@ -1107,6 +1107,7 @@ tr_prometheus_collectors(Conf) -> %% emqx collectors emqx_prometheus, {'/prometheus/auth', emqx_prometheus_auth}, + {'/prometheus/data_integration', emqx_prometheus_data_integration}, emqx_prometheus_mria %% builtin vm collectors | prometheus_collectors(Conf) diff --git a/apps/emqx_prometheus/src/emqx_prometheus_api.erl b/apps/emqx_prometheus/src/emqx_prometheus_api.erl index 280f1aa8d..32cb89177 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_api.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_api.erl @@ -30,7 +30,8 @@ -export([ setting/2, stats/2, - auth/2 + auth/2, + data_integration/2 ]). -define(TAGS, [<<"Monitor">>]). @@ -42,7 +43,8 @@ paths() -> [ "/prometheus", "/prometheus/auth", - "/prometheus/stats" + "/prometheus/stats", + "/prometheus/data_integration" ]. schema("/prometheus") -> @@ -87,6 +89,18 @@ schema("/prometheus/stats") -> responses => #{200 => prometheus_data_schema()} } + }; +schema("/prometheus/data_integration") -> + #{ + 'operationId' => data_integration, + get => + #{ + description => ?DESC(get_prom_data_integration_data), + tags => ?TAGS, + security => security(), + responses => + #{200 => prometheus_data_schema()} + } }. security() -> @@ -121,6 +135,9 @@ stats(get, #{headers := Headers}) -> auth(get, #{headers := Headers}) -> collect(emqx_prometheus_auth, Headers). +data_integration(get, #{headers := Headers}) -> + collect(emqx_prometheus_data_integration, Headers). + %%-------------------------------------------------------------------- %% Internal funcs %%-------------------------------------------------------------------- diff --git a/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl b/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl new file mode 100644 index 000000000..4c679b842 --- /dev/null +++ b/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl @@ -0,0 +1,491 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_prometheus_data_integration). + +-export([ + deregister_cleanup/1, + collect_mf/2, + collect_metrics/2 +]). + +-export([collect/1]). + +-export([add_collect_family/4]). + +-export([ + rules/0, + rules_data/1, + actions/0, + actions_data/1, + actions_exec_count/0, + actions_exec_count_data/0, + schema_registry/0, + schema_registry_data/0, + connectors/0, + connectors_data/0, + rule_specific/0, + rule_specific_data/1, + action_specific/0, + action_specific_data/0, + connector_specific/0, + connector_specific_data/0 +]). + +-include("emqx_prometheus.hrl"). +-include_lib("prometheus/include/prometheus.hrl"). + +-import( + prometheus_model_helpers, + [ + create_mf/5, + gauge_metric/1, + gauge_metrics/1 + ] +). + +%% Please don't remove this attribute, prometheus uses it to +%% automatically register collectors. +-behaviour(prometheus_collector). + +%%-------------------------------------------------------------------- +%% Macros +%%-------------------------------------------------------------------- + +-define(METRIC_NAME_PREFIX, "emqx_data_integration_"). + +-define(MG(K, MAP), maps:get(K, MAP)). +-define(MG0(K, MAP), maps:get(K, MAP, 0)). + +%%-------------------------------------------------------------------- +%% Collector API +%%-------------------------------------------------------------------- + +%% @private +deregister_cleanup(_) -> ok. + +%% @private +-spec collect_mf(_Registry, Callback) -> ok when + _Registry :: prometheus_registry:registry(), + Callback :: prometheus_collector:collect_mf_callback(). +%% erlfmt-ignore +collect_mf(?PROMETHEUS_DATA_INTEGRATION_REGISTRY, Callback) -> + Rules = emqx_rule_engine:get_rules(), + _ = [add_collect_family(Name, rules_data(Rules), Callback, gauge) || Name <- rules()], + _ = [add_collect_family(Name, actions_data(Rules), Callback, gauge) || Name <- actions()], + _ = [add_collect_family(Name, schema_registry_data(), Callback, gauge) || Name <- schema_registry()], + _ = [add_collect_family(Name, connectors_data(), Callback, gauge) || Name <- connectors()], + _ = [add_collect_family(Name, rule_specific_data(Rules), Callback, gauge) || Name <- rule_specific()], + _ = [add_collect_family(Name, action_specific_data(), Callback, gauge) || Name <- action_specific()], + + ok; +collect_mf(_, _) -> + ok. + +%% @private +collect(<<"json">>) -> + %% TODO + #{}; +collect(<<"prometheus">>) -> + prometheus_text_format:format(?PROMETHEUS_DATA_INTEGRATION_REGISTRY). + +add_collect_family(Name, Data, Callback, Type) -> + Callback(create_mf(Name, _Help = <<"">>, Type, ?MODULE, Data)). + +collect_metrics(Name, Metrics) -> + collect_di(Name, Metrics). + +%%-------------------------------------------------------------------- +%% Collector +%%-------------------------------------------------------------------- + +%%======================================== +%% Data Integration Overview +%%======================================== + +%%==================== +%% All Rules +%% Rules +collect_di(K = emqx_rule_count, Data) -> + gauge_metric(?MG(K, Data)); +collect_di(K = emqx_rules_matched_rate, Data) -> + gauge_metric(?MG(K, Data)); +collect_di(K = emqx_rules_matched_rate_last5m, Data) -> + gauge_metric(?MG(K, Data)); +%%==================== +%% All Actions +collect_di(K = emqx_rules_actions_rate, Data) -> + gauge_metric(?MG(K, Data)); +collect_di(K = emqx_rules_actions_rate_last5m, Data) -> + gauge_metric(?MG(K, Data)); +%%==================== +%% Schema Registry +collect_di(K = emqx_schema_registry_count, Data) -> + gauge_metric(?MG(K, Data)); +%%==================== +%% Connectors +collect_di(K = emqx_connector_count, Data) -> + gauge_metric(?MG(K, Data)); +%%======================================== +%% Data Integration for Specific: Rule && Action && Connector +%%======================================== + +%%==================== +%% Specific Rule +collect_di(K = emqx_rule_matched, Data) -> + gauge_metrics(?MG(K, Data)); +collect_di(K = emqx_rule_failed, Data) -> + gauge_metrics(?MG(K, Data)); +collect_di(K = emqx_rule_passed, Data) -> + gauge_metrics(?MG(K, Data)); +collect_di(K = emqx_rule_failed_exception, Data) -> + gauge_metrics(?MG(K, Data)); +collect_di(K = emqx_rule_failed_no_result, Data) -> + gauge_metrics(?MG(K, Data)); +collect_di(K = emqx_rule_actions_total, Data) -> + gauge_metrics(?MG(K, Data)); +collect_di(K = emqx_rule_actions_success, Data) -> + gauge_metrics(?MG(K, Data)); +collect_di(K = emqx_rule_actions_failed, Data) -> + gauge_metrics(?MG(K, Data)); +collect_di(K = emqx_rule_actions_failed_out_of_service, Data) -> + gauge_metrics(?MG(K, Data)); +collect_di(K = emqx_rule_actions_failed_unknown, Data) -> + gauge_metrics(?MG(K, Data)); +collect_di(K = emqx_rule_matched_rate, Data) -> + gauge_metrics(?MG(K, Data)); +collect_di(K = emqx_rule_matched_rate_last5m, Data) -> + gauge_metrics(?MG(K, Data)); +collect_di(K = emqx_rule_matched_rate_max, Data) -> + gauge_metrics(?MG(K, Data)); +%%==================== +%% Specific Action + +collect_di(K = emqx_action_matched, Data) -> + gauge_metrics(?MG(K, Data)); +collect_di(K = emqx_action_dropped, Data) -> + gauge_metrics(?MG(K, Data)); +collect_di(K = emqx_action_success, Data) -> + gauge_metrics(?MG(K, Data)); +collect_di(K = emqx_action_failed, Data) -> + gauge_metrics(?MG(K, Data)); +collect_di(K = emqx_action_rate, Data) -> + gauge_metrics(?MG(K, Data)); +collect_di(K = emqx_action_inflight, Data) -> + gauge_metrics(?MG(K, Data)); +collect_di(K = emqx_action_received, Data) -> + gauge_metrics(?MG(K, Data)); +collect_di(K = emqx_action_late_reply, Data) -> + gauge_metrics(?MG(K, Data)); +collect_di(K = emqx_action_retried, Data) -> + gauge_metrics(?MG(K, Data)); +collect_di(K = emqx_action_retried_success, Data) -> + gauge_metrics(?MG(K, Data)); +collect_di(K = emqx_action_retried_failed, Data) -> + gauge_metrics(?MG(K, Data)); +collect_di(K = emqx_action_dropped_resource_stopped, Data) -> + gauge_metrics(?MG(K, Data)); +collect_di(K = emqx_action_dropped_resource_not_found, Data) -> + gauge_metrics(?MG(K, Data)); +collect_di(K = emqx_action_dropped_queue_full, Data) -> + gauge_metrics(?MG(K, Data)); +collect_di(K = emqx_action_dropped_other, Data) -> + gauge_metrics(?MG(K, Data)); +collect_di(K = emqx_action_dropped_expired, Data) -> + gauge_metrics(?MG(K, Data)); +collect_di(K = emqx_action_queuing, Data) -> + gauge_metrics(?MG(K, Data)); +collect_di(K = emqx_action_rate_last5m, Data) -> + gauge_metrics(?MG(K, Data)); +collect_di(K = emqx_action_rate_max, Data) -> + gauge_metrics(?MG(K, Data)). + +%%==================== +%% Specific Connector + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +%%======================================== +%% Data Integration Overview +%%======================================== + +%%==================== +%% All Rules + +rules() -> + [ + emqx_rule_count, + emqx_rules_matched_rate, + emqx_rules_matched_rate_last5m + ]. + +-define(RULE_TAB, emqx_rule_engine). + +rules_data(Rules) -> + Rate = lists:foldl( + fun( + #{id := Id}, + #{emqx_rules_matched_rate := Rate, emqx_rules_matched_rate_last5m := RateLast5m} = AccIn + ) -> + RuleMetrics = emqx_metrics_worker:get_metrics(rule_metrics, Id), + AccIn#{ + emqx_rules_matched_rate => Rate + + emqx_utils_maps:deep_get([rate, matched, current], RuleMetrics, 0), + emqx_rules_matched_rate_last5m => RateLast5m + + emqx_utils_maps:deep_get([rate, matched, last5m], RuleMetrics, 0) + } + end, + _InitAcc = maps:from_keys(rules(), 0), + Rules + ), + Rate#{emqx_rule_count => ets:info(?RULE_TAB, size)}. + +%%==================== +%% All Actions + +actions() -> + [ + emqx_rules_actions_rate, + emqx_rules_actions_rate_last5m + ]. + +actions_data(Rules) -> + lists:foldl( + fun( + #{id := Id}, + #{emqx_rules_actions_rate := Rate, emqx_rules_actions_rate_last5m := RateLast5m} = + _AccIn + ) -> + RuleMetrics = emqx_metrics_worker:get_metrics(rule_metrics, Id), + _AccIn#{ + emqx_rules_actions_rate => Rate + + emqx_utils_maps:deep_get([rate, matched, current], RuleMetrics, 0), + emqx_rules_actions_rate_last5m => RateLast5m + + emqx_utils_maps:deep_get([rate, matched, last5m], RuleMetrics, 0) + } + end, + _InitAcc = maps:from_keys(actions(), 0), + Rules + ). + +actions_exec_count() -> + [ + emqx_action_sink, + emqx_action_source + ]. + +actions_exec_count_data() -> + []. + +%%==================== +%% Schema Registry + +schema_registry() -> + [ + emqx_schema_registry_count + ]. + +schema_registry_data() -> + #{ + emqx_schema_registry_count => erlang:map_size(emqx_schema_registry:list_schemas()) + }. + +%%==================== +%% Connectors + +connectors() -> + [ + emqx_connector_count + ]. + +connectors_data() -> + #{ + emqx_connector_count => + lists:foldl( + fun(List, AccIn) -> erlang:length(List) + AccIn end, + 0, + [ + emqx_connector:list(), emqx_bridge:list(), emqx_bridge_v2:list() + ] + ) + }. + +%%======================================== +%% Data Integration for Specific: Rule && Action && Connector +%%======================================== + +%%==================== +%% Specific Rule +%% With rule_id as label key: `rule_id` + +rule_specific() -> + [ + emqx_rule_matched, + emqx_rule_failed, + emqx_rule_passed, + emqx_rule_failed_exception, + emqx_rule_failed_no_result, + emqx_rule_actions_total, + emqx_rule_actions_success, + emqx_rule_actions_failed, + emqx_rule_actions_failed_out_of_service, + emqx_rule_actions_failed_unknown, + emqx_rule_matched_rate, + emqx_rule_matched_rate_last5m, + emqx_rule_matched_rate_max + ]. + +rule_specific_data(Rules) -> + lists:foldl( + fun(#{id := Id} = Rule, AccIn) -> + merge_acc_with_rules(Id, get_metric(Rule), AccIn) + end, + maps:from_keys(rule_specific(), []), + Rules + ). + +merge_acc_with_rules(Id, RuleMetrics, PointsAcc) -> + maps:fold( + fun(K, V, AccIn) -> + AccIn#{K => [rule_point(Id, V) | ?MG(K, AccIn)]} + end, + PointsAcc, + RuleMetrics + ). + +rule_point(Id, V) -> + {[{rule_id, Id}], V}. + +get_metric(#{id := Id} = _Rule) -> + case emqx_metrics_worker:get_metrics(rule_metrics, Id) of + #{counters := Counters, rate := #{matched := MatchedRate}} -> + #{ + emqx_rule_matched => ?MG(matched, Counters), + emqx_rule_failed => ?MG(failed, Counters), + emqx_rule_passed => ?MG(passed, Counters), + emqx_rule_failed_exception => ?MG('failed.exception', Counters), + emqx_rule_failed_no_result => ?MG('failed.no_result', Counters), + emqx_rule_actions_total => ?MG('actions.total', Counters), + emqx_rule_actions_success => ?MG('actions.success', Counters), + emqx_rule_actions_failed => ?MG('actions.failed', Counters), + emqx_rule_actions_failed_out_of_service => ?MG( + 'actions.failed.out_of_service', Counters + ), + emqx_rule_actions_failed_unknown => ?MG('actions.failed.unknown', Counters), + emqx_rule_matched_rate => ?MG(current, MatchedRate), + emqx_rule_matched_rate_last5m => ?MG(last5m, MatchedRate), + emqx_rule_matched_rate_max => ?MG(max, MatchedRate) + } + end. + +%%==================== +%% Specific Action +%% With action_id: `{type}:{name}` as label key: `action_id` + +action_specific() -> + [ + emqx_action_matched, + emqx_action_dropped, + emqx_action_success, + emqx_action_failed, + emqx_action_rate, + emqx_action_inflight, + emqx_action_received, + emqx_action_late_reply, + emqx_action_retried, + emqx_action_retried_success, + emqx_action_retried_failed, + emqx_action_dropped_resource_stopped, + emqx_action_dropped_resource_not_found, + emqx_action_dropped_queue_full, + emqx_action_dropped_other, + emqx_action_dropped_expired, + emqx_action_queuing, + emqx_action_rate_last5m, + emqx_action_rate_max + ]. + +action_specific_data() -> + lists:foldl( + fun(#{type := Type, name := Name} = _Bridge, AccIn) -> + Id = emqx_bridge_resource:bridge_id(Type, Name), + merge_acc_with_bridges(Id, get_bridge_metric(Type, Name), AccIn) + end, + maps:from_keys(action_specific(), []), + emqx_bridge:list() + ). + +merge_acc_with_bridges(Id, BridgeMetrics, PointsAcc) -> + maps:fold( + fun(K, V, AccIn) -> + AccIn#{K => [bridge_point(Id, V) | ?MG(K, AccIn)]} + end, + PointsAcc, + BridgeMetrics + ). + +bridge_point(Id, V) -> + {[{action_id, Id}], V}. + +get_bridge_metric(Type, Name) -> + case emqx_bridge:get_metrics(Type, Name) of + #{counters := Counters, rate := #{matched := MatchedRate}, gauges := Gauges} -> + #{ + emqx_action_matched => ?MG0(matched, Counters), + emqx_action_dropped => ?MG0(dropped, Counters), + emqx_action_success => ?MG0(success, Counters), + emqx_action_failed => ?MG0(failed, Counters), + emqx_action_rate => ?MG0(current, MatchedRate), + emqx_action_inflight => ?MG0(inflight, Gauges), + emqx_action_received => ?MG0(received, Counters), + emqx_action_late_reply => ?MG0(late_reply, Counters), + emqx_action_retried => ?MG0(retried, Counters), + emqx_action_retried_success => ?MG0('retried.success', Counters), + emqx_action_retried_failed => ?MG0('retried.failed', Counters), + emqx_action_dropped_resource_stopped => ?MG0('dropped.resource_stopped', Counters), + emqx_action_dropped_resource_not_found => ?MG0( + 'dropped.resource_not_found', Counters + ), + emqx_action_dropped_queue_full => ?MG0('dropped.queue_full', Counters), + emqx_action_dropped_other => ?MG0('dropped.other', Counters), + emqx_action_dropped_expired => ?MG0('dropped.expired', Counters), + emqx_action_queuing => ?MG0(queuing, Gauges), + emqx_action_rate_last5m => ?MG0(last5m, MatchedRate), + emqx_action_rate_max => ?MG0(max, MatchedRate) + } + end. + +%% TODO: Bridge V2 + +%%==================== +%% Specific Connector +%% With connector_id: `{type}:{name}` as label key: `connector_id` + +connector_specific() -> + [ + emqx_connector_enable, + emqx_connector_status + ]. + +connector_specific_data() -> + []. + +%%-------------------------------------------------------------------- + +%%-------------------------------------------------------------------- +%% Help funcs diff --git a/rel/i18n/emqx_prometheus_api.hocon b/rel/i18n/emqx_prometheus_api.hocon index 89999fdd7..0c48e3add 100644 --- a/rel/i18n/emqx_prometheus_api.hocon +++ b/rel/i18n/emqx_prometheus_api.hocon @@ -20,4 +20,9 @@ get_prom_auth_data.desc: get_prom_auth_data.label: """Prometheus Metrics for Auth""" +get_prom_data_integration_data.desc: +"""Get Prometheus Metrics for Data Integration""" +get_prom_data_integration_data.label: +"""Prometheus Metrics for Data Integration""" + }