refactor(connection): remove active_n from state

This commit is contained in:
Shawn 2021-07-05 19:13:55 +08:00
parent 10b01a34ef
commit 969e72c82b
3 changed files with 40 additions and 33 deletions

View File

@ -815,7 +815,6 @@ broker {
## - `flapping_detect.*` ## - `flapping_detect.*`
## - `force_shutdown.*` ## - `force_shutdown.*`
## - `conn_congestion.*` ## - `conn_congestion.*`
## - `overall_max_connections`
## ##
## Syntax: zones.<zone-name> {} ## Syntax: zones.<zone-name> {}
zones.default { zones.default {
@ -833,7 +832,9 @@ zones.default {
## Default: true ## Default: true
stats.enable: true stats.enable: true
## Maximum number of concurrent connections. ## Maximum number of concurrent connections in this zone.
## This value must be larger than the sum of `max_connections` set
## in the listeners under this zone.
## ##
## @doc zones.<name>.overall_max_connections ## @doc zones.<name>.overall_max_connections
## ValueType: Number | infinity ## ValueType: Number | infinity

View File

@ -83,8 +83,6 @@
sockname :: emqx_types:peername(), sockname :: emqx_types:peername(),
%% Sock State %% Sock State
sockstate :: emqx_types:sockstate(), sockstate :: emqx_types:sockstate(),
%% The {active, N} option
active_n :: pos_integer(),
%% Limiter %% Limiter
limiter :: maybe(emqx_limiter:limiter()), limiter :: maybe(emqx_limiter:limiter()),
%% Limit Timer %% Limit Timer
@ -102,14 +100,18 @@
%% Idle Timeout %% Idle Timeout
idle_timeout :: integer(), idle_timeout :: integer(),
%% Idle Timer %% Idle Timer
idle_timer :: maybe(reference()) idle_timer :: maybe(reference()),
%% Zone name
zone :: atom(),
%% Listener Name
listener :: atom()
}). }).
-type(state() :: #state{}). -type(state() :: #state{}).
-type(opts() :: #{zone := atom(), listener := atom(), atom() => term()}). -type(opts() :: #{zone := atom(), listener := atom(), atom() => term()}).
-define(ACTIVE_N, 100). -define(ACTIVE_N, 100).
-define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n]). -define(INFO_KEYS, [socktype, peername, sockname, sockstate]).
-define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]). -define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]).
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).
@ -166,8 +168,6 @@ info(sockname, #state{sockname = Sockname}) ->
Sockname; Sockname;
info(sockstate, #state{sockstate = SockSt}) -> info(sockstate, #state{sockstate = SockSt}) ->
SockSt; SockSt;
info(active_n, #state{active_n = ActiveN}) ->
ActiveN;
info(stats_timer, #state{stats_timer = StatsTimer}) -> info(stats_timer, #state{stats_timer = StatsTimer}) ->
StatsTimer; StatsTimer;
info(limit_timer, #state{limit_timer = LimitTimer}) -> info(limit_timer, #state{limit_timer = LimitTimer}) ->
@ -254,8 +254,9 @@ init_state(Transport, Socket, Options) ->
peercert => Peercert, peercert => Peercert,
conn_mod => ?MODULE conn_mod => ?MODULE
}, },
Zone = proplists:get_value(zone, Options), Zone = maps:get(zone, Options),
ActiveN = proplists:get_value(active_n, Options, ?ACTIVE_N), Listener = maps:get(listener, Options),
PubLimit = emqx_zone:publish_limit(Zone), PubLimit = emqx_zone:publish_limit(Zone),
BytesIn = proplists:get_value(rate_limit, Options), BytesIn = proplists:get_value(rate_limit, Options),
RateLimit = emqx_zone:ratelimit(Zone), RateLimit = emqx_zone:ratelimit(Zone),
@ -273,7 +274,6 @@ init_state(Transport, Socket, Options) ->
peername = Peername, peername = Peername,
sockname = Sockname, sockname = Sockname,
sockstate = idle, sockstate = idle,
active_n = ActiveN,
limiter = Limiter, limiter = Limiter,
parse_state = ParseState, parse_state = ParseState,
serialize = Serialize, serialize = Serialize,
@ -281,7 +281,9 @@ init_state(Transport, Socket, Options) ->
gc_state = GcState, gc_state = GcState,
stats_timer = StatsTimer, stats_timer = StatsTimer,
idle_timeout = IdleTimeout, idle_timeout = IdleTimeout,
idle_timer = IdleTimer idle_timer = IdleTimer,
zone = Zone,
listener = Listener
}. }.
run_loop(Parent, State = #state{transport = Transport, run_loop(Parent, State = #state{transport = Transport,
@ -452,14 +454,16 @@ handle_msg({Passive, _Sock}, State)
NState1 = check_oom(run_gc(InStats, NState)), NState1 = check_oom(run_gc(InStats, NState)),
handle_info(activate_socket, NState1); handle_info(activate_socket, NState1);
handle_msg(Deliver = {deliver, _Topic, _Msg}, handle_msg(Deliver = {deliver, _Topic, _Msg}, #state{zone = Zone,
#state{active_n = ActiveN} = State) -> listener = Listener} = State) ->
ActiveN = emqx_config:get_listener_conf(Zone, Listener, [tcp, active_n]),
Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)], Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)],
with_channel(handle_deliver, [Delivers], State); with_channel(handle_deliver, [Delivers], State);
%% Something sent %% Something sent
handle_msg({inet_reply, _Sock, ok}, State = #state{active_n = ActiveN}) -> handle_msg({inet_reply, _Sock, ok}, State = #state{zone = Zone, listener = Listener}) ->
case emqx_pd:get_counter(outgoing_pubs) > ActiveN of case emqx_pd:get_counter(outgoing_pubs) >
emqx_config:get_listener_conf(Zone, Listener, [tcp, active_n]) of
true -> true ->
Pubs = emqx_pd:reset_counter(outgoing_pubs), Pubs = emqx_pd:reset_counter(outgoing_pubs),
Bytes = emqx_pd:reset_counter(outgoing_bytes), Bytes = emqx_pd:reset_counter(outgoing_bytes),
@ -799,10 +803,10 @@ activate_socket(State = #state{sockstate = closed}) ->
{ok, State}; {ok, State};
activate_socket(State = #state{sockstate = blocked}) -> activate_socket(State = #state{sockstate = blocked}) ->
{ok, State}; {ok, State};
activate_socket(State = #state{transport = Transport, activate_socket(State = #state{transport = Transport, socket = Socket,
socket = Socket, zone = Zone, listener = Listener}) ->
active_n = N}) -> ActiveN = emqx_config:get_listener_conf(Zone, Listener, [tcp, active_n]),
case Transport:setopts(Socket, [{active, N}]) of case Transport:setopts(Socket, [{active, ActiveN}]) of
ok -> {ok, State#state{sockstate = running}}; ok -> {ok, State#state{sockstate = running}};
Error -> Error Error -> Error
end. end.

View File

@ -62,8 +62,6 @@
sockname :: emqx_types:peername(), sockname :: emqx_types:peername(),
%% Sock state %% Sock state
sockstate :: emqx_types:sockstate(), sockstate :: emqx_types:sockstate(),
%% Simulate the active_n opt
active_n :: pos_integer(),
%% MQTT Piggyback %% MQTT Piggyback
mqtt_piggyback :: single | multiple, mqtt_piggyback :: single | multiple,
%% Limiter %% Limiter
@ -85,7 +83,11 @@
%% Idle Timeout %% Idle Timeout
idle_timeout :: timeout(), idle_timeout :: timeout(),
%% Idle Timer %% Idle Timer
idle_timer :: maybe(reference()) idle_timer :: maybe(reference()),
%% Zone name
zone :: atom(),
%% Listener Name
listener :: atom()
}). }).
-type(state() :: #state{}). -type(state() :: #state{}).
@ -93,7 +95,7 @@
-type(ws_cmd() :: {active, boolean()}|close). -type(ws_cmd() :: {active, boolean()}|close).
-define(ACTIVE_N, 100). -define(ACTIVE_N, 100).
-define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n]). -define(INFO_KEYS, [socktype, peername, sockname, sockstate]).
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]).
-define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]). -define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]).
@ -124,8 +126,6 @@ info(sockname, #state{sockname = Sockname}) ->
Sockname; Sockname;
info(sockstate, #state{sockstate = SockSt}) -> info(sockstate, #state{sockstate = SockSt}) ->
SockSt; SockSt;
info(active_n, #state{active_n = ActiveN}) ->
ActiveN;
info(limiter, #state{limiter = Limiter}) -> info(limiter, #state{limiter = Limiter}) ->
maybe_apply(fun emqx_limiter:info/1, Limiter); maybe_apply(fun emqx_limiter:info/1, Limiter);
info(channel, #state{channel = Channel}) -> info(channel, #state{channel = Channel}) ->
@ -293,7 +293,6 @@ websocket_init([Req, Opts]) ->
BytesIn = proplists:get_value(rate_limit, Opts), BytesIn = proplists:get_value(rate_limit, Opts),
RateLimit = emqx_zone:ratelimit(Zone), RateLimit = emqx_zone:ratelimit(Zone),
Limiter = emqx_limiter:init(Zone, PubLimit, BytesIn, RateLimit), Limiter = emqx_limiter:init(Zone, PubLimit, BytesIn, RateLimit),
ActiveN = proplists:get_value(active_n, Opts, ?ACTIVE_N),
MQTTPiggyback = proplists:get_value(mqtt_piggyback, Opts, multiple), MQTTPiggyback = proplists:get_value(mqtt_piggyback, Opts, multiple),
FrameOpts = emqx_zone:mqtt_frame_options(Zone), FrameOpts = emqx_zone:mqtt_frame_options(Zone),
ParseState = emqx_frame:initial_parse_state(FrameOpts), ParseState = emqx_frame:initial_parse_state(FrameOpts),
@ -309,7 +308,6 @@ websocket_init([Req, Opts]) ->
{ok, #state{peername = Peername, {ok, #state{peername = Peername,
sockname = Sockname, sockname = Sockname,
sockstate = running, sockstate = running,
active_n = ActiveN,
mqtt_piggyback = MQTTPiggyback, mqtt_piggyback = MQTTPiggyback,
limiter = Limiter, limiter = Limiter,
parse_state = ParseState, parse_state = ParseState,
@ -372,7 +370,8 @@ websocket_info({check_gc, Stats}, State) ->
return(check_oom(run_gc(Stats, State))); return(check_oom(run_gc(Stats, State)));
websocket_info(Deliver = {deliver, _Topic, _Msg}, websocket_info(Deliver = {deliver, _Topic, _Msg},
State = #state{active_n = ActiveN}) -> State = #state{zone = Zone, listener = Listener}) ->
ActiveN = emqx_config:get_listener_conf(Zone, Listener, [tcp, active_n]),
Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)], Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)],
with_channel(handle_deliver, [Delivers], State); with_channel(handle_deliver, [Delivers], State);
@ -551,11 +550,12 @@ parse_incoming(Data, State = #state{parse_state = ParseState}) ->
%% Handle incoming packet %% Handle incoming packet
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
handle_incoming(Packet, State = #state{active_n = ActiveN}) handle_incoming(Packet, State = #state{zone = Zone, listener = Listener})
when is_record(Packet, mqtt_packet) -> when is_record(Packet, mqtt_packet) ->
?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]), ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]),
ok = inc_incoming_stats(Packet), ok = inc_incoming_stats(Packet),
NState = case emqx_pd:get_counter(incoming_pubs) > ActiveN of NState = case emqx_pd:get_counter(incoming_pubs) >
emqx_config:get_listener_conf(Zone, Listener, [tcp, active_n]) of
true -> postpone({cast, rate_limit}, State); true -> postpone({cast, rate_limit}, State);
false -> State false -> State
end, end,
@ -586,11 +586,13 @@ with_channel(Fun, Args, State = #state{channel = Channel}) ->
%% Handle outgoing packets %% Handle outgoing packets
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
handle_outgoing(Packets, State = #state{active_n = ActiveN, mqtt_piggyback = MQTTPiggyback}) -> handle_outgoing(Packets, State = #state{mqtt_piggyback = MQTTPiggyback,
zone = Zone, listener = Listener}) ->
IoData = lists:map(serialize_and_inc_stats_fun(State), Packets), IoData = lists:map(serialize_and_inc_stats_fun(State), Packets),
Oct = iolist_size(IoData), Oct = iolist_size(IoData),
ok = inc_sent_stats(length(Packets), Oct), ok = inc_sent_stats(length(Packets), Oct),
NState = case emqx_pd:get_counter(outgoing_pubs) > ActiveN of NState = case emqx_pd:get_counter(outgoing_pubs) >
emqx_config:get_listener_conf(Zone, Listener, [tcp, active_n]) of
true -> true ->
Stats = #{cnt => emqx_pd:reset_counter(outgoing_pubs), Stats = #{cnt => emqx_pd:reset_counter(outgoing_pubs),
oct => emqx_pd:reset_counter(outgoing_bytes) oct => emqx_pd:reset_counter(outgoing_bytes)