diff --git a/apps/emqx/etc/emqx.conf b/apps/emqx/etc/emqx.conf index 58417b7cf..2179fba90 100644 --- a/apps/emqx/etc/emqx.conf +++ b/apps/emqx/etc/emqx.conf @@ -815,7 +815,6 @@ broker { ## - `flapping_detect.*` ## - `force_shutdown.*` ## - `conn_congestion.*` -## - `overall_max_connections` ## ## Syntax: zones. {} zones.default { @@ -833,7 +832,9 @@ zones.default { ## Default: true stats.enable: true - ## Maximum number of concurrent connections. + ## Maximum number of concurrent connections in this zone. + ## This value must be larger than the sum of `max_connections` set + ## in the listeners under this zone. ## ## @doc zones..overall_max_connections ## ValueType: Number | infinity diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 54d3a4691..1cbc36e05 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -83,8 +83,6 @@ sockname :: emqx_types:peername(), %% Sock State sockstate :: emqx_types:sockstate(), - %% The {active, N} option - active_n :: pos_integer(), %% Limiter limiter :: maybe(emqx_limiter:limiter()), %% Limit Timer @@ -102,14 +100,18 @@ %% Idle Timeout idle_timeout :: integer(), %% Idle Timer - idle_timer :: maybe(reference()) + idle_timer :: maybe(reference()), + %% Zone name + zone :: atom(), + %% Listener Name + listener :: atom() }). -type(state() :: #state{}). -type(opts() :: #{zone := atom(), listener := atom(), atom() => term()}). -define(ACTIVE_N, 100). --define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n]). +-define(INFO_KEYS, [socktype, peername, sockname, sockstate]). -define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). @@ -166,8 +168,6 @@ info(sockname, #state{sockname = Sockname}) -> Sockname; info(sockstate, #state{sockstate = SockSt}) -> SockSt; -info(active_n, #state{active_n = ActiveN}) -> - ActiveN; info(stats_timer, #state{stats_timer = StatsTimer}) -> StatsTimer; info(limit_timer, #state{limit_timer = LimitTimer}) -> @@ -254,8 +254,9 @@ init_state(Transport, Socket, Options) -> peercert => Peercert, conn_mod => ?MODULE }, - Zone = proplists:get_value(zone, Options), - ActiveN = proplists:get_value(active_n, Options, ?ACTIVE_N), + Zone = maps:get(zone, Options), + Listener = maps:get(listener, Options), + PubLimit = emqx_zone:publish_limit(Zone), BytesIn = proplists:get_value(rate_limit, Options), RateLimit = emqx_zone:ratelimit(Zone), @@ -273,7 +274,6 @@ init_state(Transport, Socket, Options) -> peername = Peername, sockname = Sockname, sockstate = idle, - active_n = ActiveN, limiter = Limiter, parse_state = ParseState, serialize = Serialize, @@ -281,7 +281,9 @@ init_state(Transport, Socket, Options) -> gc_state = GcState, stats_timer = StatsTimer, idle_timeout = IdleTimeout, - idle_timer = IdleTimer + idle_timer = IdleTimer, + zone = Zone, + listener = Listener }. run_loop(Parent, State = #state{transport = Transport, @@ -452,14 +454,16 @@ handle_msg({Passive, _Sock}, State) NState1 = check_oom(run_gc(InStats, NState)), handle_info(activate_socket, NState1); -handle_msg(Deliver = {deliver, _Topic, _Msg}, - #state{active_n = ActiveN} = State) -> +handle_msg(Deliver = {deliver, _Topic, _Msg}, #state{zone = Zone, + listener = Listener} = State) -> + ActiveN = emqx_config:get_listener_conf(Zone, Listener, [tcp, active_n]), Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)], with_channel(handle_deliver, [Delivers], State); %% Something sent -handle_msg({inet_reply, _Sock, ok}, State = #state{active_n = ActiveN}) -> - case emqx_pd:get_counter(outgoing_pubs) > ActiveN of +handle_msg({inet_reply, _Sock, ok}, State = #state{zone = Zone, listener = Listener}) -> + case emqx_pd:get_counter(outgoing_pubs) > + emqx_config:get_listener_conf(Zone, Listener, [tcp, active_n]) of true -> Pubs = emqx_pd:reset_counter(outgoing_pubs), Bytes = emqx_pd:reset_counter(outgoing_bytes), @@ -799,10 +803,10 @@ activate_socket(State = #state{sockstate = closed}) -> {ok, State}; activate_socket(State = #state{sockstate = blocked}) -> {ok, State}; -activate_socket(State = #state{transport = Transport, - socket = Socket, - active_n = N}) -> - case Transport:setopts(Socket, [{active, N}]) of +activate_socket(State = #state{transport = Transport, socket = Socket, + zone = Zone, listener = Listener}) -> + ActiveN = emqx_config:get_listener_conf(Zone, Listener, [tcp, active_n]), + case Transport:setopts(Socket, [{active, ActiveN}]) of ok -> {ok, State#state{sockstate = running}}; Error -> Error end. diff --git a/apps/emqx/src/emqx_ws_connection.erl b/apps/emqx/src/emqx_ws_connection.erl index 7bc68c271..b50505bf8 100644 --- a/apps/emqx/src/emqx_ws_connection.erl +++ b/apps/emqx/src/emqx_ws_connection.erl @@ -62,8 +62,6 @@ sockname :: emqx_types:peername(), %% Sock state sockstate :: emqx_types:sockstate(), - %% Simulate the active_n opt - active_n :: pos_integer(), %% MQTT Piggyback mqtt_piggyback :: single | multiple, %% Limiter @@ -85,7 +83,11 @@ %% Idle Timeout idle_timeout :: timeout(), %% Idle Timer - idle_timer :: maybe(reference()) + idle_timer :: maybe(reference()), + %% Zone name + zone :: atom(), + %% Listener Name + listener :: atom() }). -type(state() :: #state{}). @@ -93,7 +95,7 @@ -type(ws_cmd() :: {active, boolean()}|close). -define(ACTIVE_N, 100). --define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n]). +-define(INFO_KEYS, [socktype, peername, sockname, sockstate]). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]). -define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]). @@ -124,8 +126,6 @@ info(sockname, #state{sockname = Sockname}) -> Sockname; info(sockstate, #state{sockstate = SockSt}) -> SockSt; -info(active_n, #state{active_n = ActiveN}) -> - ActiveN; info(limiter, #state{limiter = Limiter}) -> maybe_apply(fun emqx_limiter:info/1, Limiter); info(channel, #state{channel = Channel}) -> @@ -293,7 +293,6 @@ websocket_init([Req, Opts]) -> BytesIn = proplists:get_value(rate_limit, Opts), RateLimit = emqx_zone:ratelimit(Zone), Limiter = emqx_limiter:init(Zone, PubLimit, BytesIn, RateLimit), - ActiveN = proplists:get_value(active_n, Opts, ?ACTIVE_N), MQTTPiggyback = proplists:get_value(mqtt_piggyback, Opts, multiple), FrameOpts = emqx_zone:mqtt_frame_options(Zone), ParseState = emqx_frame:initial_parse_state(FrameOpts), @@ -309,7 +308,6 @@ websocket_init([Req, Opts]) -> {ok, #state{peername = Peername, sockname = Sockname, sockstate = running, - active_n = ActiveN, mqtt_piggyback = MQTTPiggyback, limiter = Limiter, parse_state = ParseState, @@ -372,7 +370,8 @@ websocket_info({check_gc, Stats}, State) -> return(check_oom(run_gc(Stats, State))); websocket_info(Deliver = {deliver, _Topic, _Msg}, - State = #state{active_n = ActiveN}) -> + State = #state{zone = Zone, listener = Listener}) -> + ActiveN = emqx_config:get_listener_conf(Zone, Listener, [tcp, active_n]), Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)], with_channel(handle_deliver, [Delivers], State); @@ -551,11 +550,12 @@ parse_incoming(Data, State = #state{parse_state = ParseState}) -> %% Handle incoming packet %%-------------------------------------------------------------------- -handle_incoming(Packet, State = #state{active_n = ActiveN}) +handle_incoming(Packet, State = #state{zone = Zone, listener = Listener}) when is_record(Packet, mqtt_packet) -> ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]), ok = inc_incoming_stats(Packet), - NState = case emqx_pd:get_counter(incoming_pubs) > ActiveN of + NState = case emqx_pd:get_counter(incoming_pubs) > + emqx_config:get_listener_conf(Zone, Listener, [tcp, active_n]) of true -> postpone({cast, rate_limit}, State); false -> State end, @@ -586,11 +586,13 @@ with_channel(Fun, Args, State = #state{channel = Channel}) -> %% Handle outgoing packets %%-------------------------------------------------------------------- -handle_outgoing(Packets, State = #state{active_n = ActiveN, mqtt_piggyback = MQTTPiggyback}) -> +handle_outgoing(Packets, State = #state{mqtt_piggyback = MQTTPiggyback, + zone = Zone, listener = Listener}) -> IoData = lists:map(serialize_and_inc_stats_fun(State), Packets), Oct = iolist_size(IoData), ok = inc_sent_stats(length(Packets), Oct), - NState = case emqx_pd:get_counter(outgoing_pubs) > ActiveN of + NState = case emqx_pd:get_counter(outgoing_pubs) > + emqx_config:get_listener_conf(Zone, Listener, [tcp, active_n]) of true -> Stats = #{cnt => emqx_pd:reset_counter(outgoing_pubs), oct => emqx_pd:reset_counter(outgoing_bytes)