diff --git a/apps/emqtt/src/emqtt_client.erl b/apps/emqtt/src/emqtt_client.erl index 24c41cccd..55b21dc6e 100644 --- a/apps/emqtt/src/emqtt_client.erl +++ b/apps/emqtt/src/emqtt_client.erl @@ -111,6 +111,7 @@ handle_info({inet_reply, _Ref, ok}, State) -> handle_info({inet_async, Sock, _Ref, {ok, Data}}, State = #state{peer_name = PeerName, socket = Sock}) -> lager:debug("RECV from ~s: ~p", [PeerName, Data]), + emqtt_metrics:inc('bytes/received', size(Data)), process_received_bytes(Data, control_throttle(State #state{await_recv = false})); @@ -170,6 +171,7 @@ process_received_bytes(Bytes, State = #state{parse_state = ParseState, control_throttle(State #state{parse_state = ParseState1}), hibernate}; {ok, Packet, Rest} -> + received_stats(Packet), case emqtt_protocol:received(Packet, ProtoState) of {ok, ProtoState1} -> process_received_bytes(Rest, State#state{parse_state = emqtt_parser:init(), @@ -211,3 +213,21 @@ control_throttle(State = #state{conn_state = Flow, stop(Reason, State ) -> {stop, Reason, State}. +received_stats(?PACKET(Type)) -> + emqtt_metrics:inc('packets/received'), + inc(Type). +inc(?CONNECT) -> + emqtt_metrics:inc('packets/connect'); +inc(?PUBLISH) -> + emqtt_metrics:inc('packets/publish/received'); +inc(?SUBSCRIBE) -> + emqtt_metrics:inc('packets/subscribe'); +inc(?UNSUBSCRIBE) -> + emqtt_metrics:inc('packets/unsubscribe'); +inc(?PINGREQ) -> + emqtt_metrics:inc('packets/pingreq'); +inc(?DISCONNECT) -> + emqtt_metrics:inc('packets/disconnect'); +inc(_) -> + ignore. + diff --git a/apps/emqtt/src/emqtt_protocol.erl b/apps/emqtt/src/emqtt_protocol.erl index 6a842f096..aa59a16fa 100644 --- a/apps/emqtt/src/emqtt_protocol.erl +++ b/apps/emqtt/src/emqtt_protocol.erl @@ -193,8 +193,10 @@ send({_From, Message = #mqtt_message{qos = Qos}}, State = #proto_state{session = send(Packet, State = #proto_state{transport = Transport, socket = Sock, peer_name = PeerName, client_id = ClientId}) when is_record(Packet, mqtt_packet) -> lager:info("SENT to ~s@~s: ~s", [ClientId, PeerName, emqtt_packet:dump(Packet)]), + sent_stats(Packet), Data = emqtt_serialiser:serialise(Packet), lager:debug("SENT to ~s: ~p", [PeerName, Data]), + emqtt_metrics:inc('bytes/sent', size(Data)), Transport:send(Sock, Data), {ok, State}. @@ -299,3 +301,19 @@ validate_qos(_) -> false. try_unregister(undefined, _) -> ok; try_unregister(ClientId, _) -> emqtt_cm:unregister(ClientId, self()). +sent_stats(?PACKET(Type)) -> + emqtt_metrics:inc('packets/sent'), + inc(Type). +inc(?CONNACK) -> + emqtt_metrics:inc('packets/connack'); +inc(?PUBLISH) -> + emqtt_metrics:inc('packets/publish/sent'); +inc(?SUBACK) -> + emqtt_metrics:inc('packets/suback'); +inc(?UNSUBACK) -> + emqtt_metrics:inc('packets/unsuback'); +inc(?PINGRESP) -> + emqtt_metrics:inc('packets/pingresp'); +inc(_) -> + ingore. +