refactor metrics
This commit is contained in:
parent
6a66ca90b1
commit
ebf203a931
|
@ -110,7 +110,7 @@ handle_call(kick, _From, State) ->
|
|||
{stop, {shutdown, kick}, ok, State};
|
||||
|
||||
handle_call(Req, _From, State = #state{peername = Peername}) ->
|
||||
lager:error("Client ~s: unexpected request - ~p", [emqttd_net:format(Peername), Req]),
|
||||
lager:error("Client(~s): unexpected request - ~p", [emqttd_net:format(Peername), Req]),
|
||||
{reply, {error, unsupported_request}, State}.
|
||||
|
||||
handle_cast({subscribe, TopicTable}, State) ->
|
||||
|
@ -120,7 +120,7 @@ handle_cast({unsubscribe, Topics}, State) ->
|
|||
with_session(fun(SessPid) -> emqttd_session:unsubscribe(SessPid, Topics) end, State);
|
||||
|
||||
handle_cast(Msg, State = #state{peername = Peername}) ->
|
||||
lager:error("Client ~s: unexpected msg - ~p",[emqttd_net:format(Peername), Msg]),
|
||||
lager:error("Client(~s): unexpected msg - ~p",[emqttd_net:format(Peername), Msg]),
|
||||
{noreply, State}.
|
||||
|
||||
handle_info(timeout, State) ->
|
||||
|
@ -152,11 +152,12 @@ handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) ->
|
|||
network_error(Reason, State);
|
||||
|
||||
handle_info({inet_reply, _Sock, {error, Reason}}, State = #state{peername = Peername}) ->
|
||||
lager:error("Client ~s: unexpected inet_reply '~p'", [emqttd_net:format(Peername), Reason]),
|
||||
lager:error("Client(~s): unexpected inet_reply '~p'", [emqttd_net:format(Peername), Reason]),
|
||||
{noreply, State};
|
||||
|
||||
handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport, socket = Socket, peername = Peername}) ->
|
||||
lager:debug("Client ~s: Start KeepAlive with ~p seconds", [emqttd_net:format(Peername), TimeoutSec]),
|
||||
lager:debug("Client(~s): Start KeepAlive with ~p seconds",
|
||||
[emqttd_net:format(Peername), TimeoutSec]),
|
||||
StatFun = fun() ->
|
||||
case Transport:getstat(Socket, [recv_oct]) of
|
||||
{ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct};
|
||||
|
@ -169,13 +170,12 @@ handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport
|
|||
handle_info({keepalive, check}, State = #state{peername = Peername, keepalive = KeepAlive}) ->
|
||||
case emqttd_keepalive:check(KeepAlive) of
|
||||
{ok, KeepAlive1} ->
|
||||
lager:debug("Client ~s: Keepalive Resumed", [emqttd_net:format(Peername)]),
|
||||
noreply(State#state{keepalive = KeepAlive1});
|
||||
{error, timeout} ->
|
||||
lager:debug("Client ~s: Keepalive Timeout!", [emqttd_net:format(Peername)]),
|
||||
lager:debug("Client(~s): Keepalive Timeout!", [emqttd_net:format(Peername)]),
|
||||
stop({shutdown, keepalive_timeout}, State#state{keepalive = undefined});
|
||||
{error, Error} ->
|
||||
lager:debug("Client ~s: Keepalive Error: ~p!", [emqttd_net:format(Peername), Error]),
|
||||
lager:debug("Client(~s): Keepalive Error: ~p!", [emqttd_net:format(Peername), Error]),
|
||||
stop({shutdown, keepalive_error}, State#state{keepalive = undefined})
|
||||
end;
|
||||
|
||||
|
@ -228,9 +228,9 @@ received(Bytes, State = #state{packet_opts = PacketOpts,
|
|||
conn_name = ConnStr}) ->
|
||||
case Parser(Bytes) of
|
||||
{more, NewParser} ->
|
||||
{noreply, control_throttle(State #state{parser = NewParser}), hibernate};
|
||||
noreply(control_throttle(State#state{parser = NewParser}));
|
||||
{ok, Packet, Rest} ->
|
||||
received_stats(Packet),
|
||||
emqttd_metrics:received(Packet),
|
||||
case emqttd_protocol:received(Packet, ProtoState) of
|
||||
{ok, ProtoState1} ->
|
||||
received(Rest, State#state{parser = emqttd_parser:new(PacketOpts),
|
||||
|
@ -244,12 +244,12 @@ received(Bytes, State = #state{packet_opts = PacketOpts,
|
|||
stop(Reason, State#state{proto_state = ProtoState1})
|
||||
end;
|
||||
{error, Error} ->
|
||||
lager:error("MQTT detected framing error ~p for connection ~p", [Error, ConnStr]),
|
||||
lager:error("MQTT framing error ~p for connection ~p", [Error, ConnStr]),
|
||||
stop({shutdown, Error}, State)
|
||||
end.
|
||||
|
||||
network_error(Reason, State = #state{peername = Peername}) ->
|
||||
lager:warning("Client ~s: MQTT detected network error '~p'",
|
||||
lager:warning("Client(~s): MQTT detected network error '~p'",
|
||||
[emqttd_net:format(Peername), Reason]),
|
||||
stop({shutdown, conn_closed}, State).
|
||||
|
||||
|
@ -269,21 +269,3 @@ control_throttle(State = #state{conn_state = Flow,
|
|||
{_, _} -> run_socket(State)
|
||||
end.
|
||||
|
||||
received_stats(?PACKET(Type)) ->
|
||||
emqttd_metrics:inc('packets/received'), inc(Type).
|
||||
inc(?CONNECT) ->
|
||||
emqttd_metrics:inc('packets/connect');
|
||||
inc(?PUBLISH) ->
|
||||
emqttd_metrics:inc('messages/received'),
|
||||
emqttd_metrics:inc('packets/publish/received');
|
||||
inc(?SUBSCRIBE) ->
|
||||
emqttd_metrics:inc('packets/subscribe');
|
||||
inc(?UNSUBSCRIBE) ->
|
||||
emqttd_metrics:inc('packets/unsubscribe');
|
||||
inc(?PINGREQ) ->
|
||||
emqttd_metrics:inc('packets/pingreq');
|
||||
inc(?DISCONNECT) ->
|
||||
emqttd_metrics:inc('packets/disconnect');
|
||||
inc(_) ->
|
||||
ignore.
|
||||
|
||||
|
|
Loading…
Reference in New Issue