From 55376144ce183bb39669cd4f1bdd87bf8ab5e658 Mon Sep 17 00:00:00 2001 From: firest Date: Mon, 17 Apr 2023 10:06:36 +0800 Subject: [PATCH] fix(limiter): simplify the configuration of the limiter --- apps/emqx/src/emqx_connection.erl | 6 +- .../src/emqx_limiter/src/emqx_htb_limiter.erl | 3 +- .../src/emqx_limiter_bucket_ref.erl | 5 + .../emqx_limiter/src/emqx_limiter_schema.erl | 152 ++++++++++++------ .../emqx_limiter/src/emqx_limiter_server.erl | 46 +++--- apps/emqx/src/emqx_ws_connection.erl | 4 +- apps/emqx/test/emqx_ratelimiter_SUITE.erl | 75 ++++----- rel/i18n/emqx_limiter_schema.hocon | 34 +--- 8 files changed, 183 insertions(+), 142 deletions(-) diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 8d47f033c..27b6f3e84 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -182,10 +182,8 @@ -define(ALARM_SOCK_STATS_KEYS, [send_pend, recv_cnt, recv_oct, send_cnt, send_oct]). -define(ALARM_SOCK_OPTS_KEYS, [high_watermark, high_msgq_watermark, sndbuf, recbuf, buffer]). -%% use macro to do compile time limiter's type check --define(LIMITER_BYTES_IN, bytes_in). --define(LIMITER_MESSAGE_IN, message_in). --define(EMPTY_QUEUE, {[], []}). +-define(LIMITER_BYTES_IN, bytes). +-define(LIMITER_MESSAGE_IN, messages). -dialyzer({no_match, [info/2]}). -dialyzer( diff --git a/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl b/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl index bbebd9460..53f26deb5 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl @@ -139,7 +139,8 @@ make_token_bucket_limiter(Cfg, Bucket) -> Cfg#{ tokens => emqx_limiter_server:get_initial_val(Cfg), lasttime => ?NOW, - bucket => Bucket + bucket => Bucket, + capacity => emqx_limiter_schema:calc_capacity(Cfg) }. %%@doc create a limiter server's reference diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_bucket_ref.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_bucket_ref.erl index fe30e41e9..139564df7 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_bucket_ref.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_bucket_ref.erl @@ -23,6 +23,7 @@ %% API -export([ new/3, + infinity_bucket/0, check/3, try_restore/2, available/1 @@ -58,6 +59,10 @@ new(Counter, Index, Rate) -> rate => Rate }. +-spec infinity_bucket() -> bucket_ref(). +infinity_bucket() -> + infinity. + %% @doc check tokens -spec check(pos_integer(), bucket_ref(), Disivisble :: boolean()) -> HasToken :: diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl index f45fc55b6..f59ddc35b 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl @@ -31,20 +31,20 @@ get_bucket_cfg_path/2, desc/1, types/0, - infinity_value/0 + calc_capacity/1 ]). -define(KILOBYTE, 1024). -define(BUCKET_KEYS, [ - {bytes_in, bucket_infinity}, - {message_in, bucket_infinity}, + {bytes, bucket_infinity}, + {messages, bucket_infinity}, {connection, bucket_limit}, {message_routing, bucket_infinity} ]). -type limiter_type() :: - bytes_in - | message_in + bytes + | messages | connection | message_routing %% internal limiter for unclassified resources @@ -90,14 +90,17 @@ namespace() -> limiter. -roots() -> [limiter]. +roots() -> + [{limiter, hoconsc:mk(hoconsc:ref(?MODULE, limiter), #{importance => ?IMPORTANCE_HIDDEN})}]. fields(limiter) -> [ {Type, ?HOCON(?R_REF(node_opts), #{ desc => ?DESC(Type), - default => #{} + default => #{}, + importance => ?IMPORTANCE_HIDDEN, + aliases => alias_of_type(Type) })} || Type <- types() ] ++ @@ -107,6 +110,7 @@ fields(limiter) -> ?R_REF(client_fields), #{ desc => ?DESC(client), + importance => ?IMPORTANCE_HIDDEN, default => maps:from_list([ {erlang:atom_to_binary(Type), #{}} || Type <- types() @@ -124,30 +128,50 @@ fields(node_opts) -> })} ]; fields(client_fields) -> - [ - {Type, - ?HOCON(?R_REF(client_opts), #{ - desc => ?DESC(Type), - default => #{} - })} - || Type <- types() - ]; + client_fields(types(), #{default => #{}}); fields(bucket_infinity) -> [ {rate, ?HOCON(rate(), #{desc => ?DESC(rate), default => <<"infinity">>})}, - {capacity, ?HOCON(capacity(), #{desc => ?DESC(capacity), default => <<"infinity">>})}, - {initial, ?HOCON(initial(), #{default => <<"0">>, desc => ?DESC(initial)})} + {burst, + ?HOCON(capacity(), #{ + desc => ?DESC(capacity), + default => <<"0">>, + importance => ?IMPORTANCE_HIDDEN, + aliases => [capacity] + })}, + {initial, + ?HOCON(initial(), #{ + default => <<"0">>, + desc => ?DESC(initial), + importance => ?IMPORTANCE_HIDDEN + })} ]; fields(bucket_limit) -> [ {rate, ?HOCON(rate(), #{desc => ?DESC(rate), default => <<"1000/s">>})}, - {capacity, ?HOCON(capacity(), #{desc => ?DESC(capacity), default => <<"1000">>})}, - {initial, ?HOCON(initial(), #{default => <<"0">>, desc => ?DESC(initial)})} + {burst, + ?HOCON(capacity(), #{ + desc => ?DESC(burst), + default => <<"0">>, + importance => ?IMPORTANCE_HIDDEN, + aliases => [capacity] + })}, + {initial, + ?HOCON(initial(), #{ + default => <<"0">>, + desc => ?DESC(initial), + importance => ?IMPORTANCE_HIDDEN + })} ]; fields(client_opts) -> [ {rate, ?HOCON(rate(), #{default => <<"infinity">>, desc => ?DESC(rate)})}, - {initial, ?HOCON(initial(), #{default => <<"0">>, desc => ?DESC(initial)})}, + {initial, + ?HOCON(initial(), #{ + default => <<"0">>, + desc => ?DESC(initial), + importance => ?IMPORTANCE_HIDDEN + })}, %% low_watermark add for emqx_channel and emqx_session %% both modules consume first and then check %% so we need to use this value to prevent excessive consumption @@ -157,20 +181,24 @@ fields(client_opts) -> initial(), #{ desc => ?DESC(low_watermark), - default => <<"0">> + default => <<"0">>, + importance => ?IMPORTANCE_HIDDEN } )}, - {capacity, + {burst, ?HOCON(capacity(), #{ - desc => ?DESC(client_bucket_capacity), - default => <<"infinity">> + desc => ?DESC(burst), + default => <<"0">>, + importance => ?IMPORTANCE_HIDDEN, + aliases => [capacity] })}, {divisible, ?HOCON( boolean(), #{ desc => ?DESC(divisible), - default => false + default => false, + importance => ?IMPORTANCE_HIDDEN } )}, {max_retry_time, @@ -178,7 +206,8 @@ fields(client_opts) -> emqx_schema:duration(), #{ desc => ?DESC(max_retry_time), - default => <<"10s">> + default => <<"10s">>, + importance => ?IMPORTANCE_HIDDEN } )}, {failure_strategy, @@ -186,16 +215,18 @@ fields(client_opts) -> failure_strategy(), #{ desc => ?DESC(failure_strategy), - default => force + default => force, + importance => ?IMPORTANCE_HIDDEN } )} ]; fields(listener_fields) -> - bucket_fields(?BUCKET_KEYS, listener_client_fields); + composite_bucket_fields(?BUCKET_KEYS, listener_client_fields); fields(listener_client_fields) -> - client_fields(?BUCKET_KEYS); + {Types, _} = lists:unzip(?BUCKET_KEYS), + client_fields(Types, #{required => false}); fields(Type) -> - bucket_field(Type). + simple_bucket_field(Type). desc(limiter) -> "Settings for the rate limiter."; @@ -230,19 +261,14 @@ get_bucket_cfg_path(Type, BucketName) -> [limiter, Type, bucket, BucketName]. types() -> - [bytes_in, message_in, connection, message_routing, internal]. + [bytes, messages, connection, message_routing, internal]. -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- - -%% `infinity` to `infinity_value` rules: -%% 1. all infinity capacity will change to infinity_value -%% 2. if the rate of global and bucket both are `infinity`, -%% use `infinity_value` as bucket rate. see `emqx_limiter_server:get_counter_rate/2` -infinity_value() -> - %% 1 TB - 1099511627776. +calc_capacity(#{rate := infinity}) -> + infinity; +calc_capacity(#{burst := infinity}) -> + infinity; +calc_capacity(#{rate := Rate, burst := Burst}) -> + erlang:floor(1000 * Rate / default_period()) + Burst. %%-------------------------------------------------------------------- %% Internal functions @@ -335,7 +361,7 @@ to_quota(Str, Regex) -> {match, [Quota, ""]} -> {ok, erlang:list_to_integer(Quota)}; {match, ""} -> - {ok, infinity_value()}; + {ok, infinity}; _ -> {error, Str} end @@ -350,7 +376,8 @@ apply_unit("mb", Val) -> Val * ?KILOBYTE * ?KILOBYTE; apply_unit("gb", Val) -> Val * ?KILOBYTE * ?KILOBYTE * ?KILOBYTE; apply_unit(Unit, _) -> throw("invalid unit:" ++ Unit). -bucket_field(Type) when is_atom(Type) -> +%% A bucket with only one type +simple_bucket_field(Type) when is_atom(Type) -> fields(bucket_infinity) ++ [ {client, @@ -358,16 +385,22 @@ bucket_field(Type) when is_atom(Type) -> ?R_REF(?MODULE, client_opts), #{ desc => ?DESC(client), - required => false + required => false, + importance => importance_of_type(Type), + aliases => alias_of_type(Type) } )} ]. -bucket_fields(Types, ClientRef) -> + +%% A bucket with multi types +composite_bucket_fields(Types, ClientRef) -> [ {Type, ?HOCON(?R_REF(?MODULE, Opts), #{ desc => ?DESC(?MODULE, Type), - required => false + required => false, + importance => importance_of_type(Type), + aliases => alias_of_type(Type) })} || {Type, Opts} <- Types ] ++ @@ -382,12 +415,29 @@ bucket_fields(Types, ClientRef) -> )} ]. -client_fields(Types) -> +client_fields(Types, Meta) -> [ {Type, - ?HOCON(?R_REF(client_opts), #{ + ?HOCON(?R_REF(client_opts), Meta#{ desc => ?DESC(Type), - required => false + importance => importance_of_type(Type), + aliases => alias_of_type(Type) })} - || {Type, _} <- Types + || Type <- Types ]. + +importance_of_type(interval) -> + ?IMPORTANCE_HIDDEN; +importance_of_type(message_routing) -> + ?IMPORTANCE_HIDDEN; +importance_of_type(connection) -> + ?IMPORTANCE_HIDDEN; +importance_of_type(_) -> + ?DEFAULT_IMPORTANCE. + +alias_of_type(messages) -> + [message_in]; +alias_of_type(bytess) -> + [bytes_in]; +alias_of_type(_) -> + []. diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl index f1daeaaeb..58db66f82 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl @@ -118,17 +118,24 @@ connect(_Id, _Type, undefined) -> {ok, emqx_htb_limiter:make_infinity_limiter()}; connect(Id, Type, Cfg) -> case find_limiter_cfg(Type, Cfg) of - {undefined, _} -> + {_ClientCfg, undefined, _NodeCfg} -> {ok, emqx_htb_limiter:make_infinity_limiter()}; + {#{rate := infinity}, #{rate := infinity}, #{rate := infinity}} -> + {ok, emqx_htb_limiter:make_infinity_limiter()}; + {ClientCfg, #{rate := infinity}, #{rate := infinity}} -> + {ok, + emqx_htb_limiter:make_token_bucket_limiter( + ClientCfg, emqx_limiter_bucket_ref:infinity_bucket() + )}; { - #{ - rate := BucketRate, - capacity := BucketSize - }, - #{rate := CliRate, capacity := CliSize} = ClientCfg + #{rate := CliRate} = ClientCfg, + #{rate := BucketRate} = BucketCfg, + _ } -> case emqx_limiter_manager:find_bucket(Id, Type) of {ok, Bucket} -> + BucketSize = emqx_limiter_schema:calc_capacity(BucketCfg), + CliSize = emqx_limiter_schema:calc_capacity(ClientCfg), {ok, if CliRate < BucketRate orelse CliSize < BucketSize -> @@ -493,12 +500,14 @@ make_root(#{rate := Rate, burst := Burst}) -> produced => 0.0 }. -do_add_bucket(Id, #{rate := Rate, capacity := Capacity} = Cfg, #{buckets := Buckets} = State) -> +do_add_bucket(_Id, #{rate := infinity}, #{root := #{rate := infinity}} = State) -> + State; +do_add_bucket(Id, #{rate := Rate} = Cfg, #{buckets := Buckets} = State) -> case maps:get(Id, Buckets, undefined) of undefined -> make_bucket(Id, Cfg, State); Bucket -> - Bucket2 = Bucket#{rate := Rate, capacity := Capacity}, + Bucket2 = Bucket#{rate := Rate, capacity := emqx_limiter_schema:calc_capacity(Cfg)}, State#{buckets := Buckets#{Id := Bucket2}} end. @@ -509,7 +518,7 @@ make_bucket(Id, Cfg, #{index := ?COUNTER_SIZE} = State) -> }); make_bucket( Id, - #{rate := Rate, capacity := Capacity} = Cfg, + #{rate := Rate} = Cfg, #{type := Type, counter := Counter, index := Index, buckets := Buckets} = State ) -> NewIndex = Index + 1, @@ -519,7 +528,7 @@ make_bucket( rate => Rate, obtained => Initial, correction => 0, - capacity => Capacity, + capacity => emqx_limiter_schema:calc_capacity(Cfg), counter => Counter, index => NewIndex }, @@ -541,19 +550,14 @@ do_del_bucket(Id, #{type := Type, buckets := Buckets} = State) -> get_initial_val( #{ initial := Initial, - rate := Rate, - capacity := Capacity + rate := Rate } ) -> - %% initial will nevner be infinity(see the emqx_limiter_schema) - InfVal = emqx_limiter_schema:infinity_value(), if Initial > 0 -> Initial; Rate =/= infinity -> - erlang:min(Rate, Capacity); - Capacity =/= infinity andalso Capacity =/= InfVal -> - Capacity; + Rate; true -> 0 end. @@ -568,11 +572,12 @@ call(Type, Msg) -> end. find_limiter_cfg(Type, #{rate := _} = Cfg) -> - {Cfg, find_client_cfg(Type, maps:get(client, Cfg, undefined))}; + {find_client_cfg(Type, maps:get(client, Cfg, undefined)), Cfg, find_node_cfg(Type)}; find_limiter_cfg(Type, Cfg) -> { + find_client_cfg(Type, emqx_utils_maps:deep_get([client, Type], Cfg, undefined)), maps:get(Type, Cfg, undefined), - find_client_cfg(Type, emqx_utils_maps:deep_get([client, Type], Cfg, undefined)) + find_node_cfg(Type) }. find_client_cfg(Type, BucketCfg) -> @@ -585,3 +590,6 @@ merge_client_cfg(NodeCfg, undefined) -> NodeCfg; merge_client_cfg(NodeCfg, BucketCfg) -> maps:merge(NodeCfg, BucketCfg). + +find_node_cfg(Type) -> + emqx:get_config([limiter, Type], #{rate => infinity, burst => 0}). diff --git a/apps/emqx/src/emqx_ws_connection.erl b/apps/emqx/src/emqx_ws_connection.erl index 20962809f..faf62f98d 100644 --- a/apps/emqx/src/emqx_ws_connection.erl +++ b/apps/emqx/src/emqx_ws_connection.erl @@ -121,8 +121,8 @@ -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]). -define(ENABLED(X), (X =/= undefined)). --define(LIMITER_BYTES_IN, bytes_in). --define(LIMITER_MESSAGE_IN, message_in). +-define(LIMITER_BYTES_IN, bytes). +-define(LIMITER_MESSAGE_IN, messages). -dialyzer({no_match, [info/2]}). -dialyzer({nowarn_function, [websocket_init/1]}). diff --git a/apps/emqx/test/emqx_ratelimiter_SUITE.erl b/apps/emqx/test/emqx_ratelimiter_SUITE.erl index f3b97d517..7288dcf7c 100644 --- a/apps/emqx/test/emqx_ratelimiter_SUITE.erl +++ b/apps/emqx/test/emqx_ratelimiter_SUITE.erl @@ -72,7 +72,7 @@ t_consume(_) -> Cfg = fun(Cfg) -> Cfg#{ rate := 100, - capacity := 100, + burst := 0, initial := 100, max_retry_time := 1000, failure_strategy := force @@ -89,7 +89,7 @@ t_retry(_) -> Cfg = fun(Cfg) -> Cfg#{ rate := 50, - capacity := 200, + burst := 150, initial := 0, max_retry_time := 1000, failure_strategy := force @@ -109,7 +109,7 @@ t_restore(_) -> Cfg = fun(Cfg) -> Cfg#{ rate := 1, - capacity := 200, + burst := 199, initial := 50, max_retry_time := 100, failure_strategy := force @@ -129,7 +129,7 @@ t_max_retry_time(_) -> Cfg = fun(Cfg) -> Cfg#{ rate := 1, - capacity := 1, + burst := 0, max_retry_time := 500, failure_strategy := drop } @@ -139,8 +139,12 @@ t_max_retry_time(_) -> Begin = ?NOW, Result = emqx_htb_limiter:consume(101, Client), ?assertMatch({drop, _}, Result), - Time = ?NOW - Begin, - ?assert(Time >= 500 andalso Time < 550) + End = ?NOW, + Time = End - Begin, + ?assert( + Time >= 500 andalso Time < 550, + lists:flatten(io_lib:format("Begin:~p, End:~p, Time:~p~n", [Begin, End, Time])) + ) end, with_per_client(Cfg, Case). @@ -150,7 +154,7 @@ t_divisible(_) -> divisible := true, rate := ?RATE("1000/1s"), initial := 600, - capacity := 600 + burst := 0 } end, Case = fun(BucketCfg) -> @@ -176,7 +180,7 @@ t_low_watermark(_) -> low_watermark := 400, rate := ?RATE("1000/1s"), initial := 1000, - capacity := 1000 + burst := 0 } end, Case = fun(BucketCfg) -> @@ -201,8 +205,7 @@ t_infinity_client(_) -> Fun = fun(Cfg) -> Cfg end, Case = fun(Cfg) -> Client = connect(Cfg), - InfVal = emqx_limiter_schema:infinity_value(), - ?assertMatch(#{bucket := #{rate := InfVal}}, Client), + ?assertMatch(infinity, Client), Result = emqx_htb_limiter:check(100000, Client), ?assertEqual({ok, Client}, Result) end, @@ -212,12 +215,12 @@ t_try_restore_agg(_) -> Fun = fun(#{client := Cli} = Bucket) -> Bucket2 = Bucket#{ rate := 1, - capacity := 200, + burst := 199, initial := 50 }, Cli2 = Cli#{ rate := infinity, - capacity := infinity, + burst := infinity, divisible := true, max_retry_time := 100, failure_strategy := force @@ -239,11 +242,11 @@ t_short_board(_) -> Bucket2 = Bucket#{ rate := ?RATE("100/1s"), initial := 0, - capacity := 100 + burst := 0 }, Cli2 = Cli#{ rate := ?RATE("600/1s"), - capacity := 600, + burst := 0, initial := 600 }, Bucket2#{client := Cli2} @@ -261,46 +264,45 @@ t_rate(_) -> Bucket2 = Bucket#{ rate := ?RATE("100/100ms"), initial := 0, - capacity := infinity + burst := infinity }, Cli2 = Cli#{ rate := infinity, - capacity := infinity, + burst := infinity, initial := 0 }, Bucket2#{client := Cli2} end, Case = fun(Cfg) -> + Time = 1000, Client = connect(Cfg), - Ts1 = erlang:system_time(millisecond), C1 = emqx_htb_limiter:available(Client), - timer:sleep(1000), - Ts2 = erlang:system_time(millisecond), + timer:sleep(1100), C2 = emqx_htb_limiter:available(Client), - ShouldInc = floor((Ts2 - Ts1) / 100) * 100, + ShouldInc = floor(Time / 100) * 100, Inc = C2 - C1, ?assert(in_range(Inc, ShouldInc - 100, ShouldInc + 100), "test bucket rate") end, with_bucket(Fun, Case). t_capacity(_) -> - Capacity = 600, + Capacity = 1200, Fun = fun(#{client := Cli} = Bucket) -> Bucket2 = Bucket#{ rate := ?RATE("100/100ms"), initial := 0, - capacity := 600 + burst := 200 }, Cli2 = Cli#{ rate := infinity, - capacity := infinity, + burst := infinity, initial := 0 }, Bucket2#{client := Cli2} end, Case = fun(Cfg) -> Client = connect(Cfg), - timer:sleep(1000), + timer:sleep(1500), C1 = emqx_htb_limiter:available(Client), ?assertEqual(Capacity, C1, "test bucket capacity") end, @@ -318,11 +320,11 @@ t_collaborative_alloc(_) -> Bucket2 = Bucket#{ rate := ?RATE("400/1s"), initial := 0, - capacity := 600 + burst := 200 }, Cli2 = Cli#{ rate := ?RATE("50"), - capacity := 100, + burst := 50, initial := 100 }, Bucket2#{client := Cli2} @@ -363,11 +365,11 @@ t_burst(_) -> Bucket2 = Bucket#{ rate := ?RATE("200/1s"), initial := 0, - capacity := 200 + burst := 0 }, Cli2 = Cli#{ rate := ?RATE("50/1s"), - capacity := 200, + burst := 150, divisible := true }, Bucket2#{client := Cli2} @@ -401,11 +403,11 @@ t_limit_global_with_unlimit_other(_) -> Bucket2 = Bucket#{ rate := infinity, initial := 0, - capacity := infinity + burst := infinity }, Cli2 = Cli#{ rate := infinity, - capacity := infinity, + burst := infinity, initial := 0 }, Bucket2#{client := Cli2} @@ -414,7 +416,7 @@ t_limit_global_with_unlimit_other(_) -> Case = fun() -> C1 = counters:new(1, []), start_client({b1, Bucket}, ?NOW + 2000, C1, 20), - timer:sleep(2100), + timer:sleep(2200), check_average_rate(C1, 2, 600) end, @@ -432,7 +434,7 @@ t_check_container(_) -> Cfg#{ rate := ?RATE("1000/1s"), initial := 1000, - capacity := 1000 + burst := 0 } end, Case = fun(#{client := Client} = BucketCfg) -> @@ -565,7 +567,7 @@ t_schema_unit(_) -> ?assertMatch({error, _}, M:to_rate("100MB/1")), ?assertMatch({error, _}, M:to_rate("100/10x")), - ?assertEqual({ok, emqx_limiter_schema:infinity_value()}, M:to_capacity("infinity")), + ?assertEqual({ok, infinity}, M:to_capacity("infinity")), ?assertEqual({ok, 100}, M:to_capacity("100")), ?assertEqual({ok, 100 * 1024}, M:to_capacity("100KB")), ?assertEqual({ok, 100 * 1024 * 1024}, M:to_capacity("100MB")), @@ -748,17 +750,16 @@ connect(Name, Cfg) -> Limiter. make_limiter_cfg() -> - Infinity = emqx_limiter_schema:infinity_value(), Client = #{ - rate => Infinity, + rate => infinity, initial => 0, - capacity => Infinity, + burst => infinity, low_watermark => 0, divisible => false, max_retry_time => timer:seconds(5), failure_strategy => force }, - #{client => Client, rate => Infinity, initial => 0, capacity => Infinity}. + #{client => Client, rate => infinity, initial => 0, burst => infinity}. add_bucket(Cfg) -> add_bucket(?MODULE, Cfg). diff --git a/rel/i18n/emqx_limiter_schema.hocon b/rel/i18n/emqx_limiter_schema.hocon index 3657df694..2874999a5 100644 --- a/rel/i18n/emqx_limiter_schema.hocon +++ b/rel/i18n/emqx_limiter_schema.hocon @@ -33,28 +33,6 @@ emqx_limiter_schema { } } - client_bucket_capacity { - desc { - en: """The capacity of per user.""" - zh: """每个使用者的令牌容量上限""" - } - label: { - en: """Capacity""" - zh: """容量""" - } - } - - capacity { - desc { - en: """The capacity of this token bucket.""" - zh: """该令牌桶的容量""" - } - label: { - en: """Capacity""" - zh: """容量""" - } - } - low_watermark { desc { en: """If the remaining tokens are lower than this value, @@ -152,30 +130,30 @@ Once the limit is reached, new connections will be refused""" } } - message_in { + messages { desc { - en: """The message in limiter. + en: """The messages limiter. This is used to limit the inbound message numbers for this EMQX node Once the limit is reached, the restricted client will be slow down even be hung for a while.""" zh: """流入速率控制器。 这个用来控制当前节点上的消息流入速率,当达到最大速率后,会话将会被限速甚至被强制挂起一小段时间""" } label: { - en: """Message In""" + en: """Messages""" zh: """消息流入速率""" } } - bytes_in { + bytes { desc { - en: """The bytes_in limiter. + en: """The bytes limiter. This is used to limit the inbound bytes rate for this EMQX node. Once the limit is reached, the restricted client will be slow down even be hung for a while.""" zh: """流入字节率控制器。 这个是用来控制当前节点上的数据流入的字节率,每条消息将会消耗和其二进制大小等量的令牌,当达到最大速率后,会话将会被限速甚至被强制挂起一小段时间""" } label: { - en: """Bytes In""" + en: """Bytes""" zh: """流入字节率""" } }