From 97cca1a5bae3d2220b5201d9822a24bbeebc2e01 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 13 May 2019 10:18:01 +0800 Subject: [PATCH] Set {active, N} for ssl connection (#2531) --- .travis.yml | 2 +- etc/emqx.conf | 25 ++++++++++++++++++++----- src/emqx_connection.erl | 15 +++++++++------ 3 files changed, 30 insertions(+), 12 deletions(-) diff --git a/.travis.yml b/.travis.yml index bef59bf4c..abe6f7a6a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,7 @@ language: erlang otp_release: - - 21.2 + - 21.3 before_install: - git clone https://github.com/erlang/rebar3.git; cd rebar3; ./bootstrap; sudo mv rebar3 /usr/local/bin/; cd .. diff --git a/etc/emqx.conf b/etc/emqx.conf index df50884e9..aa4acec3b 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -863,8 +863,11 @@ listener.tcp.external.zone = external ## Rate limit for the external MQTT/TCP connections. Format is 'rate,burst'. ## ## Value: rate,burst +## - rate: The average limit value for per second +## - burst: The maximum allowed for each check, To avoid frequent restriction +## this value is recommended to be set to `(max_packet_size * active_n)/2` ## Unit: Bps -## listener.tcp.external.rate_limit = 1024,4096 +## listener.tcp.external.rate_limit = 1024,52428800 ## The access control rules for the MQTT/TCP listener. ## @@ -994,8 +997,11 @@ listener.tcp.internal.zone = internal ## See: listener.tcp.$name.rate_limit ## ## Value: rate,burst +## - rate: The average limit value for per second +## - burst: The maximum allowed for each check, To avoid frequent restriction +## this value is recommended to be set to `(max_packet_size * active_n)/2` ## Unit: Bps -## listener.tcp.internal.rate_limit = 1000000,2000000 +## listener.tcp.internal.rate_limit = 1000000,524288000 ## The TCP backlog of internal MQTT/TCP Listener. ## @@ -1104,8 +1110,11 @@ listener.ssl.external.access.1 = allow all ## Rate limit for the external MQTT/SSL connections. ## ## Value: rate,burst +## - rate: The average limit value for per second +## - burst: The maximum allowed for each check, To avoid frequent restriction +## this value is recommended to be set to `(max_packet_size * active_n)/2` ## Unit: Bps -## listener.ssl.external.rate_limit = 1024,4096 +## listener.ssl.external.rate_limit = 1024,52428800 ## Enable the Proxy Protocol V1/2 if the EMQ cluster is deployed behind ## HAProxy or Nginx. @@ -1338,8 +1347,11 @@ listener.ws.external.max_conn_rate = 1000 ## Rate limit for the MQTT/WebSocket connections. ## ## Value: rate,burst +## - rate: The average limit value for per second +## - burst: The maximum allowed for each check, To avoid frequent restriction +## this value is recommended to be set to `(max_packet_size * 1)/2` ## Unit: Bps -## listener.ws.external.rate_limit = 1024,4096 +## listener.ws.external.rate_limit = 1024,524288 ## Zone of the external MQTT/WebSocket listener belonged to. ## @@ -1546,8 +1558,11 @@ listener.wss.external.max_conn_rate = 1000 ## Rate limit for the MQTT/WebSocket/SSL connections. ## ## Value: rate,burst +## - rate: The average limit value for per second +## - burst: The maximum allowed for each check, To avoid frequent restriction +## this value is recommended to be set to `(max_packet_size * 1)/2` ## Unit: Bps -## listener.wss.external.rate_limit = 1024,4096 +## listener.wss.external.rate_limit = 1024,524288 ## Zone of the external MQTT/WebSocket/SSL listener belonged to. ## diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 89b84dc6b..8ede37e57 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -317,9 +317,15 @@ handle(info, {tcp_passive, _Sock}, State) -> ok = activate_socket(NState), {keep_state, NState}; +handle(info, {ssl_passive, _Sock}, State) -> + %% Rate limit here:) + NState = ensure_rate_limit(State), + ok = activate_socket(NState), + {keep_state, NState}; + handle(info, activate_socket, State) -> %% Rate limit timer expired. - ok = activate_socket(State), + ok = activate_socket(State#state{conn_state = running}), {keep_state, State#state{conn_state = running, limit_timer = undefined}}; handle(info, {inet_reply, _Sock, ok}, State) -> @@ -442,6 +448,7 @@ ensure_rate_limit([{Rl, Pos, Cnt}|Limiters], State) -> {0, Rl1} -> ensure_rate_limit(Limiters, setelement(Pos, State, Rl1)); {Pause, Rl1} -> + ?LOG(debug, "[Connection] Rate limit pause connection ~pms", [Pause]), TRef = erlang:send_after(Pause, self(), activate_socket), setelement(Pos, State#state{conn_state = blocked, limit_timer = TRef}, Rl1) end. @@ -453,11 +460,7 @@ activate_socket(#state{conn_state = blocked}) -> ok; activate_socket(#state{transport = Transport, socket = Socket, active_n = N}) -> - TrueOrN = case Transport:is_ssl(Socket) of - true -> true; %% Cannot set '{active, N}' for SSL:( - false -> N - end, - case Transport:setopts(Socket, [{active, TrueOrN}]) of + case Transport:setopts(Socket, [{active, N}]) of ok -> ok; {error, Reason} -> self() ! {shutdown, Reason},