refactor(config): make rate_limit only in listeners for now

This commit is contained in:
Shawn 2021-07-19 15:36:00 +08:00
parent e6424d63d8
commit 2898e9c6dc
3 changed files with 38 additions and 40 deletions

View File

@ -31,8 +31,8 @@
-export([ info/1 -export([ info/1
, info/2 , info/2
, get_mqtt_conf/2
, get_mqtt_conf/3 , get_mqtt_conf/3
, get_mqtt_conf/4
, set_conn_state/2 , set_conn_state/2
, get_session/1 , get_session/1
, set_session/2 , set_session/2
@ -206,11 +206,11 @@ init(ConnInfo = #{peername := {PeerHost, _Port},
sockname := {_Host, SockPort}}, #{zone := Zone, listener := Listener}) -> sockname := {_Host, SockPort}}, #{zone := Zone, listener := Listener}) ->
Peercert = maps:get(peercert, ConnInfo, undefined), Peercert = maps:get(peercert, ConnInfo, undefined),
Protocol = maps:get(protocol, ConnInfo, mqtt), Protocol = maps:get(protocol, ConnInfo, mqtt),
MountPoint = case get_mqtt_conf(Zone, Listener, mountpoint) of MountPoint = case get_mqtt_conf(Zone, mountpoint) of
<<>> -> undefined; <<>> -> undefined;
MP -> MP MP -> MP
end, end,
QuotaPolicy = emqx_config:get_zone_conf(Zone, [rate_limit, quota]), QuotaPolicy = emqx_config:get_listener_conf(Zone, Listener,[rate_limit, quota], []),
ClientInfo = set_peercert_infos( ClientInfo = set_peercert_infos(
Peercert, Peercert,
#{zone => Zone, #{zone => Zone,
@ -249,11 +249,11 @@ set_peercert_infos(NoSSL, ClientInfo, _, _)
NoSSL =:= undefined -> NoSSL =:= undefined ->
ClientInfo#{username => undefined}; ClientInfo#{username => undefined};
set_peercert_infos(Peercert, ClientInfo, Zone, Listener) -> set_peercert_infos(Peercert, ClientInfo, Zone, _Listener) ->
{DN, CN} = {esockd_peercert:subject(Peercert), {DN, CN} = {esockd_peercert:subject(Peercert),
esockd_peercert:common_name(Peercert)}, esockd_peercert:common_name(Peercert)},
PeercetAs = fun(Key) -> PeercetAs = fun(Key) ->
case get_mqtt_conf(Zone, Listener, Key) of case get_mqtt_conf(Zone, Key) of
cn -> CN; cn -> CN;
dn -> DN; dn -> DN;
crt -> Peercert; crt -> Peercert;
@ -426,7 +426,7 @@ handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = S
end; end;
handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
Channel = #channel{clientinfo = ClientInfo = #{zone := Zone, listener := Listener}}) -> Channel = #channel{clientinfo = ClientInfo = #{zone := Zone}}) ->
case emqx_packet:check(Packet) of case emqx_packet:check(Packet) of
ok -> ok ->
TopicFilters0 = parse_topic_filters(TopicFilters), TopicFilters0 = parse_topic_filters(TopicFilters),
@ -538,7 +538,7 @@ process_connect(AckProps, Channel = #channel{conninfo = ConnInfo,
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId),
Channel = #channel{clientinfo = #{zone := Zone, listener := Listener}}) -> Channel = #channel{clientinfo = #{zone := Zone}}) ->
case pipeline([fun check_quota_exceeded/2, case pipeline([fun check_quota_exceeded/2,
fun process_alias/2, fun process_alias/2,
fun check_pub_alias/2, fun check_pub_alias/2,
@ -995,7 +995,7 @@ handle_info({sock_closed, Reason}, Channel = #channel{conn_state = connecting})
handle_info({sock_closed, Reason}, Channel = handle_info({sock_closed, Reason}, Channel =
#channel{conn_state = ConnState, #channel{conn_state = ConnState,
clientinfo = ClientInfo = #{zone := Zone, listener := Listener}}) clientinfo = ClientInfo = #{zone := Zone}})
when ConnState =:= connected orelse ConnState =:= reauthenticating -> when ConnState =:= connected orelse ConnState =:= reauthenticating ->
emqx_config:get_zone_conf(Zone, [flapping_detect, enable]) emqx_config:get_zone_conf(Zone, [flapping_detect, enable])
andalso emqx_flapping:detect(ClientInfo), andalso emqx_flapping:detect(ClientInfo),
@ -1158,9 +1158,9 @@ enrich_conninfo(ConnPkt = #mqtt_packet_connect{
username = Username username = Username
}, },
Channel = #channel{conninfo = ConnInfo, Channel = #channel{conninfo = ConnInfo,
clientinfo = #{zone := Zone, listener := Listener} clientinfo = #{zone := Zone}
}) -> }) ->
ExpiryInterval = expiry_interval(Zone, Listener, ConnPkt), ExpiryInterval = expiry_interval(Zone, ConnPkt),
NConnInfo = ConnInfo#{proto_name => ProtoName, NConnInfo = ConnInfo#{proto_name => ProtoName,
proto_ver => ProtoVer, proto_ver => ProtoVer,
clean_start => CleanStart, clean_start => CleanStart,
@ -1169,21 +1169,21 @@ enrich_conninfo(ConnPkt = #mqtt_packet_connect{
username => Username, username => Username,
conn_props => ConnProps, conn_props => ConnProps,
expiry_interval => ExpiryInterval, expiry_interval => ExpiryInterval,
receive_maximum => receive_maximum(Zone, Listener, ConnProps) receive_maximum => receive_maximum(Zone, ConnProps)
}, },
{ok, Channel#channel{conninfo = NConnInfo}}. {ok, Channel#channel{conninfo = NConnInfo}}.
%% If the Session Expiry Interval is absent the value 0 is used. %% If the Session Expiry Interval is absent the value 0 is used.
expiry_interval(_, _, #mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V5, expiry_interval(_, #mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V5,
properties = ConnProps}) -> properties = ConnProps}) ->
emqx_mqtt_props:get('Session-Expiry-Interval', ConnProps, 0); emqx_mqtt_props:get('Session-Expiry-Interval', ConnProps, 0);
expiry_interval(Zone, Listener, #mqtt_packet_connect{clean_start = false}) -> expiry_interval(Zone, #mqtt_packet_connect{clean_start = false}) ->
get_mqtt_conf(Zone, Listener, session_expiry_interval); get_mqtt_conf(Zone, session_expiry_interval);
expiry_interval(_, _, #mqtt_packet_connect{clean_start = true}) -> expiry_interval(_, #mqtt_packet_connect{clean_start = true}) ->
0. 0.
receive_maximum(Zone, Listener, ConnProps) -> receive_maximum(Zone, ConnProps) ->
MaxInflightConfig = case get_mqtt_conf(Zone, Listener, max_inflight) of MaxInflightConfig = case get_mqtt_conf(Zone, max_inflight) of
0 -> ?RECEIVE_MAXIMUM_LIMIT; 0 -> ?RECEIVE_MAXIMUM_LIMIT;
N -> N N -> N
end, end,
@ -1232,9 +1232,9 @@ set_bridge_mode(_ConnPkt, _ClientInfo) -> ok.
maybe_username_as_clientid(_ConnPkt, ClientInfo = #{username := undefined}) -> maybe_username_as_clientid(_ConnPkt, ClientInfo = #{username := undefined}) ->
{ok, ClientInfo}; {ok, ClientInfo};
maybe_username_as_clientid(_ConnPkt, ClientInfo = #{zone := Zone, listener := Listener, maybe_username_as_clientid(_ConnPkt, ClientInfo = #{zone := Zone,
username := Username}) -> username := Username}) ->
case get_mqtt_conf(Zone, Listener, use_username_as_clientid) of case get_mqtt_conf(Zone, use_username_as_clientid) of
true -> {ok, ClientInfo#{clientid => Username}}; true -> {ok, ClientInfo#{clientid => Username}};
false -> ok false -> ok
end. end.
@ -1477,9 +1477,8 @@ put_subid_in_subopts(_Properties, TopicFilters) -> TopicFilters.
enrich_subopts(SubOpts, _Channel = ?IS_MQTT_V5) -> enrich_subopts(SubOpts, _Channel = ?IS_MQTT_V5) ->
SubOpts; SubOpts;
enrich_subopts(SubOpts, #channel{clientinfo = #{zone := Zone, is_bridge := IsBridge, enrich_subopts(SubOpts, #channel{clientinfo = #{zone := Zone, is_bridge := IsBridge}}) ->
listener := Listener}}) -> NL = flag(get_mqtt_conf(Zone, ignore_loop_deliver)),
NL = flag(get_mqtt_conf(Zone, Listener, ignore_loop_deliver)),
SubOpts#{rap => flag(IsBridge), nl => NL}. SubOpts#{rap => flag(IsBridge), nl => NL}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -1515,8 +1514,8 @@ enrich_connack_caps(AckProps, _Channel) -> AckProps.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Enrich server keepalive %% Enrich server keepalive
enrich_server_keepalive(AckProps, #channel{clientinfo = #{zone := Zone, listener := Listener}}) -> enrich_server_keepalive(AckProps, #channel{clientinfo = #{zone := Zone}}) ->
case get_mqtt_conf(Zone, Listener, server_keepalive) of case get_mqtt_conf(Zone, server_keepalive) of
disabled -> AckProps; disabled -> AckProps;
Keepalive -> AckProps#{'Server-Keep-Alive' => Keepalive} Keepalive -> AckProps#{'Server-Keep-Alive' => Keepalive}
end. end.
@ -1525,11 +1524,11 @@ enrich_server_keepalive(AckProps, #channel{clientinfo = #{zone := Zone, listener
%% Enrich response information %% Enrich response information
enrich_response_information(AckProps, #channel{conninfo = #{conn_props := ConnProps}, enrich_response_information(AckProps, #channel{conninfo = #{conn_props := ConnProps},
clientinfo = #{zone := Zone, listener := Listener}}) -> clientinfo = #{zone := Zone}}) ->
case emqx_mqtt_props:get('Request-Response-Information', ConnProps, 0) of case emqx_mqtt_props:get('Request-Response-Information', ConnProps, 0) of
0 -> AckProps; 0 -> AckProps;
1 -> AckProps#{'Response-Information' => 1 -> AckProps#{'Response-Information' =>
case get_mqtt_conf(Zone, Listener, response_information, "") of case get_mqtt_conf(Zone, response_information, "") of
"" -> undefined; "" -> undefined;
RspInfo -> RspInfo RspInfo -> RspInfo
end} end}
@ -1581,9 +1580,8 @@ ensure_keepalive(_AckProps, Channel = #channel{conninfo = ConnInfo}) ->
ensure_keepalive_timer(0, Channel) -> Channel; ensure_keepalive_timer(0, Channel) -> Channel;
ensure_keepalive_timer(disabled, Channel) -> Channel; ensure_keepalive_timer(disabled, Channel) -> Channel;
ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone, ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone}}) ->
listener := Listener}}) -> Backoff = get_mqtt_conf(Zone, keepalive_backoff),
Backoff = get_mqtt_conf(Zone, Listener, keepalive_backoff),
Keepalive = emqx_keepalive:init(round(timer:seconds(Interval) * Backoff)), Keepalive = emqx_keepalive:init(round(timer:seconds(Interval) * Backoff)),
ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}). ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}).
@ -1624,7 +1622,7 @@ maybe_shutdown(Reason, Channel = #channel{conninfo = ConnInfo}) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Is ACL enabled? %% Is ACL enabled?
is_acl_enabled(#{zone := Zone, listener := Listener, is_superuser := IsSuperuser}) -> is_acl_enabled(#{zone := Zone, is_superuser := IsSuperuser}) ->
(not IsSuperuser) andalso emqx_config:get_zone_conf(Zone, [acl, enable]). (not IsSuperuser) andalso emqx_config:get_zone_conf(Zone, [acl, enable]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -1736,10 +1734,10 @@ sp(false) -> 0.
flag(true) -> 1; flag(true) -> 1;
flag(false) -> 0. flag(false) -> 0.
get_mqtt_conf(Zone, Listener, Key) -> get_mqtt_conf(Zone, Key) ->
emqx_config:get_zone_conf(Zone, [mqtt, Key]). emqx_config:get_zone_conf(Zone, [mqtt, Key]).
get_mqtt_conf(Zone, Listener, Key, Default) -> get_mqtt_conf(Zone, Key, Default) ->
emqx_config:get_zone_conf(Zone, [mqtt, Key], Default). emqx_config:get_zone_conf(Zone, [mqtt, Key], Default).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -269,7 +269,7 @@ init_state(Transport, Socket, #{zone := Zone, listener := Listener} = Opts) ->
true -> undefined; true -> undefined;
false -> disabled false -> disabled
end, end,
IdleTimeout = emqx_channel:get_mqtt_conf(Zone, Listener, idle_timeout), IdleTimeout = emqx_channel:get_mqtt_conf(Zone, idle_timeout),
IdleTimer = start_timer(IdleTimeout, idle_timeout), IdleTimer = start_timer(IdleTimeout, idle_timeout),
#state{transport = Transport, #state{transport = Transport,
socket = Socket, socket = Socket,
@ -908,5 +908,5 @@ get_state(Pid) ->
get_active_n(Zone, Listener) -> get_active_n(Zone, Listener) ->
case emqx_config:get([zones, Zone, listeners, Listener, type]) of case emqx_config:get([zones, Zone, listeners, Listener, type]) of
quic -> 100; quic -> 100;
_ -> emqx_config:get_zone_conf(Zone, [tcp, active_n]) _ -> emqx_config:get_listener_conf(Zone, Listener, [tcp, active_n])
end. end.

View File

@ -244,7 +244,7 @@ check_origin_header(Req, #{zone := Zone, listener := Listener} = Opts) ->
websocket_init([Req, #{zone := Zone, listener := Listener} = Opts]) -> websocket_init([Req, #{zone := Zone, listener := Listener} = Opts]) ->
{Peername, Peercert} = {Peername, Peercert} =
case emqx_config:get_zone_conf(Zone, [proxy_protocol]) andalso case emqx_config:get_listener_conf(Zone, Listener, [proxy_protocol]) andalso
maps:get(proxy_header, Req) of maps:get(proxy_header, Req) of
#{src_address := SrcAddr, src_port := SrcPort, ssl := SSL} -> #{src_address := SrcAddr, src_port := SrcPort, ssl := SSL} ->
SourceName = {SrcAddr, SrcPort}, SourceName = {SrcAddr, SrcPort},
@ -296,7 +296,7 @@ websocket_init([Req, #{zone := Zone, listener := Listener} = Opts]) ->
false -> disabled false -> disabled
end, end,
%% MQTT Idle Timeout %% MQTT Idle Timeout
IdleTimeout = emqx_channel:get_mqtt_conf(Zone, Listener, idle_timeout), IdleTimeout = emqx_channel:get_mqtt_conf(Zone, idle_timeout),
IdleTimer = start_timer(IdleTimeout, idle_timeout), IdleTimer = start_timer(IdleTimeout, idle_timeout),
case emqx_config:get_zone_conf(emqx_channel:info(zone, Channel), case emqx_config:get_zone_conf(emqx_channel:info(zone, Channel),
[force_shutdown]) of [force_shutdown]) of
@ -372,7 +372,7 @@ websocket_info({check_gc, Stats}, State) ->
websocket_info(Deliver = {deliver, _Topic, _Msg}, websocket_info(Deliver = {deliver, _Topic, _Msg},
State = #state{zone = Zone, listener = Listener}) -> State = #state{zone = Zone, listener = Listener}) ->
ActiveN = emqx_config:get_zone_conf(Zone, [tcp, active_n]), ActiveN = emqx_config:get_listener_conf(Zone, Listener, [tcp, active_n]),
Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)], Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)],
with_channel(handle_deliver, [Delivers], State); with_channel(handle_deliver, [Delivers], State);
@ -564,7 +564,7 @@ handle_incoming(Packet, State = #state{zone = Zone, listener = Listener})
?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]), ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]),
ok = inc_incoming_stats(Packet), ok = inc_incoming_stats(Packet),
NState = case emqx_pd:get_counter(incoming_pubs) > NState = case emqx_pd:get_counter(incoming_pubs) >
emqx_config:get_zone_conf(Zone, [tcp, active_n]) of emqx_config:get_listener_conf(Zone, Listener, [tcp, active_n]) of
true -> postpone({cast, rate_limit}, State); true -> postpone({cast, rate_limit}, State);
false -> State false -> State
end, end,
@ -601,7 +601,7 @@ handle_outgoing(Packets, State = #state{mqtt_piggyback = MQTTPiggyback,
Oct = iolist_size(IoData), Oct = iolist_size(IoData),
ok = inc_sent_stats(length(Packets), Oct), ok = inc_sent_stats(length(Packets), Oct),
NState = case emqx_pd:get_counter(outgoing_pubs) > NState = case emqx_pd:get_counter(outgoing_pubs) >
emqx_config:get_zone_conf(Zone, [tcp, active_n]) of emqx_config:get_listener_conf(Zone, Listener, [tcp, active_n]) of
true -> true ->
Stats = #{cnt => emqx_pd:reset_counter(outgoing_pubs), Stats = #{cnt => emqx_pd:reset_counter(outgoing_pubs),
oct => emqx_pd:reset_counter(outgoing_bytes) oct => emqx_pd:reset_counter(outgoing_bytes)
@ -789,4 +789,4 @@ set_field(Name, Value, State) ->
setelement(Pos+1, State, Value). setelement(Pos+1, State, Value).
get_ws_opts(Zone, Listener, Key) -> get_ws_opts(Zone, Listener, Key) ->
emqx_config:get_zone_conf(Zone, [websocket, Key]). emqx_config:get_listener_conf(Zone, Listener, [websocket, Key]).