From 55376144ce183bb39669cd4f1bdd87bf8ab5e658 Mon Sep 17 00:00:00 2001 From: firest Date: Mon, 17 Apr 2023 10:06:36 +0800 Subject: [PATCH 1/3] fix(limiter): simplify the configuration of the limiter --- apps/emqx/src/emqx_connection.erl | 6 +- .../src/emqx_limiter/src/emqx_htb_limiter.erl | 3 +- .../src/emqx_limiter_bucket_ref.erl | 5 + .../emqx_limiter/src/emqx_limiter_schema.erl | 152 ++++++++++++------ .../emqx_limiter/src/emqx_limiter_server.erl | 46 +++--- apps/emqx/src/emqx_ws_connection.erl | 4 +- apps/emqx/test/emqx_ratelimiter_SUITE.erl | 75 ++++----- rel/i18n/emqx_limiter_schema.hocon | 34 +--- 8 files changed, 183 insertions(+), 142 deletions(-) diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 8d47f033c..27b6f3e84 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -182,10 +182,8 @@ -define(ALARM_SOCK_STATS_KEYS, [send_pend, recv_cnt, recv_oct, send_cnt, send_oct]). -define(ALARM_SOCK_OPTS_KEYS, [high_watermark, high_msgq_watermark, sndbuf, recbuf, buffer]). -%% use macro to do compile time limiter's type check --define(LIMITER_BYTES_IN, bytes_in). --define(LIMITER_MESSAGE_IN, message_in). --define(EMPTY_QUEUE, {[], []}). +-define(LIMITER_BYTES_IN, bytes). +-define(LIMITER_MESSAGE_IN, messages). -dialyzer({no_match, [info/2]}). -dialyzer( diff --git a/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl b/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl index bbebd9460..53f26deb5 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl @@ -139,7 +139,8 @@ make_token_bucket_limiter(Cfg, Bucket) -> Cfg#{ tokens => emqx_limiter_server:get_initial_val(Cfg), lasttime => ?NOW, - bucket => Bucket + bucket => Bucket, + capacity => emqx_limiter_schema:calc_capacity(Cfg) }. %%@doc create a limiter server's reference diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_bucket_ref.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_bucket_ref.erl index fe30e41e9..139564df7 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_bucket_ref.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_bucket_ref.erl @@ -23,6 +23,7 @@ %% API -export([ new/3, + infinity_bucket/0, check/3, try_restore/2, available/1 @@ -58,6 +59,10 @@ new(Counter, Index, Rate) -> rate => Rate }. +-spec infinity_bucket() -> bucket_ref(). +infinity_bucket() -> + infinity. + %% @doc check tokens -spec check(pos_integer(), bucket_ref(), Disivisble :: boolean()) -> HasToken :: diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl index f45fc55b6..f59ddc35b 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl @@ -31,20 +31,20 @@ get_bucket_cfg_path/2, desc/1, types/0, - infinity_value/0 + calc_capacity/1 ]). -define(KILOBYTE, 1024). -define(BUCKET_KEYS, [ - {bytes_in, bucket_infinity}, - {message_in, bucket_infinity}, + {bytes, bucket_infinity}, + {messages, bucket_infinity}, {connection, bucket_limit}, {message_routing, bucket_infinity} ]). -type limiter_type() :: - bytes_in - | message_in + bytes + | messages | connection | message_routing %% internal limiter for unclassified resources @@ -90,14 +90,17 @@ namespace() -> limiter. -roots() -> [limiter]. +roots() -> + [{limiter, hoconsc:mk(hoconsc:ref(?MODULE, limiter), #{importance => ?IMPORTANCE_HIDDEN})}]. fields(limiter) -> [ {Type, ?HOCON(?R_REF(node_opts), #{ desc => ?DESC(Type), - default => #{} + default => #{}, + importance => ?IMPORTANCE_HIDDEN, + aliases => alias_of_type(Type) })} || Type <- types() ] ++ @@ -107,6 +110,7 @@ fields(limiter) -> ?R_REF(client_fields), #{ desc => ?DESC(client), + importance => ?IMPORTANCE_HIDDEN, default => maps:from_list([ {erlang:atom_to_binary(Type), #{}} || Type <- types() @@ -124,30 +128,50 @@ fields(node_opts) -> })} ]; fields(client_fields) -> - [ - {Type, - ?HOCON(?R_REF(client_opts), #{ - desc => ?DESC(Type), - default => #{} - })} - || Type <- types() - ]; + client_fields(types(), #{default => #{}}); fields(bucket_infinity) -> [ {rate, ?HOCON(rate(), #{desc => ?DESC(rate), default => <<"infinity">>})}, - {capacity, ?HOCON(capacity(), #{desc => ?DESC(capacity), default => <<"infinity">>})}, - {initial, ?HOCON(initial(), #{default => <<"0">>, desc => ?DESC(initial)})} + {burst, + ?HOCON(capacity(), #{ + desc => ?DESC(capacity), + default => <<"0">>, + importance => ?IMPORTANCE_HIDDEN, + aliases => [capacity] + })}, + {initial, + ?HOCON(initial(), #{ + default => <<"0">>, + desc => ?DESC(initial), + importance => ?IMPORTANCE_HIDDEN + })} ]; fields(bucket_limit) -> [ {rate, ?HOCON(rate(), #{desc => ?DESC(rate), default => <<"1000/s">>})}, - {capacity, ?HOCON(capacity(), #{desc => ?DESC(capacity), default => <<"1000">>})}, - {initial, ?HOCON(initial(), #{default => <<"0">>, desc => ?DESC(initial)})} + {burst, + ?HOCON(capacity(), #{ + desc => ?DESC(burst), + default => <<"0">>, + importance => ?IMPORTANCE_HIDDEN, + aliases => [capacity] + })}, + {initial, + ?HOCON(initial(), #{ + default => <<"0">>, + desc => ?DESC(initial), + importance => ?IMPORTANCE_HIDDEN + })} ]; fields(client_opts) -> [ {rate, ?HOCON(rate(), #{default => <<"infinity">>, desc => ?DESC(rate)})}, - {initial, ?HOCON(initial(), #{default => <<"0">>, desc => ?DESC(initial)})}, + {initial, + ?HOCON(initial(), #{ + default => <<"0">>, + desc => ?DESC(initial), + importance => ?IMPORTANCE_HIDDEN + })}, %% low_watermark add for emqx_channel and emqx_session %% both modules consume first and then check %% so we need to use this value to prevent excessive consumption @@ -157,20 +181,24 @@ fields(client_opts) -> initial(), #{ desc => ?DESC(low_watermark), - default => <<"0">> + default => <<"0">>, + importance => ?IMPORTANCE_HIDDEN } )}, - {capacity, + {burst, ?HOCON(capacity(), #{ - desc => ?DESC(client_bucket_capacity), - default => <<"infinity">> + desc => ?DESC(burst), + default => <<"0">>, + importance => ?IMPORTANCE_HIDDEN, + aliases => [capacity] })}, {divisible, ?HOCON( boolean(), #{ desc => ?DESC(divisible), - default => false + default => false, + importance => ?IMPORTANCE_HIDDEN } )}, {max_retry_time, @@ -178,7 +206,8 @@ fields(client_opts) -> emqx_schema:duration(), #{ desc => ?DESC(max_retry_time), - default => <<"10s">> + default => <<"10s">>, + importance => ?IMPORTANCE_HIDDEN } )}, {failure_strategy, @@ -186,16 +215,18 @@ fields(client_opts) -> failure_strategy(), #{ desc => ?DESC(failure_strategy), - default => force + default => force, + importance => ?IMPORTANCE_HIDDEN } )} ]; fields(listener_fields) -> - bucket_fields(?BUCKET_KEYS, listener_client_fields); + composite_bucket_fields(?BUCKET_KEYS, listener_client_fields); fields(listener_client_fields) -> - client_fields(?BUCKET_KEYS); + {Types, _} = lists:unzip(?BUCKET_KEYS), + client_fields(Types, #{required => false}); fields(Type) -> - bucket_field(Type). + simple_bucket_field(Type). desc(limiter) -> "Settings for the rate limiter."; @@ -230,19 +261,14 @@ get_bucket_cfg_path(Type, BucketName) -> [limiter, Type, bucket, BucketName]. types() -> - [bytes_in, message_in, connection, message_routing, internal]. + [bytes, messages, connection, message_routing, internal]. -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- - -%% `infinity` to `infinity_value` rules: -%% 1. all infinity capacity will change to infinity_value -%% 2. if the rate of global and bucket both are `infinity`, -%% use `infinity_value` as bucket rate. see `emqx_limiter_server:get_counter_rate/2` -infinity_value() -> - %% 1 TB - 1099511627776. +calc_capacity(#{rate := infinity}) -> + infinity; +calc_capacity(#{burst := infinity}) -> + infinity; +calc_capacity(#{rate := Rate, burst := Burst}) -> + erlang:floor(1000 * Rate / default_period()) + Burst. %%-------------------------------------------------------------------- %% Internal functions @@ -335,7 +361,7 @@ to_quota(Str, Regex) -> {match, [Quota, ""]} -> {ok, erlang:list_to_integer(Quota)}; {match, ""} -> - {ok, infinity_value()}; + {ok, infinity}; _ -> {error, Str} end @@ -350,7 +376,8 @@ apply_unit("mb", Val) -> Val * ?KILOBYTE * ?KILOBYTE; apply_unit("gb", Val) -> Val * ?KILOBYTE * ?KILOBYTE * ?KILOBYTE; apply_unit(Unit, _) -> throw("invalid unit:" ++ Unit). -bucket_field(Type) when is_atom(Type) -> +%% A bucket with only one type +simple_bucket_field(Type) when is_atom(Type) -> fields(bucket_infinity) ++ [ {client, @@ -358,16 +385,22 @@ bucket_field(Type) when is_atom(Type) -> ?R_REF(?MODULE, client_opts), #{ desc => ?DESC(client), - required => false + required => false, + importance => importance_of_type(Type), + aliases => alias_of_type(Type) } )} ]. -bucket_fields(Types, ClientRef) -> + +%% A bucket with multi types +composite_bucket_fields(Types, ClientRef) -> [ {Type, ?HOCON(?R_REF(?MODULE, Opts), #{ desc => ?DESC(?MODULE, Type), - required => false + required => false, + importance => importance_of_type(Type), + aliases => alias_of_type(Type) })} || {Type, Opts} <- Types ] ++ @@ -382,12 +415,29 @@ bucket_fields(Types, ClientRef) -> )} ]. -client_fields(Types) -> +client_fields(Types, Meta) -> [ {Type, - ?HOCON(?R_REF(client_opts), #{ + ?HOCON(?R_REF(client_opts), Meta#{ desc => ?DESC(Type), - required => false + importance => importance_of_type(Type), + aliases => alias_of_type(Type) })} - || {Type, _} <- Types + || Type <- Types ]. + +importance_of_type(interval) -> + ?IMPORTANCE_HIDDEN; +importance_of_type(message_routing) -> + ?IMPORTANCE_HIDDEN; +importance_of_type(connection) -> + ?IMPORTANCE_HIDDEN; +importance_of_type(_) -> + ?DEFAULT_IMPORTANCE. + +alias_of_type(messages) -> + [message_in]; +alias_of_type(bytess) -> + [bytes_in]; +alias_of_type(_) -> + []. diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl index f1daeaaeb..58db66f82 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl @@ -118,17 +118,24 @@ connect(_Id, _Type, undefined) -> {ok, emqx_htb_limiter:make_infinity_limiter()}; connect(Id, Type, Cfg) -> case find_limiter_cfg(Type, Cfg) of - {undefined, _} -> + {_ClientCfg, undefined, _NodeCfg} -> {ok, emqx_htb_limiter:make_infinity_limiter()}; + {#{rate := infinity}, #{rate := infinity}, #{rate := infinity}} -> + {ok, emqx_htb_limiter:make_infinity_limiter()}; + {ClientCfg, #{rate := infinity}, #{rate := infinity}} -> + {ok, + emqx_htb_limiter:make_token_bucket_limiter( + ClientCfg, emqx_limiter_bucket_ref:infinity_bucket() + )}; { - #{ - rate := BucketRate, - capacity := BucketSize - }, - #{rate := CliRate, capacity := CliSize} = ClientCfg + #{rate := CliRate} = ClientCfg, + #{rate := BucketRate} = BucketCfg, + _ } -> case emqx_limiter_manager:find_bucket(Id, Type) of {ok, Bucket} -> + BucketSize = emqx_limiter_schema:calc_capacity(BucketCfg), + CliSize = emqx_limiter_schema:calc_capacity(ClientCfg), {ok, if CliRate < BucketRate orelse CliSize < BucketSize -> @@ -493,12 +500,14 @@ make_root(#{rate := Rate, burst := Burst}) -> produced => 0.0 }. -do_add_bucket(Id, #{rate := Rate, capacity := Capacity} = Cfg, #{buckets := Buckets} = State) -> +do_add_bucket(_Id, #{rate := infinity}, #{root := #{rate := infinity}} = State) -> + State; +do_add_bucket(Id, #{rate := Rate} = Cfg, #{buckets := Buckets} = State) -> case maps:get(Id, Buckets, undefined) of undefined -> make_bucket(Id, Cfg, State); Bucket -> - Bucket2 = Bucket#{rate := Rate, capacity := Capacity}, + Bucket2 = Bucket#{rate := Rate, capacity := emqx_limiter_schema:calc_capacity(Cfg)}, State#{buckets := Buckets#{Id := Bucket2}} end. @@ -509,7 +518,7 @@ make_bucket(Id, Cfg, #{index := ?COUNTER_SIZE} = State) -> }); make_bucket( Id, - #{rate := Rate, capacity := Capacity} = Cfg, + #{rate := Rate} = Cfg, #{type := Type, counter := Counter, index := Index, buckets := Buckets} = State ) -> NewIndex = Index + 1, @@ -519,7 +528,7 @@ make_bucket( rate => Rate, obtained => Initial, correction => 0, - capacity => Capacity, + capacity => emqx_limiter_schema:calc_capacity(Cfg), counter => Counter, index => NewIndex }, @@ -541,19 +550,14 @@ do_del_bucket(Id, #{type := Type, buckets := Buckets} = State) -> get_initial_val( #{ initial := Initial, - rate := Rate, - capacity := Capacity + rate := Rate } ) -> - %% initial will nevner be infinity(see the emqx_limiter_schema) - InfVal = emqx_limiter_schema:infinity_value(), if Initial > 0 -> Initial; Rate =/= infinity -> - erlang:min(Rate, Capacity); - Capacity =/= infinity andalso Capacity =/= InfVal -> - Capacity; + Rate; true -> 0 end. @@ -568,11 +572,12 @@ call(Type, Msg) -> end. find_limiter_cfg(Type, #{rate := _} = Cfg) -> - {Cfg, find_client_cfg(Type, maps:get(client, Cfg, undefined))}; + {find_client_cfg(Type, maps:get(client, Cfg, undefined)), Cfg, find_node_cfg(Type)}; find_limiter_cfg(Type, Cfg) -> { + find_client_cfg(Type, emqx_utils_maps:deep_get([client, Type], Cfg, undefined)), maps:get(Type, Cfg, undefined), - find_client_cfg(Type, emqx_utils_maps:deep_get([client, Type], Cfg, undefined)) + find_node_cfg(Type) }. find_client_cfg(Type, BucketCfg) -> @@ -585,3 +590,6 @@ merge_client_cfg(NodeCfg, undefined) -> NodeCfg; merge_client_cfg(NodeCfg, BucketCfg) -> maps:merge(NodeCfg, BucketCfg). + +find_node_cfg(Type) -> + emqx:get_config([limiter, Type], #{rate => infinity, burst => 0}). diff --git a/apps/emqx/src/emqx_ws_connection.erl b/apps/emqx/src/emqx_ws_connection.erl index 20962809f..faf62f98d 100644 --- a/apps/emqx/src/emqx_ws_connection.erl +++ b/apps/emqx/src/emqx_ws_connection.erl @@ -121,8 +121,8 @@ -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]). -define(ENABLED(X), (X =/= undefined)). --define(LIMITER_BYTES_IN, bytes_in). --define(LIMITER_MESSAGE_IN, message_in). +-define(LIMITER_BYTES_IN, bytes). +-define(LIMITER_MESSAGE_IN, messages). -dialyzer({no_match, [info/2]}). -dialyzer({nowarn_function, [websocket_init/1]}). diff --git a/apps/emqx/test/emqx_ratelimiter_SUITE.erl b/apps/emqx/test/emqx_ratelimiter_SUITE.erl index f3b97d517..7288dcf7c 100644 --- a/apps/emqx/test/emqx_ratelimiter_SUITE.erl +++ b/apps/emqx/test/emqx_ratelimiter_SUITE.erl @@ -72,7 +72,7 @@ t_consume(_) -> Cfg = fun(Cfg) -> Cfg#{ rate := 100, - capacity := 100, + burst := 0, initial := 100, max_retry_time := 1000, failure_strategy := force @@ -89,7 +89,7 @@ t_retry(_) -> Cfg = fun(Cfg) -> Cfg#{ rate := 50, - capacity := 200, + burst := 150, initial := 0, max_retry_time := 1000, failure_strategy := force @@ -109,7 +109,7 @@ t_restore(_) -> Cfg = fun(Cfg) -> Cfg#{ rate := 1, - capacity := 200, + burst := 199, initial := 50, max_retry_time := 100, failure_strategy := force @@ -129,7 +129,7 @@ t_max_retry_time(_) -> Cfg = fun(Cfg) -> Cfg#{ rate := 1, - capacity := 1, + burst := 0, max_retry_time := 500, failure_strategy := drop } @@ -139,8 +139,12 @@ t_max_retry_time(_) -> Begin = ?NOW, Result = emqx_htb_limiter:consume(101, Client), ?assertMatch({drop, _}, Result), - Time = ?NOW - Begin, - ?assert(Time >= 500 andalso Time < 550) + End = ?NOW, + Time = End - Begin, + ?assert( + Time >= 500 andalso Time < 550, + lists:flatten(io_lib:format("Begin:~p, End:~p, Time:~p~n", [Begin, End, Time])) + ) end, with_per_client(Cfg, Case). @@ -150,7 +154,7 @@ t_divisible(_) -> divisible := true, rate := ?RATE("1000/1s"), initial := 600, - capacity := 600 + burst := 0 } end, Case = fun(BucketCfg) -> @@ -176,7 +180,7 @@ t_low_watermark(_) -> low_watermark := 400, rate := ?RATE("1000/1s"), initial := 1000, - capacity := 1000 + burst := 0 } end, Case = fun(BucketCfg) -> @@ -201,8 +205,7 @@ t_infinity_client(_) -> Fun = fun(Cfg) -> Cfg end, Case = fun(Cfg) -> Client = connect(Cfg), - InfVal = emqx_limiter_schema:infinity_value(), - ?assertMatch(#{bucket := #{rate := InfVal}}, Client), + ?assertMatch(infinity, Client), Result = emqx_htb_limiter:check(100000, Client), ?assertEqual({ok, Client}, Result) end, @@ -212,12 +215,12 @@ t_try_restore_agg(_) -> Fun = fun(#{client := Cli} = Bucket) -> Bucket2 = Bucket#{ rate := 1, - capacity := 200, + burst := 199, initial := 50 }, Cli2 = Cli#{ rate := infinity, - capacity := infinity, + burst := infinity, divisible := true, max_retry_time := 100, failure_strategy := force @@ -239,11 +242,11 @@ t_short_board(_) -> Bucket2 = Bucket#{ rate := ?RATE("100/1s"), initial := 0, - capacity := 100 + burst := 0 }, Cli2 = Cli#{ rate := ?RATE("600/1s"), - capacity := 600, + burst := 0, initial := 600 }, Bucket2#{client := Cli2} @@ -261,46 +264,45 @@ t_rate(_) -> Bucket2 = Bucket#{ rate := ?RATE("100/100ms"), initial := 0, - capacity := infinity + burst := infinity }, Cli2 = Cli#{ rate := infinity, - capacity := infinity, + burst := infinity, initial := 0 }, Bucket2#{client := Cli2} end, Case = fun(Cfg) -> + Time = 1000, Client = connect(Cfg), - Ts1 = erlang:system_time(millisecond), C1 = emqx_htb_limiter:available(Client), - timer:sleep(1000), - Ts2 = erlang:system_time(millisecond), + timer:sleep(1100), C2 = emqx_htb_limiter:available(Client), - ShouldInc = floor((Ts2 - Ts1) / 100) * 100, + ShouldInc = floor(Time / 100) * 100, Inc = C2 - C1, ?assert(in_range(Inc, ShouldInc - 100, ShouldInc + 100), "test bucket rate") end, with_bucket(Fun, Case). t_capacity(_) -> - Capacity = 600, + Capacity = 1200, Fun = fun(#{client := Cli} = Bucket) -> Bucket2 = Bucket#{ rate := ?RATE("100/100ms"), initial := 0, - capacity := 600 + burst := 200 }, Cli2 = Cli#{ rate := infinity, - capacity := infinity, + burst := infinity, initial := 0 }, Bucket2#{client := Cli2} end, Case = fun(Cfg) -> Client = connect(Cfg), - timer:sleep(1000), + timer:sleep(1500), C1 = emqx_htb_limiter:available(Client), ?assertEqual(Capacity, C1, "test bucket capacity") end, @@ -318,11 +320,11 @@ t_collaborative_alloc(_) -> Bucket2 = Bucket#{ rate := ?RATE("400/1s"), initial := 0, - capacity := 600 + burst := 200 }, Cli2 = Cli#{ rate := ?RATE("50"), - capacity := 100, + burst := 50, initial := 100 }, Bucket2#{client := Cli2} @@ -363,11 +365,11 @@ t_burst(_) -> Bucket2 = Bucket#{ rate := ?RATE("200/1s"), initial := 0, - capacity := 200 + burst := 0 }, Cli2 = Cli#{ rate := ?RATE("50/1s"), - capacity := 200, + burst := 150, divisible := true }, Bucket2#{client := Cli2} @@ -401,11 +403,11 @@ t_limit_global_with_unlimit_other(_) -> Bucket2 = Bucket#{ rate := infinity, initial := 0, - capacity := infinity + burst := infinity }, Cli2 = Cli#{ rate := infinity, - capacity := infinity, + burst := infinity, initial := 0 }, Bucket2#{client := Cli2} @@ -414,7 +416,7 @@ t_limit_global_with_unlimit_other(_) -> Case = fun() -> C1 = counters:new(1, []), start_client({b1, Bucket}, ?NOW + 2000, C1, 20), - timer:sleep(2100), + timer:sleep(2200), check_average_rate(C1, 2, 600) end, @@ -432,7 +434,7 @@ t_check_container(_) -> Cfg#{ rate := ?RATE("1000/1s"), initial := 1000, - capacity := 1000 + burst := 0 } end, Case = fun(#{client := Client} = BucketCfg) -> @@ -565,7 +567,7 @@ t_schema_unit(_) -> ?assertMatch({error, _}, M:to_rate("100MB/1")), ?assertMatch({error, _}, M:to_rate("100/10x")), - ?assertEqual({ok, emqx_limiter_schema:infinity_value()}, M:to_capacity("infinity")), + ?assertEqual({ok, infinity}, M:to_capacity("infinity")), ?assertEqual({ok, 100}, M:to_capacity("100")), ?assertEqual({ok, 100 * 1024}, M:to_capacity("100KB")), ?assertEqual({ok, 100 * 1024 * 1024}, M:to_capacity("100MB")), @@ -748,17 +750,16 @@ connect(Name, Cfg) -> Limiter. make_limiter_cfg() -> - Infinity = emqx_limiter_schema:infinity_value(), Client = #{ - rate => Infinity, + rate => infinity, initial => 0, - capacity => Infinity, + burst => infinity, low_watermark => 0, divisible => false, max_retry_time => timer:seconds(5), failure_strategy => force }, - #{client => Client, rate => Infinity, initial => 0, capacity => Infinity}. + #{client => Client, rate => infinity, initial => 0, burst => infinity}. add_bucket(Cfg) -> add_bucket(?MODULE, Cfg). diff --git a/rel/i18n/emqx_limiter_schema.hocon b/rel/i18n/emqx_limiter_schema.hocon index 3657df694..2874999a5 100644 --- a/rel/i18n/emqx_limiter_schema.hocon +++ b/rel/i18n/emqx_limiter_schema.hocon @@ -33,28 +33,6 @@ emqx_limiter_schema { } } - client_bucket_capacity { - desc { - en: """The capacity of per user.""" - zh: """每个使用者的令牌容量上限""" - } - label: { - en: """Capacity""" - zh: """容量""" - } - } - - capacity { - desc { - en: """The capacity of this token bucket.""" - zh: """该令牌桶的容量""" - } - label: { - en: """Capacity""" - zh: """容量""" - } - } - low_watermark { desc { en: """If the remaining tokens are lower than this value, @@ -152,30 +130,30 @@ Once the limit is reached, new connections will be refused""" } } - message_in { + messages { desc { - en: """The message in limiter. + en: """The messages limiter. This is used to limit the inbound message numbers for this EMQX node Once the limit is reached, the restricted client will be slow down even be hung for a while.""" zh: """流入速率控制器。 这个用来控制当前节点上的消息流入速率,当达到最大速率后,会话将会被限速甚至被强制挂起一小段时间""" } label: { - en: """Message In""" + en: """Messages""" zh: """消息流入速率""" } } - bytes_in { + bytes { desc { - en: """The bytes_in limiter. + en: """The bytes limiter. This is used to limit the inbound bytes rate for this EMQX node. Once the limit is reached, the restricted client will be slow down even be hung for a while.""" zh: """流入字节率控制器。 这个是用来控制当前节点上的数据流入的字节率,每条消息将会消耗和其二进制大小等量的令牌,当达到最大速率后,会话将会被限速甚至被强制挂起一小段时间""" } label: { - en: """Bytes In""" + en: """Bytes""" zh: """流入字节率""" } } From 2a54d93c7e05851b994396d23214a33a5a600002 Mon Sep 17 00:00:00 2001 From: firest Date: Wed, 12 Apr 2023 17:42:23 +0800 Subject: [PATCH 2/3] chore: update changes --- changes/ce/perf-10376.en.md | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 changes/ce/perf-10376.en.md diff --git a/changes/ce/perf-10376.en.md b/changes/ce/perf-10376.en.md new file mode 100644 index 000000000..d585ad5b2 --- /dev/null +++ b/changes/ce/perf-10376.en.md @@ -0,0 +1,6 @@ +Simplify the configuration of the limiter feature and optimize some codes +- Rename `message_in` to `messages` +- Rename `bytes_in` to `bytes` +- Use `burst` instead of `capacity` +- Hide non-importance fields +- Optimize limiter instances in different rate settings From 02f8d073f8dc125675bde9cc67bd5b8c8d013a17 Mon Sep 17 00:00:00 2001 From: firest Date: Thu, 13 Apr 2023 11:45:51 +0800 Subject: [PATCH 3/3] test(limiter): fix test errors and make spellcheck happy --- .../emqx_limiter/src/emqx_limiter_schema.erl | 2 +- apps/emqx/test/emqx_channel_SUITE.erl | 9 +++--- apps/emqx/test/emqx_connection_SUITE.erl | 22 +++++++------- apps/emqx/test/emqx_ws_connection_SUITE.erl | 29 +++++++++---------- .../test/emqx_retainer_SUITE.erl | 9 +++--- rel/i18n/emqx_limiter_schema.hocon | 4 +-- 6 files changed, 34 insertions(+), 41 deletions(-) diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl index f59ddc35b..730559f80 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl @@ -437,7 +437,7 @@ importance_of_type(_) -> alias_of_type(messages) -> [message_in]; -alias_of_type(bytess) -> +alias_of_type(bytes) -> [bytes_in]; alias_of_type(_) -> []. diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index 29f8b1503..eccb5c865 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -162,8 +162,7 @@ limiter_conf() -> Make = fun() -> #{ burst => 0, - rate => infinity, - capacity => infinity + rate => infinity } end, @@ -172,7 +171,7 @@ limiter_conf() -> Acc#{Name => Make()} end, #{}, - [bytes_in, message_in, message_routing, connection, internal] + [bytes, messages, message_routing, connection, internal] ). stats_conf() -> @@ -1258,7 +1257,7 @@ limiter_cfg() -> Client = #{ rate => 5, initial => 0, - capacity => 5, + burst => 0, low_watermark => 1, divisible => false, max_retry_time => timer:seconds(5), @@ -1270,7 +1269,7 @@ limiter_cfg() -> }. bucket_cfg() -> - #{rate => 10, initial => 0, capacity => 10}. + #{rate => 10, initial => 0, burst => 0}. add_bucket() -> emqx_limiter_server:add_bucket(?MODULE, message_routing, bucket_cfg()). diff --git a/apps/emqx/test/emqx_connection_SUITE.erl b/apps/emqx/test/emqx_connection_SUITE.erl index 21ed45119..f24c1c895 100644 --- a/apps/emqx/test/emqx_connection_SUITE.erl +++ b/apps/emqx/test/emqx_connection_SUITE.erl @@ -427,7 +427,7 @@ t_ensure_rate_limit(_) -> fun(_, Client) -> {pause, 3000, undefined, Client} end ), {ok, State2} = emqx_connection:check_limiter( - [{1000, bytes_in}], + [{1000, bytes}], [], WhenOk, [], @@ -703,31 +703,29 @@ handle_call(Pid, Call, St) -> emqx_connection:handle_call(Pid, Call, St). -define(LIMITER_ID, 'tcp:default'). init_limiter() -> - emqx_limiter_container:get_limiter_by_types(?LIMITER_ID, [bytes_in, message_in], limiter_cfg()). + emqx_limiter_container:get_limiter_by_types(?LIMITER_ID, [bytes, messages], limiter_cfg()). limiter_cfg() -> - Infinity = emqx_limiter_schema:infinity_value(), Cfg = bucket_cfg(), Client = #{ - rate => Infinity, + rate => infinity, initial => 0, - capacity => Infinity, + burst => 0, low_watermark => 1, divisible => false, max_retry_time => timer:seconds(5), failure_strategy => force }, - #{bytes_in => Cfg, message_in => Cfg, client => #{bytes_in => Client, message_in => Client}}. + #{bytes => Cfg, messages => Cfg, client => #{bytes => Client, messages => Client}}. bucket_cfg() -> - Infinity = emqx_limiter_schema:infinity_value(), - #{rate => Infinity, initial => 0, capacity => Infinity}. + #{rate => infinity, initial => 0, burst => 0}. add_bucket() -> Cfg = bucket_cfg(), - emqx_limiter_server:add_bucket(?LIMITER_ID, bytes_in, Cfg), - emqx_limiter_server:add_bucket(?LIMITER_ID, message_in, Cfg). + emqx_limiter_server:add_bucket(?LIMITER_ID, bytes, Cfg), + emqx_limiter_server:add_bucket(?LIMITER_ID, messages, Cfg). del_bucket() -> - emqx_limiter_server:del_bucket(?LIMITER_ID, bytes_in), - emqx_limiter_server:del_bucket(?LIMITER_ID, message_in). + emqx_limiter_server:del_bucket(?LIMITER_ID, bytes), + emqx_limiter_server:del_bucket(?LIMITER_ID, messages). diff --git a/apps/emqx/test/emqx_ws_connection_SUITE.erl b/apps/emqx/test/emqx_ws_connection_SUITE.erl index de8b1c9af..1ae23361e 100644 --- a/apps/emqx/test/emqx_ws_connection_SUITE.erl +++ b/apps/emqx/test/emqx_ws_connection_SUITE.erl @@ -509,16 +509,16 @@ t_handle_timeout_emit_stats(_) -> t_ensure_rate_limit(_) -> {ok, Rate} = emqx_limiter_schema:to_rate("50MB"), Limiter = init_limiter(#{ - bytes_in => bucket_cfg(), - message_in => bucket_cfg(), - client => #{bytes_in => client_cfg(Rate)} + bytes => bucket_cfg(), + messages => bucket_cfg(), + client => #{bytes => client_cfg(Rate)} }), St = st(#{limiter => Limiter}), %% must bigger than value in emqx_ratelimit_SUITE {ok, Need} = emqx_limiter_schema:to_capacity("1GB"), St1 = ?ws_conn:check_limiter( - [{Need, bytes_in}], + [{Need, bytes}], [], fun(_, _, S) -> S end, [], @@ -699,23 +699,21 @@ init_limiter() -> init_limiter(limiter_cfg()). init_limiter(LimiterCfg) -> - emqx_limiter_container:get_limiter_by_types(?LIMITER_ID, [bytes_in, message_in], LimiterCfg). + emqx_limiter_container:get_limiter_by_types(?LIMITER_ID, [bytes, messages], LimiterCfg). limiter_cfg() -> Cfg = bucket_cfg(), Client = client_cfg(), - #{bytes_in => Cfg, message_in => Cfg, client => #{bytes_in => Client, message_in => Client}}. + #{bytes => Cfg, messages => Cfg, client => #{bytes => Client, messages => Client}}. client_cfg() -> - Infinity = emqx_limiter_schema:infinity_value(), - client_cfg(Infinity). + client_cfg(infinity). client_cfg(Rate) -> - Infinity = emqx_limiter_schema:infinity_value(), #{ rate => Rate, initial => 0, - capacity => Infinity, + burst => 0, low_watermark => 1, divisible => false, max_retry_time => timer:seconds(5), @@ -723,14 +721,13 @@ client_cfg(Rate) -> }. bucket_cfg() -> - Infinity = emqx_limiter_schema:infinity_value(), - #{rate => Infinity, initial => 0, capacity => Infinity}. + #{rate => infinity, initial => 0, burst => 0}. add_bucket() -> Cfg = bucket_cfg(), - emqx_limiter_server:add_bucket(?LIMITER_ID, bytes_in, Cfg), - emqx_limiter_server:add_bucket(?LIMITER_ID, message_in, Cfg). + emqx_limiter_server:add_bucket(?LIMITER_ID, bytes, Cfg), + emqx_limiter_server:add_bucket(?LIMITER_ID, messages, Cfg). del_bucket() -> - emqx_limiter_server:del_bucket(?LIMITER_ID, bytes_in), - emqx_limiter_server:del_bucket(?LIMITER_ID, message_in). + emqx_limiter_server:del_bucket(?LIMITER_ID, bytes), + emqx_limiter_server:del_bucket(?LIMITER_ID, messages). diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index 09d1f77da..c90ec6b2b 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -758,23 +758,22 @@ with_conf(ConfMod, Case) -> end. make_limiter_cfg(Rate) -> - Infinity = emqx_limiter_schema:infinity_value(), Client = #{ rate => Rate, initial => 0, - capacity => Infinity, + burst => 0, low_watermark => 1, divisible => false, max_retry_time => timer:seconds(5), failure_strategy => force }, - #{client => Client, rate => Infinity, initial => 0, capacity => Infinity}. + #{client => Client, rate => Rate, initial => 0, burst => 0}. make_limiter_json(Rate) -> Client = #{ <<"rate">> => Rate, <<"initial">> => 0, - <<"capacity">> => <<"infinity">>, + <<"burst">> => <<"0">>, <<"low_watermark">> => 0, <<"divisible">> => <<"false">>, <<"max_retry_time">> => <<"5s">>, @@ -784,5 +783,5 @@ make_limiter_json(Rate) -> <<"client">> => Client, <<"rate">> => <<"infinity">>, <<"initial">> => 0, - <<"capacity">> => <<"infinity">> + <<"burst">> => <<"0">> }. diff --git a/rel/i18n/emqx_limiter_schema.hocon b/rel/i18n/emqx_limiter_schema.hocon index 2874999a5..37eb4ee1e 100644 --- a/rel/i18n/emqx_limiter_schema.hocon +++ b/rel/i18n/emqx_limiter_schema.hocon @@ -132,7 +132,7 @@ Once the limit is reached, new connections will be refused""" messages { desc { - en: """The messages limiter. + en: """The `messages` limiter. This is used to limit the inbound message numbers for this EMQX node Once the limit is reached, the restricted client will be slow down even be hung for a while.""" zh: """流入速率控制器。 @@ -146,7 +146,7 @@ Once the limit is reached, the restricted client will be slow down even be hung bytes { desc { - en: """The bytes limiter. + en: """The `bytes` limiter. This is used to limit the inbound bytes rate for this EMQX node. Once the limit is reached, the restricted client will be slow down even be hung for a while.""" zh: """流入字节率控制器。