Set {active, N} for ssl connection (#2531)

This commit is contained in:
JianBo He 2019-05-13 10:18:01 +08:00 committed by tigercl
parent 311f1e6df1
commit 97cca1a5ba
3 changed files with 30 additions and 12 deletions

View File

@ -1,7 +1,7 @@
language: erlang
otp_release:
- 21.2
- 21.3
before_install:
- git clone https://github.com/erlang/rebar3.git; cd rebar3; ./bootstrap; sudo mv rebar3 /usr/local/bin/; cd ..

View File

@ -863,8 +863,11 @@ listener.tcp.external.zone = external
## Rate limit for the external MQTT/TCP connections. Format is 'rate,burst'.
##
## Value: rate,burst
## - rate: The average limit value for per second
## - burst: The maximum allowed for each check, To avoid frequent restriction
## this value is recommended to be set to `(max_packet_size * active_n)/2`
## Unit: Bps
## listener.tcp.external.rate_limit = 1024,4096
## listener.tcp.external.rate_limit = 1024,52428800
## The access control rules for the MQTT/TCP listener.
##
@ -994,8 +997,11 @@ listener.tcp.internal.zone = internal
## See: listener.tcp.$name.rate_limit
##
## Value: rate,burst
## - rate: The average limit value for per second
## - burst: The maximum allowed for each check, To avoid frequent restriction
## this value is recommended to be set to `(max_packet_size * active_n)/2`
## Unit: Bps
## listener.tcp.internal.rate_limit = 1000000,2000000
## listener.tcp.internal.rate_limit = 1000000,524288000
## The TCP backlog of internal MQTT/TCP Listener.
##
@ -1104,8 +1110,11 @@ listener.ssl.external.access.1 = allow all
## Rate limit for the external MQTT/SSL connections.
##
## Value: rate,burst
## - rate: The average limit value for per second
## - burst: The maximum allowed for each check, To avoid frequent restriction
## this value is recommended to be set to `(max_packet_size * active_n)/2`
## Unit: Bps
## listener.ssl.external.rate_limit = 1024,4096
## listener.ssl.external.rate_limit = 1024,52428800
## Enable the Proxy Protocol V1/2 if the EMQ cluster is deployed behind
## HAProxy or Nginx.
@ -1338,8 +1347,11 @@ listener.ws.external.max_conn_rate = 1000
## Rate limit for the MQTT/WebSocket connections.
##
## Value: rate,burst
## - rate: The average limit value for per second
## - burst: The maximum allowed for each check, To avoid frequent restriction
## this value is recommended to be set to `(max_packet_size * 1)/2`
## Unit: Bps
## listener.ws.external.rate_limit = 1024,4096
## listener.ws.external.rate_limit = 1024,524288
## Zone of the external MQTT/WebSocket listener belonged to.
##
@ -1546,8 +1558,11 @@ listener.wss.external.max_conn_rate = 1000
## Rate limit for the MQTT/WebSocket/SSL connections.
##
## Value: rate,burst
## - rate: The average limit value for per second
## - burst: The maximum allowed for each check, To avoid frequent restriction
## this value is recommended to be set to `(max_packet_size * 1)/2`
## Unit: Bps
## listener.wss.external.rate_limit = 1024,4096
## listener.wss.external.rate_limit = 1024,524288
## Zone of the external MQTT/WebSocket/SSL listener belonged to.
##

View File

@ -317,9 +317,15 @@ handle(info, {tcp_passive, _Sock}, State) ->
ok = activate_socket(NState),
{keep_state, NState};
handle(info, {ssl_passive, _Sock}, State) ->
%% Rate limit here:)
NState = ensure_rate_limit(State),
ok = activate_socket(NState),
{keep_state, NState};
handle(info, activate_socket, State) ->
%% Rate limit timer expired.
ok = activate_socket(State),
ok = activate_socket(State#state{conn_state = running}),
{keep_state, State#state{conn_state = running, limit_timer = undefined}};
handle(info, {inet_reply, _Sock, ok}, State) ->
@ -442,6 +448,7 @@ ensure_rate_limit([{Rl, Pos, Cnt}|Limiters], State) ->
{0, Rl1} ->
ensure_rate_limit(Limiters, setelement(Pos, State, Rl1));
{Pause, Rl1} ->
?LOG(debug, "[Connection] Rate limit pause connection ~pms", [Pause]),
TRef = erlang:send_after(Pause, self(), activate_socket),
setelement(Pos, State#state{conn_state = blocked, limit_timer = TRef}, Rl1)
end.
@ -453,11 +460,7 @@ activate_socket(#state{conn_state = blocked}) ->
ok;
activate_socket(#state{transport = Transport, socket = Socket, active_n = N}) ->
TrueOrN = case Transport:is_ssl(Socket) of
true -> true; %% Cannot set '{active, N}' for SSL:(
false -> N
end,
case Transport:setopts(Socket, [{active, TrueOrN}]) of
case Transport:setopts(Socket, [{active, N}]) of
ok -> ok;
{error, Reason} ->
self() ! {shutdown, Reason},