fix(active_n): revert the changes to active_n

This commit is contained in:
Shawn 2021-07-05 11:17:35 +08:00
parent 8b3fcde380
commit fb809a5c08
14 changed files with 44 additions and 42 deletions

View File

@ -1806,7 +1806,7 @@ zones.internal {
bind: "127.0.0.1:11883" bind: "127.0.0.1:11883"
acceptors: 4 acceptors: 4
max_connections: 1024000 max_connections: 1024000
tcp.active: 1000 tcp.active_n: 1000
tcp.backlog: 512 tcp.backlog: 512
} }
} }
@ -1959,10 +1959,10 @@ example_common_tcp_options {
## ##
## See: https://erlang.org/doc/man/inet.html#setopts-2 ## See: https://erlang.org/doc/man/inet.html#setopts-2
## ##
## @doc listeners.<name>.tcp.active ## @doc listeners.<name>.tcp.active_n
## ValueType: Number ## ValueType: Number
## Default: 100 ## Default: 100
tcp.active: 100 tcp.active_n: 100
## TCP backlog defines the maximum length that the queue of ## TCP backlog defines the maximum length that the queue of
## pending connections can grow to. ## pending connections can grow to.

View File

@ -84,7 +84,7 @@
%% Sock State %% Sock State
sockstate :: emqx_types:sockstate(), sockstate :: emqx_types:sockstate(),
%% The {active, N} option %% The {active, N} option
active :: pos_integer(), active_n :: pos_integer(),
%% Limiter %% Limiter
limiter :: maybe(emqx_limiter:limiter()), limiter :: maybe(emqx_limiter:limiter()),
%% Limit Timer %% Limit Timer
@ -108,7 +108,7 @@
-type(state() :: #state{}). -type(state() :: #state{}).
-define(ACTIVE_N, 100). -define(ACTIVE_N, 100).
-define(INFO_KEYS, [socktype, peername, sockname, sockstate, active]). -define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n]).
-define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]). -define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]).
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).
@ -165,7 +165,7 @@ info(sockname, #state{sockname = Sockname}) ->
Sockname; Sockname;
info(sockstate, #state{sockstate = SockSt}) -> info(sockstate, #state{sockstate = SockSt}) ->
SockSt; SockSt;
info(active, #state{active = ActiveN}) -> info(active_n, #state{active_n = ActiveN}) ->
ActiveN; ActiveN;
info(stats_timer, #state{stats_timer = StatsTimer}) -> info(stats_timer, #state{stats_timer = StatsTimer}) ->
StatsTimer; StatsTimer;
@ -254,7 +254,7 @@ init_state(Transport, Socket, Options) ->
conn_mod => ?MODULE conn_mod => ?MODULE
}, },
Zone = proplists:get_value(zone, Options), Zone = proplists:get_value(zone, Options),
ActiveN = proplists:get_value(active, Options, ?ACTIVE_N), ActiveN = proplists:get_value(active_n, Options, ?ACTIVE_N),
PubLimit = emqx_zone:publish_limit(Zone), PubLimit = emqx_zone:publish_limit(Zone),
BytesIn = proplists:get_value(rate_limit, Options), BytesIn = proplists:get_value(rate_limit, Options),
RateLimit = emqx_zone:ratelimit(Zone), RateLimit = emqx_zone:ratelimit(Zone),
@ -272,7 +272,7 @@ init_state(Transport, Socket, Options) ->
peername = Peername, peername = Peername,
sockname = Sockname, sockname = Sockname,
sockstate = idle, sockstate = idle,
active = ActiveN, active_n = ActiveN,
limiter = Limiter, limiter = Limiter,
parse_state = ParseState, parse_state = ParseState,
serialize = Serialize, serialize = Serialize,
@ -452,12 +452,12 @@ handle_msg({Passive, _Sock}, State)
handle_info(activate_socket, NState1); handle_info(activate_socket, NState1);
handle_msg(Deliver = {deliver, _Topic, _Msg}, handle_msg(Deliver = {deliver, _Topic, _Msg},
#state{active = ActiveN} = State) -> #state{active_n = ActiveN} = State) ->
Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)], Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)],
with_channel(handle_deliver, [Delivers], State); with_channel(handle_deliver, [Delivers], State);
%% Something sent %% Something sent
handle_msg({inet_reply, _Sock, ok}, State = #state{active = ActiveN}) -> handle_msg({inet_reply, _Sock, ok}, State = #state{active_n = ActiveN}) ->
case emqx_pd:get_counter(outgoing_pubs) > ActiveN of case emqx_pd:get_counter(outgoing_pubs) > ActiveN of
true -> true ->
Pubs = emqx_pd:reset_counter(outgoing_pubs), Pubs = emqx_pd:reset_counter(outgoing_pubs),
@ -800,7 +800,7 @@ activate_socket(State = #state{sockstate = blocked}) ->
{ok, State}; {ok, State};
activate_socket(State = #state{transport = Transport, activate_socket(State = #state{transport = Transport,
socket = Socket, socket = Socket,
active = N}) -> active_n = N}) ->
case Transport:setopts(Socket, [{active, N}]) of case Transport:setopts(Socket, [{active, N}]) of
ok -> {ok, State#state{sockstate = running}}; ok -> {ok, State#state{sockstate = running}};
Error -> Error Error -> Error

View File

@ -199,7 +199,9 @@ ssl_opts(Opts) ->
maps:get(ssl, Opts, #{})))). maps:get(ssl, Opts, #{})))).
tcp_opts(Opts) -> tcp_opts(Opts) ->
maps:to_list(maps:get(tcp, Opts, #{})). maps:to_list(
maps:without([active_n],
maps:get(tcp, Opts, #{}))).
is_ssl(Opts) -> is_ssl(Opts) ->
emqx_config:deep_get([ssl, enable], Opts, false). emqx_config:deep_get([ssl, enable], Opts, false).

View File

@ -379,7 +379,7 @@ fields("ws_opts") ->
]; ];
fields("tcp_opts") -> fields("tcp_opts") ->
[ {"active", t(integer(), undefined, 100)} [ {"active_n", t(integer(), undefined, 100)}
, {"backlog", t(integer(), undefined, 1024)} , {"backlog", t(integer(), undefined, 1024)}
, {"send_timeout", t(duration(), undefined, "15s")} , {"send_timeout", t(duration(), undefined, "15s")}
, {"send_timeout_close", t(boolean(), undefined, true)} , {"send_timeout_close", t(boolean(), undefined, true)}

View File

@ -62,8 +62,8 @@
sockname :: emqx_types:peername(), sockname :: emqx_types:peername(),
%% Sock state %% Sock state
sockstate :: emqx_types:sockstate(), sockstate :: emqx_types:sockstate(),
%% Simulate the active opt %% Simulate the active_n opt
active :: pos_integer(), active_n :: pos_integer(),
%% MQTT Piggyback %% MQTT Piggyback
mqtt_piggyback :: single | multiple, mqtt_piggyback :: single | multiple,
%% Limiter %% Limiter
@ -93,7 +93,7 @@
-type(ws_cmd() :: {active, boolean()}|close). -type(ws_cmd() :: {active, boolean()}|close).
-define(ACTIVE_N, 100). -define(ACTIVE_N, 100).
-define(INFO_KEYS, [socktype, peername, sockname, sockstate, active]). -define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n]).
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]).
-define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]). -define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]).
@ -124,7 +124,7 @@ info(sockname, #state{sockname = Sockname}) ->
Sockname; Sockname;
info(sockstate, #state{sockstate = SockSt}) -> info(sockstate, #state{sockstate = SockSt}) ->
SockSt; SockSt;
info(active, #state{active = ActiveN}) -> info(active_n, #state{active_n = ActiveN}) ->
ActiveN; ActiveN;
info(limiter, #state{limiter = Limiter}) -> info(limiter, #state{limiter = Limiter}) ->
maybe_apply(fun emqx_limiter:info/1, Limiter); maybe_apply(fun emqx_limiter:info/1, Limiter);
@ -293,7 +293,7 @@ websocket_init([Req, Opts]) ->
BytesIn = proplists:get_value(rate_limit, Opts), BytesIn = proplists:get_value(rate_limit, Opts),
RateLimit = emqx_zone:ratelimit(Zone), RateLimit = emqx_zone:ratelimit(Zone),
Limiter = emqx_limiter:init(Zone, PubLimit, BytesIn, RateLimit), Limiter = emqx_limiter:init(Zone, PubLimit, BytesIn, RateLimit),
ActiveN = proplists:get_value(active, Opts, ?ACTIVE_N), ActiveN = proplists:get_value(active_n, Opts, ?ACTIVE_N),
MQTTPiggyback = proplists:get_value(mqtt_piggyback, Opts, multiple), MQTTPiggyback = proplists:get_value(mqtt_piggyback, Opts, multiple),
FrameOpts = emqx_zone:mqtt_frame_options(Zone), FrameOpts = emqx_zone:mqtt_frame_options(Zone),
ParseState = emqx_frame:initial_parse_state(FrameOpts), ParseState = emqx_frame:initial_parse_state(FrameOpts),
@ -309,7 +309,7 @@ websocket_init([Req, Opts]) ->
{ok, #state{peername = Peername, {ok, #state{peername = Peername,
sockname = Sockname, sockname = Sockname,
sockstate = running, sockstate = running,
active = ActiveN, active_n = ActiveN,
mqtt_piggyback = MQTTPiggyback, mqtt_piggyback = MQTTPiggyback,
limiter = Limiter, limiter = Limiter,
parse_state = ParseState, parse_state = ParseState,
@ -372,7 +372,7 @@ websocket_info({check_gc, Stats}, State) ->
return(check_oom(run_gc(Stats, State))); return(check_oom(run_gc(Stats, State)));
websocket_info(Deliver = {deliver, _Topic, _Msg}, websocket_info(Deliver = {deliver, _Topic, _Msg},
State = #state{active = ActiveN}) -> State = #state{active_n = ActiveN}) ->
Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)], Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)],
with_channel(handle_deliver, [Delivers], State); with_channel(handle_deliver, [Delivers], State);
@ -551,7 +551,7 @@ parse_incoming(Data, State = #state{parse_state = ParseState}) ->
%% Handle incoming packet %% Handle incoming packet
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
handle_incoming(Packet, State = #state{active = ActiveN}) handle_incoming(Packet, State = #state{active_n = ActiveN})
when is_record(Packet, mqtt_packet) -> when is_record(Packet, mqtt_packet) ->
?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]), ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]),
ok = inc_incoming_stats(Packet), ok = inc_incoming_stats(Packet),
@ -586,7 +586,7 @@ with_channel(Fun, Args, State = #state{channel = Channel}) ->
%% Handle outgoing packets %% Handle outgoing packets
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
handle_outgoing(Packets, State = #state{active = ActiveN, mqtt_piggyback = MQTTPiggyback}) -> handle_outgoing(Packets, State = #state{active_n = ActiveN, mqtt_piggyback = MQTTPiggyback}) ->
IoData = lists:map(serialize_and_inc_stats_fun(State), Packets), IoData = lists:map(serialize_and_inc_stats_fun(State), Packets),
Oct = iolist_size(IoData), Oct = iolist_size(IoData),
ok = inc_sent_stats(length(Packets), Oct), ok = inc_sent_stats(length(Packets), Oct),

View File

@ -120,7 +120,7 @@ t_info(_) ->
end end
end), end),
#{sockinfo := SockInfo} = emqx_connection:info(CPid), #{sockinfo := SockInfo} = emqx_connection:info(CPid),
?assertMatch(#{active := 100, ?assertMatch(#{active_n := 100,
peername := {{127,0,0,1},3456}, peername := {{127,0,0,1},3456},
sockname := {{127,0,0,1},1883}, sockname := {{127,0,0,1},1883},
sockstate := idle, sockstate := idle,
@ -219,8 +219,8 @@ t_handle_msg_deliver(_) ->
t_handle_msg_inet_reply(_) -> t_handle_msg_inet_reply(_) ->
ok = meck:expect(emqx_pd, get_counter, fun(_) -> 10 end), ok = meck:expect(emqx_pd, get_counter, fun(_) -> 10 end),
?assertMatch({ok, _St}, handle_msg({inet_reply, for_testing, ok}, st(#{active => 0}))), ?assertMatch({ok, _St}, handle_msg({inet_reply, for_testing, ok}, st(#{active_n => 0}))),
?assertEqual(ok, handle_msg({inet_reply, for_testing, ok}, st(#{active => 100}))), ?assertEqual(ok, handle_msg({inet_reply, for_testing, ok}, st(#{active_n => 100}))),
?assertMatch({stop, {shutdown, for_testing}, _St}, ?assertMatch({stop, {shutdown, for_testing}, _St},
handle_msg({inet_reply, for_testing, {error, for_testing}}, st())). handle_msg({inet_reply, for_testing, {error, for_testing}}, st())).
@ -386,7 +386,7 @@ t_start_link_exit_on_activate(_) ->
t_get_conn_info(_) -> t_get_conn_info(_) ->
with_conn(fun(CPid) -> with_conn(fun(CPid) ->
#{sockinfo := SockInfo} = emqx_connection:info(CPid), #{sockinfo := SockInfo} = emqx_connection:info(CPid),
?assertEqual(#{active => 100, ?assertEqual(#{active_n => 100,
peername => {{127,0,0,1},3456}, peername => {{127,0,0,1},3456},
sockname => {{127,0,0,1},1883}, sockname => {{127,0,0,1},1883},
sockstate => running, sockstate => running,

View File

@ -265,7 +265,7 @@ t_connect_idle_timeout(_) ->
t_connect_limit_timeout(_) -> t_connect_limit_timeout(_) ->
ok = meck:new(proplists, [non_strict, passthrough, no_history, no_link, unstick]), ok = meck:new(proplists, [non_strict, passthrough, no_history, no_link, unstick]),
meck:expect(proplists, get_value, fun(active, _Options, _Default) -> 1; meck:expect(proplists, get_value, fun(active_n, _Options, _Default) -> 1;
(Arg1, ARg2, Arg3) -> meck:passthrough([Arg1, ARg2, Arg3]) (Arg1, ARg2, Arg3) -> meck:passthrough([Arg1, ARg2, Arg3])
end), end),

View File

@ -118,7 +118,7 @@ t_info(_) ->
end), end),
#{sockinfo := SockInfo} = ?ws_conn:call(WsPid, info), #{sockinfo := SockInfo} = ?ws_conn:call(WsPid, info),
#{socktype := ws, #{socktype := ws,
active := 100, active_n := 100,
peername := {{127,0,0,1}, 3456}, peername := {{127,0,0,1}, 3456},
sockname := {{127,0,0,1}, 18083}, sockname := {{127,0,0,1}, 18083},
sockstate := running sockstate := running

View File

@ -303,7 +303,7 @@ sockinfo(#state{peername = Peername}) ->
peername => Peername, peername => Peername,
sockname => {{127, 0, 0, 1}, 5683}, %% FIXME: Sock? sockname => {{127, 0, 0, 1}, 5683}, %% FIXME: Sock?
sockstate => running, sockstate => running,
active => 1 active_n => 1
}. }.
%% copies from emqx_channel:info/1 %% copies from emqx_channel:info/1

View File

@ -49,7 +49,7 @@ exproto.listener.protoname.max_conn_rate = 1000
## Specify the {active, N} option for the external MQTT/TCP Socket. ## Specify the {active, N} option for the external MQTT/TCP Socket.
## ##
## Value: Number ## Value: Number
exproto.listener.protoname.active = 100 exproto.listener.protoname.active_n = 100
## Idle timeout ## Idle timeout
## ##

View File

@ -78,7 +78,7 @@ end}.
{datatype, integer} {datatype, integer}
]}. ]}.
{mapping, "exproto.listener.$proto.active", "emqx_exproto.listeners", [ {mapping, "exproto.listener.$proto.active_n", "emqx_exproto.listeners", [
{default, 100}, {default, 100},
{datatype, integer} {datatype, integer}
]}. ]}.
@ -250,7 +250,7 @@ end}.
end, end,
ConnOpts = fun(Prefix) -> ConnOpts = fun(Prefix) ->
Filter([{active, cuttlefish:conf_get(Prefix ++ ".active", Conf, undefined)}, Filter([{active_n, cuttlefish:conf_get(Prefix ++ ".active_n", Conf, undefined)},
{idle_timeout, cuttlefish:conf_get(Prefix ++ ".idle_timeout", Conf, undefined)}]) {idle_timeout, cuttlefish:conf_get(Prefix ++ ".idle_timeout", Conf, undefined)}])
end, end,

View File

@ -61,7 +61,7 @@
%% Sock State %% Sock State
sockstate :: emqx_types:sockstate(), sockstate :: emqx_types:sockstate(),
%% The {active, N} option %% The {active, N} option
active :: pos_integer(), active_n :: pos_integer(),
%% BACKW: e4.2.0-e4.2.1 %% BACKW: e4.2.0-e4.2.1
%% We should remove it %% We should remove it
sendfun :: function() | undefined, sendfun :: function() | undefined,
@ -84,7 +84,7 @@
-type(state() :: #state{}). -type(state() :: #state{}).
-define(ACTIVE_N, 100). -define(ACTIVE_N, 100).
-define(INFO_KEYS, [socktype, peername, sockname, sockstate, active]). -define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n]).
-define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]). -define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]).
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).
@ -137,7 +137,7 @@ info(sockname, #state{sockname = Sockname}) ->
Sockname; Sockname;
info(sockstate, #state{sockstate = SockSt}) -> info(sockstate, #state{sockstate = SockSt}) ->
SockSt; SockSt;
info(active, #state{active = ActiveN}) -> info(active_n, #state{active_n = ActiveN}) ->
ActiveN. ActiveN.
-spec(stats(pid()|state()) -> emqx_types:stats()). -spec(stats(pid()|state()) -> emqx_types:stats()).
@ -240,7 +240,7 @@ init_state(WrappedSock, Peername, Options) ->
conn_mod => ?MODULE conn_mod => ?MODULE
}, },
ActiveN = proplists:get_value(active, Options, ?ACTIVE_N), ActiveN = proplists:get_value(active_n, Options, ?ACTIVE_N),
%% FIXME: %% FIXME:
%%Limiter = emqx_limiter:init(Options), %%Limiter = emqx_limiter:init(Options),
@ -255,7 +255,7 @@ init_state(WrappedSock, Peername, Options) ->
peername = Peername, peername = Peername,
sockname = Sockname, sockname = Sockname,
sockstate = idle, sockstate = idle,
active = ActiveN, active_n = ActiveN,
sendfun = undefined, sendfun = undefined,
limiter = undefined, limiter = undefined,
channel = Channel, channel = Channel,
@ -403,13 +403,13 @@ handle_msg({Passive, _Sock}, State)
handle_info(activate_socket, NState1); handle_info(activate_socket, NState1);
handle_msg(Deliver = {deliver, _Topic, _Msg}, handle_msg(Deliver = {deliver, _Topic, _Msg},
State = #state{active = ActiveN}) -> State = #state{active_n = ActiveN}) ->
Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)], Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)],
with_channel(handle_deliver, [Delivers], State); with_channel(handle_deliver, [Delivers], State);
%% Something sent %% Something sent
%% TODO: Who will deliver this message? %% TODO: Who will deliver this message?
handle_msg({inet_reply, _Sock, ok}, State = #state{active = ActiveN}) -> handle_msg({inet_reply, _Sock, ok}, State = #state{active_n = ActiveN}) ->
case emqx_pd:get_counter(outgoing_pkt) > ActiveN of case emqx_pd:get_counter(outgoing_pkt) > ActiveN of
true -> true ->
Pubs = emqx_pd:reset_counter(outgoing_pkt), Pubs = emqx_pd:reset_counter(outgoing_pkt),
@ -652,7 +652,7 @@ activate_socket(State = #state{sockstate = closed}) ->
activate_socket(State = #state{sockstate = blocked}) -> activate_socket(State = #state{sockstate = blocked}) ->
{ok, State}; {ok, State};
activate_socket(State = #state{socket = Socket, activate_socket(State = #state{socket = Socket,
active = N}) -> active_n = N}) ->
%% FIXME: Works on dtls/udp ??? %% FIXME: Works on dtls/udp ???
%% How to hanlde buffer? %% How to hanlde buffer?
case esockd_setopts(Socket, [{active, N}]) of case esockd_setopts(Socket, [{active, N}]) of

View File

@ -459,7 +459,7 @@ sockinfo(#lwm2m_state{peername = Peername}) ->
peername => Peername, peername => Peername,
sockname => {{127,0,0,1}, 5683}, %% FIXME: Sock? sockname => {{127,0,0,1}, 5683}, %% FIXME: Sock?
sockstate => running, sockstate => running,
active => 1 active_n => 1
}. }.
%% copies from emqx_channel:info/1 %% copies from emqx_channel:info/1

View File

@ -97,7 +97,7 @@
pending_topic_ids = #{} :: pending_msgs() pending_topic_ids = #{} :: pending_msgs()
}). }).
-define(INFO_KEYS, [socktype, peername, sockname, sockstate]). %, active]). -define(INFO_KEYS, [socktype, peername, sockname, sockstate]). %, active_n]).
-define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]). -define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]).
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).