diff --git a/apps/emqx/etc/emqx.conf b/apps/emqx/etc/emqx.conf index 161c05a12..7bd8cfe73 100644 --- a/apps/emqx/etc/emqx.conf +++ b/apps/emqx/etc/emqx.conf @@ -960,29 +960,6 @@ rate_limit { conn_bytes_in = "100KB,10s" } -quota { - ## Messages quota for the each of external MQTT connection. - ## This value consumed by the number of recipient on a message. - ## - ## @doc quota.conn_messages_routing - ## ValueType: String | infinity - ## Default: infinity - ## Examples: 100 messaegs per 1s: - ## quota.conn_messages_routing: "100,1s" - conn_messages_routing = "100,1s" - - ## Messages quota for the all of external MQTT connections. - ## This value consumed by the number of recipient on a message. - ## - ## @doc quota.overall_messages_routing - ## ValueType: String | infinity - ## Default: infinity - ## Examples: 200000 messages per 1s: - ## quota.overall_messages_routing: "200000,1s" - ## - overall_messages_routing = "200000,1s" -} - ##================================================================== ## Zones ##================================================================== @@ -1020,8 +997,6 @@ quota { ## - `flapping_detect.*` ## - `force_shutdown.*` ## - `conn_congestion.*` -## - `rate_limit.*` -## - `quota.*` ## - `force_gc.*` ## ## syntax: zones. diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index 0d58cf4b8..17cf9a50b 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -16,7 +16,7 @@ , {typerefl, {git, "https://github.com/k32/typerefl", {tag, "0.8.6"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}} - , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.0"}}} + , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.1"}}} , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.12.1"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.0"}}} , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.24.0"}}} diff --git a/apps/emqx/src/emqx_limiter/src/emqx_htb_generic.erl b/apps/emqx/src/emqx_limiter/src/emqx_htb_generic.erl new file mode 100644 index 000000000..f14e09b5d --- /dev/null +++ b/apps/emqx/src/emqx_limiter/src/emqx_htb_generic.erl @@ -0,0 +1,58 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_htb_generic). + +-behaviour(esockd_generic_limiter). + +%% API +-export([new_create_options/2, create/1, delete/1, consume/2]). + +-type create_options() :: #{ module := emqx_htb_generic + , type := emqx_limiter_schema:limiter_type() + , bucket := emqx_limiter_schema:bucket_name() + }. + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- + +-spec new_create_options(emqx_limiter_schema:limiter_type(), + emqx_limiter_schema:bucket_name()) -> create_options(). +new_create_options(Type, BucketName) -> + #{module => ?MODULE, type => Type, bucket => BucketName}. + +-spec create(create_options()) -> esockd_generic_limiter:limiter(). +create(#{module := ?MODULE, type := Type, bucket := BucketName}) -> + Limiter = emqx_limiter_server:connect(Type, BucketName), + #{module => ?MODULE, name => Type, limiter => Limiter}. + +delete(_GLimiter) -> + ok. + +consume(Token, #{limiter := Limiter} = GLimiter) -> + case emqx_htb_limiter:check(Token, Limiter) of + {ok, Limiter2} -> + {ok, GLimiter#{limiter := Limiter2}}; + {pause, Ms, Retry, Limiter2} -> + {pause, Ms, GLimiter#{limiter := emqx_htb_limiter:set_retry(Retry, Limiter2)}}; + {drop, Limiter2} -> + {ok, GLimiter#{limiter := Limiter2}} + end. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index a249447d3..32ed8215c 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -306,10 +306,13 @@ do_flatten_listeners(Type, Conf0) -> esockd_opts(Type, Opts0) -> Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0), - Opts2 = case emqx_config:get_zone_conf(zone(Opts0), [rate_limit, max_conn_rate]) of - infinity -> Opts1; - Rate -> Opts1#{max_conn_rate => Rate} - end, + Limiter = limiter(Opts0), + Opts2 = case maps:get(connection, Limiter, undefined) of + undefined -> + Opts1; + BucketName -> + Opts1#{limiter => emqx_htb_generic:new_create_options(connection, BucketName)} + end, Opts3 = Opts2#{ access_rules => esockd_access_rules(maps:get(access_rules, Opts0, [])) , tune_fun => {emqx_olp, backoff_new_conn, [zone(Opts0)]} }, diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 51a9273b3..39e5c96fe 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -163,11 +163,8 @@ this number of messages or bytes have passed through.""" , {"conn_congestion", sc(ref("conn_congestion"), #{})} - , {"quota", - sc(ref("quota"), - #{})} , {"stats", - sc(ref("stats"), + sc(ref("stats"), #{})} , {"sysmon", sc(ref("sysmon"), @@ -494,19 +491,6 @@ fields("rate_limit") -> } ]; -fields("quota") -> - [ {"conn_messages_routing", - sc(hoconsc:union([infinity, comma_separated_list()]), - #{ default => infinity - }) - } - , {"overall_messages_routing", - sc(hoconsc:union([infinity, comma_separated_list()]), - #{ default => infinity - }) - } - ]; - fields("flapping_detect") -> [ {"enable", sc(boolean(), diff --git a/apps/emqx/src/emqx_zone_schema.erl b/apps/emqx/src/emqx_zone_schema.erl index 140cd1aca..ad5947051 100644 --- a/apps/emqx/src/emqx_zone_schema.erl +++ b/apps/emqx/src/emqx_zone_schema.erl @@ -23,7 +23,7 @@ namespace() -> zone. %% this shcema module is not used at root level. %% roots are added only for document generation. roots() -> ["mqtt", "stats", "flapping_detect", "force_shutdown", - "conn_congestion", "rate_limit", "quota", "force_gc", + "conn_congestion", "force_gc", "overload_protection" ]. diff --git a/mix.exs b/mix.exs index 5e03718c7..2e3219646 100644 --- a/mix.exs +++ b/mix.exs @@ -53,7 +53,7 @@ defmodule EMQXUmbrella.MixProject do {:gproc, github: "uwiger/gproc", tag: "0.8.0", override: true}, {:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true}, {:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true}, - {:esockd, github: "emqx/esockd", tag: "5.9.0", override: true}, + {:esockd, github: "emqx/esockd", tag: "5.9.1", override: true}, {:mria, github: "emqx/mria", tag: "0.2.0", override: true}, {:ekka, github: "emqx/ekka", tag: "0.12.1", override: true}, {:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.0", override: true}, diff --git a/rebar.config b/rebar.config index c6d374fca..634ab807f 100644 --- a/rebar.config +++ b/rebar.config @@ -52,7 +52,7 @@ , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}} - , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.0"}}} + , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.1"}}} , {mria, {git, "https://github.com/emqx/mria", {tag, "0.2.0"}}} , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.12.1"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.0"}}}