Merge pull request #11497 from zhongwencool/prometheus-metrics
Prometheus metrics
This commit is contained in:
commit
2322b27542
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([add_handler/0, remove_handler/0, pre_config_update/3]).
|
-export([add_handler/0, remove_handler/0, pre_config_update/3]).
|
||||||
|
-export([is_olp_enabled/0]).
|
||||||
|
|
||||||
-define(ZONES, [zones]).
|
-define(ZONES, [zones]).
|
||||||
|
|
||||||
|
@ -33,3 +34,13 @@ remove_handler() ->
|
||||||
%% replace the old config with the new config
|
%% replace the old config with the new config
|
||||||
pre_config_update(?ZONES, NewRaw, _OldRaw) ->
|
pre_config_update(?ZONES, NewRaw, _OldRaw) ->
|
||||||
{ok, NewRaw}.
|
{ok, NewRaw}.
|
||||||
|
|
||||||
|
is_olp_enabled() ->
|
||||||
|
maps:fold(
|
||||||
|
fun
|
||||||
|
(_, #{overload_protection := #{enable := true}}, _Acc) -> true;
|
||||||
|
(_, _, Acc) -> Acc
|
||||||
|
end,
|
||||||
|
false,
|
||||||
|
emqx_config:get([zones], #{})
|
||||||
|
).
|
||||||
|
|
|
@ -67,6 +67,7 @@
|
||||||
terminate/2,
|
terminate/2,
|
||||||
code_change/3
|
code_change/3
|
||||||
]).
|
]).
|
||||||
|
-export([olp_metrics/0]).
|
||||||
|
|
||||||
%% BACKW: v4.3.0
|
%% BACKW: v4.3.0
|
||||||
-export([upgrade_retained_delayed_counter_type/0]).
|
-export([upgrade_retained_delayed_counter_type/0]).
|
||||||
|
@ -269,13 +270,16 @@
|
||||||
|
|
||||||
%% Overload protection counters
|
%% Overload protection counters
|
||||||
-define(OLP_METRICS, [
|
-define(OLP_METRICS, [
|
||||||
{counter, 'olp.delay.ok'},
|
{counter, 'overload_protection.delay.ok'},
|
||||||
{counter, 'olp.delay.timeout'},
|
{counter, 'overload_protection.delay.timeout'},
|
||||||
{counter, 'olp.hbn'},
|
{counter, 'overload_protection.hibernation'},
|
||||||
{counter, 'olp.gc'},
|
{counter, 'overload_protection.gc'},
|
||||||
{counter, 'olp.new_conn'}
|
{counter, 'overload_protection.new_conn'}
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
olp_metrics() ->
|
||||||
|
lists:map(fun({_, Metric}) -> Metric end, ?OLP_METRICS).
|
||||||
|
|
||||||
-record(state, {next_idx = 1}).
|
-record(state, {next_idx = 1}).
|
||||||
|
|
||||||
-record(metric, {name, type, idx}).
|
-record(metric, {name, type, idx}).
|
||||||
|
@ -701,9 +705,9 @@ reserved_idx('authorization.cache_hit') -> 302;
|
||||||
reserved_idx('authentication.success') -> 310;
|
reserved_idx('authentication.success') -> 310;
|
||||||
reserved_idx('authentication.success.anonymous') -> 311;
|
reserved_idx('authentication.success.anonymous') -> 311;
|
||||||
reserved_idx('authentication.failure') -> 312;
|
reserved_idx('authentication.failure') -> 312;
|
||||||
reserved_idx('olp.delay.ok') -> 400;
|
reserved_idx('overload_protection.delay.ok') -> 400;
|
||||||
reserved_idx('olp.delay.timeout') -> 401;
|
reserved_idx('overload_protection.delay.timeout') -> 401;
|
||||||
reserved_idx('olp.hbn') -> 402;
|
reserved_idx('overload_protection.hibernation') -> 402;
|
||||||
reserved_idx('olp.gc') -> 403;
|
reserved_idx('overload_protection.gc') -> 403;
|
||||||
reserved_idx('olp.new_conn') -> 404;
|
reserved_idx('overload_protection.new_conn') -> 404;
|
||||||
reserved_idx(_) -> undefined.
|
reserved_idx(_) -> undefined.
|
||||||
|
|
|
@ -38,11 +38,11 @@
|
||||||
| backoff_new_conn.
|
| backoff_new_conn.
|
||||||
|
|
||||||
-type cnt_name() ::
|
-type cnt_name() ::
|
||||||
'olp.delay.ok'
|
'overload_protection.delay.ok'
|
||||||
| 'olp.delay.timeout'
|
| 'overload_protection.delay.timeout'
|
||||||
| 'olp.hbn'
|
| 'overload_protection.hibernation'
|
||||||
| 'olp.gc'
|
| 'overload_protection.gc'
|
||||||
| 'olp.new_conn'.
|
| 'overload_protection.new_conn'.
|
||||||
|
|
||||||
-define(overload_protection, overload_protection).
|
-define(overload_protection, overload_protection).
|
||||||
|
|
||||||
|
@ -63,10 +63,10 @@ backoff(Zone) ->
|
||||||
false ->
|
false ->
|
||||||
false;
|
false;
|
||||||
ok ->
|
ok ->
|
||||||
emqx_metrics:inc('olp.delay.ok'),
|
emqx_metrics:inc('overload_protection.delay.ok'),
|
||||||
ok;
|
ok;
|
||||||
timeout ->
|
timeout ->
|
||||||
emqx_metrics:inc('olp.delay.timeout'),
|
emqx_metrics:inc('overload_protection.delay.timeout'),
|
||||||
timeout
|
timeout
|
||||||
end;
|
end;
|
||||||
_ ->
|
_ ->
|
||||||
|
@ -76,18 +76,18 @@ backoff(Zone) ->
|
||||||
%% @doc If forceful GC should be skipped when the system is overloaded.
|
%% @doc If forceful GC should be skipped when the system is overloaded.
|
||||||
-spec backoff_gc(Zone :: atom()) -> boolean().
|
-spec backoff_gc(Zone :: atom()) -> boolean().
|
||||||
backoff_gc(Zone) ->
|
backoff_gc(Zone) ->
|
||||||
do_check(Zone, ?FUNCTION_NAME, 'olp.gc').
|
do_check(Zone, ?FUNCTION_NAME, 'overload_protection.gc').
|
||||||
|
|
||||||
%% @doc If hibernation should be skipped when the system is overloaded.
|
%% @doc If hibernation should be skipped when the system is overloaded.
|
||||||
-spec backoff_hibernation(Zone :: atom()) -> boolean().
|
-spec backoff_hibernation(Zone :: atom()) -> boolean().
|
||||||
backoff_hibernation(Zone) ->
|
backoff_hibernation(Zone) ->
|
||||||
do_check(Zone, ?FUNCTION_NAME, 'olp.hbn').
|
do_check(Zone, ?FUNCTION_NAME, 'overload_protection.hibernation').
|
||||||
|
|
||||||
%% @doc Returns {error, overloaded} if new connection should be
|
%% @doc Returns {error, overloaded} if new connection should be
|
||||||
%% closed when system is overloaded.
|
%% closed when system is overloaded.
|
||||||
-spec backoff_new_conn(Zone :: atom()) -> ok | {error, overloaded}.
|
-spec backoff_new_conn(Zone :: atom()) -> ok | {error, overloaded}.
|
||||||
backoff_new_conn(Zone) ->
|
backoff_new_conn(Zone) ->
|
||||||
case do_check(Zone, ?FUNCTION_NAME, 'olp.new_conn') of
|
case do_check(Zone, ?FUNCTION_NAME, 'overload_protection.new_conn') of
|
||||||
true ->
|
true ->
|
||||||
{error, overloaded};
|
{error, overloaded};
|
||||||
false ->
|
false ->
|
||||||
|
|
|
@ -118,7 +118,7 @@ new_conn(
|
||||||
{stop, stream_accept_error, S}
|
{stop, stream_accept_error, S}
|
||||||
end;
|
end;
|
||||||
true ->
|
true ->
|
||||||
emqx_metrics:inc('olp.new_conn'),
|
emqx_metrics:inc('overload_protection.new_conn'),
|
||||||
_ = quicer:async_shutdown_connection(
|
_ = quicer:async_shutdown_connection(
|
||||||
Conn,
|
Conn,
|
||||||
?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE,
|
?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE,
|
||||||
|
|
|
@ -177,7 +177,9 @@ names() ->
|
||||||
emqx_subscriptions_shared_count,
|
emqx_subscriptions_shared_count,
|
||||||
emqx_subscriptions_shared_max,
|
emqx_subscriptions_shared_max,
|
||||||
emqx_retained_count,
|
emqx_retained_count,
|
||||||
emqx_retained_max
|
emqx_retained_max,
|
||||||
|
emqx_delayed_count,
|
||||||
|
emqx_delayed_max
|
||||||
].
|
].
|
||||||
|
|
||||||
%% @doc Get stats by name.
|
%% @doc Get stats by name.
|
||||||
|
|
|
@ -85,10 +85,32 @@ create_metric_views() ->
|
||||||
create_gauge(Meter, VmGauge, fun ?MODULE:get_vm_gauge/1),
|
create_gauge(Meter, VmGauge, fun ?MODULE:get_vm_gauge/1),
|
||||||
ClusterGauge = [{'node.running', 0}, {'node.stopped', 0}],
|
ClusterGauge = [{'node.running', 0}, {'node.stopped', 0}],
|
||||||
create_gauge(Meter, ClusterGauge, fun ?MODULE:get_cluster_gauge/1),
|
create_gauge(Meter, ClusterGauge, fun ?MODULE:get_cluster_gauge/1),
|
||||||
Metrics = lists:map(fun({K, V}) -> {K, V, unit(K)} end, emqx_metrics:all()),
|
Metrics0 = filter_olp_metrics(emqx_metrics:all()),
|
||||||
|
Metrics = lists:map(fun({K, V}) -> {to_metric_name(K), V, unit(K)} end, Metrics0),
|
||||||
create_counter(Meter, Metrics, fun ?MODULE:get_metric_counter/1),
|
create_counter(Meter, Metrics, fun ?MODULE:get_metric_counter/1),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
filter_olp_metrics(Metrics) ->
|
||||||
|
case emqx_config_zones:is_olp_enabled() of
|
||||||
|
true ->
|
||||||
|
Metrics;
|
||||||
|
false ->
|
||||||
|
OlpMetrics = emqx_metrics:olp_metrics(),
|
||||||
|
lists:filter(
|
||||||
|
fun({K, _}) ->
|
||||||
|
not lists:member(K, OlpMetrics)
|
||||||
|
end,
|
||||||
|
Metrics
|
||||||
|
)
|
||||||
|
end.
|
||||||
|
|
||||||
|
to_metric_name('messages.dropped.await_pubrel_timeout') ->
|
||||||
|
'messages.dropped.expired';
|
||||||
|
to_metric_name('packets.connect.received') ->
|
||||||
|
'packets.connect';
|
||||||
|
to_metric_name(Name) ->
|
||||||
|
Name.
|
||||||
|
|
||||||
unit(K) ->
|
unit(K) ->
|
||||||
case lists:member(K, bytes_metrics()) of
|
case lists:member(K, bytes_metrics()) of
|
||||||
true -> kb;
|
true -> kb;
|
||||||
|
|
|
@ -168,6 +168,9 @@ collect_mf(_Registry, Callback) ->
|
||||||
_ = [add_collect_family(Name, Metrics, Callback, counter) || Name <- emqx_metrics_delivery()],
|
_ = [add_collect_family(Name, Metrics, Callback, counter) || Name <- emqx_metrics_delivery()],
|
||||||
_ = [add_collect_family(Name, Metrics, Callback, counter) || Name <- emqx_metrics_client()],
|
_ = [add_collect_family(Name, Metrics, Callback, counter) || Name <- emqx_metrics_client()],
|
||||||
_ = [add_collect_family(Name, Metrics, Callback, counter) || Name <- emqx_metrics_session()],
|
_ = [add_collect_family(Name, Metrics, Callback, counter) || Name <- emqx_metrics_session()],
|
||||||
|
_ = [add_collect_family(Name, Metrics, Callback, counter) || Name <- emqx_metrics_olp()],
|
||||||
|
_ = [add_collect_family(Name, Metrics, Callback, counter) || Name <- emqx_metrics_acl()],
|
||||||
|
_ = [add_collect_family(Name, Metrics, Callback, counter) || Name <- emqx_metrics_authn()],
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
|
@ -228,6 +231,10 @@ emqx_collect(emqx_sessions_count, Stats) ->
|
||||||
gauge_metric(?C('sessions.count', Stats));
|
gauge_metric(?C('sessions.count', Stats));
|
||||||
emqx_collect(emqx_sessions_max, Stats) ->
|
emqx_collect(emqx_sessions_max, Stats) ->
|
||||||
gauge_metric(?C('sessions.max', Stats));
|
gauge_metric(?C('sessions.max', Stats));
|
||||||
|
emqx_collect(emqx_channels_count, Stats) ->
|
||||||
|
gauge_metric(?C('channels.count', Stats));
|
||||||
|
emqx_collect(emqx_channels_max, Stats) ->
|
||||||
|
gauge_metric(?C('channels.max', Stats));
|
||||||
%% pub/sub stats
|
%% pub/sub stats
|
||||||
emqx_collect(emqx_topics_count, Stats) ->
|
emqx_collect(emqx_topics_count, Stats) ->
|
||||||
gauge_metric(?C('topics.count', Stats));
|
gauge_metric(?C('topics.count', Stats));
|
||||||
|
@ -254,6 +261,11 @@ emqx_collect(emqx_retained_count, Stats) ->
|
||||||
gauge_metric(?C('retained.count', Stats));
|
gauge_metric(?C('retained.count', Stats));
|
||||||
emqx_collect(emqx_retained_max, Stats) ->
|
emqx_collect(emqx_retained_max, Stats) ->
|
||||||
gauge_metric(?C('retained.max', Stats));
|
gauge_metric(?C('retained.max', Stats));
|
||||||
|
%% delayed
|
||||||
|
emqx_collect(emqx_delayed_count, Stats) ->
|
||||||
|
gauge_metric(?C('delayed.count', Stats));
|
||||||
|
emqx_collect(emqx_delayed_max, Stats) ->
|
||||||
|
gauge_metric(?C('delayed.max', Stats));
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Metrics - packets & bytes
|
%% Metrics - packets & bytes
|
||||||
|
|
||||||
|
@ -408,7 +420,10 @@ emqx_collect(emqx_delivery_dropped_expired, Stats) ->
|
||||||
counter_metric(?C('delivery.dropped.expired', Stats));
|
counter_metric(?C('delivery.dropped.expired', Stats));
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Metrics - client
|
%% Metrics - client
|
||||||
|
emqx_collect(emqx_client_connect, Stats) ->
|
||||||
|
counter_metric(?C('client.connect', Stats));
|
||||||
|
emqx_collect(emqx_client_connack, Stats) ->
|
||||||
|
counter_metric(?C('client.connack', Stats));
|
||||||
emqx_collect(emqx_client_connected, Stats) ->
|
emqx_collect(emqx_client_connected, Stats) ->
|
||||||
counter_metric(?C('client.connected', Stats));
|
counter_metric(?C('client.connected', Stats));
|
||||||
emqx_collect(emqx_client_authenticate, Stats) ->
|
emqx_collect(emqx_client_authenticate, Stats) ->
|
||||||
|
@ -437,6 +452,43 @@ emqx_collect(emqx_session_discarded, Stats) ->
|
||||||
emqx_collect(emqx_session_terminated, Stats) ->
|
emqx_collect(emqx_session_terminated, Stats) ->
|
||||||
counter_metric(?C('session.terminated', Stats));
|
counter_metric(?C('session.terminated', Stats));
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
%% Metrics - overload protection
|
||||||
|
emqx_collect(emqx_overload_protection_delay_ok, Stats) ->
|
||||||
|
counter_metric(?C('overload_protection.delay.ok', Stats));
|
||||||
|
emqx_collect(emqx_overload_protection_delay_timeout, Stats) ->
|
||||||
|
counter_metric(?C('overload_protection.delay.timeout', Stats));
|
||||||
|
emqx_collect(emqx_overload_protection_hibernation, Stats) ->
|
||||||
|
counter_metric(?C('overload_protection.hibernation', Stats));
|
||||||
|
emqx_collect(emqx_overload_protection_gc, Stats) ->
|
||||||
|
counter_metric(?C('overload_protection.gc', Stats));
|
||||||
|
emqx_collect(emqx_overload_protection_new_conn, Stats) ->
|
||||||
|
counter_metric(?C('overload_protection.new_conn', Stats));
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Metrics - acl
|
||||||
|
emqx_collect(emqx_authorization_allow, Stats) ->
|
||||||
|
counter_metric(?C('authorization.allow', Stats));
|
||||||
|
emqx_collect(emqx_authorization_deny, Stats) ->
|
||||||
|
counter_metric(?C('authorization.deny', Stats));
|
||||||
|
emqx_collect(emqx_authorization_cache_hit, Stats) ->
|
||||||
|
counter_metric(?C('authorization.cache_hit', Stats));
|
||||||
|
emqx_collect(emqx_authorization_superuser, Stats) ->
|
||||||
|
counter_metric(?C('authorization.superuser', Stats));
|
||||||
|
emqx_collect(emqx_authorization_nomatch, Stats) ->
|
||||||
|
counter_metric(?C('authorization.nomatch', Stats));
|
||||||
|
emqx_collect(emqx_authorization_matched_allow, Stats) ->
|
||||||
|
counter_metric(?C('authorization.matched_allow', Stats));
|
||||||
|
emqx_collect(emqx_authorization_matched_deny, Stats) ->
|
||||||
|
counter_metric(?C('authorization.matched_deny', Stats));
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Metrics - authn
|
||||||
|
emqx_collect(emqx_authentication_success, Stats) ->
|
||||||
|
counter_metric(?C('authentication.success', Stats));
|
||||||
|
emqx_collect(emqx_authentication_success_anonymous, Stats) ->
|
||||||
|
counter_metric(?C('authentication.success.anonymous', Stats));
|
||||||
|
emqx_collect(emqx_authentication_failure, Stats) ->
|
||||||
|
counter_metric(?C('authentication.failure', Stats));
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
%% VM
|
%% VM
|
||||||
|
|
||||||
emqx_collect(emqx_vm_cpu_use, VMData) ->
|
emqx_collect(emqx_vm_cpu_use, VMData) ->
|
||||||
|
@ -506,6 +558,38 @@ emqx_metrics_packets() ->
|
||||||
emqx_packets_auth_sent
|
emqx_packets_auth_sent
|
||||||
].
|
].
|
||||||
|
|
||||||
|
emqx_metrics_olp() ->
|
||||||
|
case emqx_config_zones:is_olp_enabled() of
|
||||||
|
true ->
|
||||||
|
[
|
||||||
|
emqx_overload_protection_delay_ok,
|
||||||
|
emqx_overload_protection_delay_timeout,
|
||||||
|
emqx_overload_protection_hibernation,
|
||||||
|
emqx_overload_protection_gc,
|
||||||
|
emqx_overload_protection_new_conn
|
||||||
|
];
|
||||||
|
false ->
|
||||||
|
[]
|
||||||
|
end.
|
||||||
|
|
||||||
|
emqx_metrics_acl() ->
|
||||||
|
[
|
||||||
|
emqx_authorization_allow,
|
||||||
|
emqx_authorization_deny,
|
||||||
|
emqx_authorization_cache_hit,
|
||||||
|
emqx_authorization_superuser,
|
||||||
|
emqx_authorization_nomatch,
|
||||||
|
emqx_authorization_matched_allow,
|
||||||
|
emqx_authorization_matched_deny
|
||||||
|
].
|
||||||
|
|
||||||
|
emqx_metrics_authn() ->
|
||||||
|
[
|
||||||
|
emqx_authentication_success,
|
||||||
|
emqx_authentication_success_anonymous,
|
||||||
|
emqx_authentication_failure
|
||||||
|
].
|
||||||
|
|
||||||
emqx_metrics_messages() ->
|
emqx_metrics_messages() ->
|
||||||
[
|
[
|
||||||
emqx_messages_received,
|
emqx_messages_received,
|
||||||
|
@ -539,6 +623,8 @@ emqx_metrics_delivery() ->
|
||||||
|
|
||||||
emqx_metrics_client() ->
|
emqx_metrics_client() ->
|
||||||
[
|
[
|
||||||
|
emqx_client_connect,
|
||||||
|
emqx_client_connack,
|
||||||
emqx_client_connected,
|
emqx_client_connected,
|
||||||
emqx_client_authenticate,
|
emqx_client_authenticate,
|
||||||
emqx_client_auth_anonymous,
|
emqx_client_auth_anonymous,
|
||||||
|
|
|
@ -0,0 +1,2 @@
|
||||||
|
Enhanced broker metrics collection and export by adding new metrics for messages, overload protection, authorization, authentication,
|
||||||
|
and improving naming consistency for OpenTelemetry.
|
Loading…
Reference in New Issue