diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index 5acb9e5d3..ddddccbda 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -110,7 +110,7 @@ handle_call(kick, _From, State) -> {stop, {shutdown, kick}, ok, State}; handle_call(Req, _From, State = #state{peername = Peername}) -> - lager:error("Client ~s: unexpected request - ~p", [emqttd_net:format(Peername), Req]), + lager:error("Client(~s): unexpected request - ~p", [emqttd_net:format(Peername), Req]), {reply, {error, unsupported_request}, State}. handle_cast({subscribe, TopicTable}, State) -> @@ -120,7 +120,7 @@ handle_cast({unsubscribe, Topics}, State) -> with_session(fun(SessPid) -> emqttd_session:unsubscribe(SessPid, Topics) end, State); handle_cast(Msg, State = #state{peername = Peername}) -> - lager:error("Client ~s: unexpected msg - ~p",[emqttd_net:format(Peername), Msg]), + lager:error("Client(~s): unexpected msg - ~p",[emqttd_net:format(Peername), Msg]), {noreply, State}. handle_info(timeout, State) -> @@ -152,11 +152,12 @@ handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) -> network_error(Reason, State); handle_info({inet_reply, _Sock, {error, Reason}}, State = #state{peername = Peername}) -> - lager:error("Client ~s: unexpected inet_reply '~p'", [emqttd_net:format(Peername), Reason]), + lager:error("Client(~s): unexpected inet_reply '~p'", [emqttd_net:format(Peername), Reason]), {noreply, State}; handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport, socket = Socket, peername = Peername}) -> - lager:debug("Client ~s: Start KeepAlive with ~p seconds", [emqttd_net:format(Peername), TimeoutSec]), + lager:debug("Client(~s): Start KeepAlive with ~p seconds", + [emqttd_net:format(Peername), TimeoutSec]), StatFun = fun() -> case Transport:getstat(Socket, [recv_oct]) of {ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct}; @@ -169,13 +170,12 @@ handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport handle_info({keepalive, check}, State = #state{peername = Peername, keepalive = KeepAlive}) -> case emqttd_keepalive:check(KeepAlive) of {ok, KeepAlive1} -> - lager:debug("Client ~s: Keepalive Resumed", [emqttd_net:format(Peername)]), noreply(State#state{keepalive = KeepAlive1}); {error, timeout} -> - lager:debug("Client ~s: Keepalive Timeout!", [emqttd_net:format(Peername)]), + lager:debug("Client(~s): Keepalive Timeout!", [emqttd_net:format(Peername)]), stop({shutdown, keepalive_timeout}, State#state{keepalive = undefined}); {error, Error} -> - lager:debug("Client ~s: Keepalive Error: ~p!", [emqttd_net:format(Peername), Error]), + lager:debug("Client(~s): Keepalive Error: ~p!", [emqttd_net:format(Peername), Error]), stop({shutdown, keepalive_error}, State#state{keepalive = undefined}) end; @@ -183,10 +183,10 @@ handle_info(Info, State = #state{peername = Peername}) -> lager:error("Client ~s: unexpected info ~p",[emqttd_net:format(Peername), Info]), {noreply, State}. -terminate(Reason, #state{peername = Peername, - transport = Transport, - socket = Socket, - keepalive = KeepAlive, +terminate(Reason, #state{peername = Peername, + transport = Transport, + socket = Socket, + keepalive = KeepAlive, proto_state = ProtoState}) -> lager:info("Client(~s) terminated, reason: ~p", [emqttd_net:format(Peername), Reason]), emqttd_keepalive:cancel(KeepAlive), @@ -228,9 +228,9 @@ received(Bytes, State = #state{packet_opts = PacketOpts, conn_name = ConnStr}) -> case Parser(Bytes) of {more, NewParser} -> - {noreply, control_throttle(State #state{parser = NewParser}), hibernate}; + noreply(control_throttle(State#state{parser = NewParser})); {ok, Packet, Rest} -> - received_stats(Packet), + emqttd_metrics:received(Packet), case emqttd_protocol:received(Packet, ProtoState) of {ok, ProtoState1} -> received(Rest, State#state{parser = emqttd_parser:new(PacketOpts), @@ -244,12 +244,12 @@ received(Bytes, State = #state{packet_opts = PacketOpts, stop(Reason, State#state{proto_state = ProtoState1}) end; {error, Error} -> - lager:error("MQTT detected framing error ~p for connection ~p", [Error, ConnStr]), + lager:error("MQTT framing error ~p for connection ~p", [Error, ConnStr]), stop({shutdown, Error}, State) end. network_error(Reason, State = #state{peername = Peername}) -> - lager:warning("Client ~s: MQTT detected network error '~p'", + lager:warning("Client(~s): MQTT detected network error '~p'", [emqttd_net:format(Peername), Reason]), stop({shutdown, conn_closed}, State). @@ -269,21 +269,3 @@ control_throttle(State = #state{conn_state = Flow, {_, _} -> run_socket(State) end. -received_stats(?PACKET(Type)) -> - emqttd_metrics:inc('packets/received'), inc(Type). -inc(?CONNECT) -> - emqttd_metrics:inc('packets/connect'); -inc(?PUBLISH) -> - emqttd_metrics:inc('messages/received'), - emqttd_metrics:inc('packets/publish/received'); -inc(?SUBSCRIBE) -> - emqttd_metrics:inc('packets/subscribe'); -inc(?UNSUBSCRIBE) -> - emqttd_metrics:inc('packets/unsubscribe'); -inc(?PINGREQ) -> - emqttd_metrics:inc('packets/pingreq'); -inc(?DISCONNECT) -> - emqttd_metrics:inc('packets/disconnect'); -inc(_) -> - ignore. -