From b424f8ac1222cf6fe7263bdd1011052021d02001 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Fri, 19 Jan 2024 17:35:12 +0800 Subject: [PATCH] feat(prom_stats): aggregated/unaggregated prometheus data --- apps/emqx_prometheus/src/emqx_prometheus.erl | 1135 +++++++++-------- .../src/emqx_prometheus_auth.erl | 130 +- .../src/emqx_prometheus_data_integration.erl | 364 +++--- 3 files changed, 838 insertions(+), 791 deletions(-) diff --git a/apps/emqx_prometheus/src/emqx_prometheus.erl b/apps/emqx_prometheus/src/emqx_prometheus.erl index 264d818c9..af35acc36 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus.erl @@ -22,6 +22,16 @@ %% be used by the prometheus application -behaviour(prometheus_collector). +-behaviour(emqx_prometheus_cluster). +-export([ + fetch_data_from_local_node/0, + fetch_cluster_consistented_data/0, + aggre_or_zip_init_acc/0, + logic_sum_metrics/0 +]). + +-export([zip_json_prom_stats_metrics/3]). + -include("emqx_prometheus.hrl"). -include_lib("public_key/include/public_key.hrl"). @@ -34,7 +44,7 @@ create_mf/5, gauge_metric/1, gauge_metrics/1, - counter_metric/1 + counter_metrics/1 ] ). @@ -67,12 +77,21 @@ do_stop/0 ]). +%%-------------------------------------------------------------------- +%% Macros +%%-------------------------------------------------------------------- + +-define(MG(K, MAP), maps:get(K, MAP)). +-define(MG0(K, MAP), maps:get(K, MAP, 0)). + -define(C(K, L), proplists:get_value(K, L, 0)). -define(TIMER_MSG, '#interval'). -define(HTTP_OPTIONS, [{autoredirect, true}, {timeout, 60000}]). +-define(LOGICAL_SUM_METRIC_NAMES, []). + %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- @@ -172,85 +191,96 @@ deregister_cleanup(?PROMETHEUS_DEFAULT_REGISTRY) -> ok. collect_mf(?PROMETHEUS_DEFAULT_REGISTRY, Callback) -> - Metrics = emqx_metrics:all(), - Stats = emqx_stats:getstats(), - VMData = emqx_vm_data(), - ClusterData = emqx_cluster_data(), - CertsData = emqx_certs_data(), + RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()), %% TODO: license expiry epoch and cert expiry epoch should be cached - _ = [add_collect_family(Name, CertsData, Callback, gauge) || Name <- emqx_certs()], - _ = [add_collect_family(Name, Stats, Callback, gauge) || Name <- emqx_stats:names()], - _ = [add_collect_family(Name, VMData, Callback, gauge) || Name <- emqx_vm()], - _ = [add_collect_family(Name, ClusterData, Callback, gauge) || Name <- emqx_cluster()], - _ = [add_collect_family(Name, Metrics, Callback, counter) || Name <- emqx_metrics_packets()], - _ = [add_collect_family(Name, Metrics, Callback, counter) || Name <- emqx_metrics_messages()], - _ = [add_collect_family(Name, Metrics, Callback, counter) || Name <- emqx_metrics_delivery()], - _ = [add_collect_family(Name, Metrics, Callback, counter) || Name <- emqx_metrics_client()], - _ = [add_collect_family(Name, Metrics, Callback, counter) || Name <- emqx_metrics_session()], - _ = [add_collect_family(Name, Metrics, Callback, counter) || Name <- emqx_metrics_olp()], - _ = [add_collect_family(Name, Metrics, Callback, counter) || Name <- emqx_metrics_acl()], - _ = [add_collect_family(Name, Metrics, Callback, counter) || Name <- emqx_metrics_authn()], - ok = maybe_collect_family_license(Callback), + ok = add_collect_family(Callback, stats_metric_meta(), ?MG(stats_data, RawData)), + ok = add_collect_family(Callback, vm_metric_meta(), ?MG(vm_data, RawData)), + ok = add_collect_family(Callback, cluster_metric_meta(), ?MG(cluster_data, RawData)), + + ok = add_collect_family(Callback, emqx_packet_metric_meta(), ?MG(emqx_packet_data, RawData)), + ok = add_collect_family(Callback, message_metric_meta(), ?MG(emqx_message_data, RawData)), + ok = add_collect_family(Callback, delivery_metric_meta(), ?MG(emqx_delivery_data, RawData)), + ok = add_collect_family(Callback, client_metric_meta(), ?MG(emqx_client_data, RawData)), + ok = add_collect_family(Callback, session_metric_meta(), ?MG(emqx_session_data, RawData)), + ok = add_collect_family(Callback, olp_metric_meta(), ?MG(emqx_olp_data, RawData)), + ok = add_collect_family(Callback, acl_metric_meta(), ?MG(emqx_acl_data, RawData)), + ok = add_collect_family(Callback, authn_metric_meta(), ?MG(emqx_authn_data, RawData)), + + ok = add_collect_family(Callback, cert_metric_meta(), ?MG(cert_data, RawData)), + ok = maybe_license_add_collect_family(Callback, RawData), ok; collect_mf(_Registry, _Callback) -> ok. %% @private collect(<<"json">>) -> - Metrics = emqx_metrics:all(), - Stats = emqx_stats:getstats(), - VMData = emqx_vm_data(), - %% TODO: FIXME! - %% emqx_metrics_olp()), - %% emqx_metrics_acl()), - %% emqx_metrics_authn()), - (maybe_collect_license())#{ - certs => collect_certs_json(emqx_certs_data()), - stats => maps:from_list([collect_stats(Name, Stats) || Name <- emqx_stats:names()]), - metrics => maps:from_list([collect_stats(Name, VMData) || Name <- emqx_vm()]), - packets => maps:from_list([collect_stats(Name, Metrics) || Name <- emqx_metrics_packets()]), - messages => maps:from_list([collect_stats(Name, Metrics) || Name <- emqx_metrics_messages()]), - delivery => maps:from_list([collect_stats(Name, Metrics) || Name <- emqx_metrics_delivery()]), - client => maps:from_list([collect_stats(Name, Metrics) || Name <- emqx_metrics_client()]), - session => maps:from_list([collect_stats(Name, Metrics) || Name <- emqx_metrics_session()]) + RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()), + (maybe_license_collect_json_data(RawData))#{ + stats => collect_json_data(?MG(stats_data, RawData)), + metrics => collect_json_data(?MG(vm_data, RawData)), + packets => collect_json_data(?MG(emqx_packet_data, RawData)), + messages => collect_json_data(?MG(emqx_message_data, RawData)), + delivery => collect_json_data(?MG(emqx_delivery_data, RawData)), + client => collect_json_data(?MG(emqx_client_data, RawData)), + session => collect_json_data(?MG(emqx_session_data, RawData)), + cluster => collect_json_data(?MG(cluster_data, RawData)), + olp => collect_json_data(?MG(emqx_olp_data, RawData)), + acl => collect_json_data(?MG(emqx_acl_data, RawData)), + authn => collect_json_data(?MG(emqx_authn_data, RawData)), + certs => collect_cert_json_data(?MG(cert_data, RawData)) }; collect(<<"prometheus">>) -> prometheus_text_format:format(?PROMETHEUS_DEFAULT_REGISTRY). -%% @private -collect_stats(Name, Stats) -> - R = collect_metrics(Name, Stats), - case R#'Metric'.gauge of - undefined -> - {_, Val} = R#'Metric'.counter, - {Name, Val}; - {_, Val} -> - {Name, Val} - end. - collect_metrics(Name, Metrics) -> emqx_collect(Name, Metrics). +add_collect_family(Callback, MetricWithType, Data) -> + _ = [add_collect_family(Name, Data, Callback, Type) || {Name, Type, _} <- MetricWithType], + ok. + add_collect_family(Name, Data, Callback, Type) -> Callback(create_mf(Name, _Help = <<"">>, Type, ?MODULE, Data)). --if(?EMQX_RELEASE_EDITION == ee). -maybe_collect_family_license(Callback) -> - LicenseData = emqx_license_data(), - _ = [add_collect_family(Name, LicenseData, Callback, gauge) || Name <- emqx_license()], - ok. +%% behaviour +fetch_data_from_local_node() -> + {node(self()), #{ + stats_data => stats_data(), + vm_data => vm_data(), + cluster_data => cluster_data(), + %% Metrics + emqx_packet_data => emqx_metric_data(emqx_packet_metric_meta()), + emqx_message_data => emqx_metric_data(message_metric_meta()), + emqx_delivery_data => emqx_metric_data(delivery_metric_meta()), + emqx_client_data => emqx_metric_data(client_metric_meta()), + emqx_session_data => emqx_metric_data(session_metric_meta()), + emqx_olp_data => emqx_metric_data(olp_metric_meta()), + emqx_acl_data => emqx_metric_data(acl_metric_meta()), + emqx_authn_data => emqx_metric_data(authn_metric_meta()) + }}. -maybe_collect_license() -> - LicenseData = emqx_license_data(), - #{license => maps:from_list([collect_stats(Name, LicenseData) || Name <- emqx_license()])}. +fetch_cluster_consistented_data() -> + (maybe_license_fetch_data())#{ + cert_data => cert_data() + }. --else. -maybe_collect_family_license(_) -> - ok. +aggre_or_zip_init_acc() -> + #{ + stats_data => maps:from_keys(metrics_name(stats_metric_meta()), []), + vm_data => maps:from_keys(metrics_name(vm_metric_meta()), []), + cluster_data => maps:from_keys(metrics_name(cluster_metric_meta()), []), + emqx_packet_data => maps:from_keys(metrics_name(emqx_packet_metric_meta()), []), + emqx_message_data => maps:from_keys(metrics_name(message_metric_meta()), []), + emqx_delivery_data => maps:from_keys(metrics_name(delivery_metric_meta()), []), + emqx_client_data => maps:from_keys(metrics_name(client_metric_meta()), []), + emqx_session_data => maps:from_keys(metrics_name(session_metric_meta()), []), + emqx_olp_data => maps:from_keys(metrics_name(olp_metric_meta()), []), + emqx_acl_data => maps:from_keys(metrics_name(acl_metric_meta()), []), + emqx_authn_data => maps:from_keys(metrics_name(authn_metric_meta()), []) + }. -maybe_collect_license() -> - #{}. --endif. +logic_sum_metrics() -> + ?LOGICAL_SUM_METRIC_NAMES. %%-------------------------------------------------------------------- %% Collector @@ -258,512 +288,513 @@ maybe_collect_license() -> %%-------------------------------------------------------------------- %% Stats - %% connections -emqx_collect(emqx_connections_count, Stats) -> - gauge_metric(?C('connections.count', Stats)); -emqx_collect(emqx_connections_max, Stats) -> - gauge_metric(?C('connections.max', Stats)); -emqx_collect(emqx_live_connections_count, Stats) -> - gauge_metric(?C('live_connections.count', Stats)); -emqx_collect(emqx_live_connections_max, Stats) -> - gauge_metric(?C('live_connections.max', Stats)); +emqx_collect(K = emqx_connections_count, D) -> gauge_metrics(?MG(K, D)); +emqx_collect(K = emqx_connections_max, D) -> gauge_metrics(?MG(K, D)); +emqx_collect(K = emqx_live_connections_count, D) -> gauge_metrics(?MG(K, D)); +emqx_collect(K = emqx_live_connections_max, D) -> gauge_metrics(?MG(K, D)); %% sessions -emqx_collect(emqx_sessions_count, Stats) -> - gauge_metric(?C('sessions.count', Stats)); -emqx_collect(emqx_sessions_max, Stats) -> - gauge_metric(?C('sessions.max', Stats)); -emqx_collect(emqx_channels_count, Stats) -> - gauge_metric(?C('channels.count', Stats)); -emqx_collect(emqx_channels_max, Stats) -> - gauge_metric(?C('channels.max', Stats)); +emqx_collect(K = emqx_sessions_count, D) -> gauge_metrics(?MG(K, D)); +emqx_collect(K = emqx_sessions_max, D) -> gauge_metrics(?MG(K, D)); +emqx_collect(K = emqx_channels_count, D) -> gauge_metrics(?MG(K, D)); +emqx_collect(K = emqx_channels_max, D) -> gauge_metrics(?MG(K, D)); %% pub/sub stats -emqx_collect(emqx_topics_count, Stats) -> - gauge_metric(?C('topics.count', Stats)); -emqx_collect(emqx_topics_max, Stats) -> - gauge_metric(?C('topics.max', Stats)); -emqx_collect(emqx_suboptions_count, Stats) -> - gauge_metric(?C('suboptions.count', Stats)); -emqx_collect(emqx_suboptions_max, Stats) -> - gauge_metric(?C('suboptions.max', Stats)); -emqx_collect(emqx_subscribers_count, Stats) -> - gauge_metric(?C('subscribers.count', Stats)); -emqx_collect(emqx_subscribers_max, Stats) -> - gauge_metric(?C('subscribers.max', Stats)); -emqx_collect(emqx_subscriptions_count, Stats) -> - gauge_metric(?C('subscriptions.count', Stats)); -emqx_collect(emqx_subscriptions_max, Stats) -> - gauge_metric(?C('subscriptions.max', Stats)); -emqx_collect(emqx_subscriptions_shared_count, Stats) -> - gauge_metric(?C('subscriptions.shared.count', Stats)); -emqx_collect(emqx_subscriptions_shared_max, Stats) -> - gauge_metric(?C('subscriptions.shared.max', Stats)); +emqx_collect(K = emqx_topics_count, D) -> gauge_metrics(?MG(K, D)); +emqx_collect(K = emqx_topics_max, D) -> gauge_metrics(?MG(K, D)); +emqx_collect(K = emqx_suboptions_count, D) -> gauge_metrics(?MG(K, D)); +emqx_collect(K = emqx_suboptions_max, D) -> gauge_metrics(?MG(K, D)); +emqx_collect(K = emqx_subscribers_count, D) -> gauge_metrics(?MG(K, D)); +emqx_collect(K = emqx_subscribers_max, D) -> gauge_metrics(?MG(K, D)); +emqx_collect(K = emqx_subscriptions_count, D) -> gauge_metrics(?MG(K, D)); +emqx_collect(K = emqx_subscriptions_max, D) -> gauge_metrics(?MG(K, D)); +emqx_collect(K = emqx_subscriptions_shared_count, D) -> gauge_metrics(?MG(K, D)); +emqx_collect(K = emqx_subscriptions_shared_max, D) -> gauge_metrics(?MG(K, D)); %% retained -emqx_collect(emqx_retained_count, Stats) -> - gauge_metric(?C('retained.count', Stats)); -emqx_collect(emqx_retained_max, Stats) -> - gauge_metric(?C('retained.max', Stats)); +emqx_collect(K = emqx_retained_count, D) -> gauge_metrics(?MG(K, D)); +emqx_collect(K = emqx_retained_max, D) -> gauge_metrics(?MG(K, D)); %% delayed -emqx_collect(emqx_delayed_count, Stats) -> - gauge_metric(?C('delayed.count', Stats)); -emqx_collect(emqx_delayed_max, Stats) -> - gauge_metric(?C('delayed.max', Stats)); -%%-------------------------------------------------------------------- -%% Metrics - packets & bytes - -%% bytes -emqx_collect(emqx_bytes_received, Metrics) -> - counter_metric(?C('bytes.received', Metrics)); -emqx_collect(emqx_bytes_sent, Metrics) -> - counter_metric(?C('bytes.sent', Metrics)); -%% received.sent -emqx_collect(emqx_packets_received, Metrics) -> - counter_metric(?C('packets.received', Metrics)); -emqx_collect(emqx_packets_sent, Metrics) -> - counter_metric(?C('packets.sent', Metrics)); -%% connect -emqx_collect(emqx_packets_connect, Metrics) -> - counter_metric(?C('packets.connect.received', Metrics)); -emqx_collect(emqx_packets_connack_sent, Metrics) -> - counter_metric(?C('packets.connack.sent', Metrics)); -emqx_collect(emqx_packets_connack_error, Metrics) -> - counter_metric(?C('packets.connack.error', Metrics)); -emqx_collect(emqx_packets_connack_auth_error, Metrics) -> - counter_metric(?C('packets.connack.auth_error', Metrics)); -%% sub.unsub -emqx_collect(emqx_packets_subscribe_received, Metrics) -> - counter_metric(?C('packets.subscribe.received', Metrics)); -emqx_collect(emqx_packets_subscribe_auth_error, Metrics) -> - counter_metric(?C('packets.subscribe.auth_error', Metrics)); -emqx_collect(emqx_packets_subscribe_error, Metrics) -> - counter_metric(?C('packets.subscribe.error', Metrics)); -emqx_collect(emqx_packets_suback_sent, Metrics) -> - counter_metric(?C('packets.suback.sent', Metrics)); -emqx_collect(emqx_packets_unsubscribe_received, Metrics) -> - counter_metric(?C('packets.unsubscribe.received', Metrics)); -emqx_collect(emqx_packets_unsubscribe_error, Metrics) -> - counter_metric(?C('packets.unsubscribe.error', Metrics)); -emqx_collect(emqx_packets_unsuback_sent, Metrics) -> - counter_metric(?C('packets.unsuback.sent', Metrics)); -%% publish.puback -emqx_collect(emqx_packets_publish_received, Metrics) -> - counter_metric(?C('packets.publish.received', Metrics)); -emqx_collect(emqx_packets_publish_sent, Metrics) -> - counter_metric(?C('packets.publish.sent', Metrics)); -emqx_collect(emqx_packets_publish_inuse, Metrics) -> - counter_metric(?C('packets.publish.inuse', Metrics)); -emqx_collect(emqx_packets_publish_error, Metrics) -> - counter_metric(?C('packets.publish.error', Metrics)); -emqx_collect(emqx_packets_publish_auth_error, Metrics) -> - counter_metric(?C('packets.publish.auth_error', Metrics)); -emqx_collect(emqx_packets_publish_dropped, Metrics) -> - counter_metric(?C('packets.publish.dropped', Metrics)); -%% puback -emqx_collect(emqx_packets_puback_received, Metrics) -> - counter_metric(?C('packets.puback.received', Metrics)); -emqx_collect(emqx_packets_puback_sent, Metrics) -> - counter_metric(?C('packets.puback.sent', Metrics)); -emqx_collect(emqx_packets_puback_inuse, Metrics) -> - counter_metric(?C('packets.puback.inuse', Metrics)); -emqx_collect(emqx_packets_puback_missed, Metrics) -> - counter_metric(?C('packets.puback.missed', Metrics)); -%% pubrec -emqx_collect(emqx_packets_pubrec_received, Metrics) -> - counter_metric(?C('packets.pubrec.received', Metrics)); -emqx_collect(emqx_packets_pubrec_sent, Metrics) -> - counter_metric(?C('packets.pubrec.sent', Metrics)); -emqx_collect(emqx_packets_pubrec_inuse, Metrics) -> - counter_metric(?C('packets.pubrec.inuse', Metrics)); -emqx_collect(emqx_packets_pubrec_missed, Metrics) -> - counter_metric(?C('packets.pubrec.missed', Metrics)); -%% pubrel -emqx_collect(emqx_packets_pubrel_received, Metrics) -> - counter_metric(?C('packets.pubrel.received', Metrics)); -emqx_collect(emqx_packets_pubrel_sent, Metrics) -> - counter_metric(?C('packets.pubrel.sent', Metrics)); -emqx_collect(emqx_packets_pubrel_missed, Metrics) -> - counter_metric(?C('packets.pubrel.missed', Metrics)); -%% pubcomp -emqx_collect(emqx_packets_pubcomp_received, Metrics) -> - counter_metric(?C('packets.pubcomp.received', Metrics)); -emqx_collect(emqx_packets_pubcomp_sent, Metrics) -> - counter_metric(?C('packets.pubcomp.sent', Metrics)); -emqx_collect(emqx_packets_pubcomp_inuse, Metrics) -> - counter_metric(?C('packets.pubcomp.inuse', Metrics)); -emqx_collect(emqx_packets_pubcomp_missed, Metrics) -> - counter_metric(?C('packets.pubcomp.missed', Metrics)); -%% pingreq -emqx_collect(emqx_packets_pingreq_received, Metrics) -> - counter_metric(?C('packets.pingreq.received', Metrics)); -emqx_collect(emqx_packets_pingresp_sent, Metrics) -> - counter_metric(?C('packets.pingresp.sent', Metrics)); -%% disconnect -emqx_collect(emqx_packets_disconnect_received, Metrics) -> - counter_metric(?C('packets.disconnect.received', Metrics)); -emqx_collect(emqx_packets_disconnect_sent, Metrics) -> - counter_metric(?C('packets.disconnect.sent', Metrics)); -%% auth -emqx_collect(emqx_packets_auth_received, Metrics) -> - counter_metric(?C('packets.auth.received', Metrics)); -emqx_collect(emqx_packets_auth_sent, Metrics) -> - counter_metric(?C('packets.auth.sent', Metrics)); -%%-------------------------------------------------------------------- -%% Metrics - messages - -%% messages -emqx_collect(emqx_messages_received, Metrics) -> - counter_metric(?C('messages.received', Metrics)); -emqx_collect(emqx_messages_sent, Metrics) -> - counter_metric(?C('messages.sent', Metrics)); -emqx_collect(emqx_messages_qos0_received, Metrics) -> - counter_metric(?C('messages.qos0.received', Metrics)); -emqx_collect(emqx_messages_qos0_sent, Metrics) -> - counter_metric(?C('messages.qos0.sent', Metrics)); -emqx_collect(emqx_messages_qos1_received, Metrics) -> - counter_metric(?C('messages.qos1.received', Metrics)); -emqx_collect(emqx_messages_qos1_sent, Metrics) -> - counter_metric(?C('messages.qos1.sent', Metrics)); -emqx_collect(emqx_messages_qos2_received, Metrics) -> - counter_metric(?C('messages.qos2.received', Metrics)); -emqx_collect(emqx_messages_qos2_sent, Metrics) -> - counter_metric(?C('messages.qos2.sent', Metrics)); -emqx_collect(emqx_messages_publish, Metrics) -> - counter_metric(?C('messages.publish', Metrics)); -emqx_collect(emqx_messages_dropped, Metrics) -> - counter_metric(?C('messages.dropped', Metrics)); -emqx_collect(emqx_messages_dropped_expired, Metrics) -> - counter_metric(?C('messages.dropped.await_pubrel_timeout', Metrics)); -emqx_collect(emqx_messages_dropped_no_subscribers, Metrics) -> - counter_metric(?C('messages.dropped.no_subscribers', Metrics)); -emqx_collect(emqx_messages_forward, Metrics) -> - counter_metric(?C('messages.forward', Metrics)); -emqx_collect(emqx_messages_retained, Metrics) -> - counter_metric(?C('messages.retained', Metrics)); -emqx_collect(emqx_messages_delayed, Stats) -> - counter_metric(?C('messages.delayed', Stats)); -emqx_collect(emqx_messages_delivered, Stats) -> - counter_metric(?C('messages.delivered', Stats)); -emqx_collect(emqx_messages_acked, Stats) -> - counter_metric(?C('messages.acked', Stats)); -%%-------------------------------------------------------------------- -%% Metrics - delivery - -emqx_collect(emqx_delivery_dropped, Stats) -> - counter_metric(?C('delivery.dropped', Stats)); -emqx_collect(emqx_delivery_dropped_no_local, Stats) -> - counter_metric(?C('delivery.dropped.no_local', Stats)); -emqx_collect(emqx_delivery_dropped_too_large, Stats) -> - counter_metric(?C('delivery.dropped.too_large', Stats)); -emqx_collect(emqx_delivery_dropped_qos0_msg, Stats) -> - counter_metric(?C('delivery.dropped.qos0_msg', Stats)); -emqx_collect(emqx_delivery_dropped_queue_full, Stats) -> - counter_metric(?C('delivery.dropped.queue_full', Stats)); -emqx_collect(emqx_delivery_dropped_expired, Stats) -> - counter_metric(?C('delivery.dropped.expired', Stats)); -%%-------------------------------------------------------------------- -%% Metrics - client -emqx_collect(emqx_client_connect, Stats) -> - counter_metric(?C('client.connect', Stats)); -emqx_collect(emqx_client_connack, Stats) -> - counter_metric(?C('client.connack', Stats)); -emqx_collect(emqx_client_connected, Stats) -> - counter_metric(?C('client.connected', Stats)); -emqx_collect(emqx_client_authenticate, Stats) -> - counter_metric(?C('client.authenticate', Stats)); -emqx_collect(emqx_client_auth_anonymous, Stats) -> - counter_metric(?C('client.auth.anonymous', Stats)); -emqx_collect(emqx_client_authorize, Stats) -> - counter_metric(?C('client.authorize', Stats)); -emqx_collect(emqx_client_subscribe, Stats) -> - counter_metric(?C('client.subscribe', Stats)); -emqx_collect(emqx_client_unsubscribe, Stats) -> - counter_metric(?C('client.unsubscribe', Stats)); -emqx_collect(emqx_client_disconnected, Stats) -> - counter_metric(?C('client.disconnected', Stats)); -%%-------------------------------------------------------------------- -%% Metrics - session - -emqx_collect(emqx_session_created, Stats) -> - counter_metric(?C('session.created', Stats)); -emqx_collect(emqx_session_resumed, Stats) -> - counter_metric(?C('session.resumed', Stats)); -emqx_collect(emqx_session_takenover, Stats) -> - counter_metric(?C('session.takenover', Stats)); -emqx_collect(emqx_session_discarded, Stats) -> - counter_metric(?C('session.discarded', Stats)); -emqx_collect(emqx_session_terminated, Stats) -> - counter_metric(?C('session.terminated', Stats)); -%%-------------------------------------------------------------------- - -%% Metrics - overload protection -emqx_collect(emqx_overload_protection_delay_ok, Stats) -> - counter_metric(?C('overload_protection.delay.ok', Stats)); -emqx_collect(emqx_overload_protection_delay_timeout, Stats) -> - counter_metric(?C('overload_protection.delay.timeout', Stats)); -emqx_collect(emqx_overload_protection_hibernation, Stats) -> - counter_metric(?C('overload_protection.hibernation', Stats)); -emqx_collect(emqx_overload_protection_gc, Stats) -> - counter_metric(?C('overload_protection.gc', Stats)); -emqx_collect(emqx_overload_protection_new_conn, Stats) -> - counter_metric(?C('overload_protection.new_conn', Stats)); -%%-------------------------------------------------------------------- -%% Metrics - acl -emqx_collect(emqx_authorization_allow, Stats) -> - counter_metric(?C('authorization.allow', Stats)); -emqx_collect(emqx_authorization_deny, Stats) -> - counter_metric(?C('authorization.deny', Stats)); -emqx_collect(emqx_authorization_cache_hit, Stats) -> - counter_metric(?C('authorization.cache_hit', Stats)); -emqx_collect(emqx_authorization_cache_miss, Stats) -> - counter_metric(?C('authorization.cache_miss', Stats)); -emqx_collect(emqx_authorization_superuser, Stats) -> - counter_metric(?C('authorization.superuser', Stats)); -emqx_collect(emqx_authorization_nomatch, Stats) -> - counter_metric(?C('authorization.nomatch', Stats)); -emqx_collect(emqx_authorization_matched_allow, Stats) -> - counter_metric(?C('authorization.matched_allow', Stats)); -emqx_collect(emqx_authorization_matched_deny, Stats) -> - counter_metric(?C('authorization.matched_deny', Stats)); -%%-------------------------------------------------------------------- -%% Metrics - authn -emqx_collect(emqx_authentication_success, Stats) -> - counter_metric(?C('authentication.success', Stats)); -emqx_collect(emqx_authentication_success_anonymous, Stats) -> - counter_metric(?C('authentication.success.anonymous', Stats)); -emqx_collect(emqx_authentication_failure, Stats) -> - counter_metric(?C('authentication.failure', Stats)); +emqx_collect(K = emqx_delayed_count, D) -> gauge_metrics(?MG(K, D)); +emqx_collect(K = emqx_delayed_max, D) -> gauge_metrics(?MG(K, D)); %%-------------------------------------------------------------------- %% VM -emqx_collect(emqx_vm_cpu_use, VMData) -> - gauge_metric(?C(cpu_use, VMData)); -emqx_collect(emqx_vm_cpu_idle, VMData) -> - gauge_metric(?C(cpu_idle, VMData)); -emqx_collect(emqx_vm_run_queue, VMData) -> - gauge_metric(?C(run_queue, VMData)); -emqx_collect(emqx_vm_process_messages_in_queues, VMData) -> - gauge_metric(?C(process_total_messages, VMData)); -emqx_collect(emqx_vm_total_memory, VMData) -> - gauge_metric(?C(total_memory, VMData)); -emqx_collect(emqx_vm_used_memory, VMData) -> - gauge_metric(?C(used_memory, VMData)); -emqx_collect(emqx_cluster_nodes_running, ClusterData) -> - gauge_metric(?C(nodes_running, ClusterData)); -emqx_collect(emqx_cluster_nodes_stopped, ClusterData) -> - gauge_metric(?C(nodes_stopped, ClusterData)); +emqx_collect(K = emqx_vm_cpu_use, D) -> gauge_metrics(?MG(K, D)); +emqx_collect(K = emqx_vm_cpu_idle, D) -> gauge_metrics(?MG(K, D)); +emqx_collect(K = emqx_vm_run_queue, D) -> gauge_metrics(?MG(K, D)); +emqx_collect(K = emqx_vm_process_messages_in_queues, D) -> gauge_metrics(?MG(K, D)); +emqx_collect(K = emqx_vm_total_memory, D) -> gauge_metrics(?MG(K, D)); +emqx_collect(K = emqx_vm_used_memory, D) -> gauge_metrics(?MG(K, D)); +%%-------------------------------------------------------------------- +%% Cluster Info +emqx_collect(K = emqx_cluster_nodes_running, D) -> gauge_metrics(?MG(K, D)); +emqx_collect(K = emqx_cluster_nodes_stopped, D) -> gauge_metrics(?MG(K, D)); +%%-------------------------------------------------------------------- +%% Metrics - packets & bytes +%% bytes +emqx_collect(K = emqx_bytes_received, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_bytes_sent, D) -> counter_metrics(?MG(K, D)); +%% received.sent +emqx_collect(K = emqx_packets_received, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_packets_sent, D) -> counter_metrics(?MG(K, D)); +%% connect +emqx_collect(K = emqx_packets_connect, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_packets_connack_sent, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_packets_connack_error, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_packets_connack_auth_error, D) -> counter_metrics(?MG(K, D)); +%% sub.unsub +emqx_collect(K = emqx_packets_subscribe_received, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_packets_subscribe_auth_error, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_packets_subscribe_error, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_packets_suback_sent, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_packets_unsubscribe_received, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_packets_unsubscribe_error, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_packets_unsuback_sent, D) -> counter_metrics(?MG(K, D)); +%% publish.puback +emqx_collect(K = emqx_packets_publish_received, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_packets_publish_sent, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_packets_publish_inuse, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_packets_publish_error, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_packets_publish_auth_error, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_packets_publish_dropped, D) -> counter_metrics(?MG(K, D)); +%% puback +emqx_collect(K = emqx_packets_puback_received, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_packets_puback_sent, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_packets_puback_inuse, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_packets_puback_missed, D) -> counter_metrics(?MG(K, D)); +%% pubrec +emqx_collect(K = emqx_packets_pubrec_received, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_packets_pubrec_sent, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_packets_pubrec_inuse, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_packets_pubrec_missed, D) -> counter_metrics(?MG(K, D)); +%% pubrel +emqx_collect(K = emqx_packets_pubrel_received, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_packets_pubrel_sent, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_packets_pubrel_missed, D) -> counter_metrics(?MG(K, D)); +%% pubcomp +emqx_collect(K = emqx_packets_pubcomp_received, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_packets_pubcomp_sent, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_packets_pubcomp_inuse, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_packets_pubcomp_missed, D) -> counter_metrics(?MG(K, D)); +%% pingreq +emqx_collect(K = emqx_packets_pingreq_received, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_packets_pingresp_sent, D) -> counter_metrics(?MG(K, D)); +%% disconnect +emqx_collect(K = emqx_packets_disconnect_received, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_packets_disconnect_sent, D) -> counter_metrics(?MG(K, D)); +%% auth +emqx_collect(K = emqx_packets_auth_received, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_packets_auth_sent, D) -> counter_metrics(?MG(K, D)); +%%-------------------------------------------------------------------- +%% Metrics - messages +%% messages +emqx_collect(K = emqx_messages_received, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_messages_sent, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_messages_qos0_received, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_messages_qos0_sent, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_messages_qos1_received, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_messages_qos1_sent, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_messages_qos2_received, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_messages_qos2_sent, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_messages_publish, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_messages_dropped, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_messages_dropped_expired, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_messages_dropped_no_subscribers, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_messages_forward, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_messages_retained, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_messages_delayed, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_messages_delivered, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_messages_acked, D) -> counter_metrics(?MG(K, D)); +%%-------------------------------------------------------------------- +%% Metrics - delivery +emqx_collect(K = emqx_delivery_dropped, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_delivery_dropped_no_local, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_delivery_dropped_too_large, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_delivery_dropped_qos0_msg, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_delivery_dropped_queue_full, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_delivery_dropped_expired, D) -> counter_metrics(?MG(K, D)); +%%-------------------------------------------------------------------- +%% Metrics - client +emqx_collect(K = emqx_client_connect, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_client_connack, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_client_connected, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_client_authenticate, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_client_auth_anonymous, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_client_authorize, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_client_subscribe, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_client_unsubscribe, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_client_disconnected, D) -> counter_metrics(?MG(K, D)); +%%-------------------------------------------------------------------- +%% Metrics - session +emqx_collect(K = emqx_session_created, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_session_resumed, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_session_takenover, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_session_discarded, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_session_terminated, D) -> counter_metrics(?MG(K, D)); +%%-------------------------------------------------------------------- +%% Metrics - overload protection +emqx_collect(K = emqx_overload_protection_delay_ok, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_overload_protection_delay_timeout, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_overload_protection_hibernation, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_overload_protection_gc, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_overload_protection_new_conn, D) -> counter_metrics(?MG(K, D)); +%%-------------------------------------------------------------------- +%% Metrics - acl +emqx_collect(K = emqx_authorization_allow, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_authorization_deny, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_authorization_cache_hit, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_authorization_cache_miss, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_authorization_superuser, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_authorization_nomatch, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_authorization_matched_allow, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_authorization_matched_deny, D) -> counter_metrics(?MG(K, D)); +%%-------------------------------------------------------------------- +%% Metrics - authn +emqx_collect(K = emqx_authentication_success, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_authentication_success_anonymous, D) -> counter_metrics(?MG(K, D)); +emqx_collect(K = emqx_authentication_failure, D) -> counter_metrics(?MG(K, D)); %%-------------------------------------------------------------------- %% License -emqx_collect(emqx_license_expiry_at, LicenseData) -> - gauge_metric(?C(expiry_at, LicenseData)); +emqx_collect(K = emqx_license_expiry_at, D) -> gauge_metric(?MG(K, D)); %%-------------------------------------------------------------------- %% Certs -emqx_collect(emqx_cert_expiry_at, CertsData) -> - gauge_metrics(CertsData). +emqx_collect(K = emqx_cert_expiry_at, D) -> gauge_metrics(?MG(K, D)). %%-------------------------------------------------------------------- %% Indicators %%-------------------------------------------------------------------- -emqx_metrics_packets() -> +%%======================================== +%% Stats +%%======================================== + +stats_metric_meta() -> [ - emqx_bytes_received, - emqx_bytes_sent, - emqx_packets_received, - emqx_packets_sent, - emqx_packets_connect, - emqx_packets_connack_sent, - emqx_packets_connack_error, - emqx_packets_connack_auth_error, - emqx_packets_publish_received, - emqx_packets_publish_sent, - emqx_packets_publish_inuse, - emqx_packets_publish_error, - emqx_packets_publish_auth_error, - emqx_packets_publish_dropped, - emqx_packets_puback_received, - emqx_packets_puback_sent, - emqx_packets_puback_inuse, - emqx_packets_puback_missed, - emqx_packets_pubrec_received, - emqx_packets_pubrec_sent, - emqx_packets_pubrec_inuse, - emqx_packets_pubrec_missed, - emqx_packets_pubrel_received, - emqx_packets_pubrel_sent, - emqx_packets_pubrel_missed, - emqx_packets_pubcomp_received, - emqx_packets_pubcomp_sent, - emqx_packets_pubcomp_inuse, - emqx_packets_pubcomp_missed, - emqx_packets_subscribe_received, - emqx_packets_subscribe_error, - emqx_packets_subscribe_auth_error, - emqx_packets_suback_sent, - emqx_packets_unsubscribe_received, - emqx_packets_unsubscribe_error, - emqx_packets_unsuback_sent, - emqx_packets_pingreq_received, - emqx_packets_pingresp_sent, - emqx_packets_disconnect_received, - emqx_packets_disconnect_sent, - emqx_packets_auth_received, - emqx_packets_auth_sent + %% connections + {emqx_connections_count, counter, 'connections.count'}, + {emqx_connections_max, counter, 'connections.max'}, + {emqx_live_connections_count, counter, 'live_connections.count'}, + {emqx_live_connections_max, counter, 'live_connections.max'}, + %% sessions + {emqx_sessions_count, counter, 'sessions.count'}, + {emqx_sessions_max, counter, 'sessions.max'}, + {emqx_channels_count, counter, 'channels.count'}, + {emqx_channels_max, counter, 'channels.max'}, + %% pub/sub stats + {emqx_topics_count, counter, 'topics.count'}, + {emqx_topics_max, counter, 'topics.max'}, + {emqx_suboptions_count, counter, 'suboptions.count'}, + {emqx_suboptions_max, counter, 'suboptions.max'}, + {emqx_subscribers_count, counter, 'subscribers.count'}, + {emqx_subscribers_max, counter, 'subscribers.max'}, + {emqx_subscriptions_count, counter, 'subscriptions.count'}, + {emqx_subscriptions_max, counter, 'subscriptions.max'}, + {emqx_subscriptions_shared_count, counter, 'subscriptions.shared.count'}, + {emqx_subscriptions_shared_max, counter, 'subscriptions.shared.max'}, + %% retained + {emqx_retained_count, counter, 'retained.count'}, + {emqx_retained_max, counter, 'retained.max'}, + %% delayed + {emqx_delayed_count, counter, 'delayed.count'}, + {emqx_delayed_max, counter, 'delayed.max'} ]. -emqx_metrics_olp() -> - case emqx_config_zones:is_olp_enabled() of - true -> - [ - emqx_overload_protection_delay_ok, - emqx_overload_protection_delay_timeout, - emqx_overload_protection_hibernation, - emqx_overload_protection_gc, - emqx_overload_protection_new_conn - ]; - false -> - [] - end. +stats_data() -> + Stats = emqx_stats:getstats(), + lists:foldl( + fun({Name, _Type, MetricKAtom}, AccIn) -> + AccIn#{Name => [{[], ?C(MetricKAtom, Stats)}]} + end, + #{}, + stats_metric_meta() + ). -emqx_metrics_acl() -> +%%======================================== +%% Erlang VM +%%======================================== + +vm_metric_meta() -> [ - emqx_authorization_allow, - emqx_authorization_deny, - emqx_authorization_cache_hit, - emqx_authorization_cache_miss, - emqx_authorization_superuser, - emqx_authorization_nomatch, - emqx_authorization_matched_allow, - emqx_authorization_matched_deny + {emqx_vm_cpu_use, gauge, 'cpu_use'}, + {emqx_vm_cpu_idle, gauge, 'cpu_idle'}, + {emqx_vm_run_queue, gauge, 'run_queue'}, + {emqx_vm_process_messages_in_queues, gauge, 'process_total_messages'}, + {emqx_vm_total_memory, gauge, 'total_memory'}, + {emqx_vm_used_memory, gauge, 'used_memory'} ]. -emqx_metrics_authn() -> +vm_data() -> + VmStats = emqx_mgmt:vm_stats(), + lists:foldl( + fun({Name, _Type, MetricKAtom}, AccIn) -> + AccIn#{Name => [{[], ?C(MetricKAtom, VmStats)}]} + end, + #{}, + vm_metric_meta() + ). + +%%======================================== +%% Cluster +%%======================================== + +cluster_metric_meta() -> [ - emqx_authentication_success, - emqx_authentication_success_anonymous, - emqx_authentication_failure + {emqx_cluster_nodes_running, gauge, undefined}, + {emqx_cluster_nodes_stopped, gauge, undefined} ]. -emqx_metrics_messages() -> - [ - emqx_messages_received, - emqx_messages_sent, - emqx_messages_qos0_received, - emqx_messages_qos0_sent, - emqx_messages_qos1_received, - emqx_messages_qos1_sent, - emqx_messages_qos2_received, - emqx_messages_qos2_sent, - emqx_messages_publish, - emqx_messages_dropped, - emqx_messages_dropped_expired, - emqx_messages_dropped_no_subscribers, - emqx_messages_forward, - emqx_messages_retained, - emqx_messages_delayed, - emqx_messages_delivered, - emqx_messages_acked - ]. - -emqx_metrics_delivery() -> - [ - emqx_delivery_dropped, - emqx_delivery_dropped_no_local, - emqx_delivery_dropped_too_large, - emqx_delivery_dropped_qos0_msg, - emqx_delivery_dropped_queue_full, - emqx_delivery_dropped_expired - ]. - -emqx_metrics_client() -> - [ - emqx_client_connect, - emqx_client_connack, - emqx_client_connected, - emqx_client_authenticate, - emqx_client_auth_anonymous, - emqx_client_authorize, - emqx_client_subscribe, - emqx_client_unsubscribe, - emqx_client_disconnected - ]. - -emqx_metrics_session() -> - [ - emqx_session_created, - emqx_session_resumed, - emqx_session_takenover, - emqx_session_discarded, - emqx_session_terminated - ]. - -emqx_vm() -> - [ - emqx_vm_cpu_use, - emqx_vm_cpu_idle, - emqx_vm_run_queue, - emqx_vm_process_messages_in_queues, - emqx_vm_total_memory, - emqx_vm_used_memory - ]. - -emqx_vm_data() -> - emqx_mgmt:vm_stats(). - -emqx_cluster() -> - [ - emqx_cluster_nodes_running, - emqx_cluster_nodes_stopped - ]. - -emqx_cluster_data() -> +cluster_data() -> Running = emqx:cluster_nodes(running), Stopped = emqx:cluster_nodes(stopped), + #{ + emqx_cluster_nodes_running => [{[], length(Running)}], + emqx_cluster_nodes_stopped => [{[], length(Stopped)}] + }. + +%%======================================== +%% Metrics +%%======================================== + +emqx_metric_data(MetricNameTypeKeyL) -> + Metrics = emqx_metrics:all(), + lists:foldl( + fun({Name, _Type, MetricKAtom}, AccIn) -> + AccIn#{Name => [{[], ?C(MetricKAtom, Metrics)}]} + end, + #{}, + MetricNameTypeKeyL + ). + +%%========== +%% Bytes && Packets +emqx_packet_metric_meta() -> [ - {nodes_running, length(Running)}, - {nodes_stopped, length(Stopped)} + {emqx_bytes_received, counter, 'bytes.received'}, + {emqx_bytes_sent, counter, 'bytes.sent'}, + %% received.sent + {emqx_packets_received, counter, 'packets.received'}, + {emqx_packets_sent, counter, 'packets.sent'}, + %% connect + {emqx_packets_connect, counter, 'packets.connect.received'}, + {emqx_packets_connack_sent, counter, 'packets.connack.sent'}, + {emqx_packets_connack_error, counter, 'packets.connack.error'}, + {emqx_packets_connack_auth_error, counter, 'packets.connack.auth_error'}, + %% sub.unsub + {emqx_packets_subscribe_received, counter, 'packets.subscribe.received'}, + {emqx_packets_subscribe_auth_error, counter, 'packets.subscribe.auth_error'}, + {emqx_packets_subscribe_error, counter, 'packets.subscribe.error'}, + {emqx_packets_suback_sent, counter, 'packets.suback.sent'}, + {emqx_packets_unsubscribe_received, counter, 'packets.unsubscribe.received'}, + {emqx_packets_unsubscribe_error, counter, 'packets.unsubscribe.error'}, + {emqx_packets_unsuback_sent, counter, 'packets.unsuback.sent'}, + %% publish.puback + {emqx_packets_publish_received, counter, 'packets.publish.received'}, + {emqx_packets_publish_sent, counter, 'packets.publish.sent'}, + {emqx_packets_publish_inuse, counter, 'packets.publish.inuse'}, + {emqx_packets_publish_error, counter, 'packets.publish.error'}, + {emqx_packets_publish_auth_error, counter, 'packets.publish.auth_error'}, + {emqx_packets_publish_dropped, counter, 'packets.publish.dropped'}, + %% puback + {emqx_packets_puback_received, counter, 'packets.puback.received'}, + {emqx_packets_puback_sent, counter, 'packets.puback.sent'}, + {emqx_packets_puback_inuse, counter, 'packets.puback.inuse'}, + {emqx_packets_puback_missed, counter, 'packets.puback.missed'}, + %% pubrec + {emqx_packets_pubrec_received, counter, 'packets.pubrec.received'}, + {emqx_packets_pubrec_sent, counter, 'packets.pubrec.sent'}, + {emqx_packets_pubrec_inuse, counter, 'packets.pubrec.inuse'}, + {emqx_packets_pubrec_missed, counter, 'packets.pubrec.missed'}, + %% pubrel + {emqx_packets_pubrel_received, counter, 'packets.pubrel.received'}, + {emqx_packets_pubrel_sent, counter, 'packets.pubrel.sent'}, + {emqx_packets_pubrel_missed, counter, 'packets.pubrel.missed'}, + %% pubcomp + {emqx_packets_pubcomp_received, counter, 'packets.pubcomp.received'}, + {emqx_packets_pubcomp_sent, counter, 'packets.pubcomp.sent'}, + {emqx_packets_pubcomp_inuse, counter, 'packets.pubcomp.inuse'}, + {emqx_packets_pubcomp_missed, counter, 'packets.pubcomp.missed'}, + %% pingreq + {emqx_packets_pingreq_received, counter, 'packets.pingreq.received'}, + {emqx_packets_pingresp_sent, counter, 'packets.pingresp.sent'}, + %% disconnect + {emqx_packets_disconnect_received, counter, 'packets.disconnect.received'}, + {emqx_packets_disconnect_sent, counter, 'packets.disconnect.sent'}, + %% auth + {emqx_packets_auth_received, counter, 'packets.auth.received'}, + {emqx_packets_auth_sent, counter, 'packets.auth.sent'} ]. +%%========== +%% Messages +message_metric_meta() -> + [ + {emqx_messages_received, counter, 'messages.received'}, + {emqx_messages_sent, counter, 'messages.sent'}, + {emqx_messages_qos0_received, counter, 'messages.qos0.received'}, + {emqx_messages_qos0_sent, counter, 'messages.qos0.sent'}, + {emqx_messages_qos1_received, counter, 'messages.qos1.received'}, + {emqx_messages_qos1_sent, counter, 'messages.qos1.sent'}, + {emqx_messages_qos2_received, counter, 'messages.qos2.received'}, + {emqx_messages_qos2_sent, counter, 'messages.qos2.sent'}, + {emqx_messages_publish, counter, 'messages.publish'}, + {emqx_messages_dropped, counter, 'messages.dropped'}, + {emqx_messages_dropped_expired, counter, 'messages.dropped.await_pubrel_timeout'}, + {emqx_messages_dropped_no_subscribers, counter, 'messages.dropped.no_subscribers'}, + {emqx_messages_forward, counter, 'messages.forward'}, + {emqx_messages_retained, counter, 'messages.retained'}, + {emqx_messages_delayed, counter, 'messages.delayed'}, + {emqx_messages_delivered, counter, 'messages.delivered'}, + {emqx_messages_acked, counter, 'messages.acked'} + ]. + +%%========== +%% Delivery +delivery_metric_meta() -> + [ + {emqx_delivery_dropped, counter, 'delivery.dropped'}, + {emqx_delivery_dropped_no_local, counter, 'delivery.dropped.no_local'}, + {emqx_delivery_dropped_too_large, counter, 'delivery.dropped.too_large'}, + {emqx_delivery_dropped_qos0_msg, counter, 'delivery.dropped.qos0_msg'}, + {emqx_delivery_dropped_queue_full, counter, 'delivery.dropped.queue_full'}, + {emqx_delivery_dropped_expired, counter, 'delivery.dropped.expired'} + ]. + +%%========== +%% Client +client_metric_meta() -> + [ + {emqx_client_connect, counter, 'client.connect'}, + {emqx_client_connack, counter, 'client.connack'}, + {emqx_client_connected, counter, 'client.connected'}, + {emqx_client_authenticate, counter, 'client.authenticate'}, + {emqx_client_auth_anonymous, counter, 'client.auth.anonymous'}, + {emqx_client_authorize, counter, 'client.authorize'}, + {emqx_client_subscribe, counter, 'client.subscribe'}, + {emqx_client_unsubscribe, counter, 'client.unsubscribe'}, + {emqx_client_disconnected, counter, 'client.disconnected'} + ]. + +%%========== +%% Metrics - session +session_metric_meta() -> + [ + {emqx_session_created, counter, 'session.created'}, + {emqx_session_resumed, counter, 'session.resumed'}, + {emqx_session_takenover, counter, 'session.takenover'}, + {emqx_session_discarded, counter, 'session.discarded'}, + {emqx_session_terminated, counter, 'session.terminated'} + ]. + +%%========== +%% Metrics - acl +acl_metric_meta() -> + [ + {emqx_authorization_allow, counter, 'authorization.allow'}, + {emqx_authorization_deny, counter, 'authorization.deny'}, + {emqx_authorization_cache_hit, counter, 'authorization.cache_hit'}, + {emqx_authorization_cache_miss, counter, 'authorization.cache_miss'}, + {emqx_authorization_superuser, counter, 'authorization.superuser'}, + {emqx_authorization_nomatch, counter, 'authorization.nomatch'}, + {emqx_authorization_matched_allow, counter, 'authorization.matched_allow'}, + {emqx_authorization_matched_deny, counter, 'authorization.matched_deny'} + ]. + +%%========== +%% Metrics - authn +authn_metric_meta() -> + [ + {emqx_authentication_success, counter, 'authentication.success'}, + {emqx_authentication_success_anonymous, counter, 'authentication.success.anonymous'}, + {emqx_authentication_failure, counter, 'authentication.failure'} + ]. + +%%========== +%% Overload Protection +olp_metric_meta() -> + emqx_metrics_olp_meta(emqx_config_zones:is_olp_enabled()). + +emqx_metrics_olp_meta(true) -> + [ + {emqx_overload_protection_delay_ok, counter, 'overload_protection.delay.ok'}, + {emqx_overload_protection_delay_timeout, counter, 'overload_protection.delay.timeout'}, + {emqx_overload_protection_hibernation, counter, 'overload_protection.hibernation'}, + {emqx_overload_protection_gc, counter, 'overload_protection.gc'}, + {emqx_overload_protection_new_conn, counter, 'overload_protection.new_conn'} + ]; +emqx_metrics_olp_meta(false) -> + []. + +%%======================================== +%% License +%%======================================== + -if(?EMQX_RELEASE_EDITION == ee). -emqx_license() -> + +maybe_license_add_collect_family(Callback, RawData) -> + ok = add_collect_family(Callback, license_metric_meta(), ?MG(license_data, RawData)), + ok. + +maybe_license_fetch_data() -> + #{license_data => license_data()}. + +maybe_license_collect_json_data(RawData) -> + #{license => ?MG(license_data, RawData)}. + +%% license +license_metric_meta() -> [ - emqx_license_expiry_at + {emqx_license_expiry_at, gauge, undefined} ]. -emqx_license_data() -> - [ - {expiry_at, emqx_license_checker:expiry_epoch()} - ]. +license_data() -> + #{emqx_license_expiry_at => emqx_license_checker:expiry_epoch()}. + -else. +maybe_license_add_collect_family(_, _) -> + ok. + +maybe_license_fetch_data() -> + #{}. + +maybe_license_collect_json_data(_RawData) -> + #{}. + -endif. -emqx_certs() -> +%%======================================== +%% Certs +%%======================================== + +cert_metric_meta() -> [ - emqx_cert_expiry_at + {emqx_cert_expiry_at, gauge, undefined} ]. -define(LISTENER_TYPES, [ssl, wss, quic]). --spec emqx_certs_data() -> +-spec cert_data() -> [_Point :: {[Label], Epoch}] when Label :: TypeLabel | NameLabel, TypeLabel :: {listener_type, ssl | wss | quic}, NameLabel :: {listener_name, atom()}, Epoch :: non_neg_integer(). -emqx_certs_data() -> - case emqx_config:get([listeners], undefined) of - undefined -> - []; - AllListeners when is_map(AllListeners) -> - lists:foldl( - fun(ListenerType, PointsAcc) -> - PointsAcc ++ - points_of_listeners(ListenerType, AllListeners) - end, - _PointsInitAcc = [], - ?LISTENER_TYPES - ) - end. +cert_data() -> + cert_data(emqx_config:get([listeners], undefined)). + +cert_data(undefined) -> + []; +cert_data(AllListeners) -> + Points = lists:foldl( + fun(ListenerType, PointsAcc) -> + PointsAcc ++ + points_of_listeners(ListenerType, AllListeners) + end, + _PointsInitAcc = [], + ?LISTENER_TYPES + ), + #{ + emqx_cert_expiry_at => Points + }. points_of_listeners(Type, AllListeners) -> do_points_of_listeners(Type, maps:get(Type, AllListeners, undefined)). @@ -803,24 +834,7 @@ do_points_of_listeners(ListenerType, TypeOfListeners) -> ). gen_point(Type, Name, Path) -> - { - %% Labels: [{_Labelkey, _LabelValue}] - [ - {listener_type, Type}, - {listener_name, Name} - ], - %% Value - cert_expiry_at_from_path(Path) - }. - -collect_certs_json(CertsData) -> - lists:foldl( - fun({Labels, Data}, AccIn) -> - [(maps:from_list(Labels))#{emqx_cert_expiry_at => Data} | AccIn] - end, - _InitAcc = [], - CertsData - ). + {[{listener_type, Type}, {listener_name, Name}], cert_expiry_at_from_path(Path)}. %% TODO: cert manager for more generic utils functions cert_expiry_at_from_path(Path0) -> @@ -849,6 +863,59 @@ utc_time_to_datetime(Str) -> date_to_expiry_epoch(DateTime) -> calendar:datetime_to_gregorian_seconds(DateTime) - ?EPOCH_START. +%%-------------------------------------------------------------------- +%% Collect functions +%%-------------------------------------------------------------------- + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% merge / zip formatting funcs for type `application/json` + +%% always return json array +collect_cert_json_data(Data) -> + collect_json_data_(Data). + +collect_json_data(Data0) -> + DataListPerNode = collect_json_data_(Data0), + case {?GET_PROM_DATA_MODE(), DataListPerNode} of + %% all nodes results unaggregated, should be a list + {?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED, _} -> + DataListPerNode; + %% only local node result [#{...}] + %% To guaranteed compatibility, return a json object, not array + {?PROM_DATA_MODE__NODE, [NData | _]} -> + NData; + %% All nodes results aggregated + %% return a json object, not array + {?PROM_DATA_MODE__ALL_NODES_AGGREGATED, [NData | _]} -> + NData; + %% olp maybe not enabled, with empty list to empty object + {_, []} -> + #{} + end. + +collect_json_data_(Data) -> + emqx_prometheus_cluster:collect_json_data(Data, fun zip_json_prom_stats_metrics/3). + +zip_json_prom_stats_metrics(Key, Points, [] = _AccIn) -> + lists:foldl( + fun({Lables, Metric}, AccIn2) -> + LablesKVMap = maps:from_list(Lables), + Point = LablesKVMap#{Key => Metric}, + [Point | AccIn2] + end, + [], + Points + ); +zip_json_prom_stats_metrics(Key, Points, AllResultedAcc) -> + ThisKeyResult = lists:foldl(emqx_prometheus_cluster:point_to_map_fun(Key), [], Points), + lists:zipwith(fun maps:merge/2, AllResultedAcc, ThisKeyResult). + +metrics_name(MetricsAll) -> + [Name || {Name, _, _} <- MetricsAll]. + +%%-------------------------------------------------------------------- +%% bpapi + %% deprecated_since 5.0.10, remove this when 5.1.x do_start() -> emqx_prometheus_sup:start_child(?APP). diff --git a/apps/emqx_prometheus/src/emqx_prometheus_auth.erl b/apps/emqx_prometheus/src/emqx_prometheus_auth.erl index 5fa9057da..0d0607518 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_auth.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_auth.erl @@ -81,43 +81,6 @@ -define(MG0(K, MAP), maps:get(K, MAP, 0)). -define(PG0(K, PROPLISTS), proplists:get_value(K, PROPLISTS, 0)). --define(AUTHNS_WITH_TYPE, [ - {emqx_authn_enable, gauge}, - {emqx_authn_status, gauge}, - {emqx_authn_nomatch, counter}, - {emqx_authn_total, counter}, - {emqx_authn_success, counter}, - {emqx_authn_failed, counter} -]). - --define(AUTHZS_WITH_TYPE, [ - {emqx_authz_enable, gauge}, - {emqx_authz_status, gauge}, - {emqx_authz_nomatch, counter}, - {emqx_authz_total, counter}, - {emqx_authz_success, counter}, - {emqx_authz_failed, counter} -]). - --define(AUTHN_USERS_COUNT_WITH_TYPE, [ - {emqx_authn_users_count, gauge} -]). - --define(AUTHZ_RULES_COUNT_WITH_TYPE, [ - {emqx_authz_rules_count, gauge} -]). - --define(BANNED_WITH_TYPE, [ - {emqx_banned_count, gauge} -]). - --define(LOGICAL_SUM_METRIC_NAMES, [ - emqx_authn_enable, - emqx_authn_status, - emqx_authz_enable, - emqx_authz_status -]). - %%-------------------------------------------------------------------- %% Collector API %%-------------------------------------------------------------------- @@ -132,11 +95,11 @@ deregister_cleanup(_) -> ok. %% erlfmt-ignore collect_mf(?PROMETHEUS_AUTH_REGISTRY, Callback) -> RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()), - ok = add_collect_family(Callback, ?AUTHNS_WITH_TYPE, ?MG(authn, RawData)), - ok = add_collect_family(Callback, ?AUTHN_USERS_COUNT_WITH_TYPE, ?MG(authn_users_count, RawData)), - ok = add_collect_family(Callback, ?AUTHZS_WITH_TYPE, ?MG(authz, RawData)), - ok = add_collect_family(Callback, ?AUTHZ_RULES_COUNT_WITH_TYPE, ?MG(authz_rules_count, RawData)), - ok = add_collect_family(Callback, ?BANNED_WITH_TYPE, ?MG(banned_count, RawData)), + ok = add_collect_family(Callback, authn_metric_meta(), ?MG(authn_data, RawData)), + ok = add_collect_family(Callback, authn_users_count_metric_meta(), ?MG(authn_users_count_data, RawData)), + ok = add_collect_family(Callback, authz_metric_meta(), ?MG(authz_data, RawData)), + ok = add_collect_family(Callback, authz_rules_count_metric_meta(), ?MG(authz_rules_count_data, RawData)), + ok = add_collect_family(Callback, banned_count_metric_meta(), ?MG(banned_count_data, RawData)), ok; collect_mf(_, _) -> ok. @@ -145,8 +108,8 @@ collect_mf(_, _) -> collect(<<"json">>) -> RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()), #{ - emqx_authn => collect_json_data(?MG(authn, RawData)), - emqx_authz => collect_json_data(?MG(authz, RawData)), + emqx_authn => collect_json_data(?MG(authn_data, RawData)), + emqx_authz => collect_json_data(?MG(authz_data, RawData)), emqx_banned => collect_banned_data() }; collect(<<"prometheus">>) -> @@ -165,25 +128,30 @@ collect_metrics(Name, Metrics) -> %% behaviour fetch_data_from_local_node() -> {node(self()), #{ - authn => authn_data(), - authz => authz_data() + authn_data => authn_data(), + authz_data => authz_data() }}. fetch_cluster_consistented_data() -> #{ - authn_users_count => authn_users_count_data(), - authz_rules_count => authz_rules_count_data(), - banned_count => banned_count_data() + authn_users_count_data => authn_users_count_data(), + authz_rules_count_data => authz_rules_count_data(), + banned_count_data => banned_count_data() }. aggre_or_zip_init_acc() -> #{ - authn => maps:from_keys(authn_metric_names(), []), - authz => maps:from_keys(authz_metric_names(), []) + authn_data => maps:from_keys(authn_metric(names), []), + authz_data => maps:from_keys(authz_metric(names), []) }. logic_sum_metrics() -> - ?LOGICAL_SUM_METRIC_NAMES. + [ + emqx_authn_enable, + emqx_authn_status, + emqx_authz_enable, + emqx_authz_status + ]. %%-------------------------------------------------------------------- %% Collector @@ -243,6 +211,19 @@ collect_auth(emqx_banned_count, Data) -> %%==================== %% Authn overview +authn_metric_meta() -> + [ + {emqx_authn_enable, gauge}, + {emqx_authn_status, gauge}, + {emqx_authn_nomatch, counter}, + {emqx_authn_total, counter}, + {emqx_authn_success, counter}, + {emqx_authn_failed, counter} + ]. + +authn_metric(names) -> + emqx_prometheus_cluster:metric_names(authn_metric_meta()). + -spec authn_data() -> #{Key => [Point]} when Key :: authn_metric_name(), Point :: {[Label], Metric}, @@ -256,7 +237,7 @@ authn_data() -> AccIn#{Key => authn_backend_to_points(Key, Authns)} end, #{}, - authn_metric_names() + authn_metric(names) ). -spec authn_backend_to_points(Key, list(Authn)) -> list(Point) when @@ -287,15 +268,17 @@ lookup_authn_metrics_local(Id) -> emqx_authn_failed => ?MG0(failed, Counters) }; {error, _Reason} -> - maps:from_keys(authn_metric_names() -- [emqx_authn_enable], 0) + maps:from_keys(authn_metric(names) -- [emqx_authn_enable], 0) end. -authn_metric_names() -> - emqx_prometheus_cluster:metric_names(?AUTHNS_WITH_TYPE). - %%==================== %% Authn users count +authn_users_count_metric_meta() -> + [ + {emqx_authn_users_count, gauge} + ]. + -define(AUTHN_MNESIA, emqx_authn_mnesia). -define(AUTHN_SCRAM_MNESIA, emqx_authn_scram_mnesia). @@ -321,6 +304,19 @@ authn_users_count_data() -> %%==================== %% Authz overview +authz_metric_meta() -> + [ + {emqx_authz_enable, gauge}, + {emqx_authz_status, gauge}, + {emqx_authz_nomatch, counter}, + {emqx_authz_total, counter}, + {emqx_authz_success, counter}, + {emqx_authz_failed, counter} + ]. + +authz_metric(names) -> + emqx_prometheus_cluster:metric_names(authz_metric_meta()). + -spec authz_data() -> #{Key => [Point]} when Key :: authz_metric_name(), Point :: {[Label], Metric}, @@ -334,7 +330,7 @@ authz_data() -> AccIn#{Key => authz_backend_to_points(Key, Authzs)} end, #{}, - authz_metric_names() + authz_metric(names) ). -spec authz_backend_to_points(Key, list(Authz)) -> list(Point) when @@ -365,15 +361,17 @@ lookup_authz_metrics_local(Type) -> emqx_authz_failed => ?MG0(failed, Counters) }; {error, _Reason} -> - maps:from_keys(authz_metric_names() -- [emqx_authz_enable], 0) + maps:from_keys(authz_metric(names) -- [emqx_authz_enable], 0) end. -authz_metric_names() -> - emqx_prometheus_cluster:metric_names(?AUTHZS_WITH_TYPE). - %%==================== %% Authz rules count +authz_rules_count_metric_meta() -> + [ + {emqx_authz_rules_count, gauge} + ]. + -define(ACL_TABLE, emqx_acl). authz_rules_count_data() -> @@ -400,7 +398,13 @@ authz_rules_count_data() -> %%==================== %% Banned count --define(BANNED_TABLE, emqx_banned). +banned_count_metric_meta() -> + [ + {emqx_banned_count, gauge} + ]. +-define(BANNED_TABLE, + emqx_banned +). banned_count_data() -> mnesia_size(?BANNED_TABLE). diff --git a/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl b/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl index bfd011eaa..008a029a8 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl @@ -65,65 +65,6 @@ -define(MG(K, MAP), maps:get(K, MAP)). -define(MG0(K, MAP), maps:get(K, MAP, 0)). --define(RULES_WITH_TYPE, [ - {emqx_rules_count, gauge} -]). - --define(CONNECTORS_WITH_TYPE, [ - {emqx_connectors_count, gauge} -]). - --define(RULES_SPECIFIC_WITH_TYPE, [ - {emqx_rule_enable, gauge}, - {emqx_rule_matched, counter}, - {emqx_rule_failed, counter}, - {emqx_rule_passed, counter}, - {emqx_rule_failed_exception, counter}, - {emqx_rule_failed_no_result, counter}, - {emqx_rule_actions_total, counter}, - {emqx_rule_actions_success, counter}, - {emqx_rule_actions_failed, counter}, - {emqx_rule_actions_failed_out_of_service, counter}, - {emqx_rule_actions_failed_unknown, counter} -]). - --define(ACTION_SPECIFIC_WITH_TYPE, [ - {emqx_action_matched, counter}, - {emqx_action_dropped, counter}, - {emqx_action_success, counter}, - {emqx_action_failed, counter}, - {emqx_action_inflight, gauge}, - {emqx_action_received, counter}, - {emqx_action_late_reply, counter}, - {emqx_action_retried, counter}, - {emqx_action_retried_success, counter}, - {emqx_action_retried_failed, counter}, - {emqx_action_dropped_resource_stopped, counter}, - {emqx_action_dropped_resource_not_found, counter}, - {emqx_action_dropped_queue_full, counter}, - {emqx_action_dropped_other, counter}, - {emqx_action_dropped_expired, counter}, - {emqx_action_queuing, gauge} -]). - --define(CONNECTOR_SPECIFIC_WITH_TYPE, [ - {emqx_connector_enable, gauge}, - {emqx_connector_status, gauge} -]). - --if(?EMQX_RELEASE_EDITION == ee). --define(SCHEMA_REGISTRY_WITH_TYPE, [ - emqx_schema_registrys_count -]). --else. --endif. - --define(LOGICAL_SUM_METRIC_NAMES, [ - emqx_rule_enable, - emqx_connector_enable, - emqx_connector_status -]). - %%-------------------------------------------------------------------- %% Callback for emqx_prometheus_cluster %%-------------------------------------------------------------------- @@ -132,28 +73,32 @@ fetch_data_from_local_node() -> Rules = emqx_rule_engine:get_rules(), Bridges = emqx_bridge:list(), {node(self()), #{ - rule_specific_data => rule_specific_data(Rules), - action_specific_data => action_specific_data(Bridges), - connector_specific_data => connector_specific_data(Bridges) + rule_metric_data => rule_metric_data(Rules), + action_metric_data => action_metric_data(Bridges), + connector_metric_data => connector_metric_data(Bridges) }}. fetch_cluster_consistented_data() -> Rules = emqx_rule_engine:get_rules(), Bridges = emqx_bridge:list(), (maybe_collect_schema_registry())#{ - rules_data => rules_data(Rules), - connectors_data => connectors_data(Bridges) + rules_ov_data => rules_ov_data(Rules), + connectors_ov_data => connectors_ov_data(Bridges) }. aggre_or_zip_init_acc() -> #{ - rule_specific_data => maps:from_keys(rule_specific_metric_names(), []), - action_specific_data => maps:from_keys(action_specific_metric_names(), []), - connector_specific_data => maps:from_keys(connectr_specific_metric_names(), []) + rule_metric_data => maps:from_keys(rule_metric(names), []), + action_metric_data => maps:from_keys(action_metric(names), []), + connector_metric_data => maps:from_keys(connectr_metric(names), []) }. logic_sum_metrics() -> - ?LOGICAL_SUM_METRIC_NAMES. + [ + emqx_rule_enable, + emqx_connector_enable, + emqx_connector_status + ]. %%-------------------------------------------------------------------- %% Collector API @@ -170,21 +115,23 @@ collect_mf(?PROMETHEUS_DATA_INTEGRATION_REGISTRY, Callback) -> RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()), %% Data Integration Overview - ok = add_collect_family(Callback, ?RULES_WITH_TYPE, ?MG(rules_data, RawData)), - ok = add_collect_family(Callback, ?CONNECTORS_WITH_TYPE, ?MG(connectors_data, RawData)), + ok = add_collect_family(Callback, rules_ov_metric_meta(), ?MG(rules_ov_data, RawData)), + ok = add_collect_family( + Callback, connectors_ov_metric_meta(), ?MG(connectors_ov_data, RawData) + ), ok = maybe_collect_family_schema_registry(Callback), - %% Rule Specific - RuleSpecificDs = ?MG(rule_specific_data, RawData), - ok = add_collect_family(Callback, ?RULES_SPECIFIC_WITH_TYPE, RuleSpecificDs), + %% Rule Metric + RuleMetricDs = ?MG(rule_metric_data, RawData), + ok = add_collect_family(Callback, rule_metric_meta(), RuleMetricDs), - %% Action Specific - ActionSpecificDs = ?MG(action_specific_data, RawData), - ok = add_collect_family(Callback, ?ACTION_SPECIFIC_WITH_TYPE, ActionSpecificDs), + %% Action Metric + ActionMetricDs = ?MG(action_metric_data, RawData), + ok = add_collect_family(Callback, action_metric_meta(), ActionMetricDs), - %% Connector Specific - ConnectorSpecificDs = ?MG(connector_specific_data, RawData), - ok = add_collect_family(Callback, ?CONNECTOR_SPECIFIC_WITH_TYPE, ConnectorSpecificDs), + %% Connector Metric + ConnectorMetricDs = ?MG(connector_metric_data, RawData), + ok = add_collect_family(Callback, connector_metric_meta(), ConnectorMetricDs), ok; collect_mf(_, _) -> @@ -197,9 +144,9 @@ collect(<<"json">>) -> Bridges = emqx_bridge:list(), #{ data_integration_overview => collect_data_integration_overview(Rules, Bridges), - rules => collect_json_data(?MG(rule_specific_data, RawData)), - actions => collect_json_data(?MG(action_specific_data, RawData)), - connectors => collect_json_data(?MG(connector_specific_data, RawData)) + rules => collect_json_data(?MG(rule_metric_data, RawData)), + actions => collect_json_data(?MG(action_metric_data, RawData)), + connectors => collect_json_data(?MG(connector_metric_data, RawData)) }; collect(<<"prometheus">>) -> prometheus_text_format:format(?PROMETHEUS_DATA_INTEGRATION_REGISTRY). @@ -218,21 +165,6 @@ add_collect_family(Name, Data, Callback, Type) -> collect_metrics(Name, Metrics) -> collect_di(Name, Metrics). --if(?EMQX_RELEASE_EDITION == ee). -maybe_collect_family_schema_registry(Callback) -> - ok = add_collect_family(Callback, ?SCHEMA_REGISTRY_WITH_TYPE, schema_registry_data()), - ok. - -maybe_collect_schema_registry() -> - schema_registry_data(). --else. -maybe_collect_family_schema_registry(_) -> - ok. - -maybe_collect_schema_registry() -> - #{}. --endif. - %%-------------------------------------------------------------------- %% Collector %%-------------------------------------------------------------------- @@ -244,88 +176,54 @@ maybe_collect_schema_registry() -> %%==================== %% All Rules %% Rules -collect_di(K = emqx_rules_count, Data) -> - gauge_metric(?MG(K, Data)); +collect_di(K = emqx_rules_count, Data) -> gauge_metric(?MG(K, Data)); %%==================== %% Schema Registry -collect_di(K = emqx_schema_registrys_count, Data) -> - gauge_metric(?MG(K, Data)); +collect_di(K = emqx_schema_registrys_count, Data) -> gauge_metric(?MG(K, Data)); %%==================== %% Connectors -collect_di(K = emqx_connectors_count, Data) -> - gauge_metric(?MG(K, Data)); +collect_di(K = emqx_connectors_count, Data) -> gauge_metric(?MG(K, Data)); %%======================================== -%% Data Integration for Specific: Rule && Action && Connector +%% Data Integration Metric for: Rule && Action && Connector %%======================================== %%==================== -%% Specific Rule -collect_di(K = emqx_rule_enable, Data) -> - gauge_metrics(?MG(K, Data)); -collect_di(K = emqx_rule_matched, Data) -> - counter_metrics(?MG(K, Data)); -collect_di(K = emqx_rule_failed, Data) -> - counter_metrics(?MG(K, Data)); -collect_di(K = emqx_rule_passed, Data) -> - counter_metrics(?MG(K, Data)); -collect_di(K = emqx_rule_failed_exception, Data) -> - counter_metrics(?MG(K, Data)); -collect_di(K = emqx_rule_failed_no_result, Data) -> - counter_metrics(?MG(K, Data)); -collect_di(K = emqx_rule_actions_total, Data) -> - counter_metrics(?MG(K, Data)); -collect_di(K = emqx_rule_actions_success, Data) -> - counter_metrics(?MG(K, Data)); -collect_di(K = emqx_rule_actions_failed, Data) -> - counter_metrics(?MG(K, Data)); -collect_di(K = emqx_rule_actions_failed_out_of_service, Data) -> - counter_metrics(?MG(K, Data)); -collect_di(K = emqx_rule_actions_failed_unknown, Data) -> - counter_metrics(?MG(K, Data)); +%% Rule Metric +collect_di(K = emqx_rule_enable, Data) -> gauge_metrics(?MG(K, Data)); +collect_di(K = emqx_rule_matched, Data) -> counter_metrics(?MG(K, Data)); +collect_di(K = emqx_rule_failed, Data) -> counter_metrics(?MG(K, Data)); +collect_di(K = emqx_rule_passed, Data) -> counter_metrics(?MG(K, Data)); +collect_di(K = emqx_rule_failed_exception, Data) -> counter_metrics(?MG(K, Data)); +collect_di(K = emqx_rule_failed_no_result, Data) -> counter_metrics(?MG(K, Data)); +collect_di(K = emqx_rule_actions_total, Data) -> counter_metrics(?MG(K, Data)); +collect_di(K = emqx_rule_actions_success, Data) -> counter_metrics(?MG(K, Data)); +collect_di(K = emqx_rule_actions_failed, Data) -> counter_metrics(?MG(K, Data)); +collect_di(K = emqx_rule_actions_failed_out_of_service, Data) -> counter_metrics(?MG(K, Data)); +collect_di(K = emqx_rule_actions_failed_unknown, Data) -> counter_metrics(?MG(K, Data)); %%==================== -%% Specific Action - -collect_di(K = emqx_action_matched, Data) -> - counter_metrics(?MG(K, Data)); -collect_di(K = emqx_action_dropped, Data) -> - counter_metrics(?MG(K, Data)); -collect_di(K = emqx_action_success, Data) -> - counter_metrics(?MG(K, Data)); -collect_di(K = emqx_action_failed, Data) -> - counter_metrics(?MG(K, Data)); -collect_di(K = emqx_action_inflight, Data) -> - %% inflight type: gauge - gauge_metrics(?MG(K, Data)); -collect_di(K = emqx_action_received, Data) -> - counter_metrics(?MG(K, Data)); -collect_di(K = emqx_action_late_reply, Data) -> - counter_metrics(?MG(K, Data)); -collect_di(K = emqx_action_retried, Data) -> - counter_metrics(?MG(K, Data)); -collect_di(K = emqx_action_retried_success, Data) -> - counter_metrics(?MG(K, Data)); -collect_di(K = emqx_action_retried_failed, Data) -> - counter_metrics(?MG(K, Data)); -collect_di(K = emqx_action_dropped_resource_stopped, Data) -> - counter_metrics(?MG(K, Data)); -collect_di(K = emqx_action_dropped_resource_not_found, Data) -> - counter_metrics(?MG(K, Data)); -collect_di(K = emqx_action_dropped_queue_full, Data) -> - counter_metrics(?MG(K, Data)); -collect_di(K = emqx_action_dropped_other, Data) -> - counter_metrics(?MG(K, Data)); -collect_di(K = emqx_action_dropped_expired, Data) -> - counter_metrics(?MG(K, Data)); -collect_di(K = emqx_action_queuing, Data) -> - %% queuing type: gauge - gauge_metrics(?MG(K, Data)); +%% Action Metric +collect_di(K = emqx_action_matched, Data) -> counter_metrics(?MG(K, Data)); +collect_di(K = emqx_action_dropped, Data) -> counter_metrics(?MG(K, Data)); +collect_di(K = emqx_action_success, Data) -> counter_metrics(?MG(K, Data)); +collect_di(K = emqx_action_failed, Data) -> counter_metrics(?MG(K, Data)); +%% inflight type: gauge +collect_di(K = emqx_action_inflight, Data) -> gauge_metrics(?MG(K, Data)); +collect_di(K = emqx_action_received, Data) -> counter_metrics(?MG(K, Data)); +collect_di(K = emqx_action_late_reply, Data) -> counter_metrics(?MG(K, Data)); +collect_di(K = emqx_action_retried, Data) -> counter_metrics(?MG(K, Data)); +collect_di(K = emqx_action_retried_success, Data) -> counter_metrics(?MG(K, Data)); +collect_di(K = emqx_action_retried_failed, Data) -> counter_metrics(?MG(K, Data)); +collect_di(K = emqx_action_dropped_resource_stopped, Data) -> counter_metrics(?MG(K, Data)); +collect_di(K = emqx_action_dropped_resource_not_found, Data) -> counter_metrics(?MG(K, Data)); +collect_di(K = emqx_action_dropped_queue_full, Data) -> counter_metrics(?MG(K, Data)); +collect_di(K = emqx_action_dropped_other, Data) -> counter_metrics(?MG(K, Data)); +collect_di(K = emqx_action_dropped_expired, Data) -> counter_metrics(?MG(K, Data)); +%% queuing type: gauge +collect_di(K = emqx_action_queuing, Data) -> gauge_metrics(?MG(K, Data)); %%==================== -%% Specific Connector - -collect_di(K = emqx_connector_enable, Data) -> - gauge_metrics(?MG(K, Data)); -collect_di(K = emqx_connector_status, Data) -> - gauge_metrics(?MG(K, Data)). +%% Connector Metric +collect_di(K = emqx_connector_enable, Data) -> gauge_metrics(?MG(K, Data)); +collect_di(K = emqx_connector_status, Data) -> gauge_metrics(?MG(K, Data)). %%-------------------------------------------------------------------- %% Internal functions @@ -338,8 +236,16 @@ collect_di(K = emqx_connector_status, Data) -> %%==================== %% All Rules +rules_ov_metric_meta() -> + [ + {emqx_rules_count, gauge} + ]. + +rules_ov_metric(names) -> + emqx_prometheus_cluster:metric_names(rules_ov_metric_meta()). + -define(RULE_TAB, emqx_rule_engine). -rules_data(_Rules) -> +rules_ov_data(_Rules) -> #{ emqx_rules_count => ets:info(?RULE_TAB, size) }. @@ -348,36 +254,83 @@ rules_data(_Rules) -> %% Schema Registry -if(?EMQX_RELEASE_EDITION == ee). + +maybe_collect_family_schema_registry(Callback) -> + ok = add_collect_family(Callback, schema_registry_metric_meta(), schema_registry_data()), + ok. + +schema_registry_metric_meta() -> + [ + {emqx_schema_registrys_count, gauge} + ]. + schema_registry_data() -> #{ emqx_schema_registrys_count => erlang:map_size(emqx_schema_registry:list_schemas()) }. + +maybe_collect_schema_registry() -> + schema_registry_data(). + -else. + +maybe_collect_family_schema_registry(_) -> + ok. + +maybe_collect_schema_registry() -> + #{}. + -endif. %%==================== %% Connectors -connectors_data(Brdiges) -> +connectors_ov_metric_meta() -> + [ + {emqx_connectors_count, gauge} + ]. + +connectors_ov_metric(names) -> + emqx_prometheus_cluster:metric_names(connectors_ov_metric_meta()). + +connectors_ov_data(Brdiges) -> #{ %% Both Bridge V1 and V2 emqx_connectors_count => erlang:length(Brdiges) }. %%======================================== -%% Data Integration for Specific: Rule && Action && Connector +%% Data Integration Metric for: Rule && Action && Connector %%======================================== %%==================== -%% Specific Rule +%% Rule Metric %% With rule_id as label key: `rule_id` -rule_specific_data(Rules) -> +rule_metric_meta() -> + [ + {emqx_rule_enable, gauge}, + {emqx_rule_matched, counter}, + {emqx_rule_failed, counter}, + {emqx_rule_passed, counter}, + {emqx_rule_failed_exception, counter}, + {emqx_rule_failed_no_result, counter}, + {emqx_rule_actions_total, counter}, + {emqx_rule_actions_success, counter}, + {emqx_rule_actions_failed, counter}, + {emqx_rule_actions_failed_out_of_service, counter}, + {emqx_rule_actions_failed_unknown, counter} + ]. + +rule_metric(names) -> + emqx_prometheus_cluster:metric_names(rule_metric_meta()). + +rule_metric_data(Rules) -> lists:foldl( fun(#{id := Id} = Rule, AccIn) -> merge_acc_with_rules(Id, get_metric(Rule), AccIn) end, - maps:from_keys(rule_specific_metric_names(), []), + maps:from_keys(rule_metric(names), []), Rules ). @@ -413,20 +366,40 @@ get_metric(#{id := Id, enable := Bool} = _Rule) -> } end. -rule_specific_metric_names() -> - emqx_prometheus_cluster:metric_names(?RULES_SPECIFIC_WITH_TYPE). - %%==================== -%% Specific Action +%% Action Metric %% With action_id: `{type}:{name}` as label key: `action_id` -action_specific_data(Bridges) -> +action_metric_meta() -> + [ + {emqx_action_matched, counter}, + {emqx_action_dropped, counter}, + {emqx_action_success, counter}, + {emqx_action_failed, counter}, + {emqx_action_inflight, gauge}, + {emqx_action_received, counter}, + {emqx_action_late_reply, counter}, + {emqx_action_retried, counter}, + {emqx_action_retried_success, counter}, + {emqx_action_retried_failed, counter}, + {emqx_action_dropped_resource_stopped, counter}, + {emqx_action_dropped_resource_not_found, counter}, + {emqx_action_dropped_queue_full, counter}, + {emqx_action_dropped_other, counter}, + {emqx_action_dropped_expired, counter}, + {emqx_action_queuing, gauge} + ]. + +action_metric(names) -> + emqx_prometheus_cluster:metric_names(action_metric_meta()). + +action_metric_data(Bridges) -> lists:foldl( fun(#{type := Type, name := Name} = _Bridge, AccIn) -> Id = emqx_bridge_resource:bridge_id(Type, Name), merge_acc_with_bridges(Id, get_bridge_metric(Type, Name), AccIn) end, - maps:from_keys(action_specific_metric_names(), []), + maps:from_keys(action_metric(names), []), Bridges ). @@ -467,20 +440,26 @@ get_bridge_metric(Type, Name) -> } end. -action_specific_metric_names() -> - emqx_prometheus_cluster:metric_names(?ACTION_SPECIFIC_WITH_TYPE). - %%==================== -%% Specific Connector +%% Connector Metric %% With connector_id: `{type}:{name}` as label key: `connector_id` -connector_specific_data(Bridges) -> +connector_metric_meta() -> + [ + {emqx_connector_enable, gauge}, + {emqx_connector_status, gauge} + ]. + +connectr_metric(names) -> + emqx_prometheus_cluster:metric_names(connector_metric_meta()). + +connector_metric_data(Bridges) -> lists:foldl( fun(#{type := Type, name := Name} = Bridge, AccIn) -> Id = emqx_bridge_resource:bridge_id(Type, Name), merge_acc_with_connectors(Id, get_connector_status(Bridge), AccIn) end, - maps:from_keys(connectr_specific_metric_names(), []), + maps:from_keys(connectr_metric(names), []), Bridges ). @@ -504,9 +483,6 @@ get_connector_status(#{resource_data := ResourceData} = _Bridge) -> emqx_connector_status => emqx_prometheus_cluster:status_to_number(Status) }. -connectr_specific_metric_names() -> - emqx_prometheus_cluster:metric_names(?CONNECTOR_SPECIFIC_WITH_TYPE). - %%-------------------------------------------------------------------- %% Collect functions %%-------------------------------------------------------------------- @@ -514,18 +490,18 @@ connectr_specific_metric_names() -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% merge / zip formatting funcs for type `application/json` collect_data_integration_overview(Rules, Bridges) -> - RulesD = rules_data(Rules), - ConnectorsD = connectors_data(Bridges), + RulesD = rules_ov_data(Rules), + ConnectorsD = connectors_ov_data(Bridges), M1 = lists:foldl( fun(K, AccIn) -> AccIn#{K => ?MG(K, RulesD)} end, #{}, - emqx_prometheus_cluster:metric_names(?RULES_WITH_TYPE) + rules_ov_metric(names) ), M2 = lists:foldl( fun(K, AccIn) -> AccIn#{K => ?MG(K, ConnectorsD)} end, #{}, - emqx_prometheus_cluster:metric_names(?CONNECTORS_WITH_TYPE) + connectors_ov_metric(names) ), M3 = maybe_collect_schema_registry(),