diff --git a/apps/emqx/src/config/emqx_config_zones.erl b/apps/emqx/src/config/emqx_config_zones.erl index 57e2824ff..c367e2198 100644 --- a/apps/emqx/src/config/emqx_config_zones.erl +++ b/apps/emqx/src/config/emqx_config_zones.erl @@ -19,6 +19,7 @@ %% API -export([add_handler/0, remove_handler/0, pre_config_update/3]). +-export([is_olp_enabled/0]). -define(ZONES, [zones]). @@ -33,3 +34,13 @@ remove_handler() -> %% replace the old config with the new config pre_config_update(?ZONES, NewRaw, _OldRaw) -> {ok, NewRaw}. + +is_olp_enabled() -> + maps:fold( + fun + (_, #{overload_protection := #{enable := true}}, _Acc) -> true; + (_, _, Acc) -> Acc + end, + false, + emqx_config:get([zones], #{}) + ). diff --git a/apps/emqx/src/emqx_metrics.erl b/apps/emqx/src/emqx_metrics.erl index 6b46af257..5f5deadd3 100644 --- a/apps/emqx/src/emqx_metrics.erl +++ b/apps/emqx/src/emqx_metrics.erl @@ -67,6 +67,7 @@ terminate/2, code_change/3 ]). +-export([olp_metrics/0]). %% BACKW: v4.3.0 -export([upgrade_retained_delayed_counter_type/0]). @@ -269,13 +270,16 @@ %% Overload protection counters -define(OLP_METRICS, [ - {counter, 'olp.delay.ok'}, - {counter, 'olp.delay.timeout'}, - {counter, 'olp.hbn'}, - {counter, 'olp.gc'}, - {counter, 'olp.new_conn'} + {counter, 'overload_protection.delay.ok'}, + {counter, 'overload_protection.delay.timeout'}, + {counter, 'overload_protection.hibernation'}, + {counter, 'overload_protection.gc'}, + {counter, 'overload_protection.new_conn'} ]). +olp_metrics() -> + lists:map(fun({_, Metric}) -> Metric end, ?OLP_METRICS). + -record(state, {next_idx = 1}). -record(metric, {name, type, idx}). @@ -701,9 +705,9 @@ reserved_idx('authorization.cache_hit') -> 302; reserved_idx('authentication.success') -> 310; reserved_idx('authentication.success.anonymous') -> 311; reserved_idx('authentication.failure') -> 312; -reserved_idx('olp.delay.ok') -> 400; -reserved_idx('olp.delay.timeout') -> 401; -reserved_idx('olp.hbn') -> 402; -reserved_idx('olp.gc') -> 403; -reserved_idx('olp.new_conn') -> 404; +reserved_idx('overload_protection.delay.ok') -> 400; +reserved_idx('overload_protection.delay.timeout') -> 401; +reserved_idx('overload_protection.hibernation') -> 402; +reserved_idx('overload_protection.gc') -> 403; +reserved_idx('overload_protection.new_conn') -> 404; reserved_idx(_) -> undefined. diff --git a/apps/emqx/src/emqx_olp.erl b/apps/emqx/src/emqx_olp.erl index 5a4775896..5ca35d8b4 100644 --- a/apps/emqx/src/emqx_olp.erl +++ b/apps/emqx/src/emqx_olp.erl @@ -38,11 +38,11 @@ | backoff_new_conn. -type cnt_name() :: - 'olp.delay.ok' - | 'olp.delay.timeout' - | 'olp.hbn' - | 'olp.gc' - | 'olp.new_conn'. + 'overload_protection.delay.ok' + | 'overload_protection.delay.timeout' + | 'overload_protection.hibernation' + | 'overload_protection.gc' + | 'overload_protection.new_conn'. -define(overload_protection, overload_protection). @@ -63,10 +63,10 @@ backoff(Zone) -> false -> false; ok -> - emqx_metrics:inc('olp.delay.ok'), + emqx_metrics:inc('overload_protection.delay.ok'), ok; timeout -> - emqx_metrics:inc('olp.delay.timeout'), + emqx_metrics:inc('overload_protection.delay.timeout'), timeout end; _ -> @@ -76,18 +76,18 @@ backoff(Zone) -> %% @doc If forceful GC should be skipped when the system is overloaded. -spec backoff_gc(Zone :: atom()) -> boolean(). backoff_gc(Zone) -> - do_check(Zone, ?FUNCTION_NAME, 'olp.gc'). + do_check(Zone, ?FUNCTION_NAME, 'overload_protection.gc'). %% @doc If hibernation should be skipped when the system is overloaded. -spec backoff_hibernation(Zone :: atom()) -> boolean(). backoff_hibernation(Zone) -> - do_check(Zone, ?FUNCTION_NAME, 'olp.hbn'). + do_check(Zone, ?FUNCTION_NAME, 'overload_protection.hibernation'). %% @doc Returns {error, overloaded} if new connection should be %% closed when system is overloaded. -spec backoff_new_conn(Zone :: atom()) -> ok | {error, overloaded}. backoff_new_conn(Zone) -> - case do_check(Zone, ?FUNCTION_NAME, 'olp.new_conn') of + case do_check(Zone, ?FUNCTION_NAME, 'overload_protection.new_conn') of true -> {error, overloaded}; false -> diff --git a/apps/emqx/src/emqx_quic_connection.erl b/apps/emqx/src/emqx_quic_connection.erl index a77ec28f2..7ddf05af3 100644 --- a/apps/emqx/src/emqx_quic_connection.erl +++ b/apps/emqx/src/emqx_quic_connection.erl @@ -118,7 +118,7 @@ new_conn( {stop, stream_accept_error, S} end; true -> - emqx_metrics:inc('olp.new_conn'), + emqx_metrics:inc('overload_protection.new_conn'), _ = quicer:async_shutdown_connection( Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, diff --git a/apps/emqx/src/emqx_stats.erl b/apps/emqx/src/emqx_stats.erl index e590577da..dfd3115f0 100644 --- a/apps/emqx/src/emqx_stats.erl +++ b/apps/emqx/src/emqx_stats.erl @@ -177,7 +177,9 @@ names() -> emqx_subscriptions_shared_count, emqx_subscriptions_shared_max, emqx_retained_count, - emqx_retained_max + emqx_retained_max, + emqx_delayed_count, + emqx_delayed_max ]. %% @doc Get stats by name. diff --git a/apps/emqx_opentelemetry/src/emqx_otel.erl b/apps/emqx_opentelemetry/src/emqx_otel.erl index a9d9af0ea..408fd4c58 100644 --- a/apps/emqx_opentelemetry/src/emqx_otel.erl +++ b/apps/emqx_opentelemetry/src/emqx_otel.erl @@ -85,10 +85,32 @@ create_metric_views() -> create_gauge(Meter, VmGauge, fun ?MODULE:get_vm_gauge/1), ClusterGauge = [{'node.running', 0}, {'node.stopped', 0}], create_gauge(Meter, ClusterGauge, fun ?MODULE:get_cluster_gauge/1), - Metrics = lists:map(fun({K, V}) -> {K, V, unit(K)} end, emqx_metrics:all()), + Metrics0 = filter_olp_metrics(emqx_metrics:all()), + Metrics = lists:map(fun({K, V}) -> {to_metric_name(K), V, unit(K)} end, Metrics0), create_counter(Meter, Metrics, fun ?MODULE:get_metric_counter/1), ok. +filter_olp_metrics(Metrics) -> + case emqx_config_zones:is_olp_enabled() of + true -> + Metrics; + false -> + OlpMetrics = emqx_metrics:olp_metrics(), + lists:filter( + fun({K, _}) -> + not lists:member(K, OlpMetrics) + end, + Metrics + ) + end. + +to_metric_name('messages.dropped.await_pubrel_timeout') -> + 'messages.dropped.expired'; +to_metric_name('packets.connect.received') -> + 'packets.connect'; +to_metric_name(Name) -> + Name. + unit(K) -> case lists:member(K, bytes_metrics()) of true -> kb; diff --git a/apps/emqx_prometheus/src/emqx_prometheus.erl b/apps/emqx_prometheus/src/emqx_prometheus.erl index ac902ca55..e9030d3ed 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus.erl @@ -168,6 +168,9 @@ collect_mf(_Registry, Callback) -> _ = [add_collect_family(Name, Metrics, Callback, counter) || Name <- emqx_metrics_delivery()], _ = [add_collect_family(Name, Metrics, Callback, counter) || Name <- emqx_metrics_client()], _ = [add_collect_family(Name, Metrics, Callback, counter) || Name <- emqx_metrics_session()], + _ = [add_collect_family(Name, Metrics, Callback, counter) || Name <- emqx_metrics_olp()], + _ = [add_collect_family(Name, Metrics, Callback, counter) || Name <- emqx_metrics_acl()], + _ = [add_collect_family(Name, Metrics, Callback, counter) || Name <- emqx_metrics_authn()], ok. %% @private @@ -228,6 +231,10 @@ emqx_collect(emqx_sessions_count, Stats) -> gauge_metric(?C('sessions.count', Stats)); emqx_collect(emqx_sessions_max, Stats) -> gauge_metric(?C('sessions.max', Stats)); +emqx_collect(emqx_channels_count, Stats) -> + gauge_metric(?C('channels.count', Stats)); +emqx_collect(emqx_channels_max, Stats) -> + gauge_metric(?C('channels.max', Stats)); %% pub/sub stats emqx_collect(emqx_topics_count, Stats) -> gauge_metric(?C('topics.count', Stats)); @@ -254,6 +261,11 @@ emqx_collect(emqx_retained_count, Stats) -> gauge_metric(?C('retained.count', Stats)); emqx_collect(emqx_retained_max, Stats) -> gauge_metric(?C('retained.max', Stats)); +%% delayed +emqx_collect(emqx_delayed_count, Stats) -> + gauge_metric(?C('delayed.count', Stats)); +emqx_collect(emqx_delayed_max, Stats) -> + gauge_metric(?C('delayed.max', Stats)); %%-------------------------------------------------------------------- %% Metrics - packets & bytes @@ -408,7 +420,10 @@ emqx_collect(emqx_delivery_dropped_expired, Stats) -> counter_metric(?C('delivery.dropped.expired', Stats)); %%-------------------------------------------------------------------- %% Metrics - client - +emqx_collect(emqx_client_connect, Stats) -> + counter_metric(?C('client.connect', Stats)); +emqx_collect(emqx_client_connack, Stats) -> + counter_metric(?C('client.connack', Stats)); emqx_collect(emqx_client_connected, Stats) -> counter_metric(?C('client.connected', Stats)); emqx_collect(emqx_client_authenticate, Stats) -> @@ -437,6 +452,43 @@ emqx_collect(emqx_session_discarded, Stats) -> emqx_collect(emqx_session_terminated, Stats) -> counter_metric(?C('session.terminated', Stats)); %%-------------------------------------------------------------------- + +%% Metrics - overload protection +emqx_collect(emqx_overload_protection_delay_ok, Stats) -> + counter_metric(?C('overload_protection.delay.ok', Stats)); +emqx_collect(emqx_overload_protection_delay_timeout, Stats) -> + counter_metric(?C('overload_protection.delay.timeout', Stats)); +emqx_collect(emqx_overload_protection_hibernation, Stats) -> + counter_metric(?C('overload_protection.hibernation', Stats)); +emqx_collect(emqx_overload_protection_gc, Stats) -> + counter_metric(?C('overload_protection.gc', Stats)); +emqx_collect(emqx_overload_protection_new_conn, Stats) -> + counter_metric(?C('overload_protection.new_conn', Stats)); +%%-------------------------------------------------------------------- +%% Metrics - acl +emqx_collect(emqx_authorization_allow, Stats) -> + counter_metric(?C('authorization.allow', Stats)); +emqx_collect(emqx_authorization_deny, Stats) -> + counter_metric(?C('authorization.deny', Stats)); +emqx_collect(emqx_authorization_cache_hit, Stats) -> + counter_metric(?C('authorization.cache_hit', Stats)); +emqx_collect(emqx_authorization_superuser, Stats) -> + counter_metric(?C('authorization.superuser', Stats)); +emqx_collect(emqx_authorization_nomatch, Stats) -> + counter_metric(?C('authorization.nomatch', Stats)); +emqx_collect(emqx_authorization_matched_allow, Stats) -> + counter_metric(?C('authorization.matched_allow', Stats)); +emqx_collect(emqx_authorization_matched_deny, Stats) -> + counter_metric(?C('authorization.matched_deny', Stats)); +%%-------------------------------------------------------------------- +%% Metrics - authn +emqx_collect(emqx_authentication_success, Stats) -> + counter_metric(?C('authentication.success', Stats)); +emqx_collect(emqx_authentication_success_anonymous, Stats) -> + counter_metric(?C('authentication.success.anonymous', Stats)); +emqx_collect(emqx_authentication_failure, Stats) -> + counter_metric(?C('authentication.failure', Stats)); +%%-------------------------------------------------------------------- %% VM emqx_collect(emqx_vm_cpu_use, VMData) -> @@ -506,6 +558,38 @@ emqx_metrics_packets() -> emqx_packets_auth_sent ]. +emqx_metrics_olp() -> + case emqx_config_zones:is_olp_enabled() of + true -> + [ + emqx_overload_protection_delay_ok, + emqx_overload_protection_delay_timeout, + emqx_overload_protection_hibernation, + emqx_overload_protection_gc, + emqx_overload_protection_new_conn + ]; + false -> + [] + end. + +emqx_metrics_acl() -> + [ + emqx_authorization_allow, + emqx_authorization_deny, + emqx_authorization_cache_hit, + emqx_authorization_superuser, + emqx_authorization_nomatch, + emqx_authorization_matched_allow, + emqx_authorization_matched_deny + ]. + +emqx_metrics_authn() -> + [ + emqx_authentication_success, + emqx_authentication_success_anonymous, + emqx_authentication_failure + ]. + emqx_metrics_messages() -> [ emqx_messages_received, @@ -539,6 +623,8 @@ emqx_metrics_delivery() -> emqx_metrics_client() -> [ + emqx_client_connect, + emqx_client_connack, emqx_client_connected, emqx_client_authenticate, emqx_client_auth_anonymous, diff --git a/changes/ce/feat-11497.en.md b/changes/ce/feat-11497.en.md new file mode 100644 index 000000000..30ef73fb1 --- /dev/null +++ b/changes/ce/feat-11497.en.md @@ -0,0 +1,2 @@ +Enhanced broker metrics collection and export by adding new metrics for messages, overload protection, authorization, authentication, +and improving naming consistency for OpenTelemetry.