fix(limiter): fix an error when setting `max_conn_rate` in a listener

This commit is contained in:
firest 2023-05-11 13:52:13 +08:00
parent c8758190b5
commit b7126257a5
2 changed files with 30 additions and 22 deletions

View File

@ -286,7 +286,8 @@ default_client_config() ->
default_bucket_config() -> default_bucket_config() ->
#{ #{
rate => infinity, rate => infinity,
burst => 0 burst => 0,
initial => 0
}. }.
get_listener_opts(Conf) -> get_listener_opts(Conf) ->

View File

@ -347,7 +347,8 @@ do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) when
Type == tcp; Type == ssl Type == tcp; Type == ssl
-> ->
Id = listener_id(Type, ListenerName), Id = listener_id(Type, ListenerName),
add_limiter_bucket(Id, Opts), Limiter = limiter(Opts),
add_limiter_bucket(Id, Limiter),
esockd:open( esockd:open(
Id, Id,
ListenOn, ListenOn,
@ -356,7 +357,7 @@ do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) when
#{ #{
listener => {Type, ListenerName}, listener => {Type, ListenerName},
zone => zone(Opts), zone => zone(Opts),
limiter => limiter(Opts), limiter => Limiter,
enable_authn => enable_authn(Opts) enable_authn => enable_authn(Opts)
} }
]} ]}
@ -366,9 +367,10 @@ do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) when
Type == ws; Type == wss Type == ws; Type == wss
-> ->
Id = listener_id(Type, ListenerName), Id = listener_id(Type, ListenerName),
add_limiter_bucket(Id, Opts), Limiter = limiter(Opts),
add_limiter_bucket(Id, Limiter),
RanchOpts = ranch_opts(Type, ListenOn, Opts), RanchOpts = ranch_opts(Type, ListenOn, Opts),
WsOpts = ws_opts(Type, ListenerName, Opts), WsOpts = ws_opts(Type, ListenerName, Opts, Limiter),
case Type of case Type of
ws -> cowboy:start_clear(Id, RanchOpts, WsOpts); ws -> cowboy:start_clear(Id, RanchOpts, WsOpts);
wss -> cowboy:start_tls(Id, RanchOpts, WsOpts) wss -> cowboy:start_tls(Id, RanchOpts, WsOpts)
@ -415,20 +417,22 @@ do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) ->
Password -> [{password, str(Password)}] Password -> [{password, str(Password)}]
end ++ end ++
optional_quic_listener_opts(Opts), optional_quic_listener_opts(Opts),
Limiter = limiter(Opts),
ConnectionOpts = #{ ConnectionOpts = #{
conn_callback => emqx_quic_connection, conn_callback => emqx_quic_connection,
peer_unidi_stream_count => maps:get(peer_unidi_stream_count, Opts, 1), peer_unidi_stream_count => maps:get(peer_unidi_stream_count, Opts, 1),
peer_bidi_stream_count => maps:get(peer_bidi_stream_count, Opts, 10), peer_bidi_stream_count => maps:get(peer_bidi_stream_count, Opts, 10),
zone => zone(Opts), zone => zone(Opts),
listener => {quic, ListenerName}, listener => {quic, ListenerName},
limiter => limiter(Opts) limiter => Limiter
}, },
StreamOpts = #{ StreamOpts = #{
stream_callback => emqx_quic_stream, stream_callback => emqx_quic_stream,
active => 1 active => 1
}, },
Id = listener_id(quic, ListenerName), Id = listener_id(quic, ListenerName),
add_limiter_bucket(Id, Opts), add_limiter_bucket(Id, Limiter),
quicer:start_listener( quicer:start_listener(
Id, Id,
ListenOn, ListenOn,
@ -532,12 +536,12 @@ esockd_opts(ListenerId, Type, Opts0) ->
end end
). ).
ws_opts(Type, ListenerName, Opts) -> ws_opts(Type, ListenerName, Opts, Limiter) ->
WsPaths = [ WsPaths = [
{emqx_utils_maps:deep_get([websocket, mqtt_path], Opts, "/mqtt"), emqx_ws_connection, #{ {emqx_utils_maps:deep_get([websocket, mqtt_path], Opts, "/mqtt"), emqx_ws_connection, #{
zone => zone(Opts), zone => zone(Opts),
listener => {Type, ListenerName}, listener => {Type, ListenerName},
limiter => limiter(Opts), limiter => Limiter,
enable_authn => enable_authn(Opts) enable_authn => enable_authn(Opts)
}} }}
], ],
@ -653,26 +657,29 @@ zone(Opts) ->
limiter(Opts) -> limiter(Opts) ->
emqx_limiter_schema:get_listener_opts(Opts). emqx_limiter_schema:get_listener_opts(Opts).
add_limiter_bucket(Id, #{limiter := Limiter}) -> add_limiter_bucket(_Id, undefined) ->
ok;
add_limiter_bucket(Id, Limiter) ->
maps:fold( maps:fold(
fun(Type, Cfg, _) -> fun(Type, Cfg, _) ->
emqx_limiter_server:add_bucket(Id, Type, Cfg) emqx_limiter_server:add_bucket(Id, Type, Cfg)
end, end,
ok, ok,
maps:without([client], Limiter) maps:without([client], Limiter)
); ).
add_limiter_bucket(_Id, _Cfg) ->
ok.
del_limiter_bucket(Id, #{limiter := Limiters}) -> del_limiter_bucket(Id, Conf) ->
lists:foreach( case limiter(Conf) of
fun(Type) -> undefined ->
emqx_limiter_server:del_bucket(Id, Type) ok;
end, Limiter ->
maps:keys(Limiters) lists:foreach(
); fun(Type) ->
del_limiter_bucket(_Id, _Cfg) -> emqx_limiter_server:del_bucket(Id, Type)
ok. end,
maps:keys(Limiter)
)
end.
enable_authn(Opts) -> enable_authn(Opts) ->
maps:get(enable_authn, Opts, true). maps:get(enable_authn, Opts, true).