diff --git a/apps/emqx/etc/emqx.conf b/apps/emqx/etc/emqx.conf index bd3ae4af0..f3dee0bcf 100644 --- a/apps/emqx/etc/emqx.conf +++ b/apps/emqx/etc/emqx.conf @@ -1,11 +1,13 @@ listeners.tcp.default { bind = "0.0.0.0:1883" max_connections = 1024000 + limiter.connection = default } listeners.ssl.default { bind = "0.0.0.0:8883" max_connections = 512000 + limiter.connection = default ssl_options { keyfile = "{{ platform_etc_dir }}/certs/key.pem" certfile = "{{ platform_etc_dir }}/certs/cert.pem" @@ -16,12 +18,14 @@ listeners.ssl.default { listeners.ws.default { bind = "0.0.0.0:8083" max_connections = 1024000 + limiter.connection = default websocket.mqtt_path = "/mqtt" } listeners.wss.default { bind = "0.0.0.0:8084" max_connections = 512000 + limiter.connection = default websocket.mqtt_path = "/mqtt" ssl_options { keyfile = "{{ platform_etc_dir }}/certs/key.pem" @@ -34,6 +38,7 @@ listeners.wss.default { # enabled = false # bind = "0.0.0.0:14567" # max_connections = 1024000 +# limiter.connection = default # keyfile = "{{ platform_etc_dir }}/certs/key.pem" # certfile = "{{ platform_etc_dir }}/certs/cert.pem" #} diff --git a/apps/emqx/i18n/emqx_limiter_i18n.conf b/apps/emqx/i18n/emqx_limiter_i18n.conf index 006a0662e..6fbc923b7 100644 --- a/apps/emqx/i18n/emqx_limiter_i18n.conf +++ b/apps/emqx/i18n/emqx_limiter_i18n.conf @@ -1,16 +1,5 @@ emqx_limiter_schema { - enable { - desc { - en: """Enable""" - zh: """是否开启""" - } - label: { - en: """Enable""" - zh: """是否开启""" - } - } - failure_strategy { desc { en: """The strategy when all the retries failed.""" diff --git a/apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf b/apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf index e69de29bb..36c48dde3 100644 --- a/apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf +++ b/apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf @@ -0,0 +1,11 @@ +limiter { + connection { + rate = "1000/s" + bucket { + default { + rate = "1000/s" + capacity = 1000 + } + } + } +} diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl index 5b9413cb9..89148a12c 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl @@ -119,19 +119,9 @@ post_config_update([limiter, Type], _Config, NewConf, _OldConf, _AppEnvs) -> Config = maps:get(Type, NewConf), case emqx_limiter_server:whereis(Type) of undefined -> - case Config of - #{enable := false} -> - ok; - _ -> - start_server(Type) - end; + start_server(Type, Config); _ -> - case Config of - #{enable := false} -> - stop_server(Type); - _ -> - emqx_limiter_server:update_config(Type, Config) - end + emqx_limiter_server:update_config(Type, Config) end. %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl index afb46498a..c050956ec 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl @@ -30,8 +30,7 @@ namespace/0, get_bucket_cfg_path/2, desc/1, - types/0, - is_enable/1 + types/0 ]). -define(KILOBYTE, 1024). @@ -89,13 +88,12 @@ fields(limiter) -> {Type, ?HOCON(?R_REF(limiter_opts), #{ desc => ?DESC(Type), - default => #{<<"enable">> => false} + default => #{} })} || Type <- types() ]; fields(limiter_opts) -> [ - {enable, ?HOCON(boolean(), #{desc => ?DESC(enable), default => true})}, {rate, ?HOCON(rate(), #{desc => ?DESC(rate), default => "infinity"})}, {burst, ?HOCON(burst_rate(), #{ @@ -202,10 +200,6 @@ to_rate(Str) -> get_bucket_cfg_path(Type, BucketName) -> [limiter, Type, bucket, BucketName]. --spec is_enable(limiter_type()) -> boolean(). -is_enable(Type) -> - emqx:get_config([limiter, Type, enable], false). - types() -> [bytes_in, message_in, connection, message_routing, batch]. diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl index d5ace9551..9c344c752 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl @@ -113,13 +113,10 @@ connect(_Type, undefined) -> {ok, emqx_htb_limiter:make_infinity_limiter()}; connect(Type, BucketName) when is_atom(BucketName) -> - case check_enable_and_get_bucket_cfg(Type, BucketName) of + case get_bucket_cfg(Type, BucketName) of undefined -> ?SLOG(error, #{msg => "bucket_config_not_found", type => Type, bucket => BucketName}), {error, config_not_found}; - limiter_not_started -> - ?SLOG(error, #{msg => "limiter_not_started", type => Type, bucket => BucketName}), - {error, limiter_not_started}; #{ rate := AggrRate, capacity := AggrSize, @@ -602,13 +599,8 @@ call(Type, Msg) -> gen_server:call(Pid, Msg) end. --spec check_enable_and_get_bucket_cfg(limiter_type(), bucket_name()) -> +-spec get_bucket_cfg(limiter_type(), bucket_name()) -> undefined | limiter_not_started | hocons:config(). -check_enable_and_get_bucket_cfg(Type, Bucket) -> - case emqx_limiter_schema:is_enable(Type) of - false -> - limiter_not_started; - _ -> - Path = emqx_limiter_schema:get_bucket_cfg_path(Type, Bucket), - emqx:get_config(Path, undefined) - end. +get_bucket_cfg(Type, Bucket) -> + Path = emqx_limiter_schema:get_bucket_cfg_path(Type, Bucket), + emqx:get_config(Path, undefined). diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server_sup.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server_sup.erl index 2f0fa4391..d75bf1082 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server_sup.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server_sup.erl @@ -101,14 +101,4 @@ make_child(Type, Cfg) -> }. childs() -> - Conf = emqx:get_config([limiter]), - lists:foldl( - fun - ({Type, #{enable := true}}, Acc) -> - [make_child(Type) | Acc]; - (_, Acc) -> - Acc - end, - [], - maps:to_list(Conf) - ). + [make_child(Type) || Type <- emqx_limiter_schema:types()]. diff --git a/apps/emqx/test/emqx_listeners_SUITE.erl b/apps/emqx/test/emqx_listeners_SUITE.erl index a9d299d88..3dfdf66cf 100644 --- a/apps/emqx/test/emqx_listeners_SUITE.erl +++ b/apps/emqx/test/emqx_listeners_SUITE.erl @@ -49,32 +49,33 @@ init_per_testcase(Case, Config) when undefined -> ok; Listeners -> emqx_config:put([listeners], maps:remove(quic, Listeners)) end, - PrevListeners = emqx_config:get([listeners, tcp], #{}), - PrevRateLimit = emqx_config:get([rate_limit], #{}), - emqx_config:put( - [listeners, tcp], - #{ + + PrevListeners = emqx_config:get([listeners]), + PureListeners = remove_default_limiter(PrevListeners), + PureListeners2 = PureListeners#{ + tcp => #{ listener_test => #{ bind => {"127.0.0.1", 9999}, max_connections => 4321, limiter => #{} } } - ), - emqx_config:put([rate_limit], #{max_conn_rate => 1000}), + }, + emqx_config:put([listeners], PureListeners2), + ok = emqx_listeners:start(), [ - {prev_listener_conf, PrevListeners}, - {prev_rate_limit_conf, PrevRateLimit} + {prev_listener_conf, PrevListeners} | Config ]; init_per_testcase(t_wss_conn, Config) -> catch emqx_config_handler:stop(), {ok, _} = emqx_config_handler:start_link(), - PrevListeners = emqx_config:get([listeners, wss], #{}), - emqx_config:put( - [listeners, wss], - #{ + + PrevListeners = emqx_config:get([listeners]), + PureListeners = remove_default_limiter(PrevListeners), + PureListeners2 = PureListeners#{ + wss => #{ listener_test => #{ bind => {{127, 0, 0, 1}, 9998}, limiter => #{}, @@ -85,7 +86,9 @@ init_per_testcase(t_wss_conn, Config) -> } } } - ), + }, + emqx_config:put([listeners], PureListeners2), + ok = emqx_listeners:start(), [ {prev_listener_conf, PrevListeners} @@ -94,25 +97,31 @@ init_per_testcase(t_wss_conn, Config) -> init_per_testcase(_, Config) -> catch emqx_config_handler:stop(), {ok, _} = emqx_config_handler:start_link(), - Config. + PrevListeners = emqx_config:get([listeners]), + PureListeners = remove_default_limiter(PrevListeners), + emqx_config:put([listeners], PureListeners), + [ + {prev_listener_conf, PrevListeners} + | Config + ]. end_per_testcase(Case, Config) when Case =:= t_max_conns_tcp; Case =:= t_current_conns_tcp -> PrevListener = ?config(prev_listener_conf, Config), - PrevRateLimit = ?config(prev_rate_limit_conf, Config), emqx_listeners:stop(), - emqx_config:put([listeners, tcp], PrevListener), - emqx_config:put([rate_limit], PrevRateLimit), + emqx_config:put([listeners], PrevListener), _ = emqx_config_handler:stop(), ok; end_per_testcase(t_wss_conn, Config) -> PrevListener = ?config(prev_listener_conf, Config), emqx_listeners:stop(), - emqx_config:put([listeners, wss], PrevListener), + emqx_config:put([listeners], PrevListener), _ = emqx_config_handler:stop(), ok; -end_per_testcase(_, _Config) -> +end_per_testcase(_, Config) -> + PrevListener = ?config(prev_listener_conf, Config), + emqx_config:put([listeners], PrevListener), _ = emqx_config_handler:stop(), ok. @@ -184,3 +193,16 @@ get_base_dir(Module) -> get_base_dir() -> get_base_dir(?MODULE). + +remove_default_limiter(Listeners) -> + maps:map( + fun(_, X) -> + maps:map( + fun(_, E) -> + maps:remove(limiter, E) + end, + X + ) + end, + Listeners + ). diff --git a/apps/emqx/test/emqx_ratelimiter_SUITE.erl b/apps/emqx/test/emqx_ratelimiter_SUITE.erl index 0b6f7c94e..d6ad9f1a0 100644 --- a/apps/emqx/test/emqx_ratelimiter_SUITE.erl +++ b/apps/emqx/test/emqx_ratelimiter_SUITE.erl @@ -29,7 +29,6 @@ "\n" "limiter {\n" " bytes_in {\n" - " enable = true\n" " bucket.default {\n" " rate = infinity\n" " capacity = infinity\n" @@ -37,7 +36,6 @@ " }\n" "\n" " message_in {\n" - " enable = true\n" " bucket.default {\n" " rate = infinity\n" " capacity = infinity\n" @@ -45,7 +43,6 @@ " }\n" "\n" " connection {\n" - " enable = true\n" " bucket.default {\n" " rate = infinity\n" " capacity = infinity\n" @@ -53,7 +50,6 @@ " }\n" "\n" " message_routing {\n" - " enable = true\n" " bucket.default {\n" " rate = infinity\n" " capacity = infinity\n" @@ -61,7 +57,6 @@ " }\n" "\n" " batch {\n" - " enable = true\n" " bucket.retainer {\n" " rate = infinity\n" " capacity = infinity\n"