packets stats

This commit is contained in:
Feng Lee 2015-03-08 13:22:47 +08:00
parent 8f81048302
commit 1fc9eb287d
2 changed files with 38 additions and 0 deletions

View File

@ -111,6 +111,7 @@ handle_info({inet_reply, _Ref, ok}, State) ->
handle_info({inet_async, Sock, _Ref, {ok, Data}}, State = #state{peer_name = PeerName, socket = Sock}) ->
lager:debug("RECV from ~s: ~p", [PeerName, Data]),
emqtt_metrics:inc('bytes/received', size(Data)),
process_received_bytes(Data,
control_throttle(State #state{await_recv = false}));
@ -170,6 +171,7 @@ process_received_bytes(Bytes, State = #state{parse_state = ParseState,
control_throttle(State #state{parse_state = ParseState1}),
hibernate};
{ok, Packet, Rest} ->
received_stats(Packet),
case emqtt_protocol:received(Packet, ProtoState) of
{ok, ProtoState1} ->
process_received_bytes(Rest, State#state{parse_state = emqtt_parser:init(),
@ -211,3 +213,21 @@ control_throttle(State = #state{conn_state = Flow,
stop(Reason, State ) ->
{stop, Reason, State}.
received_stats(?PACKET(Type)) ->
emqtt_metrics:inc('packets/received'),
inc(Type).
inc(?CONNECT) ->
emqtt_metrics:inc('packets/connect');
inc(?PUBLISH) ->
emqtt_metrics:inc('packets/publish/received');
inc(?SUBSCRIBE) ->
emqtt_metrics:inc('packets/subscribe');
inc(?UNSUBSCRIBE) ->
emqtt_metrics:inc('packets/unsubscribe');
inc(?PINGREQ) ->
emqtt_metrics:inc('packets/pingreq');
inc(?DISCONNECT) ->
emqtt_metrics:inc('packets/disconnect');
inc(_) ->
ignore.

View File

@ -193,8 +193,10 @@ send({_From, Message = #mqtt_message{qos = Qos}}, State = #proto_state{session =
send(Packet, State = #proto_state{transport = Transport, socket = Sock, peer_name = PeerName, client_id = ClientId}) when is_record(Packet, mqtt_packet) ->
lager:info("SENT to ~s@~s: ~s", [ClientId, PeerName, emqtt_packet:dump(Packet)]),
sent_stats(Packet),
Data = emqtt_serialiser:serialise(Packet),
lager:debug("SENT to ~s: ~p", [PeerName, Data]),
emqtt_metrics:inc('bytes/sent', size(Data)),
Transport:send(Sock, Data),
{ok, State}.
@ -299,3 +301,19 @@ validate_qos(_) -> false.
try_unregister(undefined, _) -> ok;
try_unregister(ClientId, _) -> emqtt_cm:unregister(ClientId, self()).
sent_stats(?PACKET(Type)) ->
emqtt_metrics:inc('packets/sent'),
inc(Type).
inc(?CONNACK) ->
emqtt_metrics:inc('packets/connack');
inc(?PUBLISH) ->
emqtt_metrics:inc('packets/publish/sent');
inc(?SUBACK) ->
emqtt_metrics:inc('packets/suback');
inc(?UNSUBACK) ->
emqtt_metrics:inc('packets/unsuback');
inc(?PINGRESP) ->
emqtt_metrics:inc('packets/pingresp');
inc(_) ->
ingore.