Merge remote-tracking branch 'origin/develop'

This commit is contained in:
zhanghongtong 2019-06-06 13:47:12 +08:00
commit 5025b2f65d
16 changed files with 439 additions and 311 deletions

View File

@ -75,6 +75,8 @@ start() ->
restart(ConfFile) ->
reload_config(ConfFile),
shutdown(),
ok = application:stop(mnesia),
application:start(mnesia),
reboot().
%% @doc Stop emqx application.

View File

@ -295,7 +295,7 @@ dispatch({shard, I}, Topic, Msg) ->
inc_dropped_cnt(<<"$SYS/", _/binary>>) ->
ok;
inc_dropped_cnt(_Topic) ->
emqx_metrics:inc('messages/dropped').
emqx_metrics:inc('messages.dropped').
-spec(subscribers(emqx_topic:topic()) -> [pid()]).
subscribers(Topic) when is_binary(Topic) ->
@ -377,9 +377,9 @@ topics() ->
%%------------------------------------------------------------------------------
stats_fun() ->
safe_update_stats(?SUBSCRIBER, 'subscribers/count', 'subscribers/max'),
safe_update_stats(?SUBSCRIPTION, 'subscriptions/count', 'subscriptions/max'),
safe_update_stats(?SUBOPTION, 'suboptions/count', 'suboptions/max').
safe_update_stats(?SUBSCRIBER, 'subscribers.count', 'subscribers.max'),
safe_update_stats(?SUBSCRIPTION, 'subscriptions.count', 'subscriptions.max'),
safe_update_stats(?SUBOPTION, 'suboptions.count', 'suboptions.max').
safe_update_stats(Tab, Stat, MaxStat) ->
case ets:info(Tab, size) of

View File

@ -205,5 +205,5 @@ clean_down({Pid, ClientId}) ->
stats_fun() ->
case ets:info(?CONN_TAB, size) of
undefined -> ok;
Size -> emqx_stats:setstat('connections/count', 'connections/max', Size)
Size -> emqx_stats:setstat('connections.count', 'connections.max', Size)
end.

View File

@ -219,7 +219,7 @@ connected(enter, _, _State) ->
%% Handle Input
connected(cast, {incoming, Packet = ?PACKET(Type)}, State) ->
_ = emqx_metrics:received(Packet),
ok = emqx_metrics:inc_recv(Packet),
(Type == ?PUBLISH) andalso emqx_pd:update_counter(incoming_pubs, 1),
handle_packet(Packet, fun(NState) ->
{keep_state, NState}
@ -296,7 +296,7 @@ handle(info, {Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl ->
Oct = iolist_size(Data),
?LOG(debug, "[Connection] RECV ~p", [Data]),
emqx_pd:update_counter(incoming_bytes, Oct),
emqx_metrics:trans(inc, 'bytes/received', Oct),
ok = emqx_metrics:inc('bytes.received', Oct),
NState = ensure_stats_timer(maybe_gc({1, Oct}, State)),
process_incoming(Data, [], NState);
@ -336,7 +336,6 @@ handle(info, {timeout, Timer, emit_stats},
State = #state{stats_timer = Timer,
proto_state = ProtoState,
gc_state = GcState}) ->
emqx_metrics:commit(),
emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)),
NState = State#state{stats_timer = undefined},
Limits = erlang:get(force_shutdown_policy),

View File

