improve log and fix issue #353
This commit is contained in:
parent
86ea9c844f
commit
52d63125d3
|
@ -58,6 +58,13 @@
|
||||||
packet_opts,
|
packet_opts,
|
||||||
keepalive}).
|
keepalive}).
|
||||||
|
|
||||||
|
-define(DEBUG(Format, Args, State),
|
||||||
|
lager:debug("Client(~s): " ++ Format,
|
||||||
|
[emqttd_net:format(State#state.peername) | Args])).
|
||||||
|
-define(ERROR(Format, Args, State),
|
||||||
|
lager:error("Client(~s): " ++ Format,
|
||||||
|
[emqttd_net:format(State#state.peername) | Args])).
|
||||||
|
|
||||||
start_link(SockArgs, MqttEnv) ->
|
start_link(SockArgs, MqttEnv) ->
|
||||||
{ok, proc_lib:spawn_link(?MODULE, init, [[SockArgs, MqttEnv]])}.
|
{ok, proc_lib:spawn_link(?MODULE, init, [[SockArgs, MqttEnv]])}.
|
||||||
|
|
||||||
|
@ -81,7 +88,6 @@ init([SockArgs = {Transport, Sock, _SockFun}, MqttEnv]) ->
|
||||||
{ok, NewSock} = esockd_connection:accept(SockArgs),
|
{ok, NewSock} = esockd_connection:accept(SockArgs),
|
||||||
{ok, Peername} = emqttd_net:peername(Sock),
|
{ok, Peername} = emqttd_net:peername(Sock),
|
||||||
{ok, ConnStr} = emqttd_net:connection_string(Sock, inbound),
|
{ok, ConnStr} = emqttd_net:connection_string(Sock, inbound),
|
||||||
lager:info("Connect from ~s", [ConnStr]),
|
|
||||||
SendFun = fun(Data) -> Transport:send(NewSock, Data) end,
|
SendFun = fun(Data) -> Transport:send(NewSock, Data) end,
|
||||||
PktOpts = proplists:get_value(packet, MqttEnv),
|
PktOpts = proplists:get_value(packet, MqttEnv),
|
||||||
ProtoState = emqttd_protocol:init(Peername, SendFun, PktOpts),
|
ProtoState = emqttd_protocol:init(Peername, SendFun, PktOpts),
|
||||||
|
@ -109,8 +115,8 @@ handle_call(info, _From, State = #state{conn_name = ConnName,
|
||||||
handle_call(kick, _From, State) ->
|
handle_call(kick, _From, State) ->
|
||||||
{stop, {shutdown, kick}, ok, State};
|
{stop, {shutdown, kick}, ok, State};
|
||||||
|
|
||||||
handle_call(Req, _From, State = #state{peername = Peername}) ->
|
handle_call(Req, _From, State) ->
|
||||||
lager:error("Client(~s): unexpected request - ~p", [emqttd_net:format(Peername), Req]),
|
?ERROR("Unexpected request: ~p", [Req], State),
|
||||||
{reply, {error, unsupported_request}, State}.
|
{reply, {error, unsupported_request}, State}.
|
||||||
|
|
||||||
handle_cast({subscribe, TopicTable}, State) ->
|
handle_cast({subscribe, TopicTable}, State) ->
|
||||||
|
@ -119,8 +125,8 @@ handle_cast({subscribe, TopicTable}, State) ->
|
||||||
handle_cast({unsubscribe, Topics}, State) ->
|
handle_cast({unsubscribe, Topics}, State) ->
|
||||||
with_session(fun(SessPid) -> emqttd_session:unsubscribe(SessPid, Topics) end, State);
|
with_session(fun(SessPid) -> emqttd_session:unsubscribe(SessPid, Topics) end, State);
|
||||||
|
|
||||||
handle_cast(Msg, State = #state{peername = Peername}) ->
|
handle_cast(Msg, State) ->
|
||||||
lager:error("Client(~s): unexpected msg - ~p",[emqttd_net:format(Peername), Msg]),
|
?ERROR("Unexpected msg: ~p",[Msg], State),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info(timeout, State) ->
|
handle_info(timeout, State) ->
|
||||||
|
@ -151,44 +157,41 @@ handle_info({inet_async, Sock, _Ref, {ok, Data}}, State = #state{peername = Peer
|
||||||
handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) ->
|
handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) ->
|
||||||
network_error(Reason, State);
|
network_error(Reason, State);
|
||||||
|
|
||||||
handle_info({inet_reply, _Sock, {error, Reason}}, State = #state{peername = Peername}) ->
|
handle_info({inet_reply, _Sock, {error, Reason}}, State) ->
|
||||||
lager:error("Client(~s): unexpected inet_reply '~p'", [emqttd_net:format(Peername), Reason]),
|
?ERROR("Unexpected inet_reply - ~p", [Reason], State),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport, socket = Socket, peername = Peername}) ->
|
handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport, socket = Socket}) ->
|
||||||
lager:debug("Client(~s): Start KeepAlive with ~p seconds",
|
?DEBUG("Start KeepAlive with ~p seconds", [TimeoutSec], State),
|
||||||
[emqttd_net:format(Peername), TimeoutSec]),
|
|
||||||
StatFun = fun() ->
|
StatFun = fun() ->
|
||||||
case Transport:getstat(Socket, [recv_oct]) of
|
case Transport:getstat(Socket, [recv_oct]) of
|
||||||
{ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct};
|
{ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct};
|
||||||
{error, Error} -> {error, Error}
|
{error, Error} -> {error, Error}
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
KeepAlive = emqttd_keepalive:start(StatFun, TimeoutSec, {keepalive, check}),
|
KeepAlive = emqttd_keepalive:start(StatFun, TimeoutSec, {keepalive, check}),
|
||||||
noreply(State#state{keepalive = KeepAlive});
|
noreply(State#state{keepalive = KeepAlive});
|
||||||
|
|
||||||
handle_info({keepalive, check}, State = #state{peername = Peername, keepalive = KeepAlive}) ->
|
handle_info({keepalive, check}, State = #state{keepalive = KeepAlive}) ->
|
||||||
case emqttd_keepalive:check(KeepAlive) of
|
case emqttd_keepalive:check(KeepAlive) of
|
||||||
{ok, KeepAlive1} ->
|
{ok, KeepAlive1} ->
|
||||||
noreply(State#state{keepalive = KeepAlive1});
|
noreply(State#state{keepalive = KeepAlive1});
|
||||||
{error, timeout} ->
|
{error, timeout} ->
|
||||||
lager:debug("Client(~s): Keepalive Timeout!", [emqttd_net:format(Peername)]),
|
?DEBUG("Keepalive Timeout!", [], State),
|
||||||
stop({shutdown, keepalive_timeout}, State#state{keepalive = undefined});
|
stop({shutdown, keepalive_timeout}, State#state{keepalive = undefined});
|
||||||
{error, Error} ->
|
{error, Error} ->
|
||||||
lager:debug("Client(~s): Keepalive Error: ~p!", [emqttd_net:format(Peername), Error]),
|
?DEBUG("Keepalive Error - ~p", [Error], State),
|
||||||
stop({shutdown, keepalive_error}, State#state{keepalive = undefined})
|
stop({shutdown, keepalive_error}, State#state{keepalive = undefined})
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_info(Info, State = #state{peername = Peername}) ->
|
handle_info(Info, State) ->
|
||||||
lager:error("Client ~s: unexpected info ~p",[emqttd_net:format(Peername), Info]),
|
?ERROR("Unexpected info: ~p", [Info], State),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
terminate(Reason, #state{peername = Peername,
|
terminate(Reason, #state{transport = Transport,
|
||||||
transport = Transport,
|
|
||||||
socket = Socket,
|
socket = Socket,
|
||||||
keepalive = KeepAlive,
|
keepalive = KeepAlive,
|
||||||
proto_state = ProtoState}) ->
|
proto_state = ProtoState}) ->
|
||||||
lager:info("Client(~s) terminated, reason: ~p", [emqttd_net:format(Peername), Reason]),
|
|
||||||
emqttd_keepalive:cancel(KeepAlive),
|
emqttd_keepalive:cancel(KeepAlive),
|
||||||
if
|
if
|
||||||
Reason == {shutdown, conn_closed} -> ok;
|
Reason == {shutdown, conn_closed} -> ok;
|
||||||
|
@ -196,7 +199,7 @@ terminate(Reason, #state{peername = Peername,
|
||||||
end,
|
end,
|
||||||
case {ProtoState, Reason} of
|
case {ProtoState, Reason} of
|
||||||
{undefined, _} -> ok;
|
{undefined, _} -> ok;
|
||||||
{_, {shutdown, Error}} ->
|
{_, {shutdown, Error}} ->
|
||||||
emqttd_protocol:shutdown(Error, ProtoState);
|
emqttd_protocol:shutdown(Error, ProtoState);
|
||||||
{_, Reason} ->
|
{_, Reason} ->
|
||||||
emqttd_protocol:shutdown(Reason, ProtoState)
|
emqttd_protocol:shutdown(Reason, ProtoState)
|
||||||
|
@ -223,34 +226,36 @@ received(<<>>, State) ->
|
||||||
{noreply, State, hibernate};
|
{noreply, State, hibernate};
|
||||||
|
|
||||||
received(Bytes, State = #state{packet_opts = PacketOpts,
|
received(Bytes, State = #state{packet_opts = PacketOpts,
|
||||||
parser = Parser,
|
parser = Parser,
|
||||||
proto_state = ProtoState,
|
proto_state = ProtoState}) ->
|
||||||
conn_name = ConnStr}) ->
|
case catch Parser(Bytes) of
|
||||||
case Parser(Bytes) of
|
{more, NewParser} ->
|
||||||
{more, NewParser} ->
|
noreply(control_throttle(State#state{parser = NewParser}));
|
||||||
noreply(control_throttle(State#state{parser = NewParser}));
|
{ok, Packet, Rest} ->
|
||||||
{ok, Packet, Rest} ->
|
emqttd_metrics:received(Packet),
|
||||||
emqttd_metrics:received(Packet),
|
case emqttd_protocol:received(Packet, ProtoState) of
|
||||||
case emqttd_protocol:received(Packet, ProtoState) of
|
{ok, ProtoState1} ->
|
||||||
{ok, ProtoState1} ->
|
received(Rest, State#state{parser = emqttd_parser:new(PacketOpts),
|
||||||
received(Rest, State#state{parser = emqttd_parser:new(PacketOpts),
|
proto_state = ProtoState1});
|
||||||
proto_state = ProtoState1});
|
{error, Error} ->
|
||||||
|
?ERROR("Protocol error - ~p", [Error], State),
|
||||||
|
stop({shutdown, Error}, State);
|
||||||
|
{error, Error, ProtoState1} ->
|
||||||
|
stop({shutdown, Error}, State#state{proto_state = ProtoState1});
|
||||||
|
{stop, Reason, ProtoState1} ->
|
||||||
|
stop(Reason, State#state{proto_state = ProtoState1})
|
||||||
|
end;
|
||||||
{error, Error} ->
|
{error, Error} ->
|
||||||
lager:error("MQTT protocol error ~p for connection ~p~n", [Error, ConnStr]),
|
?ERROR("Framing error - ~p", [Error], State),
|
||||||
stop({shutdown, Error}, State);
|
stop({shutdown, Error}, State);
|
||||||
{error, Error, ProtoState1} ->
|
{'EXIT', Reason} ->
|
||||||
stop({shutdown, Error}, State#state{proto_state = ProtoState1});
|
?ERROR("Parser failed for ~p~nError Frame: ~p", [Reason, Bytes], State),
|
||||||
{stop, Reason, ProtoState1} ->
|
{stop, {shutdown, frame_error}, State}
|
||||||
stop(Reason, State#state{proto_state = ProtoState1})
|
|
||||||
end;
|
|
||||||
{error, Error} ->
|
|
||||||
lager:error("MQTT framing error ~p for connection ~p", [Error, ConnStr]),
|
|
||||||
stop({shutdown, Error}, State)
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
network_error(Reason, State = #state{peername = Peername}) ->
|
network_error(Reason, State = #state{peername = Peername}) ->
|
||||||
lager:warning("Client(~s): MQTT detected network error '~p'",
|
lager:warning("Client(~s): network error - ~p",
|
||||||
[emqttd_net:format(Peername), Reason]),
|
[emqttd_net:format(Peername), Reason]),
|
||||||
stop({shutdown, conn_closed}, State).
|
stop({shutdown, conn_closed}, State).
|
||||||
|
|
||||||
run_socket(State = #state{conn_state = blocked}) ->
|
run_socket(State = #state{conn_state = blocked}) ->
|
||||||
|
|
Loading…
Reference in New Issue