From 86ea9c844fd1d624ba2a039b816479c76c02a5d4 Mon Sep 17 00:00:00 2001 From: Feng Date: Wed, 21 Oct 2015 21:18:36 +0800 Subject: [PATCH 1/2] fix issue #350 --- src/emqttd_sysmon.erl | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/emqttd_sysmon.erl b/src/emqttd_sysmon.erl index 1031ee478..8ba4ff569 100644 --- a/src/emqttd_sysmon.erl +++ b/src/emqttd_sysmon.erl @@ -150,7 +150,11 @@ suppress(Key, SuccFun, State = #state{events = Events}) -> end. procinfo(Pid) -> - emqttd_vm:get_process_info(Pid) ++ emqttd_vm:get_process_gc(Pid). + case {emqttd_vm:get_process_info(Pid), emqttd_vm:get_process_gc(Pid)} of + {undefined, _} -> undefined; + {_, undefined} -> undefined; + {Info, GcInfo} -> Info ++ GcInfo + end. publish(Sysmon, WarnMsg) -> Msg = emqttd_message:make(sysmon, topic(Sysmon), iolist_to_binary(WarnMsg)), From 52d63125d343fb985a14112d11e8114f7353ead8 Mon Sep 17 00:00:00 2001 From: Feng Date: Wed, 21 Oct 2015 21:36:56 +0800 Subject: [PATCH 2/2] improve log and fix issue #353 --- src/emqttd_client.erl | 91 +++++++++++++++++++++++-------------------- 1 file changed, 48 insertions(+), 43 deletions(-) diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index ddddccbda..de7c10766 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -58,6 +58,13 @@ packet_opts, keepalive}). +-define(DEBUG(Format, Args, State), + lager:debug("Client(~s): " ++ Format, + [emqttd_net:format(State#state.peername) | Args])). +-define(ERROR(Format, Args, State), + lager:error("Client(~s): " ++ Format, + [emqttd_net:format(State#state.peername) | Args])). + start_link(SockArgs, MqttEnv) -> {ok, proc_lib:spawn_link(?MODULE, init, [[SockArgs, MqttEnv]])}. @@ -81,7 +88,6 @@ init([SockArgs = {Transport, Sock, _SockFun}, MqttEnv]) -> {ok, NewSock} = esockd_connection:accept(SockArgs), {ok, Peername} = emqttd_net:peername(Sock), {ok, ConnStr} = emqttd_net:connection_string(Sock, inbound), - lager:info("Connect from ~s", [ConnStr]), SendFun = fun(Data) -> Transport:send(NewSock, Data) end, PktOpts = proplists:get_value(packet, MqttEnv), ProtoState = emqttd_protocol:init(Peername, SendFun, PktOpts), @@ -109,8 +115,8 @@ handle_call(info, _From, State = #state{conn_name = ConnName, handle_call(kick, _From, State) -> {stop, {shutdown, kick}, ok, State}; -handle_call(Req, _From, State = #state{peername = Peername}) -> - lager:error("Client(~s): unexpected request - ~p", [emqttd_net:format(Peername), Req]), +handle_call(Req, _From, State) -> + ?ERROR("Unexpected request: ~p", [Req], State), {reply, {error, unsupported_request}, State}. handle_cast({subscribe, TopicTable}, State) -> @@ -119,8 +125,8 @@ handle_cast({subscribe, TopicTable}, State) -> handle_cast({unsubscribe, Topics}, State) -> with_session(fun(SessPid) -> emqttd_session:unsubscribe(SessPid, Topics) end, State); -handle_cast(Msg, State = #state{peername = Peername}) -> - lager:error("Client(~s): unexpected msg - ~p",[emqttd_net:format(Peername), Msg]), +handle_cast(Msg, State) -> + ?ERROR("Unexpected msg: ~p",[Msg], State), {noreply, State}. handle_info(timeout, State) -> @@ -151,44 +157,41 @@ handle_info({inet_async, Sock, _Ref, {ok, Data}}, State = #state{peername = Peer handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) -> network_error(Reason, State); -handle_info({inet_reply, _Sock, {error, Reason}}, State = #state{peername = Peername}) -> - lager:error("Client(~s): unexpected inet_reply '~p'", [emqttd_net:format(Peername), Reason]), +handle_info({inet_reply, _Sock, {error, Reason}}, State) -> + ?ERROR("Unexpected inet_reply - ~p", [Reason], State), {noreply, State}; -handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport, socket = Socket, peername = Peername}) -> - lager:debug("Client(~s): Start KeepAlive with ~p seconds", - [emqttd_net:format(Peername), TimeoutSec]), +handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport, socket = Socket}) -> + ?DEBUG("Start KeepAlive with ~p seconds", [TimeoutSec], State), StatFun = fun() -> case Transport:getstat(Socket, [recv_oct]) of {ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct}; - {error, Error} -> {error, Error} + {error, Error} -> {error, Error} end end, KeepAlive = emqttd_keepalive:start(StatFun, TimeoutSec, {keepalive, check}), noreply(State#state{keepalive = KeepAlive}); -handle_info({keepalive, check}, State = #state{peername = Peername, keepalive = KeepAlive}) -> +handle_info({keepalive, check}, State = #state{keepalive = KeepAlive}) -> case emqttd_keepalive:check(KeepAlive) of {ok, KeepAlive1} -> noreply(State#state{keepalive = KeepAlive1}); {error, timeout} -> - lager:debug("Client(~s): Keepalive Timeout!", [emqttd_net:format(Peername)]), + ?DEBUG("Keepalive Timeout!", [], State), stop({shutdown, keepalive_timeout}, State#state{keepalive = undefined}); {error, Error} -> - lager:debug("Client(~s): Keepalive Error: ~p!", [emqttd_net:format(Peername), Error]), + ?DEBUG("Keepalive Error - ~p", [Error], State), stop({shutdown, keepalive_error}, State#state{keepalive = undefined}) end; -handle_info(Info, State = #state{peername = Peername}) -> - lager:error("Client ~s: unexpected info ~p",[emqttd_net:format(Peername), Info]), +handle_info(Info, State) -> + ?ERROR("Unexpected info: ~p", [Info], State), {noreply, State}. -terminate(Reason, #state{peername = Peername, - transport = Transport, +terminate(Reason, #state{transport = Transport, socket = Socket, keepalive = KeepAlive, proto_state = ProtoState}) -> - lager:info("Client(~s) terminated, reason: ~p", [emqttd_net:format(Peername), Reason]), emqttd_keepalive:cancel(KeepAlive), if Reason == {shutdown, conn_closed} -> ok; @@ -196,7 +199,7 @@ terminate(Reason, #state{peername = Peername, end, case {ProtoState, Reason} of {undefined, _} -> ok; - {_, {shutdown, Error}} -> + {_, {shutdown, Error}} -> emqttd_protocol:shutdown(Error, ProtoState); {_, Reason} -> emqttd_protocol:shutdown(Reason, ProtoState) @@ -223,34 +226,36 @@ received(<<>>, State) -> {noreply, State, hibernate}; received(Bytes, State = #state{packet_opts = PacketOpts, - parser = Parser, - proto_state = ProtoState, - conn_name = ConnStr}) -> - case Parser(Bytes) of - {more, NewParser} -> - noreply(control_throttle(State#state{parser = NewParser})); - {ok, Packet, Rest} -> - emqttd_metrics:received(Packet), - case emqttd_protocol:received(Packet, ProtoState) of - {ok, ProtoState1} -> - received(Rest, State#state{parser = emqttd_parser:new(PacketOpts), - proto_state = ProtoState1}); + parser = Parser, + proto_state = ProtoState}) -> + case catch Parser(Bytes) of + {more, NewParser} -> + noreply(control_throttle(State#state{parser = NewParser})); + {ok, Packet, Rest} -> + emqttd_metrics:received(Packet), + case emqttd_protocol:received(Packet, ProtoState) of + {ok, ProtoState1} -> + received(Rest, State#state{parser = emqttd_parser:new(PacketOpts), + proto_state = ProtoState1}); + {error, Error} -> + ?ERROR("Protocol error - ~p", [Error], State), + stop({shutdown, Error}, State); + {error, Error, ProtoState1} -> + stop({shutdown, Error}, State#state{proto_state = ProtoState1}); + {stop, Reason, ProtoState1} -> + stop(Reason, State#state{proto_state = ProtoState1}) + end; {error, Error} -> - lager:error("MQTT protocol error ~p for connection ~p~n", [Error, ConnStr]), + ?ERROR("Framing error - ~p", [Error], State), stop({shutdown, Error}, State); - {error, Error, ProtoState1} -> - stop({shutdown, Error}, State#state{proto_state = ProtoState1}); - {stop, Reason, ProtoState1} -> - stop(Reason, State#state{proto_state = ProtoState1}) - end; - {error, Error} -> - lager:error("MQTT framing error ~p for connection ~p", [Error, ConnStr]), - stop({shutdown, Error}, State) + {'EXIT', Reason} -> + ?ERROR("Parser failed for ~p~nError Frame: ~p", [Reason, Bytes], State), + {stop, {shutdown, frame_error}, State} end. network_error(Reason, State = #state{peername = Peername}) -> - lager:warning("Client(~s): MQTT detected network error '~p'", - [emqttd_net:format(Peername), Reason]), + lager:warning("Client(~s): network error - ~p", + [emqttd_net:format(Peername), Reason]), stop({shutdown, conn_closed}, State). run_socket(State = #state{conn_state = blocked}) ->