Merge pull request #8156 from lafirest/fix/default_con_limiter

fix(limiter): add default connection limiter for listeners
This commit is contained in:
JianBo He 2022-06-15 13:38:13 +08:00 committed by GitHub
commit f1d4bab97d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 68 additions and 80 deletions

View File

@ -1,11 +1,13 @@
listeners.tcp.default { listeners.tcp.default {
bind = "0.0.0.0:1883" bind = "0.0.0.0:1883"
max_connections = 1024000 max_connections = 1024000
limiter.connection = default
} }
listeners.ssl.default { listeners.ssl.default {
bind = "0.0.0.0:8883" bind = "0.0.0.0:8883"
max_connections = 512000 max_connections = 512000
limiter.connection = default
ssl_options { ssl_options {
keyfile = "{{ platform_etc_dir }}/certs/key.pem" keyfile = "{{ platform_etc_dir }}/certs/key.pem"
certfile = "{{ platform_etc_dir }}/certs/cert.pem" certfile = "{{ platform_etc_dir }}/certs/cert.pem"
@ -16,12 +18,14 @@ listeners.ssl.default {
listeners.ws.default { listeners.ws.default {
bind = "0.0.0.0:8083" bind = "0.0.0.0:8083"
max_connections = 1024000 max_connections = 1024000
limiter.connection = default
websocket.mqtt_path = "/mqtt" websocket.mqtt_path = "/mqtt"
} }
listeners.wss.default { listeners.wss.default {
bind = "0.0.0.0:8084" bind = "0.0.0.0:8084"
max_connections = 512000 max_connections = 512000
limiter.connection = default
websocket.mqtt_path = "/mqtt" websocket.mqtt_path = "/mqtt"
ssl_options { ssl_options {
keyfile = "{{ platform_etc_dir }}/certs/key.pem" keyfile = "{{ platform_etc_dir }}/certs/key.pem"
@ -34,6 +38,7 @@ listeners.wss.default {
# enabled = false # enabled = false
# bind = "0.0.0.0:14567" # bind = "0.0.0.0:14567"
# max_connections = 1024000 # max_connections = 1024000
# limiter.connection = default
# keyfile = "{{ platform_etc_dir }}/certs/key.pem" # keyfile = "{{ platform_etc_dir }}/certs/key.pem"
# certfile = "{{ platform_etc_dir }}/certs/cert.pem" # certfile = "{{ platform_etc_dir }}/certs/cert.pem"
#} #}

View File

@ -1,16 +1,5 @@
emqx_limiter_schema { emqx_limiter_schema {
enable {
desc {
en: """Enable"""
zh: """是否开启"""
}
label: {
en: """Enable"""
zh: """是否开启"""
}
}
failure_strategy { failure_strategy {
desc { desc {
en: """The strategy when all the retries failed.""" en: """The strategy when all the retries failed."""

View File

@ -0,0 +1,11 @@
limiter {
connection {
rate = "1000/s"
bucket {
default {
rate = "1000/s"
capacity = 1000
}
}
}
}

View File

@ -119,19 +119,9 @@ post_config_update([limiter, Type], _Config, NewConf, _OldConf, _AppEnvs) ->
Config = maps:get(Type, NewConf), Config = maps:get(Type, NewConf),
case emqx_limiter_server:whereis(Type) of case emqx_limiter_server:whereis(Type) of
undefined -> undefined ->
case Config of start_server(Type, Config);
#{enable := false} ->
ok;
_ ->
start_server(Type)
end;
_ -> _ ->
case Config of emqx_limiter_server:update_config(Type, Config)
#{enable := false} ->
stop_server(Type);
_ ->
emqx_limiter_server:update_config(Type, Config)
end
end. end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -30,8 +30,7 @@
namespace/0, namespace/0,
get_bucket_cfg_path/2, get_bucket_cfg_path/2,
desc/1, desc/1,
types/0, types/0
is_enable/1
]). ]).
-define(KILOBYTE, 1024). -define(KILOBYTE, 1024).
@ -89,13 +88,12 @@ fields(limiter) ->
{Type, {Type,
?HOCON(?R_REF(limiter_opts), #{ ?HOCON(?R_REF(limiter_opts), #{
desc => ?DESC(Type), desc => ?DESC(Type),
default => #{<<"enable">> => false} default => #{}
})} })}
|| Type <- types() || Type <- types()
]; ];
fields(limiter_opts) -> fields(limiter_opts) ->
[ [
{enable, ?HOCON(boolean(), #{desc => ?DESC(enable), default => true})},
{rate, ?HOCON(rate(), #{desc => ?DESC(rate), default => "infinity"})}, {rate, ?HOCON(rate(), #{desc => ?DESC(rate), default => "infinity"})},
{burst, {burst,
?HOCON(burst_rate(), #{ ?HOCON(burst_rate(), #{
@ -202,10 +200,6 @@ to_rate(Str) ->
get_bucket_cfg_path(Type, BucketName) -> get_bucket_cfg_path(Type, BucketName) ->
[limiter, Type, bucket, BucketName]. [limiter, Type, bucket, BucketName].
-spec is_enable(limiter_type()) -> boolean().
is_enable(Type) ->
emqx:get_config([limiter, Type, enable], false).
types() -> types() ->
[bytes_in, message_in, connection, message_routing, batch]. [bytes_in, message_in, connection, message_routing, batch].

View File

@ -113,13 +113,10 @@
connect(_Type, undefined) -> connect(_Type, undefined) ->
{ok, emqx_htb_limiter:make_infinity_limiter()}; {ok, emqx_htb_limiter:make_infinity_limiter()};
connect(Type, BucketName) when is_atom(BucketName) -> connect(Type, BucketName) when is_atom(BucketName) ->
case check_enable_and_get_bucket_cfg(Type, BucketName) of case get_bucket_cfg(Type, BucketName) of
undefined -> undefined ->
?SLOG(error, #{msg => "bucket_config_not_found", type => Type, bucket => BucketName}), ?SLOG(error, #{msg => "bucket_config_not_found", type => Type, bucket => BucketName}),
{error, config_not_found}; {error, config_not_found};
limiter_not_started ->
?SLOG(error, #{msg => "limiter_not_started", type => Type, bucket => BucketName}),
{error, limiter_not_started};
#{ #{
rate := AggrRate, rate := AggrRate,
capacity := AggrSize, capacity := AggrSize,
@ -602,13 +599,8 @@ call(Type, Msg) ->
gen_server:call(Pid, Msg) gen_server:call(Pid, Msg)
end. end.
-spec check_enable_and_get_bucket_cfg(limiter_type(), bucket_name()) -> -spec get_bucket_cfg(limiter_type(), bucket_name()) ->
undefined | limiter_not_started | hocons:config(). undefined | limiter_not_started | hocons:config().
check_enable_and_get_bucket_cfg(Type, Bucket) -> get_bucket_cfg(Type, Bucket) ->
case emqx_limiter_schema:is_enable(Type) of Path = emqx_limiter_schema:get_bucket_cfg_path(Type, Bucket),
false -> emqx:get_config(Path, undefined).
limiter_not_started;
_ ->
Path = emqx_limiter_schema:get_bucket_cfg_path(Type, Bucket),
emqx:get_config(Path, undefined)
end.

View File

@ -101,14 +101,4 @@ make_child(Type, Cfg) ->
}. }.
childs() -> childs() ->
Conf = emqx:get_config([limiter]), [make_child(Type) || Type <- emqx_limiter_schema:types()].
lists:foldl(
fun
({Type, #{enable := true}}, Acc) ->
[make_child(Type) | Acc];
(_, Acc) ->
Acc
end,
[],
maps:to_list(Conf)
).

View File

@ -49,32 +49,33 @@ init_per_testcase(Case, Config) when
undefined -> ok; undefined -> ok;
Listeners -> emqx_config:put([listeners], maps:remove(quic, Listeners)) Listeners -> emqx_config:put([listeners], maps:remove(quic, Listeners))
end, end,
PrevListeners = emqx_config:get([listeners, tcp], #{}),
PrevRateLimit = emqx_config:get([rate_limit], #{}), PrevListeners = emqx_config:get([listeners]),
emqx_config:put( PureListeners = remove_default_limiter(PrevListeners),
[listeners, tcp], PureListeners2 = PureListeners#{
#{ tcp => #{
listener_test => #{ listener_test => #{
bind => {"127.0.0.1", 9999}, bind => {"127.0.0.1", 9999},
max_connections => 4321, max_connections => 4321,
limiter => #{} limiter => #{}
} }
} }
), },
emqx_config:put([rate_limit], #{max_conn_rate => 1000}), emqx_config:put([listeners], PureListeners2),
ok = emqx_listeners:start(), ok = emqx_listeners:start(),
[ [
{prev_listener_conf, PrevListeners}, {prev_listener_conf, PrevListeners}
{prev_rate_limit_conf, PrevRateLimit}
| Config | Config
]; ];
init_per_testcase(t_wss_conn, Config) -> init_per_testcase(t_wss_conn, Config) ->
catch emqx_config_handler:stop(), catch emqx_config_handler:stop(),
{ok, _} = emqx_config_handler:start_link(), {ok, _} = emqx_config_handler:start_link(),
PrevListeners = emqx_config:get([listeners, wss], #{}),
emqx_config:put( PrevListeners = emqx_config:get([listeners]),
[listeners, wss], PureListeners = remove_default_limiter(PrevListeners),
#{ PureListeners2 = PureListeners#{
wss => #{
listener_test => #{ listener_test => #{
bind => {{127, 0, 0, 1}, 9998}, bind => {{127, 0, 0, 1}, 9998},
limiter => #{}, limiter => #{},
@ -85,7 +86,9 @@ init_per_testcase(t_wss_conn, Config) ->
} }
} }
} }
), },
emqx_config:put([listeners], PureListeners2),
ok = emqx_listeners:start(), ok = emqx_listeners:start(),
[ [
{prev_listener_conf, PrevListeners} {prev_listener_conf, PrevListeners}
@ -94,25 +97,31 @@ init_per_testcase(t_wss_conn, Config) ->
init_per_testcase(_, Config) -> init_per_testcase(_, Config) ->
catch emqx_config_handler:stop(), catch emqx_config_handler:stop(),
{ok, _} = emqx_config_handler:start_link(), {ok, _} = emqx_config_handler:start_link(),
Config. PrevListeners = emqx_config:get([listeners]),
PureListeners = remove_default_limiter(PrevListeners),
emqx_config:put([listeners], PureListeners),
[
{prev_listener_conf, PrevListeners}
| Config
].
end_per_testcase(Case, Config) when end_per_testcase(Case, Config) when
Case =:= t_max_conns_tcp; Case =:= t_current_conns_tcp Case =:= t_max_conns_tcp; Case =:= t_current_conns_tcp
-> ->
PrevListener = ?config(prev_listener_conf, Config), PrevListener = ?config(prev_listener_conf, Config),
PrevRateLimit = ?config(prev_rate_limit_conf, Config),
emqx_listeners:stop(), emqx_listeners:stop(),
emqx_config:put([listeners, tcp], PrevListener), emqx_config:put([listeners], PrevListener),
emqx_config:put([rate_limit], PrevRateLimit),
_ = emqx_config_handler:stop(), _ = emqx_config_handler:stop(),
ok; ok;
end_per_testcase(t_wss_conn, Config) -> end_per_testcase(t_wss_conn, Config) ->
PrevListener = ?config(prev_listener_conf, Config), PrevListener = ?config(prev_listener_conf, Config),
emqx_listeners:stop(), emqx_listeners:stop(),
emqx_config:put([listeners, wss], PrevListener), emqx_config:put([listeners], PrevListener),
_ = emqx_config_handler:stop(), _ = emqx_config_handler:stop(),
ok; ok;
end_per_testcase(_, _Config) -> end_per_testcase(_, Config) ->
PrevListener = ?config(prev_listener_conf, Config),
emqx_config:put([listeners], PrevListener),
_ = emqx_config_handler:stop(), _ = emqx_config_handler:stop(),
ok. ok.
@ -184,3 +193,16 @@ get_base_dir(Module) ->
get_base_dir() -> get_base_dir() ->
get_base_dir(?MODULE). get_base_dir(?MODULE).
remove_default_limiter(Listeners) ->
maps:map(
fun(_, X) ->
maps:map(
fun(_, E) ->
maps:remove(limiter, E)
end,
X
)
end,
Listeners
).

View File

@ -29,7 +29,6 @@
"\n" "\n"
"limiter {\n" "limiter {\n"
" bytes_in {\n" " bytes_in {\n"
" enable = true\n"
" bucket.default {\n" " bucket.default {\n"
" rate = infinity\n" " rate = infinity\n"
" capacity = infinity\n" " capacity = infinity\n"
@ -37,7 +36,6 @@
" }\n" " }\n"
"\n" "\n"
" message_in {\n" " message_in {\n"
" enable = true\n"
" bucket.default {\n" " bucket.default {\n"
" rate = infinity\n" " rate = infinity\n"
" capacity = infinity\n" " capacity = infinity\n"
@ -45,7 +43,6 @@
" }\n" " }\n"
"\n" "\n"
" connection {\n" " connection {\n"
" enable = true\n"
" bucket.default {\n" " bucket.default {\n"
" rate = infinity\n" " rate = infinity\n"
" capacity = infinity\n" " capacity = infinity\n"
@ -53,7 +50,6 @@
" }\n" " }\n"
"\n" "\n"
" message_routing {\n" " message_routing {\n"
" enable = true\n"
" bucket.default {\n" " bucket.default {\n"
" rate = infinity\n" " rate = infinity\n"
" capacity = infinity\n" " capacity = infinity\n"
@ -61,7 +57,6 @@
" }\n" " }\n"
"\n" "\n"
" batch {\n" " batch {\n"
" enable = true\n"
" bucket.retainer {\n" " bucket.retainer {\n"
" rate = infinity\n" " rate = infinity\n"
" capacity = infinity\n" " capacity = infinity\n"