@ -20,30 +20,31 @@
-include("types.hrl").
-include("emqx_mqtt.hrl").
-export([start_link/0]).
-export([ start_link/0
, stop/0
]).
-export([ new/1
, new/2
, all/0
]).
-export([ val/1
, inc/1
, inc/2
, inc/3
, dec/1
, dec/2
, dec/3
, set/2
]).
-export([ trans/2
, trans/3
, trans/4
, commit/0
]).
%% Received/sent metrics
-export([ received/1
, sent/1
%% Inc received/sent metrics
-export([ inc_recv/1
, inc_sent/1
]).
%% gen_server callbacks
@ -55,267 +56,320 @@
, code_change/3
]).
%% Bytes sent and received of Broker
-opaque(metric_idx() :: 1..1024).
-type(metric_name() :: atom() | string() | binary()).
-export_type([metric_idx/0]).
-define(MAX_SIZE, 1024).
-define(RESERVED_IDX, 256).
-define(TAB, ?MODULE).
-define(SERVER, ?MODULE).
%% Bytes sent and received of broker
-define(BYTES_METRICS, [
{counter, 'bytes/received'}, % Total bytes received
{counter, 'bytes/sent'} % Total bytes sent
{counter, 'bytes.received'}, % Total bytes received
{counter, 'bytes.sent'} % Total bytes sent
]).
%% Packets sent and received of broker
-define(PACKET_METRICS, [
{counter, 'packets/received'}, % All Packets received
{counter, 'packets/sent'}, % All Packets sent
{counter, 'packets/connect'}, % CONNECT Packets received
{counter, 'packets/connack'}, % CONNACK Packets sent
{counter, 'packets/publish/received'}, % PUBLISH packets received
{counter, 'packets/publish/sent'}, % PUBLISH packets sent
{counter, 'packets/puback/received'}, % PUBACK packets received
{counter, 'packets/puback/sent'}, % PUBACK packets sent
{counter, 'packets/puback/missed'}, % PUBACK packets missed
{counter, 'packets/pubrec/received'}, % PUBREC packets received
{counter, 'packets/pubrec/sent'}, % PUBREC packets sent
{counter, 'packets/pubrec/missed'}, % PUBREC packets missed
{counter, 'packets/pubrel/received'}, % PUBREL packets received
{counter, 'packets/pubrel/sent'}, % PUBREL packets sent
{counter, 'packets/pubrel/missed'}, % PUBREL packets missed
{counter, 'packets/pubcomp/received'}, % PUBCOMP packets received
{counter, 'packets/pubcomp/sent'}, % PUBCOMP packets sent
{counter, 'packets/pubcomp/missed'}, % PUBCOMP packets missed
{counter, 'packets/subscribe'}, % SUBSCRIBE Packets received
{counter, 'packets/suback'}, % SUBACK packets sent
{counter, 'packets/unsubscribe'}, % UNSUBSCRIBE Packets received
{counter, 'packets/unsuback'}, % UNSUBACK Packets sent
{counter, 'packets/pingreq'}, % PINGREQ packets received
{counter, 'packets/pingresp'}, % PINGRESP Packets sent
{counter, 'packets/disconnect/received'}, % DISCONNECT Packets received
{counter, 'packets/disconnect/sent'}, % DISCONNECT Packets sent
{counter, 'packets/auth'} % Auth Packets received
{counter, 'packets.received'}, % All Packets received
{counter, 'packets.sent'}, % All Packets sent
{counter, 'packets.connect.received'}, % CONNECT Packets received
{counter, 'packets.connack.sent'}, % CONNACK Packets sent
{counter, 'packets.connack.error'}, % CONNACK error sent
{counter, 'packets.connack.auth_error'}, % CONNACK auth_error sent
{counter, 'packets.publish.received'}, % PUBLISH packets received
{counter, 'packets.publish.sent'}, % PUBLISH packets sent
{counter, 'packets.publish.error'}, % PUBLISH failed for error
{counter, 'packets.publish.auth_error'}, % PUBLISH failed for auth error
{counter, 'packets.puback.received'}, % PUBACK packets received
{counter, 'packets.puback.sent'}, % PUBACK packets sent
{counter, 'packets.puback.missed'}, % PUBACK packets missed
{counter, 'packets.pubrec.received'}, % PUBREC packets received
{counter, 'packets.pubrec.sent'}, % PUBREC packets sent
{counter, 'packets.pubrec.missed'}, % PUBREC packets missed
{counter, 'packets.pubrel.received'}, % PUBREL packets received
{counter, 'packets.pubrel.sent'}, % PUBREL packets sent
{counter, 'packets.pubrel.missed'}, % PUBREL packets missed
{counter, 'packets.pubcomp.received'}, % PUBCOMP packets received
{counter, 'packets.pubcomp.sent'}, % PUBCOMP packets sent
{counter, 'packets.pubcomp.missed'}, % PUBCOMP packets missed
{counter, 'packets.subscribe.received'}, % SUBSCRIBE Packets received
{counter, 'packets.subscribe.error'}, % SUBSCRIBE error
{counter, 'packets.subscribe.auth_error'}, % SUBSCRIBE failed for not auth
{counter, 'packets.suback.sent'}, % SUBACK packets sent
{counter, 'packets.unsubscribe.received'}, % UNSUBSCRIBE Packets received
{counter, 'packets.unsubscribe.error'}, % UNSUBSCRIBE error
{counter, 'packets.unsuback.sent'}, % UNSUBACK Packets sent
{counter, 'packets.pingreq.received'}, % PINGREQ packets received
{counter, 'packets.pingresp.sent'}, % PINGRESP Packets sent
{counter, 'packets.disconnect.received'}, % DISCONNECT Packets received
{counter, 'packets.disconnect.sent'}, % DISCONNECT Packets sent
{counter, 'packets.auth.received'}, % Auth Packets received
{counter, 'packets.auth.sent'} % Auth Packets sent
]).
%% Messages sent and received of broker
-define(MESSAGE_METRICS, [
{counter, 'messages/received'}, % All Messages received
{counter, 'messages/sent'}, % All Messages sent
{counter, 'messages/qos0/received'}, % QoS0 Messages received
{counter, 'messages/qos0/sent'}, % QoS0 Messages sent
{counter, 'messages/qos1/received'}, % QoS1 Messages received
{counter, 'messages/qos1/sent'}, % QoS1 Messages sent
{counter, 'messages/qos2/received'}, % QoS2 Messages received
{counter, 'messages/qos2/expired'}, % QoS2 Messages expired
{counter, 'messages/qos2/sent'}, % QoS2 Messages sent
{counter, 'messages/qos2/dropped'}, % QoS2 Messages dropped
{gauge, 'messages/retained'}, % Messagea retained
{counter, 'messages/dropped'}, % Messages dropped
{counter, 'messages/expired'}, % Messages expired
{counter, 'messages/forward'} % Messages forward
{counter, 'messages.received'}, % All Messages received
{counter, 'messages.sent'}, % All Messages sent
{counter, 'messages.qos0.received'}, % QoS0 Messages received
{counter, 'messages.qos0.sent'}, % QoS0 Messages sent
{counter, 'messages.qos1.received'}, % QoS1 Messages received
{counter, 'messages.qos1.sent'}, % QoS1 Messages sent
{counter, 'messages.qos2.received'}, % QoS2 Messages received
{counter, 'messages.qos2.expired'}, % QoS2 Messages expired
{counter, 'messages.qos2.sent'}, % QoS2 Messages sent
{counter, 'messages.qos2.dropped'}, % QoS2 Messages dropped
{gauge, 'messages.retained'}, % Messagea retained
{counter, 'messages.dropped'}, % Messages dropped
{counter, 'messages.expired'}, % Messages expired
{counter, 'messages.forward'} % Messages forward
]).
-define(TAB, ?MODULE).
-define(SERVER, ?MODULE).
-record(state, {next_idx = 1}).
-record(metric, {name, type, idx}).
%% @doc Start the metrics server.
-spec(start_link() -> startlink_ret()).
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-spec(stop() -> ok).
stop() ->
gen_server:stop(?SERVER).
%%------------------------------------------------------------------------------
%% Metrics API
%%------------------------------------------------------------------------------
new({gauge, Name}) ->
ets:insert(?TAB, {{Name, 0}, 0});
-spec(new(metric_name()) -> ok).
new(Name) ->
new(counter, Name).
new({counter, Name}) ->
Schedulers = lists:seq(1, emqx_vm:schedulers()),
ets:insert(?TAB, [{{Name, I}, 0} || I <- Schedulers]).
-spec(new(gauge|counter, metric_name()) -> ok).
new(gauge, Name) ->
create(gauge, Name);
new(counter, Name) ->
create(counter, Name).
%% @private
create(Type, Name) ->
case gen_server:call(?SERVER, {create, Type, Name}) of
{ok, _Idx} -> ok;
{error, Reason} -> error(Reason)
end.
%% @doc Get all metrics
-spec(all() -> [{atom(), non_neg_integer()}]).
-spec(all() -> [{metric_name(), non_neg_integer()}]).
all() ->
maps:to_list(
ets:foldl(
fun({{Metric, _N}, Val}, Map) ->
case maps:find(Metric, Map) of
{ok, Count} -> maps:put(Metric, Count+Val, Map);
error -> maps:put(Metric, Val, Map)
end
end, #{}, ?TAB)).
CRef = persistent_term:get(?MODULE),
[{Name, counters:get(CRef, Idx)}
|| #metric{name = Name, idx = Idx} <- ets:tab2list(?TAB)].
%% @doc Get metric value
-spec(val(atom()) -> non_neg_integer()).
val(Metric) ->
lists:sum(ets:select(?TAB, [{{{Metric, '_'}, '$1'}, [], ['$1']}])).
-spec(val(metric_name()) -> maybe(non_neg_integer())).
val(Name) ->
case ets:lookup(?TAB, Name) of
[#metric{idx = Idx}] ->
CRef = persistent_term:get(?MODULE),
counters:get(CRef, Idx);
[] -> undefined
end.
%% @doc Increase counter
-spec(inc(atom()) -> non_neg_integer()).
inc(Metric) ->
inc(counter, Metric, 1).
-spec(inc(metric_name()) -> ok).
inc(Name) ->
inc(Name, 1).
%% @doc Increase metric value
-spec(inc({counter | gauge, atom()} | atom(), pos_integer()) -> non_neg_integer()).
inc({gauge, Metric}, Val) ->
inc(gauge, Metric, Val);
inc({counter, Metric}, Val) ->
inc(counter, Metric, Val);
inc(Metric, Val) when is_atom(Metric) ->
inc(counter, Metric, Val).
%% @doc Increase metric value
-spec(inc(counter | gauge, atom(), pos_integer()) -> pos_integer()).
inc(Type, Metric, Val) ->
update_counter(key(Type, Metric), {2, Val}).
-spec(inc(metric_name(), pos_integer()) -> ok).
inc(Name, Value) ->
update_counter(Name, Value).
%% @doc Decrease metric value
-spec(dec(gauge, atom()) -> integer()).
dec(gauge, Metric) ->
dec(gauge, Metric, 1).
-spec(dec(metric_name()) -> ok).
dec(Name) ->
dec(Name, 1).
%% @doc Decrease metric value
-spec(dec(gauge, atom(), pos_integer()) -> integer()).
dec(gauge, Metric, Val) ->
update_counter(key(gauge, Metric), {2, -Val}).
-spec(dec(metric_name(), pos_integer()) -> ok).
dec(Name, Value) ->
update_counter(Name, -Value).
%% @doc Set metric value
set(Metric, Val) when is_atom(Metric) ->
set(gauge, Metric, Val).
set(gauge, Metric, Val) ->
ets:insert(?TAB, {key(gauge, Metric), Val}).
-spec(set(metric_name(), integer()) -> ok).
set(Name, Value) ->
CRef = persistent_term:get(?MODULE),
Idx = ets:lookup_element(?TAB, Name, 4),
counters:put(CRef, Idx, Value).
trans(inc, Metric) ->
trans(inc, {counter, Metric}, 1).
-spec(trans(inc | dec, metric_name()) -> ok).
trans(Op, Name) when Op =:= inc; Op =:= dec ->
trans(Op, Name, 1).
trans(Opt, {gauge, Metric}, Val) ->
trans(Opt, gauge, Metric, Val);
trans(inc, {counter, Metric}, Val) ->
trans(inc, counter, Metric, Val);
trans(inc, Metric, Val) when is_atom(Metric) ->
trans(inc, counter, Metric, Val);
trans(dec, gauge, Metric) ->
trans(dec, gauge, Metric, 1).
-spec(trans(inc | dec, metric_name(), pos_integer()) -> ok).
trans(inc, Name, Value) ->
cache(Name, Value);
trans(dec, Name, Value) ->
cache(Name, -Value).
trans(inc, Type, Metric, Val) ->
hold(Type, Metric, Val);
trans(dec, gauge, Metric, Val) ->
hold(gauge, Metric, -Val).
hold(Type, Metric, Val) when Type =:= counter orelse Type =:= gauge ->
-spec(cache(metric_name(), integer()) -> ok).
cache(Name, Value) ->
put('$metrics', case get('$metrics') of
undefined ->
#{{Type, Metric} => Val};
#{Name => Value};
Metrics ->
maps:update_with({Type, Metric}, fun(Cnt) -> Cnt + Val end, Val, Metrics)
end).
maps:update_with(Name, fun(Cnt) -> Cnt + Value end, Value, Metrics)
end),
ok.
-spec(commit() -> ok).
commit() ->
case get('$metrics') of
undefined -> ok;
Metrics ->
maps:fold(fun({Type, Metric}, Val, _Acc) ->
update_counter(key(Type, Metric), {2, Val})
end, 0, Metrics),
erase('$metrics')
_ = erase('$metrics'),
lists:foreach(fun update_counter/1, maps:to_list(Metrics))
end.
%% @doc Metric key
key(gauge, Metric) ->
{Metric, 0};
key(counter, Metric) ->
{Metric, erlang:system_info(scheduler_id)}.
update_counter({Name, Value}) ->
update_counter(Name, Value).
update_counter(Key, UpOp) ->
ets:update_counter(?TAB, Key, UpOp).
update_counter(Name, Value) ->
CRef = persistent_term:get(?MODULE),
CIdx = case reserved_idx(Name) of
Idx when is_integer(Idx) -> Idx;
undefined ->
ets:lookup_element(?TAB, Name, 4)
end,
counters:add(CRef, CIdx, Value).
%%-----------------------------------------------------------------------------
%% Received/Sent metrics
%%-----------------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% Inc Received/Sent metrics
%%------------------------------------------------------------------------------
%% @doc Count packets received.
-spec(received(emqx_mqtt_types:packet()) -> ok).
received(Packet) ->
inc('packets/received'),
received1(Packet).
received1(?PUBLISH_PACKET(QoS, _PktId)) ->
inc('packets/publish/received'),
inc('messages/received'),
qos_received(QoS);
received1(?PACKET(Type)) ->
received2(Type).
received2(?CONNECT) ->
inc('packets/connect');
received2(?PUBACK) ->
inc('packets/puback/received');
received2(?PUBREC) ->
inc('packets/pubrec/received');
received2(?PUBREL) ->
inc('packets/pubrel/received');
received2(?PUBCOMP) ->
inc('packets/pubcomp/received');
received2(?SUBSCRIBE) ->
inc('packets/subscribe');
received2(?UNSUBSCRIBE) ->
inc('packets/unsubscribe');
received2(?PINGREQ) ->
inc('packets/pingreq');
received2(?DISCONNECT) ->
inc('packets/disconnect/received');
received2(_) ->
%% @doc Inc packets received.
-spec(inc_recv(emqx_mqtt_types:packet()) -> ok).
inc_recv(Packet) ->
inc('packets.received'),
do_inc_recv(Packet).
do_inc_recv(?PACKET(?CONNECT)) ->
inc('packets.connect.received');
do_inc_recv(?PUBLISH_PACKET(QoS, _PktId)) ->
inc('messages.received'),
case QoS of
?QOS_0 -> inc('messages.qos0.received');
?QOS_1 -> inc('messages.qos1.received');
?QOS_2 -> inc('messages.qos2.received')
end,
inc('packets.publish.received');
do_inc_recv(?PACKET(?PUBACK)) ->
inc('packets.puback.received');
do_inc_recv(?PACKET(?PUBREC)) ->
inc('packets.pubrec.received');
do_inc_recv(?PACKET(?PUBREL)) ->
inc('packets.pubrel.received');
do_inc_recv(?PACKET(?PUBCOMP)) ->
inc('packets.pubcomp.received');
do_inc_recv(?PACKET(?SUBSCRIBE)) ->
inc('packets.subscribe.received');
do_inc_recv(?PACKET(?UNSUBSCRIBE)) ->
inc('packets.unsubscribe.received');
do_inc_recv(?PACKET(?PINGREQ)) ->
inc('packets.pingreq.received');
do_inc_recv(?PACKET(?DISCONNECT)) ->
inc('packets.disconnect.received');
do_inc_recv(?PACKET(?AUTH)) ->
inc('packets.auth.received');
do_inc_recv(_Packet) ->
ignore.
qos_received(?QOS_0) ->
inc('messages/qos0/received');
qos_received(?QOS_1) ->
inc('messages/qos1/received');
qos_received(?QOS_2) ->
inc('messages/qos2/received').
%% @doc Count packets received. Will not count $SYS PUBLISH.
-spec(sent(emqx_mqtt_types:packet()) -> ignore | non_neg_integer()).
sent(?PUBLISH_PACKET(_QoS, <<"$SYS/", _/binary>>, _, _)) ->
%% @doc Inc packets sent. Will not count $SYS PUBLISH.
-spec(inc_sent(emqx_mqtt_types:packet()) -> ok | ignore).
inc_sent(?PUBLISH_PACKET(_QoS, <<"$SYS/", _/binary>>, _, _)) ->
ignore;
sent(Packet) ->
inc('packets/sent'),
sent1(Packet).
sent1(?PUBLISH_PACKET(QoS, _PktId)) ->
inc('packets/publish/sent'),
inc('messages/sent'),
qos_sent(QoS);
sent1(?PACKET(Type)) ->
sent2(Type).
sent2(?CONNACK) ->
inc('packets/connack');
sent2(?PUBACK) ->
inc('packets/puback/sent');
sent2(?PUBREC) ->
inc('packets/pubrec/sent');
sent2(?PUBREL) ->
inc('packets/pubrel/sent');
sent2(?PUBCOMP) ->
inc('packets/pubcomp/sent');
sent2(?SUBACK) ->
inc('packets/suback');
sent2(?UNSUBACK) ->
inc('packets/unsuback');
sent2(?PINGRESP) ->
inc('packets/pingresp');
sent2(?DISCONNECT) ->
inc('packets/disconnect/sent');
sent2(_Type) ->
inc_sent(Packet) ->
inc('packets.sent'),
do_inc_sent(Packet).
do_inc_sent(?CONNACK_PACKET(ReasonCode)) ->
(ReasonCode == ?RC_SUCCESS) orelse inc('packets.connack.error'),
(ReasonCode == ?RC_NOT_AUTHORIZED) andalso inc('packets.connack.auth_error'),
(ReasonCode == ?RC_BAD_USER_NAME_OR_PASSWORD) andalso inc('packets.connack.auth_error'),
inc('packets.connack.sent');
do_inc_sent(?PUBLISH_PACKET(QoS, _PacketId)) ->
inc('messages.sent'),
case QoS of
?QOS_0 -> inc('messages.qos0.sent');
?QOS_1 -> inc('messages.qos1.sent');
?QOS_2 -> inc('messages.qos2.sent')
end,
inc('packets.publish.sent');
do_inc_sent(?PUBACK_PACKET(_PacketId, ReasonCode)) ->
(ReasonCode >= ?RC_UNSPECIFIED_ERROR) andalso inc('packets.publish.error'),
(ReasonCode == ?RC_NOT_AUTHORIZED) andalso inc('packets.publish.auth_error'),
inc('packets.puback.sent');
do_inc_sent(?PUBREC_PACKET(_PacketId, ReasonCode)) ->
(ReasonCode >= ?RC_UNSPECIFIED_ERROR) andalso inc('packets.publish.error'),
(ReasonCode == ?RC_NOT_AUTHORIZED) andalso inc('packets.publish.auth_error'),
inc('packets.pubrec.sent');
do_inc_sent(?PACKET(?PUBREL)) ->
inc('packets.pubrel.sent');
do_inc_sent(?PACKET(?PUBCOMP)) ->
inc('packets.pubcomp.sent');
do_inc_sent(?PACKET(?SUBACK)) ->
inc('packets.suback.sent');
do_inc_sent(?PACKET(?UNSUBACK)) ->
inc('packets.unsuback.sent');
do_inc_sent(?PACKET(?PINGRESP)) ->
inc('packets.pingresp.sent');
do_inc_sent(?PACKET(?DISCONNECT)) ->
inc('packets.disconnect.sent');
do_inc_sent(?PACKET(?AUTH)) ->
inc('packets.auth.sent');
do_inc_sent(_Packet) ->
ignore.
qos_sent(?QOS_0) ->
inc('messages/qos0/sent');
qos_sent(?QOS_1) ->
inc('messages/qos1/sent');
qos_sent(?QOS_2) ->
inc('messages/qos2/sent').
%%------------------------------------------------------------------------------
%% gen_server callbacks
%%------------------------------------------------------------------------------
init([]) ->
% Create metrics table
ok = emqx_tables:new(?TAB, [public, set, {write_concurrency, true}]),
lists:foreach(fun new/1, ?BYTES_METRICS ++ ?PACKET_METRICS ++ ?MESSAGE_METRICS),
{ok, #{}, hibernate}.
% Create counters array
CRef = counters:new(?MAX_SIZE, [write_concurrency]),
ok = persistent_term:put(?MODULE, CRef),
% Create index mapping table
ok = emqx_tables:new(?TAB, [protected, {keypos, 2}, {read_concurrency, true}]),
% Store reserved indices
lists:foreach(fun({Type, Name}) ->
Idx = reserved_idx(Name),
Metric = #metric{name = Name, type = Type, idx = reserved_idx(Name)},
true = ets:insert(?TAB, Metric),
ok = counters:put(CRef, Idx, 0)
end,?BYTES_METRICS ++ ?PACKET_METRICS ++ ?MESSAGE_METRICS),
{ok, #state{next_idx = ?RESERVED_IDX + 1}, hibernate}.
handle_call({create, Type, Name}, _From, State = #state{next_idx = ?MAX_SIZE}) ->
?LOG(error, "[Metrics] Failed to create ~s:~s for index exceeded.", [Type, Name]),
{reply, {error, metric_index_exceeded}, State};
handle_call({create, Type, Name}, _From, State = #state{next_idx = NextIdx}) ->
case ets:lookup(?TAB, Name) of
[#metric{idx = Idx}] ->
?LOG(warning, "[Metrics] ~s already exists.", [Name]),
{reply, {ok, Idx}, State};
[] ->
Metric = #metric{name = Name, type = Type, idx = NextIdx},
true = ets:insert(?TAB, Metric),
{reply, {ok, NextIdx}, State#state{next_idx = NextIdx + 1}}
end;
handle_call(Req, _From, State) ->
?LOG(error, "[Metrics] Unexpected call: ~p", [Req]),
@ -329,9 +383,65 @@ handle_info(Info, State) ->
?LOG(error, "[Metrics] Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #{}) ->
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%------------------------------------------------------------------------------
%% Internal functions
%%------------------------------------------------------------------------------
reserved_idx('bytes.received') -> 01;
reserved_idx('bytes.sent') -> 02;
reserved_idx('packets.received') -> 03;
reserved_idx('packets.sent') -> 04;
reserved_idx('packets.connect.received') -> 05;
reserved_idx('packets.connack.sent') -> 06;
reserved_idx('packets.connack.error') -> 07;
reserved_idx('packets.connack.auth_error') -> 08;
reserved_idx('packets.publish.received') -> 09;
reserved_idx('packets.publish.sent') -> 10;
reserved_idx('packets.publish.error') -> 11;
reserved_idx('packets.publish.auth_error') -> 12;
reserved_idx('packets.puback.received') -> 13;
reserved_idx('packets.puback.sent') -> 14;
reserved_idx('packets.puback.missed') -> 15;
reserved_idx('packets.pubrec.received') -> 16;
reserved_idx('packets.pubrec.sent') -> 17;
reserved_idx('packets.pubrec.missed') -> 18;
reserved_idx('packets.pubrel.received') -> 19;
reserved_idx('packets.pubrel.sent') -> 20;
reserved_idx('packets.pubrel.missed') -> 21;
reserved_idx('packets.pubcomp.received') -> 22;
reserved_idx('packets.pubcomp.sent') -> 23;
reserved_idx('packets.pubcomp.missed') -> 24;
reserved_idx('packets.subscribe.received') -> 25;
reserved_idx('packets.subscribe.error') -> 26;
reserved_idx('packets.subscribe.auth_error') -> 27;
reserved_idx('packets.suback.sent') -> 28;
reserved_idx('packets.unsubscribe.received') -> 29;
reserved_idx('packets.unsubscribe.error') -> 30;
reserved_idx('packets.unsuback.sent') -> 31;
reserved_idx('packets.pingreq.received') -> 32;
reserved_idx('packets.pingresp.sent') -> 33;
reserved_idx('packets.disconnect.received') -> 34;
reserved_idx('packets.disconnect.sent') -> 35;
reserved_idx('packets.auth.received') -> 36;
reserved_idx('packets.auth.sent') -> 37;
reserved_idx('messages.received') -> 38;
reserved_idx('messages.sent') -> 39;
reserved_idx('messages.qos0.received') -> 40;
reserved_idx('messages.qos0.sent') -> 41;
reserved_idx('messages.qos1.received') -> 42;
reserved_idx('messages.qos1.sent') -> 43;
reserved_idx('messages.qos2.received') -> 44;
reserved_idx('messages.qos2.expired') -> 45;
reserved_idx('messages.qos2.sent') -> 46;
reserved_idx('messages.qos2.dropped') -> 47;
reserved_idx('messages.retained') -> 48;
reserved_idx('messages.dropped') -> 49;
reserved_idx('messages.expired') -> 50;
reserved_idx('messages.forward') -> 51;
reserved_idx(_) -> undefined.

View File

@ -699,8 +699,8 @@ send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = Send})
{ok, PState};
{ok, Data} ->
trace(send, Packet),
emqx_metrics:sent(Packet),
emqx_metrics:trans(inc, 'bytes/sent', iolist_size(Data)),
emqx_metrics:inc_sent(Packet),
ok = emqx_metrics:inc('bytes.sent', iolist_size(Data)),
{ok, inc_stats(send, Type, PState)};
{error, Reason} ->
{error, Reason}

View File

@ -160,8 +160,8 @@ stats_fun() ->
case ets:info(?ROUTE, size) of
undefined -> ok;
Size ->
emqx_stats:setstat('routes/count', 'routes/max', Size),
emqx_stats:setstat('topics/count', 'topics/max', Size)
emqx_stats:setstat('routes.count', 'routes.max', Size),
emqx_stats:setstat('topics.count', 'topics.max', Size)
end.
cleanup_routes(Node) ->

View File

@ -422,7 +422,7 @@ handle_call({register_publish_packet_id, PacketId, Ts}, _From,
end;
true ->
?LOG(warning, "[Session] Dropped qos2 packet ~w for too many awaiting_rel", [PacketId]),
emqx_metrics:trans(inc, 'messages/qos2/dropped'),
ok = emqx_metrics:inc('messages.qos2.dropped'),
{{error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}, State}
end);
@ -434,7 +434,7 @@ handle_call({pubrec, PacketId, _ReasonCode}, _From, State = #state{inflight = In
{ok, ensure_stats_timer(acked(pubrec, PacketId, State))};
false ->
?LOG(warning, "[Session] The PUBREC PacketId ~w is not found.", [PacketId]),
emqx_metrics:trans(inc, 'packets/pubrec/missed'),
ok = emqx_metrics:inc('packets.pubrec.missed'),
{{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State}
end);
@ -446,7 +446,7 @@ handle_call({pubrel, PacketId, _ReasonCode}, _From, State = #state{awaiting_rel
{ok, ensure_stats_timer(State#state{awaiting_rel = AwaitingRel1})};
error ->
?LOG(warning, "[Session] The PUBREL PacketId ~w is not found", [PacketId]),
emqx_metrics:trans(inc, 'packets/pubrel/missed'),
ok = emqx_metrics:inc('packets.pubrel.missed'),
{{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State}
end);
@ -496,7 +496,7 @@ handle_cast({puback, PacketId, _ReasonCode}, State = #state{inflight = Inflight}
ensure_stats_timer(dequeue(acked(puback, PacketId, State)));
false ->
?LOG(warning, "[Session] The PUBACK PacketId ~w is not found", [PacketId]),
emqx_metrics:trans(inc, 'packets/puback/missed'),
ok = emqx_metrics:inc('packets.puback.missed'),
State
end);
@ -508,7 +508,7 @@ handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight
ensure_stats_timer(dequeue(acked(pubcomp, PacketId, State)));
false ->
?LOG(warning, "[Session] The PUBCOMP PacketId ~w is not found", [PacketId]),
emqx_metrics:trans(inc, 'packets/pubcomp/missed'),
ok = emqx_metrics:inc('packets.pubcomp.missed'),
State
end);
@ -587,7 +587,6 @@ handle_info({timeout, Timer, emit_stats},
State = #state{client_id = ClientId,
stats_timer = Timer,
gc_state = GcState}) ->
emqx_metrics:commit(),
_ = emqx_sm:set_session_stats(ClientId, stats(State)),
NewState = State#state{stats_timer = undefined},
Limits = erlang:get(force_shutdown_policy),
@ -652,7 +651,6 @@ terminate(Reason, #state{will_msg = WillMsg,
username = Username,
conn_pid = ConnPid,
old_conn_pid = OldConnPid}) ->
emqx_metrics:commit(),
send_willmsg(WillMsg),
[maybe_shutdown(Pid, Reason) || Pid <- [ConnPid, OldConnPid]],
ok = emqx_hooks:run('session.terminated', [#{client_id => ClientId, username => Username}, Reason]).
@ -728,7 +726,7 @@ retry_delivery(Force, [{Type, Msg0, Ts} | Msgs], Now,
{publish, {PacketId, Msg}} ->
case emqx_message:is_expired(Msg) of
true ->
emqx_metrics:trans(inc, 'messages/expired'),
ok = emqx_metrics:inc('messages.expired'),
emqx_inflight:delete(PacketId, Inflight);
false ->
redeliver({PacketId, Msg}, State),
@ -770,7 +768,7 @@ expire_awaiting_rel([{PacketId, Ts} | More], Now,
Timeout = get_env(Zone, await_rel_timeout),
case (timer:now_diff(Now, Ts) div 1000) of
Age when Age >= Timeout ->
emqx_metrics:trans(inc, 'messages/qos2/expired'),
ok = emqx_metrics:inc('messages.qos2.expired'),
?LOG(warning, "[Session] Dropped qos2 packet ~s for await_rel_timeout", [PacketId]),
expire_awaiting_rel(More, Now, State#state{awaiting_rel = maps:remove(PacketId, AwaitingRel)});
Age ->

View File

@ -364,7 +364,7 @@ cleanup_down(SubPid) ->
end, mnesia:dirty_match_object(#emqx_shared_subscription{_ = '_', subpid = SubPid})).
update_stats(State) ->
emqx_stats:setstat('subscriptions/shared/count', 'subscriptions/shared/max', ets:info(?TAB, size)),
emqx_stats:setstat('subscriptions.shared.count', 'subscriptions.shared.max', ets:info(?TAB, size)),
State.
%% Return 'true' if the subscriber process is alive AND not in the failed list

View File

@ -284,8 +284,8 @@ clean_down(Session = {ClientId, SessPid}) ->
end.
stats_fun() ->
safe_update_stats(?SESSION_TAB, 'sessions/count', 'sessions/max'),
safe_update_stats(?SESSION_P_TAB, 'sessions/persistent/count', 'sessions/persistent/max').
safe_update_stats(?SESSION_TAB, 'sessions.count', 'sessions.max'),
safe_update_stats(?SESSION_P_TAB, 'sessions.persistent.count', 'sessions.persistent.max').
safe_update_stats(Tab, Stat, MaxStat) ->
case ets:info(Tab, size) of

View File

@ -48,8 +48,8 @@
]).
-record(update, {name, countdown, interval, func}).
-record(state, {timer, updates :: [#update{}],
tick_ms :: timeout()}).
-record(state, {timer, updates :: [#update{}], tick_ms :: timeout()}).
-type(stats() :: list({atom(), non_neg_integer()})).
@ -57,41 +57,41 @@
%% Connection stats
-define(CONNECTION_STATS, [
'connections/count', % current connections
'connections/max' % maximum connections connected
'connections.count', % current connections
'connections.max' % maximum connections connected
]).
%% Session stats
-define(SESSION_STATS, [
'sessions/count',
'sessions/max',
'sessions/persistent/count',
'sessions/persistent/max'
'sessions.count',
'sessions.max',
'sessions.persistent.count',
'sessions.persistent.max'
]).
%% Subscribers, Subscriptions stats
-define(PUBSUB_STATS, [
'topics/count',
'topics/max',
'suboptions/count',
'suboptions/max',
'subscribers/count',
'subscribers/max',
'subscriptions/count',
'subscriptions/max',
'subscriptions/shared/count',
'subscriptions/shared/max'
'topics.count',
'topics.max',
'suboptions.count',
'suboptions.max',
'subscribers.count',
'subscribers.max',
'subscriptions.count',
'subscriptions.max',
'subscriptions.shared.count',
'subscriptions.shared.max'
]).
-define(ROUTE_STATS, [
'routes/count',
'routes/max'
'routes.count',
'routes.max'
]).
%% Retained stats
-define(RETAINED_STATS, [
'retained/count',
'retained/max'
'retained.count',
'retained.max'
]).
-define(TAB, ?MODULE).
@ -181,7 +181,8 @@ start_timer(#state{tick_ms = Ms} = State) ->
State#state{timer = emqx_misc:start_timer(Ms, tick)}.
handle_call(stop, _From, State) ->
{stop, normal, _Reply = ok, State};
{stop, normal, ok, State};
handle_call(Req, _From, State) ->
?LOG(error, "[Stats] Unexpected call: ~p", [Req]),
{reply, ignored, State}.

View File

@ -184,8 +184,11 @@ publish(stats, Stats) ->
[safe_publish(systop(lists:concat(['stats/', Stat])), integer_to_binary(Val))
|| {Stat, Val} <- Stats, is_atom(Stat), is_integer(Val)];
publish(metrics, Metrics) ->
[safe_publish(systop(lists:concat(['metrics/', Metric])), integer_to_binary(Val))
|| {Metric, Val} <- Metrics, is_atom(Metric), is_integer(Val)].
[safe_publish(systop(metric_topic(Name)), integer_to_binary(Val))
|| {Name, Val} <- Metrics, is_atom(Name), is_integer(Val)].
metric_topic(Name) ->
lists:concat(["metrics/", string:replace(atom_to_list(Name), ".", "/", all)]).
safe_publish(Topic, Payload) ->
safe_publish(Topic, #{}, Payload).

View File

@ -190,12 +190,12 @@ websocket_handle({binary, Data}, State = #state{parse_state = ParseState,
?LOG(debug, "[WS Connection] RECV ~p", [Data]),
BinSize = iolist_size(Data),
emqx_pd:update_counter(recv_oct, BinSize),
emqx_metrics:trans(inc, 'bytes/received', BinSize),
ok = emqx_metrics:inc('bytes.received', BinSize),
try emqx_frame:parse(iolist_to_binary(Data), ParseState) of
{more, ParseState1} ->
{ok, State#state{parse_state = ParseState1}};
{ok, Packet, Rest} ->
emqx_metrics:received(Packet),
ok = emqx_metrics:inc_recv(Packet),
emqx_pd:update_counter(recv_cnt, 1),
case emqx_protocol:received(Packet, ProtoState) of
{ok, ProtoState1} ->
@ -255,7 +255,6 @@ websocket_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) ->
websocket_info({timeout, Timer, emit_stats},
State = #state{stats_timer = Timer, proto_state = ProtoState}) ->
emqx_metrics:commit(),
emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)),
{ok, State#state{stats_timer = undefined}, hibernate};

View File

@ -157,13 +157,13 @@ start_session(_) ->
%% Metric Group
%%--------------------------------------------------------------------
inc_dec_metric(_) ->
emqx_metrics:inc(gauge, 'messages/retained', 10),
emqx_metrics:dec(gauge, 'messages/retained', 10).
emqx_metrics:inc('messages.retained', 10),
emqx_metrics:dec('messages.retained', 10).
%%--------------------------------------------------------------------
%% Stats Group
%%--------------------------------------------------------------------
set_get_stat(_) ->
emqx_stats:setstat('retained/max', 99),
99 = emqx_stats:getstat('retained/max').
emqx_stats:setstat('retained.max', 99),
99 = emqx_stats:getstat('retained.max').

View File

@ -18,41 +18,57 @@
-compile(nowarn_export_all).
-include("emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
all() -> [t_inc_dec_metrics, t_trans].
all() -> [t_inc_dec, t_inc_recv, t_inc_sent, t_trans].
t_inc_dec_metrics(_) ->
t_inc_dec(_) ->
{ok, _} = emqx_metrics:start_link(),
{0, 0} = {emqx_metrics:val('bytes/received'), emqx_metrics:val('messages/retained')},
emqx_metrics:inc('bytes/received'),
emqx_metrics:inc({counter, 'bytes/received'}, 2),
emqx_metrics:inc(counter, 'bytes/received', 1),
emqx_metrics:inc('bytes/received', 1),
emqx_metrics:inc({gauge, 'messages/retained'}, 2),
emqx_metrics:inc(gauge, 'messages/retained', 2),
{5, 4} = {emqx_metrics:val('bytes/received'), emqx_metrics:val('messages/retained')},
emqx_metrics:dec(gauge, 'messages/retained'),
emqx_metrics:dec(gauge, 'messages/retained', 1),
2 = emqx_metrics:val('messages/retained'),
emqx_metrics:set('messages/retained', 3),
3 = emqx_metrics:val('messages/retained'),
emqx_metrics:received(#mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT}}),
{1, 1} = {emqx_metrics:val('packets/received'), emqx_metrics:val('packets/connect')},
emqx_metrics:sent(#mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}}),
{1, 1} = {emqx_metrics:val('packets/sent'), emqx_metrics:val('packets/connack')}.
?assertEqual(0, emqx_metrics:val('bytes.received')),
?assertEqual(0, emqx_metrics:val('messages.retained')),
ok = emqx_metrics:inc('bytes.received'),
ok = emqx_metrics:inc('bytes.received', 2),
ok = emqx_metrics:inc('bytes.received', 2),
?assertEqual(5, emqx_metrics:val('bytes.received')),
ok = emqx_metrics:inc('messages.retained', 2),
ok = emqx_metrics:inc('messages.retained', 2),
?assertEqual(4, emqx_metrics:val('messages.retained')),
ok = emqx_metrics:dec('messages.retained'),
ok = emqx_metrics:dec('messages.retained', 1),
?assertEqual(2, emqx_metrics:val('messages.retained')),
ok = emqx_metrics:set('messages.retained', 3),
?assertEqual(3, emqx_metrics:val('messages.retained')),
ok = emqx_metrics:stop().
t_inc_recv(_) ->
{ok, _} = emqx_metrics:start_link(),
ok = emqx_metrics:inc_recv(?PACKET(?CONNECT)),
?assertEqual(1, emqx_metrics:val('packets.received')),
?assertEqual(1, emqx_metrics:val('packets.connect.received')),
ok = emqx_metrics:stop().
t_inc_sent(_) ->
{ok, _} = emqx_metrics:start_link(),
ok = emqx_metrics:inc_sent(?CONNACK_PACKET(0)),
?assertEqual(1, emqx_metrics:val('packets.sent')),
?assertEqual(1, emqx_metrics:val('packets.connack.sent')),
ok = emqx_metrics:stop().
t_trans(_) ->
{ok, _} = emqx_metrics:start_link(),
emqx_metrics:trans(inc, 'bytes/received'),
emqx_metrics:trans(inc, {counter, 'bytes/received'}, 2),
emqx_metrics:trans(inc, counter, 'bytes/received', 2),
emqx_metrics:trans(inc, {gauge, 'messages/retained'}, 2),
emqx_metrics:trans(inc, gauge, 'messages/retained', 2),
{0, 0} = {emqx_metrics:val('bytes/received'), emqx_metrics:val('messages/retained')},
emqx_metrics:commit(),
{5, 4} = {emqx_metrics:val('bytes/received'), emqx_metrics:val('messages/retained')},
emqx_metrics:trans(dec, gauge, 'messages/retained'),
emqx_metrics:trans(dec, gauge, 'messages/retained', 1),
4 = emqx_metrics:val('messages/retained'),
emqx_metrics:commit(),
2 = emqx_metrics:val('messages/retained').
ok = emqx_metrics:trans(inc, 'bytes.received'),
ok = emqx_metrics:trans(inc, 'bytes.received', 2),
?assertEqual(0, emqx_metrics:val('bytes.received')),
ok = emqx_metrics:trans(inc, 'messages.retained', 2),
ok = emqx_metrics:trans(inc, 'messages.retained', 2),
?assertEqual(0, emqx_metrics:val('messages.retained')),
ok = emqx_metrics:commit(),
?assertEqual(3, emqx_metrics:val('bytes.received')),
?assertEqual(4, emqx_metrics:val('messages.retained')),
ok = emqx_metrics:trans(dec, 'messages.retained'),
ok = emqx_metrics:trans(dec, 'messages.retained', 1),
?assertEqual(4, emqx_metrics:val('messages.retained')),
ok = emqx_metrics:commit(),
?assertEqual(2, emqx_metrics:val('messages.retained')),
ok = emqx_metrics:stop().

View File

@ -18,27 +18,27 @@
get_state_test() ->
with_proc(fun() ->
SetConnsCount = emqx_stats:statsfun('connections/count'),
SetConnsCount = emqx_stats:statsfun('connections.count'),
SetConnsCount(1),
1 = emqx_stats:getstat('connections/count'),
emqx_stats:setstat('connections/count', 2),
2 = emqx_stats:getstat('connections/count'),
emqx_stats:setstat('connections/count', 'connections/max', 3),
1 = emqx_stats:getstat('connections.count'),
emqx_stats:setstat('connections.count', 2),
2 = emqx_stats:getstat('connections.count'),
emqx_stats:setstat('connections.count', 'connections.max', 3),
timer:sleep(100),
3 = emqx_stats:getstat('connections/count'),
3 = emqx_stats:getstat('connections/max'),
emqx_stats:setstat('connections/count', 'connections/max', 2),
3 = emqx_stats:getstat('connections.count'),
3 = emqx_stats:getstat('connections.max'),
emqx_stats:setstat('connections.count', 'connections.max', 2),
timer:sleep(100),
2 = emqx_stats:getstat('connections/count'),
3 = emqx_stats:getstat('connections/max'),
SetConns = emqx_stats:statsfun('connections/count', 'connections/max'),
2 = emqx_stats:getstat('connections.count'),
3 = emqx_stats:getstat('connections.max'),
SetConns = emqx_stats:statsfun('connections.count', 'connections.max'),
SetConns(4),
timer:sleep(100),
4 = emqx_stats:getstat('connections/count'),
4 = emqx_stats:getstat('connections/max'),
4 = emqx_stats:getstat('connections.count'),
4 = emqx_stats:getstat('connections.max'),
Conns = emqx_stats:getstats(),
4 = proplists:get_value('connections/count', Conns),
4 = proplists:get_value('connections/max', Conns)
4 = proplists:get_value('connections.count', Conns),
4 = proplists:get_value('connections.max', Conns)
end).
update_interval_test() ->
@ -46,10 +46,10 @@ update_interval_test() ->
with_proc(fun() ->
SleepMs = TickMs * 2 + TickMs div 2, %% sleep for 2.5 ticks
emqx_stats:cancel_update(cm_stats),
UpdFun = fun() -> emqx_stats:setstat('connections/count', 1) end,
UpdFun = fun() -> emqx_stats:setstat('connections.count', 1) end,
ok = emqx_stats:update_interval(stats_test, UpdFun),
timer:sleep(SleepMs),
?assertEqual(1, emqx_stats:getstat('connections/count'))
?assertEqual(1, emqx_stats:getstat('connections.count'))
end, TickMs).
helper_test_() ->