From 700ec7aaefe6e63c2254fbd186cdf8c060f75b2b Mon Sep 17 00:00:00 2001 From: Feng Date: Wed, 22 Feb 2017 10:01:39 +0800 Subject: [PATCH] Add 'proto_stats' record --- src/emqttd_protocol.erl | 54 ++++++++++++++++++++++++++--------------- 1 file changed, 34 insertions(+), 20 deletions(-) diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index da235a44e..7ee9a7a6a 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -35,12 +35,14 @@ -export([process/2]). +-record(proto_stats, {recv_pkt = 0, recv_msg = 0, send_pkt = 0, send_msg = 0}). + %% Protocol State -record(proto_state, {peername, sendfun, connected = false, client_id, client_pid, clean_sess, proto_ver, proto_name, username, is_superuser = false, will_msg, keepalive, max_clientid_len = ?MAX_CLIENTID_LEN, - session, ws_initial_headers, %% Headers from first HTTP request for websocket client + session, stats, ws_initial_headers, %% Headers from first HTTP request for websocket client connected_at}). -type(proto_state() :: #proto_state{}). @@ -56,20 +58,20 @@ %% @doc Init protocol init(Peername, SendFun, Opts) -> - lists:foreach(fun(K) -> put(K, 0) end, ?STATS_KEYS), MaxLen = get_value(max_clientid_len, Opts, ?MAX_CLIENTID_LEN), WsInitialHeaders = get_value(ws_initial_headers, Opts), #proto_state{peername = Peername, sendfun = SendFun, max_clientid_len = MaxLen, client_pid = self(), + stats = #proto_stats{}, ws_initial_headers = WsInitialHeaders}. info(ProtoState) -> ?record_to_proplist(proto_state, ProtoState, ?INFO_KEYS). -stats(_ProtoState) -> - [{K, get(K)} || K <- ?STATS_KEYS]. +stats(#proto_state{stats = Stats}) -> + ?record_to_proplist(proto_stats, Stats). clientid(#proto_state{client_id = ClientId}) -> ClientId. @@ -106,8 +108,10 @@ session(#proto_state{session = Session}) -> %% A Client can only send the CONNECT Packet once over a Network Connection. -spec(received(mqtt_packet(), proto_state()) -> {ok, proto_state()} | {error, any()}). -received(Packet = ?PACKET(?CONNECT), State = #proto_state{connected = false}) -> - process(Packet, State#proto_state{connected = true}); +received(Packet = ?PACKET(?CONNECT), + State = #proto_state{connected = false, stats = Stats}) -> + trace(recv, Packet, State), Stats1 = inc_stats(recv, ?CONNECT, Stats), + process(Packet, State#proto_state{connected = true, stats = Stats1}); received(?PACKET(?CONNECT), State = #proto_state{connected = true}) -> {error, protocol_bad_connect, State}; @@ -116,11 +120,11 @@ received(?PACKET(?CONNECT), State = #proto_state{connected = true}) -> received(_Packet, State = #proto_state{connected = false}) -> {error, protocol_not_connected, State}; -received(Packet = ?PACKET(_Type), State) -> - trace(recv, Packet, State), +received(Packet = ?PACKET(Type), State = #proto_state{stats = Stats}) -> + trace(recv, Packet, State), Stats1 = inc_stats(recv, Type, Stats), case validate_packet(Packet) of ok -> - process(Packet, State); + process(Packet, State#proto_state{stats = Stats1}); {error, Reason} -> {error, Reason, State} end. @@ -151,7 +155,7 @@ unsubscribe(RawTopics, ProtoState = #proto_state{client_id = ClientId, %% @doc Send PUBREL pubrel(PacketId, State) -> send(?PUBREL_PACKET(PacketId), State). -process(Packet = ?CONNECT_PACKET(Var), State0) -> +process(?CONNECT_PACKET(Var), State0) -> #mqtt_packet_connect{proto_ver = ProtoVer, proto_name = ProtoName, @@ -170,8 +174,6 @@ process(Packet = ?CONNECT_PACKET(Var), State0) -> will_msg = willmsg(Var), connected_at = os:timestamp()}, - trace(recv, Packet, State1), - {ReturnCode1, SessPresent, State3} = case validate_connect(Var, State1) of ?CONNACK_ACCEPT -> @@ -312,22 +314,34 @@ send(Msg, State = #proto_state{client_id = ClientId, username = Username}) emqttd_hooks:run('message.delivered', [ClientId, Username], Msg), send(emqttd_message:to_packet(Msg), State); -send(Packet, State = #proto_state{sendfun = SendFun}) - when is_record(Packet, mqtt_packet) -> +send(Packet = ?PACKET(Type), + State = #proto_state{sendfun = SendFun, stats = Stats}) -> trace(send, Packet, State), emqttd_metrics:sent(Packet), SendFun(Packet), - {ok, State}. + Stats1 = inc_stats(send, Type, Stats), + {ok, State#proto_state{stats = Stats1}}. -trace(recv, Packet = ?PACKET(Type), ProtoState) -> - inc(recv_pkt), ?IF(Type =:= ?PUBLISH, inc(recv_msg), ok), +trace(recv, Packet, ProtoState) -> ?LOG(info, "RECV ~s", [emqttd_packet:format(Packet)], ProtoState); -trace(send, Packet = ?PACKET(Type), ProtoState) -> - inc(send_pkt), ?IF(Type =:= ?PUBLISH, inc(send_msg), ok), +trace(send, Packet, ProtoState) -> ?LOG(info, "SEND ~s", [emqttd_packet:format(Packet)], ProtoState). -inc(Key) -> put(Key, get(Key) + 1). +inc_stats(recv, Type, Stats) -> + #proto_stats{recv_pkt = Pkt, recv_msg = Msg} = Stats, + inc_stats(Type, #proto_stats.recv_pkt, Pkt, #proto_stats.recv_msg, Msg, Stats); + +inc_stats(send, Type, Stats) -> + #proto_stats{send_pkt = Pkt, send_msg = Msg} = Stats, + inc_stats(Type, #proto_stats.send_pkt, Pkt, #proto_stats.send_msg, Msg, Stats). + +inc_stats(Type, PktPos, PktCnt, MsgPos, MsgCnt, Stats) -> + Stats1 = setelement(PktPos, Stats, PktCnt + 1), + case Type =:= ?PUBLISH of + true -> setelement(MsgPos, Stats1, MsgCnt + 1); + false -> Stats1 + end. stop_if_auth_failure(RC, State) when RC == ?CONNACK_CREDENTIALS; RC == ?CONNACK_AUTH -> {stop, {shutdown, auth_failure}, State};