Improve the emqx_connection module

Rename 'publish_limit' to 'pub_limit'
'try ... of ... catch' to replace 'case catch'
This commit is contained in:
Feng Lee 2018-12-20 14:17:16 +08:00 committed by Feng Lee
parent d827604213
commit 6e1b47f1f9
1 changed files with 38 additions and 37 deletions

View File

@ -16,15 +16,12 @@
-behaviour(gen_server). -behaviour(gen_server).
-define(LOG_HEADER, "[TCP]").
-include("emqx.hrl"). -include("emqx.hrl").
-include("emqx_mqtt.hrl"). -include("emqx_mqtt.hrl").
-include("logger.hrl"). -include("logger.hrl").
-export([start_link/3]). -export([start_link/3]).
-export([info/1, attrs/1]). -export([info/1, attrs/1, stats/1]).
-export([stats/1]).
-export([kick/1]). -export([kick/1]).
-export([session/1]). -export([session/1]).
@ -46,11 +43,12 @@
stats_timer, stats_timer,
incoming, incoming,
rate_limit, rate_limit,
publish_limit, pub_limit,
limit_timer, limit_timer,
idle_timeout idle_timeout
}). }).
-define(LOG_HEADER, "[TCP]").
-define(DEFAULT_ACTIVE_N, 100). -define(DEFAULT_ACTIVE_N, 100).
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).
@ -65,22 +63,22 @@ start_link(Transport, Socket, Options) ->
info(CPid) when is_pid(CPid) -> info(CPid) when is_pid(CPid) ->
call(CPid, info); call(CPid, info);
info(#state{transport = Transport, info(#state{transport = Transport,
socket = Socket, socket = Socket,
peername = Peername, peername = Peername,
sockname = Sockname, sockname = Sockname,
conn_state = ConnState, conn_state = ConnState,
active_n = ActiveN, active_n = ActiveN,
rate_limit = RateLimit, rate_limit = RateLimit,
publish_limit = PubLimit, pub_limit = PubLimit,
proto_state = ProtoState}) -> proto_state = ProtoState}) ->
ConnInfo = [{socktype, Transport:type(Socket)}, ConnInfo = [{socktype, Transport:type(Socket)},
{peername, Peername}, {peername, Peername},
{sockname, Sockname}, {sockname, Sockname},
{conn_state, ConnState}, {conn_state, ConnState},
{active_n, ActiveN}, {active_n, ActiveN},
{rate_limit, esockd_rate_limit:info(RateLimit)}, {rate_limit, esockd_rate_limit:info(RateLimit)},
{publish_limit, esockd_rate_limit:info(PubLimit)}], {pub_limit, esockd_rate_limit:info(PubLimit)}],
ProtoInfo = emqx_protocol:info(ProtoState), ProtoInfo = emqx_protocol:info(ProtoState),
lists:usort(lists:append(ConnInfo, ProtoInfo)). lists:usort(lists:append(ConnInfo, ProtoInfo)).
@ -139,22 +137,21 @@ init([Transport, RawSocket, Options]) ->
peercert => Peercert, peercert => Peercert,
sendfun => SendFun}, Options), sendfun => SendFun}, Options),
ParserState = emqx_protocol:parser(ProtoState), ParserState = emqx_protocol:parser(ProtoState),
State = run_socket(#state{transport = Transport, State = run_socket(#state{transport = Transport,
socket = Socket, socket = Socket,
peername = Peername, peername = Peername,
conn_state = running, conn_state = running,
active_n = ActiveN, active_n = ActiveN,
rate_limit = RateLimit, rate_limit = RateLimit,
publish_limit = PubLimit, pub_limit = PubLimit,
proto_state = ProtoState, proto_state = ProtoState,
parser_state = ParserState, parser_state = ParserState,
enable_stats = EnableStats, enable_stats = EnableStats,
idle_timeout = IdleTimout idle_timeout = IdleTimout
}), }),
GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false), GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false),
ok = emqx_gc:init(GcPolicy), ok = emqx_gc:init(GcPolicy),
ok = emqx_misc:init_proc_mng_policy(Zone), ok = emqx_misc:init_proc_mng_policy(Zone),
emqx_logger:set_metadata_peername(esockd_net:format(Peername)), emqx_logger:set_metadata_peername(esockd_net:format(Peername)),
gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}],
State, self(), IdleTimout); State, self(), IdleTimout);
@ -213,6 +210,7 @@ handle_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) ->
{error, Reason} -> {error, Reason} ->
shutdown(Reason, State) shutdown(Reason, State)
end; end;
handle_info({timeout, Timer, emit_stats}, handle_info({timeout, Timer, emit_stats},
State = #state{stats_timer = Timer, State = #state{stats_timer = Timer,
proto_state = ProtoState proto_state = ProtoState
@ -231,6 +229,7 @@ handle_info({timeout, Timer, emit_stats},
?LOG(warning, "shutdown due to ~p", [Reason]), ?LOG(warning, "shutdown due to ~p", [Reason]),
shutdown(Reason, NewState) shutdown(Reason, NewState)
end; end;
handle_info(timeout, State) -> handle_info(timeout, State) ->
shutdown(idle_timeout, State); shutdown(idle_timeout, State);
@ -331,9 +330,9 @@ handle_packet(<<>>, State) ->
handle_packet(Data, State = #state{proto_state = ProtoState, handle_packet(Data, State = #state{proto_state = ProtoState,
parser_state = ParserState, parser_state = ParserState,
idle_timeout = IdleTimeout}) -> idle_timeout = IdleTimeout}) ->
case catch emqx_frame:parse(Data, ParserState) of try emqx_frame:parse(Data, ParserState) of
{more, NewParserState} -> {more, ParserState1} ->
{noreply, State#state{parser_state = NewParserState}, IdleTimeout}; {noreply, State#state{parser_state = ParserState1}, IdleTimeout};
{ok, Packet = ?PACKET(Type), Rest} -> {ok, Packet = ?PACKET(Type), Rest} ->
emqx_metrics:received(Packet), emqx_metrics:received(Packet),
case emqx_protocol:received(Packet, ProtoState) of case emqx_protocol:received(Packet, ProtoState) of
@ -348,11 +347,12 @@ handle_packet(Data, State = #state{proto_state = ProtoState,
{stop, Error, ProtoState1} -> {stop, Error, ProtoState1} ->
stop(Error, State#state{proto_state = ProtoState1}) stop(Error, State#state{proto_state = ProtoState1})
end; end;
{error, Error} -> {error, Reason} ->
?LOG(error, "Framing error - ~p", [Error]), ?LOG(error, "Parse frame error - ~p", [Reason]),
shutdown(Error, State); shutdown(Reason, State)
{'EXIT', Reason} -> catch
?LOG(error, "Parse failed for ~p~nError data:~p", [Reason, Data]), _:Error ->
?LOG(error, "Parse failed for ~p~nError data:~p", [Error, Data]),
shutdown(parse_error, State) shutdown(parse_error, State)
end. end.
@ -370,9 +370,9 @@ inc_publish_cnt(_Type, State) ->
%% Ensure rate limit %% Ensure rate limit
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
ensure_rate_limit(State = #state{rate_limit = Rl, publish_limit = Pl, ensure_rate_limit(State = #state{rate_limit = Rl, pub_limit = Pl,
incoming = #{packets := Packets, bytes := Bytes}}) -> incoming = #{packets := Packets, bytes := Bytes}}) ->
ensure_rate_limit([{Pl, #state.publish_limit, Packets}, ensure_rate_limit([{Pl, #state.pub_limit, Packets},
{Rl, #state.rate_limit, Bytes}], State). {Rl, #state.rate_limit, Bytes}], State).
ensure_rate_limit([], State) -> ensure_rate_limit([], State) ->
@ -421,3 +421,4 @@ maybe_gc(#state{}, {publish, _PacketId, #message{payload = Payload}}) ->
ok = emqx_gc:inc(1, Oct); ok = emqx_gc:inc(1, Oct);
maybe_gc(_, _) -> maybe_gc(_, _) ->
ok. ok.