diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index e1d4011fc..2f596068c 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -16,15 +16,12 @@ -behaviour(gen_server). --define(LOG_HEADER, "[TCP]"). - -include("emqx.hrl"). -include("emqx_mqtt.hrl"). -include("logger.hrl"). -export([start_link/3]). --export([info/1, attrs/1]). --export([stats/1]). +-export([info/1, attrs/1, stats/1]). -export([kick/1]). -export([session/1]). @@ -46,11 +43,12 @@ stats_timer, incoming, rate_limit, - publish_limit, + pub_limit, limit_timer, idle_timeout }). +-define(LOG_HEADER, "[TCP]"). -define(DEFAULT_ACTIVE_N, 100). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). @@ -65,22 +63,22 @@ start_link(Transport, Socket, Options) -> info(CPid) when is_pid(CPid) -> call(CPid, info); -info(#state{transport = Transport, - socket = Socket, - peername = Peername, - sockname = Sockname, - conn_state = ConnState, - active_n = ActiveN, - rate_limit = RateLimit, - publish_limit = PubLimit, - proto_state = ProtoState}) -> +info(#state{transport = Transport, + socket = Socket, + peername = Peername, + sockname = Sockname, + conn_state = ConnState, + active_n = ActiveN, + rate_limit = RateLimit, + pub_limit = PubLimit, + proto_state = ProtoState}) -> ConnInfo = [{socktype, Transport:type(Socket)}, {peername, Peername}, {sockname, Sockname}, {conn_state, ConnState}, {active_n, ActiveN}, {rate_limit, esockd_rate_limit:info(RateLimit)}, - {publish_limit, esockd_rate_limit:info(PubLimit)}], + {pub_limit, esockd_rate_limit:info(PubLimit)}], ProtoInfo = emqx_protocol:info(ProtoState), lists:usort(lists:append(ConnInfo, ProtoInfo)). @@ -139,22 +137,21 @@ init([Transport, RawSocket, Options]) -> peercert => Peercert, sendfun => SendFun}, Options), ParserState = emqx_protocol:parser(ProtoState), - State = run_socket(#state{transport = Transport, - socket = Socket, - peername = Peername, - conn_state = running, - active_n = ActiveN, - rate_limit = RateLimit, - publish_limit = PubLimit, - proto_state = ProtoState, - parser_state = ParserState, - enable_stats = EnableStats, - idle_timeout = IdleTimout + State = run_socket(#state{transport = Transport, + socket = Socket, + peername = Peername, + conn_state = running, + active_n = ActiveN, + rate_limit = RateLimit, + pub_limit = PubLimit, + proto_state = ProtoState, + parser_state = ParserState, + enable_stats = EnableStats, + idle_timeout = IdleTimout }), GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false), ok = emqx_gc:init(GcPolicy), ok = emqx_misc:init_proc_mng_policy(Zone), - emqx_logger:set_metadata_peername(esockd_net:format(Peername)), gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], State, self(), IdleTimout); @@ -213,6 +210,7 @@ handle_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) -> {error, Reason} -> shutdown(Reason, State) end; + handle_info({timeout, Timer, emit_stats}, State = #state{stats_timer = Timer, proto_state = ProtoState @@ -231,6 +229,7 @@ handle_info({timeout, Timer, emit_stats}, ?LOG(warning, "shutdown due to ~p", [Reason]), shutdown(Reason, NewState) end; + handle_info(timeout, State) -> shutdown(idle_timeout, State); @@ -331,9 +330,9 @@ handle_packet(<<>>, State) -> handle_packet(Data, State = #state{proto_state = ProtoState, parser_state = ParserState, idle_timeout = IdleTimeout}) -> - case catch emqx_frame:parse(Data, ParserState) of - {more, NewParserState} -> - {noreply, State#state{parser_state = NewParserState}, IdleTimeout}; + try emqx_frame:parse(Data, ParserState) of + {more, ParserState1} -> + {noreply, State#state{parser_state = ParserState1}, IdleTimeout}; {ok, Packet = ?PACKET(Type), Rest} -> emqx_metrics:received(Packet), case emqx_protocol:received(Packet, ProtoState) of @@ -348,11 +347,12 @@ handle_packet(Data, State = #state{proto_state = ProtoState, {stop, Error, ProtoState1} -> stop(Error, State#state{proto_state = ProtoState1}) end; - {error, Error} -> - ?LOG(error, "Framing error - ~p", [Error]), - shutdown(Error, State); - {'EXIT', Reason} -> - ?LOG(error, "Parse failed for ~p~nError data:~p", [Reason, Data]), + {error, Reason} -> + ?LOG(error, "Parse frame error - ~p", [Reason]), + shutdown(Reason, State) + catch + _:Error -> + ?LOG(error, "Parse failed for ~p~nError data:~p", [Error, Data]), shutdown(parse_error, State) end. @@ -370,9 +370,9 @@ inc_publish_cnt(_Type, State) -> %% Ensure rate limit %%------------------------------------------------------------------------------ -ensure_rate_limit(State = #state{rate_limit = Rl, publish_limit = Pl, +ensure_rate_limit(State = #state{rate_limit = Rl, pub_limit = Pl, incoming = #{packets := Packets, bytes := Bytes}}) -> - ensure_rate_limit([{Pl, #state.publish_limit, Packets}, + ensure_rate_limit([{Pl, #state.pub_limit, Packets}, {Rl, #state.rate_limit, Bytes}], State). ensure_rate_limit([], State) -> @@ -421,3 +421,4 @@ maybe_gc(#state{}, {publish, _PacketId, #message{payload = Payload}}) -> ok = emqx_gc:inc(1, Oct); maybe_gc(_, _) -> ok. +