emqx/apps/emqx_prometheus/src/emqx_prometheus.erl

596 lines
21 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_prometheus).
-behaviour(gen_server).
%% Please don't remove this attribute, it will
%% be used by the prometheus application
-behaviour(prometheus_collector).
-include_lib("prometheus/include/prometheus.hrl").
-include_lib("prometheus/include/prometheus_model.hrl").
-import(prometheus_model_helpers,
[ create_mf/5
, gauge_metric/1
, counter_metric/1
]).
%% APIs
-export([start_link/1]).
%% gen_server callbacks
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, code_change/3
, terminate/2
]).
%% prometheus_collector callback
-export([ deregister_cleanup/1
, collect_mf/2
, collect_metrics/2
]).
-export([collect/1]).
-define(C(K, L), proplists:get_value(K, L, 0)).
-define(TIMER_MSG, '#interval').
-record(state, {push_gateway, timer, interval}).
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
start_link(Opts) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []).
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
init([Opts]) ->
Interval = maps:get(interval, Opts),
PushGateway = maps:get(push_gateway_server, Opts),
{ok, ensure_timer(#state{push_gateway = PushGateway, interval = Interval})}.
handle_call(_Msg, _From, State) ->
{noreply, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({timeout, R, ?TIMER_MSG}, State = #state{timer=R, push_gateway=Uri}) ->
[Name, Ip] = string:tokens(atom_to_list(node()), "@"),
Url = lists:concat([Uri, "/metrics/job/", Name, "/instance/",Name, "~", Ip]),
Data = prometheus_text_format:format(),
httpc:request(post, {Url, [], "text/plain", Data}, [{autoredirect, true}], []),
{noreply, ensure_timer(State)};
handle_info(_Msg, State) ->
{noreply, State}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
terminate(_Reason, _State) ->
ok.
ensure_timer(State = #state{interval = Interval}) ->
State#state{timer = emqx_misc:start_timer(Interval, ?TIMER_MSG)}.
%%--------------------------------------------------------------------
%% prometheus callbacks
%%--------------------------------------------------------------------
deregister_cleanup(_Registry) ->
ok.
collect_mf(_Registry, Callback) ->
Metrics = emqx_metrics:all(),
Stats = emqx_stats:getstats(),
VMData = emqx_vm_data(),
ClusterData = emqx_cluster_data(),
_ = [add_collect_family(Name, Stats, Callback, gauge) || Name <- emqx_stats()],
_ = [add_collect_family(Name, VMData, Callback, gauge) || Name <- emqx_vm()],
_ = [add_collect_family(Name, ClusterData, Callback, gauge) || Name <- emqx_cluster()],
_ = [add_collect_family(Name, Metrics, Callback, counter) || Name <- emqx_metrics_packets()],
_ = [add_collect_family(Name, Metrics, Callback, counter) || Name <- emqx_metrics_messages()],
_ = [add_collect_family(Name, Metrics, Callback, counter) || Name <- emqx_metrics_delivery()],
_ = [add_collect_family(Name, Metrics, Callback, counter) || Name <- emqx_metrics_client()],
_ = [add_collect_family(Name, Metrics, Callback, counter) || Name <- emqx_metrics_session()],
ok.
%% @private
collect(<<"json">>) ->
Metrics = emqx_metrics:all(),
Stats = emqx_stats:getstats(),
VMData = emqx_vm_data(),
#{stats => maps:from_list([collect_stats(Name, Stats) || Name <- emqx_stats()]),
metrics => maps:from_list([collect_stats(Name, VMData) || Name <- emqx_vm()]),
packets => maps:from_list([collect_stats(Name, Metrics) || Name <- emqx_metrics_packets()]),
messages => maps:from_list([collect_stats(Name, Metrics) || Name <- emqx_metrics_messages()]),
delivery => maps:from_list([collect_stats(Name, Metrics) || Name <- emqx_metrics_delivery()]),
client => maps:from_list([collect_stats(Name, Metrics) || Name <- emqx_metrics_client()]),
session => maps:from_list([collect_stats(Name, Metrics) || Name <- emqx_metrics_session()])};
collect(<<"prometheus">>) ->
prometheus_text_format:format().
%% @private
collect_stats(Name, Stats) ->
R = collect_metrics(Name, Stats),
case R#'Metric'.gauge of
undefined ->
{_, Val} = R#'Metric'.counter,
{Name, Val};
{_, Val} ->
{Name, Val}
end.
collect_metrics(Name, Metrics) ->
emqx_collect(Name, Metrics).
add_collect_family(Name, Data, Callback, Type) ->
Callback(create_schema(Name, <<"">>, Data, Type)).
create_schema(Name, Help, Data, Type) ->
create_mf(Name, Help, Type, ?MODULE, Data).
%%--------------------------------------------------------------------
%% Collector
%%--------------------------------------------------------------------
%%--------------------------------------------------------------------
%% Stats
%% connections
emqx_collect(emqx_connections_count, Stats) ->
gauge_metric(?C('connections.count', Stats));
emqx_collect(emqx_connections_max, Stats) ->
gauge_metric(?C('connections.max', Stats));
%% sessions
emqx_collect(emqx_sessions_count, Stats) ->
gauge_metric(?C('sessions.count', Stats));
emqx_collect(emqx_sessions_max, Stats) ->
gauge_metric(?C('sessions.max', Stats));
%% pub/sub stats
emqx_collect(emqx_topics_count, Stats) ->
gauge_metric(?C('topics.count', Stats));
emqx_collect(emqx_topics_max, Stats) ->
gauge_metric(?C('topics.max', Stats));
emqx_collect(emqx_suboptions_count, Stats) ->
gauge_metric(?C('suboptions.count', Stats));
emqx_collect(emqx_suboptions_max, Stats) ->
gauge_metric(?C('suboptions.max', Stats));
emqx_collect(emqx_subscribers_count, Stats) ->
gauge_metric(?C('subscribers.count', Stats));
emqx_collect(emqx_subscribers_max, Stats) ->
gauge_metric(?C('subscribers.max', Stats));
emqx_collect(emqx_subscriptions_count, Stats) ->
gauge_metric(?C('subscriptions.count', Stats));
emqx_collect(emqx_subscriptions_max, Stats) ->
gauge_metric(?C('subscriptions.max', Stats));
emqx_collect(emqx_subscriptions_shared_count, Stats) ->
gauge_metric(?C('subscriptions.shared.count', Stats));
emqx_collect(emqx_subscriptions_shared_max, Stats) ->
gauge_metric(?C('subscriptions.shared.max', Stats));
%% routes
emqx_collect(emqx_routes_count, Stats) ->
gauge_metric(?C('routes.count', Stats));
emqx_collect(emqx_routes_max, Stats) ->
gauge_metric(?C('routes.max', Stats));
%% retained
emqx_collect(emqx_retained_count, Stats) ->
gauge_metric(?C('retained.count', Stats));
emqx_collect(emqx_retained_max, Stats) ->
gauge_metric(?C('retained.max', Stats));
%%--------------------------------------------------------------------
%% Metrics - packets & bytes
%% bytes
emqx_collect(emqx_bytes_received, Metrics) ->
counter_metric(?C('bytes.received', Metrics));
emqx_collect(emqx_bytes_sent, Metrics) ->
counter_metric(?C('bytes.sent', Metrics));
%% received.sent
emqx_collect(emqx_packets_received, Metrics) ->
counter_metric(?C('packets.received', Metrics));
emqx_collect(emqx_packets_sent, Metrics) ->
counter_metric(?C('packets.sent', Metrics));
%% connect
emqx_collect(emqx_packets_connect, Metrics) ->
counter_metric(?C('packets.connect.received', Metrics));
emqx_collect(emqx_packets_connack_sent, Metrics) ->
counter_metric(?C('packets.connack.sent', Metrics));
emqx_collect(emqx_packets_connack_error, Metrics) ->
counter_metric(?C('packets.connack.error', Metrics));
emqx_collect(emqx_packets_connack_auth_error, Metrics) ->
counter_metric(?C('packets.connack.auth_error', Metrics));
%% sub.unsub
emqx_collect(emqx_packets_subscribe_received, Metrics) ->
counter_metric(?C('packets.subscribe.received', Metrics));
emqx_collect(emqx_packets_subscribe_auth_error, Metrics) ->
counter_metric(?C('packets.subscribe.auth_error', Metrics));
emqx_collect(emqx_packets_subscribe_error, Metrics) ->
counter_metric(?C('packets.subscribe.error', Metrics));
emqx_collect(emqx_packets_suback_sent, Metrics) ->
counter_metric(?C('packets.suback.sent', Metrics));
emqx_collect(emqx_packets_unsubscribe_received, Metrics) ->
counter_metric(?C('packets.unsubscribe.received', Metrics));
emqx_collect(emqx_packets_unsubscribe_error, Metrics) ->
counter_metric(?C('packets.unsubscribe.error', Metrics));
emqx_collect(emqx_packets_unsuback_sent, Metrics) ->
counter_metric(?C('packets.unsuback.sent', Metrics));
%% publish.puback
emqx_collect(emqx_packets_publish_received, Metrics) ->
counter_metric(?C('packets.publish.received', Metrics));
emqx_collect(emqx_packets_publish_sent, Metrics) ->
counter_metric(?C('packets.publish.sent', Metrics));
emqx_collect(emqx_packets_publish_inuse, Metrics) ->
counter_metric(?C('packets.publish.inuse', Metrics));
emqx_collect(emqx_packets_publish_error, Metrics) ->
counter_metric(?C('packets.publish.error', Metrics));
emqx_collect(emqx_packets_publish_auth_error, Metrics) ->
counter_metric(?C('packets.publish.auth_error', Metrics));
emqx_collect(emqx_packets_publish_dropped, Metrics) ->
counter_metric(?C('packets.publish.dropped', Metrics));
%% puback
emqx_collect(emqx_packets_puback_received, Metrics) ->
counter_metric(?C('packets.puback.received', Metrics));
emqx_collect(emqx_packets_puback_sent, Metrics) ->
counter_metric(?C('packets.puback.sent', Metrics));
emqx_collect(emqx_packets_puback_inuse, Metrics) ->
counter_metric(?C('packets.puback.inuse', Metrics));
emqx_collect(emqx_packets_puback_missed, Metrics) ->
counter_metric(?C('packets.puback.missed', Metrics));
%% pubrec
emqx_collect(emqx_packets_pubrec_received, Metrics) ->
counter_metric(?C('packets.pubrec.received', Metrics));
emqx_collect(emqx_packets_pubrec_sent, Metrics) ->
counter_metric(?C('packets.pubrec.sent', Metrics));
emqx_collect(emqx_packets_pubrec_inuse, Metrics) ->
counter_metric(?C('packets.pubrec.inuse', Metrics));
emqx_collect(emqx_packets_pubrec_missed, Metrics) ->
counter_metric(?C('packets.pubrec.missed', Metrics));
%% pubrel
emqx_collect(emqx_packets_pubrel_received, Metrics) ->
counter_metric(?C('packets.pubrel.received', Metrics));
emqx_collect(emqx_packets_pubrel_sent, Metrics) ->
counter_metric(?C('packets.pubrel.sent', Metrics));
emqx_collect(emqx_packets_pubrel_missed, Metrics) ->
counter_metric(?C('packets.pubrel.missed', Metrics));
%% pubcomp
emqx_collect(emqx_packets_pubcomp_received, Metrics) ->
counter_metric(?C('packets.pubcomp.received', Metrics));
emqx_collect(emqx_packets_pubcomp_sent, Metrics) ->
counter_metric(?C('packets.pubcomp.sent', Metrics));
emqx_collect(emqx_packets_pubcomp_inuse, Metrics) ->
counter_metric(?C('packets.pubcomp.inuse', Metrics));
emqx_collect(emqx_packets_pubcomp_missed, Metrics) ->
counter_metric(?C('packets.pubcomp.missed', Metrics));
%% pingreq
emqx_collect(emqx_packets_pingreq_received, Metrics) ->
counter_metric(?C('packets.pingreq.received', Metrics));
emqx_collect(emqx_packets_pingresp_sent, Metrics) ->
counter_metric(?C('packets.pingresp.sent', Metrics));
%% disconnect
emqx_collect(emqx_packets_disconnect_received, Metrics) ->
counter_metric(?C('packets.disconnect.received', Metrics));
emqx_collect(emqx_packets_disconnect_sent, Metrics) ->
counter_metric(?C('packets.disconnect.sent', Metrics));
%% auth
emqx_collect(emqx_packets_auth_received, Metrics) ->
counter_metric(?C('packets.auth.received', Metrics));
emqx_collect(emqx_packets_auth_sent, Metrics) ->
counter_metric(?C('packets.auth.sent', Metrics));
%%--------------------------------------------------------------------
%% Metrics - messages
%% messages
emqx_collect(emqx_messages_received, Metrics) ->
counter_metric(?C('messages.received', Metrics));
emqx_collect(emqx_messages_sent, Metrics) ->
counter_metric(?C('messages.sent', Metrics));
emqx_collect(emqx_messages_qos0_received, Metrics) ->
counter_metric(?C('messages.qos0.received', Metrics));
emqx_collect(emqx_messages_qos0_sent, Metrics) ->
counter_metric(?C('messages.qos0.sent', Metrics));
emqx_collect(emqx_messages_qos1_received, Metrics) ->
counter_metric(?C('messages.qos1.received', Metrics));
emqx_collect(emqx_messages_qos1_sent, Metrics) ->
counter_metric(?C('messages.qos1.sent', Metrics));
emqx_collect(emqx_messages_qos2_received, Metrics) ->
counter_metric(?C('messages.qos2.received', Metrics));
emqx_collect(emqx_messages_qos2_sent, Metrics) ->
counter_metric(?C('messages.qos2.sent', Metrics));
emqx_collect(emqx_messages_publish, Metrics) ->
counter_metric(?C('messages.publish', Metrics));
emqx_collect(emqx_messages_dropped, Metrics) ->
counter_metric(?C('messages.dropped', Metrics));
emqx_collect(emqx_messages_dropped_expired, Metrics) ->
counter_metric(?C('messages.dropped.expired', Metrics));
emqx_collect(emqx_messages_dropped_no_subscribers, Metrics) ->
counter_metric(?C('messages.dropped.no_subscribers', Metrics));
emqx_collect(emqx_messages_forward, Metrics) ->
counter_metric(?C('messages.forward', Metrics));
emqx_collect(emqx_messages_retained, Metrics) ->
counter_metric(?C('messages.retained', Metrics));
emqx_collect(emqx_messages_delayed, Stats) ->
counter_metric(?C('messages.delayed', Stats));
emqx_collect(emqx_messages_delivered, Stats) ->
counter_metric(?C('messages.delivered', Stats));
emqx_collect(emqx_messages_acked, Stats) ->
counter_metric(?C('messages.acked', Stats));
%%--------------------------------------------------------------------
%% Metrics - delivery
emqx_collect(emqx_delivery_dropped, Stats) ->
counter_metric(?C('delivery.dropped', Stats));
emqx_collect(emqx_delivery_dropped_no_local, Stats) ->
counter_metric(?C('delivery.dropped.no_local', Stats));
emqx_collect(emqx_delivery_dropped_too_large, Stats) ->
counter_metric(?C('delivery.dropped.too_large', Stats));
emqx_collect(emqx_delivery_dropped_qos0_msg, Stats) ->
counter_metric(?C('delivery.dropped.qos0_msg', Stats));
emqx_collect(emqx_delivery_dropped_queue_full, Stats) ->
counter_metric(?C('delivery.dropped.queue_full', Stats));
emqx_collect(emqx_delivery_dropped_expired, Stats) ->
counter_metric(?C('delivery.dropped.expired', Stats));
%%--------------------------------------------------------------------
%% Metrics - client
emqx_collect(emqx_client_connected, Stats) ->
counter_metric(?C('client.connected', Stats));
emqx_collect(emqx_client_authenticate, Stats) ->
counter_metric(?C('client.authenticate', Stats));
emqx_collect(emqx_client_auth_anonymous, Stats) ->
counter_metric(?C('client.auth.anonymous', Stats));
emqx_collect(emqx_client_authorize, Stats) ->
counter_metric(?C('client.authorize', Stats));
emqx_collect(emqx_client_subscribe, Stats) ->
counter_metric(?C('client.subscribe', Stats));
emqx_collect(emqx_client_unsubscribe, Stats) ->
counter_metric(?C('client.unsubscribe', Stats));
emqx_collect(emqx_client_disconnected, Stats) ->
counter_metric(?C('client.disconnected', Stats));
%%--------------------------------------------------------------------
%% Metrics - session
emqx_collect(emqx_session_created, Stats) ->
counter_metric(?C('session.created', Stats));
emqx_collect(emqx_session_resumed, Stats) ->
counter_metric(?C('session.resumed', Stats));
emqx_collect(emqx_session_takenover, Stats) ->
counter_metric(?C('session.takenover', Stats));
emqx_collect(emqx_session_discarded, Stats) ->
counter_metric(?C('session.discarded', Stats));
emqx_collect(emqx_session_terminated, Stats) ->
counter_metric(?C('session.terminated', Stats));
%%--------------------------------------------------------------------
%% VM
emqx_collect(emqx_vm_cpu_use, VMData) ->
gauge_metric(?C(cpu_use, VMData));
emqx_collect(emqx_vm_cpu_idle, VMData) ->
gauge_metric(?C(cpu_idle, VMData));
emqx_collect(emqx_vm_run_queue, VMData) ->
gauge_metric(?C(run_queue, VMData));
emqx_collect(emqx_vm_process_messages_in_queues, VMData) ->
gauge_metric(?C(process_total_messages, VMData));
emqx_collect(emqx_vm_total_memory, VMData) ->
gauge_metric(?C(total_memory, VMData));
emqx_collect(emqx_vm_used_memory, VMData) ->
gauge_metric(?C(used_memory, VMData));
emqx_collect(emqx_cluster_nodes_running, ClusterData) ->
gauge_metric(?C(nodes_running, ClusterData));
emqx_collect(emqx_cluster_nodes_stopped, ClusterData) ->
gauge_metric(?C(nodes_stopped, ClusterData)).
%%--------------------------------------------------------------------
%% Indicators
%%--------------------------------------------------------------------
emqx_stats() ->
[ emqx_connections_count
, emqx_connections_max
, emqx_sessions_count
, emqx_sessions_max
, emqx_topics_count
, emqx_topics_max
, emqx_suboptions_count
, emqx_suboptions_max
, emqx_subscribers_count
, emqx_subscribers_max
, emqx_subscriptions_count
, emqx_subscriptions_max
, emqx_subscriptions_shared_count
, emqx_subscriptions_shared_max
, emqx_routes_count
, emqx_routes_max
, emqx_retained_count
, emqx_retained_max
].
emqx_metrics_packets() ->
[ emqx_bytes_received
, emqx_bytes_sent
, emqx_packets_received
, emqx_packets_sent
, emqx_packets_connect
, emqx_packets_connack_sent
, emqx_packets_connack_error
, emqx_packets_connack_auth_error
, emqx_packets_publish_received
, emqx_packets_publish_sent
, emqx_packets_publish_inuse
, emqx_packets_publish_error
, emqx_packets_publish_auth_error
, emqx_packets_publish_dropped
, emqx_packets_puback_received
, emqx_packets_puback_sent
, emqx_packets_puback_inuse
, emqx_packets_puback_missed
, emqx_packets_pubrec_received
, emqx_packets_pubrec_sent
, emqx_packets_pubrec_inuse
, emqx_packets_pubrec_missed
, emqx_packets_pubrel_received
, emqx_packets_pubrel_sent
, emqx_packets_pubrel_missed
, emqx_packets_pubcomp_received
, emqx_packets_pubcomp_sent
, emqx_packets_pubcomp_inuse
, emqx_packets_pubcomp_missed
, emqx_packets_subscribe_received
, emqx_packets_subscribe_error
, emqx_packets_subscribe_auth_error
, emqx_packets_suback_sent
, emqx_packets_unsubscribe_received
, emqx_packets_unsubscribe_error
, emqx_packets_unsuback_sent
, emqx_packets_pingreq_received
, emqx_packets_pingresp_sent
, emqx_packets_disconnect_received
, emqx_packets_disconnect_sent
, emqx_packets_auth_received
, emqx_packets_auth_sent
].
emqx_metrics_messages() ->
[ emqx_messages_received
, emqx_messages_sent
, emqx_messages_qos0_received
, emqx_messages_qos0_sent
, emqx_messages_qos1_received
, emqx_messages_qos1_sent
, emqx_messages_qos2_received
, emqx_messages_qos2_sent
, emqx_messages_publish
, emqx_messages_dropped
, emqx_messages_dropped_expired
, emqx_messages_dropped_no_subscribers
, emqx_messages_forward
, emqx_messages_retained
, emqx_messages_delayed
, emqx_messages_delivered
, emqx_messages_acked
].
emqx_metrics_delivery() ->
[ emqx_delivery_dropped
, emqx_delivery_dropped_no_local
, emqx_delivery_dropped_too_large
, emqx_delivery_dropped_qos0_msg
, emqx_delivery_dropped_queue_full
, emqx_delivery_dropped_expired
].
emqx_metrics_client() ->
[ emqx_client_connected
, emqx_client_authenticate
, emqx_client_auth_anonymous
, emqx_client_authorize
, emqx_client_subscribe
, emqx_client_unsubscribe
, emqx_client_disconnected
].
emqx_metrics_session() ->
[ emqx_session_created
, emqx_session_resumed
, emqx_session_takenover
, emqx_session_discarded
, emqx_session_terminated
].
emqx_vm() ->
[ emqx_vm_cpu_use
, emqx_vm_cpu_idle
, emqx_vm_run_queue
, emqx_vm_process_messages_in_queues
, emqx_vm_total_memory
, emqx_vm_used_memory
].
emqx_vm_data() ->
Idle = case cpu_sup:util([detailed]) of
{_, 0, 0, _} -> 0; %% Not support for Windows
{_Num, _Use, IdleList, _} -> ?C(idle, IdleList)
end,
RunQueue = erlang:statistics(run_queue),
[{run_queue, RunQueue},
{process_total_messages, 0}, %% XXX: Plan removed at v5.0
{cpu_idle, Idle},
{cpu_use, 100 - Idle}] ++ emqx_vm:mem_info().
emqx_cluster() ->
[ emqx_cluster_nodes_running
, emqx_cluster_nodes_stopped
].
emqx_cluster_data() ->
#{running_nodes := Running, stopped_nodes := Stopped} = mria_mnesia:cluster_info(),
[{nodes_running, length(Running)},
{nodes_stopped, length(Stopped)}].