diff --git a/apps/emqx/etc/emqx.conf b/apps/emqx/etc/emqx.conf index 0dcb7285e..58417b7cf 100644 --- a/apps/emqx/etc/emqx.conf +++ b/apps/emqx/etc/emqx.conf @@ -1806,7 +1806,7 @@ zones.internal { bind: "127.0.0.1:11883" acceptors: 4 max_connections: 1024000 - tcp.active: 1000 + tcp.active_n: 1000 tcp.backlog: 512 } } @@ -1959,10 +1959,10 @@ example_common_tcp_options { ## ## See: https://erlang.org/doc/man/inet.html#setopts-2 ## - ## @doc listeners..tcp.active + ## @doc listeners..tcp.active_n ## ValueType: Number ## Default: 100 - tcp.active: 100 + tcp.active_n: 100 ## TCP backlog defines the maximum length that the queue of ## pending connections can grow to. diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 3363b013e..ab91c02b4 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -84,7 +84,7 @@ %% Sock State sockstate :: emqx_types:sockstate(), %% The {active, N} option - active :: pos_integer(), + active_n :: pos_integer(), %% Limiter limiter :: maybe(emqx_limiter:limiter()), %% Limit Timer @@ -108,7 +108,7 @@ -type(state() :: #state{}). -define(ACTIVE_N, 100). --define(INFO_KEYS, [socktype, peername, sockname, sockstate, active]). +-define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n]). -define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). @@ -165,7 +165,7 @@ info(sockname, #state{sockname = Sockname}) -> Sockname; info(sockstate, #state{sockstate = SockSt}) -> SockSt; -info(active, #state{active = ActiveN}) -> +info(active_n, #state{active_n = ActiveN}) -> ActiveN; info(stats_timer, #state{stats_timer = StatsTimer}) -> StatsTimer; @@ -254,7 +254,7 @@ init_state(Transport, Socket, Options) -> conn_mod => ?MODULE }, Zone = proplists:get_value(zone, Options), - ActiveN = proplists:get_value(active, Options, ?ACTIVE_N), + ActiveN = proplists:get_value(active_n, Options, ?ACTIVE_N), PubLimit = emqx_zone:publish_limit(Zone), BytesIn = proplists:get_value(rate_limit, Options), RateLimit = emqx_zone:ratelimit(Zone), @@ -272,7 +272,7 @@ init_state(Transport, Socket, Options) -> peername = Peername, sockname = Sockname, sockstate = idle, - active = ActiveN, + active_n = ActiveN, limiter = Limiter, parse_state = ParseState, serialize = Serialize, @@ -452,12 +452,12 @@ handle_msg({Passive, _Sock}, State) handle_info(activate_socket, NState1); handle_msg(Deliver = {deliver, _Topic, _Msg}, - #state{active = ActiveN} = State) -> + #state{active_n = ActiveN} = State) -> Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)], with_channel(handle_deliver, [Delivers], State); %% Something sent -handle_msg({inet_reply, _Sock, ok}, State = #state{active = ActiveN}) -> +handle_msg({inet_reply, _Sock, ok}, State = #state{active_n = ActiveN}) -> case emqx_pd:get_counter(outgoing_pubs) > ActiveN of true -> Pubs = emqx_pd:reset_counter(outgoing_pubs), @@ -800,7 +800,7 @@ activate_socket(State = #state{sockstate = blocked}) -> {ok, State}; activate_socket(State = #state{transport = Transport, socket = Socket, - active = N}) -> + active_n = N}) -> case Transport:setopts(Socket, [{active, N}]) of ok -> {ok, State#state{sockstate = running}}; Error -> Error diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index e3c6a5e22..d8407ec9e 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -199,7 +199,9 @@ ssl_opts(Opts) -> maps:get(ssl, Opts, #{})))). tcp_opts(Opts) -> - maps:to_list(maps:get(tcp, Opts, #{})). + maps:to_list( + maps:without([active_n], + maps:get(tcp, Opts, #{}))). is_ssl(Opts) -> emqx_config:deep_get([ssl, enable], Opts, false). diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 5a6044307..616f65bfd 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -379,7 +379,7 @@ fields("ws_opts") -> ]; fields("tcp_opts") -> - [ {"active", t(integer(), undefined, 100)} + [ {"active_n", t(integer(), undefined, 100)} , {"backlog", t(integer(), undefined, 1024)} , {"send_timeout", t(duration(), undefined, "15s")} , {"send_timeout_close", t(boolean(), undefined, true)} diff --git a/apps/emqx/src/emqx_ws_connection.erl b/apps/emqx/src/emqx_ws_connection.erl index 8d7816acf..7bc68c271 100644 --- a/apps/emqx/src/emqx_ws_connection.erl +++ b/apps/emqx/src/emqx_ws_connection.erl @@ -62,8 +62,8 @@ sockname :: emqx_types:peername(), %% Sock state sockstate :: emqx_types:sockstate(), - %% Simulate the active opt - active :: pos_integer(), + %% Simulate the active_n opt + active_n :: pos_integer(), %% MQTT Piggyback mqtt_piggyback :: single | multiple, %% Limiter @@ -93,7 +93,7 @@ -type(ws_cmd() :: {active, boolean()}|close). -define(ACTIVE_N, 100). --define(INFO_KEYS, [socktype, peername, sockname, sockstate, active]). +-define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n]). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]). -define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]). @@ -124,7 +124,7 @@ info(sockname, #state{sockname = Sockname}) -> Sockname; info(sockstate, #state{sockstate = SockSt}) -> SockSt; -info(active, #state{active = ActiveN}) -> +info(active_n, #state{active_n = ActiveN}) -> ActiveN; info(limiter, #state{limiter = Limiter}) -> maybe_apply(fun emqx_limiter:info/1, Limiter); @@ -293,7 +293,7 @@ websocket_init([Req, Opts]) -> BytesIn = proplists:get_value(rate_limit, Opts), RateLimit = emqx_zone:ratelimit(Zone), Limiter = emqx_limiter:init(Zone, PubLimit, BytesIn, RateLimit), - ActiveN = proplists:get_value(active, Opts, ?ACTIVE_N), + ActiveN = proplists:get_value(active_n, Opts, ?ACTIVE_N), MQTTPiggyback = proplists:get_value(mqtt_piggyback, Opts, multiple), FrameOpts = emqx_zone:mqtt_frame_options(Zone), ParseState = emqx_frame:initial_parse_state(FrameOpts), @@ -309,7 +309,7 @@ websocket_init([Req, Opts]) -> {ok, #state{peername = Peername, sockname = Sockname, sockstate = running, - active = ActiveN, + active_n = ActiveN, mqtt_piggyback = MQTTPiggyback, limiter = Limiter, parse_state = ParseState, @@ -372,7 +372,7 @@ websocket_info({check_gc, Stats}, State) -> return(check_oom(run_gc(Stats, State))); websocket_info(Deliver = {deliver, _Topic, _Msg}, - State = #state{active = ActiveN}) -> + State = #state{active_n = ActiveN}) -> Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)], with_channel(handle_deliver, [Delivers], State); @@ -551,7 +551,7 @@ parse_incoming(Data, State = #state{parse_state = ParseState}) -> %% Handle incoming packet %%-------------------------------------------------------------------- -handle_incoming(Packet, State = #state{active = ActiveN}) +handle_incoming(Packet, State = #state{active_n = ActiveN}) when is_record(Packet, mqtt_packet) -> ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]), ok = inc_incoming_stats(Packet), @@ -586,7 +586,7 @@ with_channel(Fun, Args, State = #state{channel = Channel}) -> %% Handle outgoing packets %%-------------------------------------------------------------------- -handle_outgoing(Packets, State = #state{active = ActiveN, mqtt_piggyback = MQTTPiggyback}) -> +handle_outgoing(Packets, State = #state{active_n = ActiveN, mqtt_piggyback = MQTTPiggyback}) -> IoData = lists:map(serialize_and_inc_stats_fun(State), Packets), Oct = iolist_size(IoData), ok = inc_sent_stats(length(Packets), Oct), diff --git a/apps/emqx/test/emqx_connection_SUITE.erl b/apps/emqx/test/emqx_connection_SUITE.erl index 6bace4ccc..a6b2b614a 100644 --- a/apps/emqx/test/emqx_connection_SUITE.erl +++ b/apps/emqx/test/emqx_connection_SUITE.erl @@ -120,7 +120,7 @@ t_info(_) -> end end), #{sockinfo := SockInfo} = emqx_connection:info(CPid), - ?assertMatch(#{active := 100, + ?assertMatch(#{active_n := 100, peername := {{127,0,0,1},3456}, sockname := {{127,0,0,1},1883}, sockstate := idle, @@ -219,8 +219,8 @@ t_handle_msg_deliver(_) -> t_handle_msg_inet_reply(_) -> ok = meck:expect(emqx_pd, get_counter, fun(_) -> 10 end), - ?assertMatch({ok, _St}, handle_msg({inet_reply, for_testing, ok}, st(#{active => 0}))), - ?assertEqual(ok, handle_msg({inet_reply, for_testing, ok}, st(#{active => 100}))), + ?assertMatch({ok, _St}, handle_msg({inet_reply, for_testing, ok}, st(#{active_n => 0}))), + ?assertEqual(ok, handle_msg({inet_reply, for_testing, ok}, st(#{active_n => 100}))), ?assertMatch({stop, {shutdown, for_testing}, _St}, handle_msg({inet_reply, for_testing, {error, for_testing}}, st())). @@ -386,7 +386,7 @@ t_start_link_exit_on_activate(_) -> t_get_conn_info(_) -> with_conn(fun(CPid) -> #{sockinfo := SockInfo} = emqx_connection:info(CPid), - ?assertEqual(#{active => 100, + ?assertEqual(#{active_n => 100, peername => {{127,0,0,1},3456}, sockname => {{127,0,0,1},1883}, sockstate => running, diff --git a/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl b/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl index e80643e96..8ce35b50c 100644 --- a/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl +++ b/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl @@ -265,7 +265,7 @@ t_connect_idle_timeout(_) -> t_connect_limit_timeout(_) -> ok = meck:new(proplists, [non_strict, passthrough, no_history, no_link, unstick]), - meck:expect(proplists, get_value, fun(active, _Options, _Default) -> 1; + meck:expect(proplists, get_value, fun(active_n, _Options, _Default) -> 1; (Arg1, ARg2, Arg3) -> meck:passthrough([Arg1, ARg2, Arg3]) end), diff --git a/apps/emqx/test/emqx_ws_connection_SUITE.erl b/apps/emqx/test/emqx_ws_connection_SUITE.erl index a9a6b7792..6db831972 100644 --- a/apps/emqx/test/emqx_ws_connection_SUITE.erl +++ b/apps/emqx/test/emqx_ws_connection_SUITE.erl @@ -118,7 +118,7 @@ t_info(_) -> end), #{sockinfo := SockInfo} = ?ws_conn:call(WsPid, info), #{socktype := ws, - active := 100, + active_n := 100, peername := {{127,0,0,1}, 3456}, sockname := {{127,0,0,1}, 18083}, sockstate := running diff --git a/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl b/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl index fa127cc8b..d465f9ca3 100644 --- a/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl +++ b/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl @@ -303,7 +303,7 @@ sockinfo(#state{peername = Peername}) -> peername => Peername, sockname => {{127, 0, 0, 1}, 5683}, %% FIXME: Sock? sockstate => running, - active => 1 + active_n => 1 }. %% copies from emqx_channel:info/1 diff --git a/apps/emqx_exproto/etc/emqx_exproto.conf b/apps/emqx_exproto/etc/emqx_exproto.conf index 687e97748..7a7667271 100644 --- a/apps/emqx_exproto/etc/emqx_exproto.conf +++ b/apps/emqx_exproto/etc/emqx_exproto.conf @@ -49,7 +49,7 @@ exproto.listener.protoname.max_conn_rate = 1000 ## Specify the {active, N} option for the external MQTT/TCP Socket. ## ## Value: Number -exproto.listener.protoname.active = 100 +exproto.listener.protoname.active_n = 100 ## Idle timeout ## diff --git a/apps/emqx_exproto/priv/emqx_exproto.schema b/apps/emqx_exproto/priv/emqx_exproto.schema index 6d0fb0fa8..4bd215847 100644 --- a/apps/emqx_exproto/priv/emqx_exproto.schema +++ b/apps/emqx_exproto/priv/emqx_exproto.schema @@ -78,7 +78,7 @@ end}. {datatype, integer} ]}. -{mapping, "exproto.listener.$proto.active", "emqx_exproto.listeners", [ +{mapping, "exproto.listener.$proto.active_n", "emqx_exproto.listeners", [ {default, 100}, {datatype, integer} ]}. @@ -250,7 +250,7 @@ end}. end, ConnOpts = fun(Prefix) -> - Filter([{active, cuttlefish:conf_get(Prefix ++ ".active", Conf, undefined)}, + Filter([{active_n, cuttlefish:conf_get(Prefix ++ ".active_n", Conf, undefined)}, {idle_timeout, cuttlefish:conf_get(Prefix ++ ".idle_timeout", Conf, undefined)}]) end, diff --git a/apps/emqx_exproto/src/emqx_exproto_conn.erl b/apps/emqx_exproto/src/emqx_exproto_conn.erl index fb30e1e88..da655bcb4 100644 --- a/apps/emqx_exproto/src/emqx_exproto_conn.erl +++ b/apps/emqx_exproto/src/emqx_exproto_conn.erl @@ -61,7 +61,7 @@ %% Sock State sockstate :: emqx_types:sockstate(), %% The {active, N} option - active :: pos_integer(), + active_n :: pos_integer(), %% BACKW: e4.2.0-e4.2.1 %% We should remove it sendfun :: function() | undefined, @@ -84,7 +84,7 @@ -type(state() :: #state{}). -define(ACTIVE_N, 100). --define(INFO_KEYS, [socktype, peername, sockname, sockstate, active]). +-define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n]). -define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). @@ -137,7 +137,7 @@ info(sockname, #state{sockname = Sockname}) -> Sockname; info(sockstate, #state{sockstate = SockSt}) -> SockSt; -info(active, #state{active = ActiveN}) -> +info(active_n, #state{active_n = ActiveN}) -> ActiveN. -spec(stats(pid()|state()) -> emqx_types:stats()). @@ -240,7 +240,7 @@ init_state(WrappedSock, Peername, Options) -> conn_mod => ?MODULE }, - ActiveN = proplists:get_value(active, Options, ?ACTIVE_N), + ActiveN = proplists:get_value(active_n, Options, ?ACTIVE_N), %% FIXME: %%Limiter = emqx_limiter:init(Options), @@ -255,7 +255,7 @@ init_state(WrappedSock, Peername, Options) -> peername = Peername, sockname = Sockname, sockstate = idle, - active = ActiveN, + active_n = ActiveN, sendfun = undefined, limiter = undefined, channel = Channel, @@ -403,13 +403,13 @@ handle_msg({Passive, _Sock}, State) handle_info(activate_socket, NState1); handle_msg(Deliver = {deliver, _Topic, _Msg}, - State = #state{active = ActiveN}) -> + State = #state{active_n = ActiveN}) -> Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)], with_channel(handle_deliver, [Delivers], State); %% Something sent %% TODO: Who will deliver this message? -handle_msg({inet_reply, _Sock, ok}, State = #state{active = ActiveN}) -> +handle_msg({inet_reply, _Sock, ok}, State = #state{active_n = ActiveN}) -> case emqx_pd:get_counter(outgoing_pkt) > ActiveN of true -> Pubs = emqx_pd:reset_counter(outgoing_pkt), @@ -652,7 +652,7 @@ activate_socket(State = #state{sockstate = closed}) -> activate_socket(State = #state{sockstate = blocked}) -> {ok, State}; activate_socket(State = #state{socket = Socket, - active = N}) -> + active_n = N}) -> %% FIXME: Works on dtls/udp ??? %% How to hanlde buffer? case esockd_setopts(Socket, [{active, N}]) of diff --git a/apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl b/apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl index 9a8c0229a..55f992da6 100644 --- a/apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl +++ b/apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl @@ -459,7 +459,7 @@ sockinfo(#lwm2m_state{peername = Peername}) -> peername => Peername, sockname => {{127,0,0,1}, 5683}, %% FIXME: Sock? sockstate => running, - active => 1 + active_n => 1 }. %% copies from emqx_channel:info/1 diff --git a/apps/emqx_sn/src/emqx_sn_gateway.erl b/apps/emqx_sn/src/emqx_sn_gateway.erl index bc1c11075..2339961cf 100644 --- a/apps/emqx_sn/src/emqx_sn_gateway.erl +++ b/apps/emqx_sn/src/emqx_sn_gateway.erl @@ -97,7 +97,7 @@ pending_topic_ids = #{} :: pending_msgs() }). --define(INFO_KEYS, [socktype, peername, sockname, sockstate]). %, active]). +-define(INFO_KEYS, [socktype, peername, sockname, sockstate]). %, active_n]). -define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).