fix(limiter): simplify the configuration of the limiter

This commit is contained in:
firest 2023-04-17 10:06:36 +08:00
parent 69d1a35c90
commit 55376144ce
8 changed files with 183 additions and 142 deletions

View File

@ -182,10 +182,8 @@
-define(ALARM_SOCK_STATS_KEYS, [send_pend, recv_cnt, recv_oct, send_cnt, send_oct]). -define(ALARM_SOCK_STATS_KEYS, [send_pend, recv_cnt, recv_oct, send_cnt, send_oct]).
-define(ALARM_SOCK_OPTS_KEYS, [high_watermark, high_msgq_watermark, sndbuf, recbuf, buffer]). -define(ALARM_SOCK_OPTS_KEYS, [high_watermark, high_msgq_watermark, sndbuf, recbuf, buffer]).
%% use macro to do compile time limiter's type check -define(LIMITER_BYTES_IN, bytes).
-define(LIMITER_BYTES_IN, bytes_in). -define(LIMITER_MESSAGE_IN, messages).
-define(LIMITER_MESSAGE_IN, message_in).
-define(EMPTY_QUEUE, {[], []}).
-dialyzer({no_match, [info/2]}). -dialyzer({no_match, [info/2]}).
-dialyzer( -dialyzer(

View File

@ -139,7 +139,8 @@ make_token_bucket_limiter(Cfg, Bucket) ->
Cfg#{ Cfg#{
tokens => emqx_limiter_server:get_initial_val(Cfg), tokens => emqx_limiter_server:get_initial_val(Cfg),
lasttime => ?NOW, lasttime => ?NOW,
bucket => Bucket bucket => Bucket,
capacity => emqx_limiter_schema:calc_capacity(Cfg)
}. }.
%%@doc create a limiter server's reference %%@doc create a limiter server's reference

View File

@ -23,6 +23,7 @@
%% API %% API
-export([ -export([
new/3, new/3,
infinity_bucket/0,
check/3, check/3,
try_restore/2, try_restore/2,
available/1 available/1
@ -58,6 +59,10 @@ new(Counter, Index, Rate) ->
rate => Rate rate => Rate
}. }.
-spec infinity_bucket() -> bucket_ref().
infinity_bucket() ->
infinity.
%% @doc check tokens %% @doc check tokens
-spec check(pos_integer(), bucket_ref(), Disivisble :: boolean()) -> -spec check(pos_integer(), bucket_ref(), Disivisble :: boolean()) ->
HasToken :: HasToken ::

View File

@ -31,20 +31,20 @@
get_bucket_cfg_path/2, get_bucket_cfg_path/2,
desc/1, desc/1,
types/0, types/0,
infinity_value/0 calc_capacity/1
]). ]).
-define(KILOBYTE, 1024). -define(KILOBYTE, 1024).
-define(BUCKET_KEYS, [ -define(BUCKET_KEYS, [
{bytes_in, bucket_infinity}, {bytes, bucket_infinity},
{message_in, bucket_infinity}, {messages, bucket_infinity},
{connection, bucket_limit}, {connection, bucket_limit},
{message_routing, bucket_infinity} {message_routing, bucket_infinity}
]). ]).
-type limiter_type() :: -type limiter_type() ::
bytes_in bytes
| message_in | messages
| connection | connection
| message_routing | message_routing
%% internal limiter for unclassified resources %% internal limiter for unclassified resources
@ -90,14 +90,17 @@
namespace() -> limiter. namespace() -> limiter.
roots() -> [limiter]. roots() ->
[{limiter, hoconsc:mk(hoconsc:ref(?MODULE, limiter), #{importance => ?IMPORTANCE_HIDDEN})}].
fields(limiter) -> fields(limiter) ->
[ [
{Type, {Type,
?HOCON(?R_REF(node_opts), #{ ?HOCON(?R_REF(node_opts), #{
desc => ?DESC(Type), desc => ?DESC(Type),
default => #{} default => #{},
importance => ?IMPORTANCE_HIDDEN,
aliases => alias_of_type(Type)
})} })}
|| Type <- types() || Type <- types()
] ++ ] ++
@ -107,6 +110,7 @@ fields(limiter) ->
?R_REF(client_fields), ?R_REF(client_fields),
#{ #{
desc => ?DESC(client), desc => ?DESC(client),
importance => ?IMPORTANCE_HIDDEN,
default => maps:from_list([ default => maps:from_list([
{erlang:atom_to_binary(Type), #{}} {erlang:atom_to_binary(Type), #{}}
|| Type <- types() || Type <- types()
@ -124,30 +128,50 @@ fields(node_opts) ->
})} })}
]; ];
fields(client_fields) -> fields(client_fields) ->
[ client_fields(types(), #{default => #{}});
{Type,
?HOCON(?R_REF(client_opts), #{
desc => ?DESC(Type),
default => #{}
})}
|| Type <- types()
];
fields(bucket_infinity) -> fields(bucket_infinity) ->
[ [
{rate, ?HOCON(rate(), #{desc => ?DESC(rate), default => <<"infinity">>})}, {rate, ?HOCON(rate(), #{desc => ?DESC(rate), default => <<"infinity">>})},
{capacity, ?HOCON(capacity(), #{desc => ?DESC(capacity), default => <<"infinity">>})}, {burst,
{initial, ?HOCON(initial(), #{default => <<"0">>, desc => ?DESC(initial)})} ?HOCON(capacity(), #{
desc => ?DESC(capacity),
default => <<"0">>,
importance => ?IMPORTANCE_HIDDEN,
aliases => [capacity]
})},
{initial,
?HOCON(initial(), #{
default => <<"0">>,
desc => ?DESC(initial),
importance => ?IMPORTANCE_HIDDEN
})}
]; ];
fields(bucket_limit) -> fields(bucket_limit) ->
[ [
{rate, ?HOCON(rate(), #{desc => ?DESC(rate), default => <<"1000/s">>})}, {rate, ?HOCON(rate(), #{desc => ?DESC(rate), default => <<"1000/s">>})},
{capacity, ?HOCON(capacity(), #{desc => ?DESC(capacity), default => <<"1000">>})}, {burst,
{initial, ?HOCON(initial(), #{default => <<"0">>, desc => ?DESC(initial)})} ?HOCON(capacity(), #{
desc => ?DESC(burst),
default => <<"0">>,
importance => ?IMPORTANCE_HIDDEN,
aliases => [capacity]
})},
{initial,
?HOCON(initial(), #{
default => <<"0">>,
desc => ?DESC(initial),
importance => ?IMPORTANCE_HIDDEN
})}
]; ];
fields(client_opts) -> fields(client_opts) ->
[ [
{rate, ?HOCON(rate(), #{default => <<"infinity">>, desc => ?DESC(rate)})}, {rate, ?HOCON(rate(), #{default => <<"infinity">>, desc => ?DESC(rate)})},
{initial, ?HOCON(initial(), #{default => <<"0">>, desc => ?DESC(initial)})}, {initial,
?HOCON(initial(), #{
default => <<"0">>,
desc => ?DESC(initial),
importance => ?IMPORTANCE_HIDDEN
})},
%% low_watermark add for emqx_channel and emqx_session %% low_watermark add for emqx_channel and emqx_session
%% both modules consume first and then check %% both modules consume first and then check
%% so we need to use this value to prevent excessive consumption %% so we need to use this value to prevent excessive consumption
@ -157,20 +181,24 @@ fields(client_opts) ->
initial(), initial(),
#{ #{
desc => ?DESC(low_watermark), desc => ?DESC(low_watermark),
default => <<"0">> default => <<"0">>,
importance => ?IMPORTANCE_HIDDEN
} }
)}, )},
{capacity, {burst,
?HOCON(capacity(), #{ ?HOCON(capacity(), #{
desc => ?DESC(client_bucket_capacity), desc => ?DESC(burst),
default => <<"infinity">> default => <<"0">>,
importance => ?IMPORTANCE_HIDDEN,
aliases => [capacity]
})}, })},
{divisible, {divisible,
?HOCON( ?HOCON(
boolean(), boolean(),
#{ #{
desc => ?DESC(divisible), desc => ?DESC(divisible),
default => false default => false,
importance => ?IMPORTANCE_HIDDEN
} }
)}, )},
{max_retry_time, {max_retry_time,
@ -178,7 +206,8 @@ fields(client_opts) ->
emqx_schema:duration(), emqx_schema:duration(),
#{ #{
desc => ?DESC(max_retry_time), desc => ?DESC(max_retry_time),
default => <<"10s">> default => <<"10s">>,
importance => ?IMPORTANCE_HIDDEN
} }
)}, )},
{failure_strategy, {failure_strategy,
@ -186,16 +215,18 @@ fields(client_opts) ->
failure_strategy(), failure_strategy(),
#{ #{
desc => ?DESC(failure_strategy), desc => ?DESC(failure_strategy),
default => force default => force,
importance => ?IMPORTANCE_HIDDEN
} }
)} )}
]; ];
fields(listener_fields) -> fields(listener_fields) ->
bucket_fields(?BUCKET_KEYS, listener_client_fields); composite_bucket_fields(?BUCKET_KEYS, listener_client_fields);
fields(listener_client_fields) -> fields(listener_client_fields) ->
client_fields(?BUCKET_KEYS); {Types, _} = lists:unzip(?BUCKET_KEYS),
client_fields(Types, #{required => false});
fields(Type) -> fields(Type) ->
bucket_field(Type). simple_bucket_field(Type).
desc(limiter) -> desc(limiter) ->
"Settings for the rate limiter."; "Settings for the rate limiter.";
@ -230,19 +261,14 @@ get_bucket_cfg_path(Type, BucketName) ->
[limiter, Type, bucket, BucketName]. [limiter, Type, bucket, BucketName].
types() -> types() ->
[bytes_in, message_in, connection, message_routing, internal]. [bytes, messages, connection, message_routing, internal].
%%-------------------------------------------------------------------- calc_capacity(#{rate := infinity}) ->
%% Internal functions infinity;
%%-------------------------------------------------------------------- calc_capacity(#{burst := infinity}) ->
infinity;
%% `infinity` to `infinity_value` rules: calc_capacity(#{rate := Rate, burst := Burst}) ->
%% 1. all infinity capacity will change to infinity_value erlang:floor(1000 * Rate / default_period()) + Burst.
%% 2. if the rate of global and bucket both are `infinity`,
%% use `infinity_value` as bucket rate. see `emqx_limiter_server:get_counter_rate/2`
infinity_value() ->
%% 1 TB
1099511627776.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internal functions %% Internal functions
@ -335,7 +361,7 @@ to_quota(Str, Regex) ->
{match, [Quota, ""]} -> {match, [Quota, ""]} ->
{ok, erlang:list_to_integer(Quota)}; {ok, erlang:list_to_integer(Quota)};
{match, ""} -> {match, ""} ->
{ok, infinity_value()}; {ok, infinity};
_ -> _ ->
{error, Str} {error, Str}
end end
@ -350,7 +376,8 @@ apply_unit("mb", Val) -> Val * ?KILOBYTE * ?KILOBYTE;
apply_unit("gb", Val) -> Val * ?KILOBYTE * ?KILOBYTE * ?KILOBYTE; apply_unit("gb", Val) -> Val * ?KILOBYTE * ?KILOBYTE * ?KILOBYTE;
apply_unit(Unit, _) -> throw("invalid unit:" ++ Unit). apply_unit(Unit, _) -> throw("invalid unit:" ++ Unit).
bucket_field(Type) when is_atom(Type) -> %% A bucket with only one type
simple_bucket_field(Type) when is_atom(Type) ->
fields(bucket_infinity) ++ fields(bucket_infinity) ++
[ [
{client, {client,
@ -358,16 +385,22 @@ bucket_field(Type) when is_atom(Type) ->
?R_REF(?MODULE, client_opts), ?R_REF(?MODULE, client_opts),
#{ #{
desc => ?DESC(client), desc => ?DESC(client),
required => false required => false,
importance => importance_of_type(Type),
aliases => alias_of_type(Type)
} }
)} )}
]. ].
bucket_fields(Types, ClientRef) ->
%% A bucket with multi types
composite_bucket_fields(Types, ClientRef) ->
[ [
{Type, {Type,
?HOCON(?R_REF(?MODULE, Opts), #{ ?HOCON(?R_REF(?MODULE, Opts), #{
desc => ?DESC(?MODULE, Type), desc => ?DESC(?MODULE, Type),
required => false required => false,
importance => importance_of_type(Type),
aliases => alias_of_type(Type)
})} })}
|| {Type, Opts} <- Types || {Type, Opts} <- Types
] ++ ] ++
@ -382,12 +415,29 @@ bucket_fields(Types, ClientRef) ->
)} )}
]. ].
client_fields(Types) -> client_fields(Types, Meta) ->
[ [
{Type, {Type,
?HOCON(?R_REF(client_opts), #{ ?HOCON(?R_REF(client_opts), Meta#{
desc => ?DESC(Type), desc => ?DESC(Type),
required => false importance => importance_of_type(Type),
aliases => alias_of_type(Type)
})} })}
|| {Type, _} <- Types || Type <- Types
]. ].
importance_of_type(interval) ->
?IMPORTANCE_HIDDEN;
importance_of_type(message_routing) ->
?IMPORTANCE_HIDDEN;
importance_of_type(connection) ->
?IMPORTANCE_HIDDEN;
importance_of_type(_) ->
?DEFAULT_IMPORTANCE.
alias_of_type(messages) ->
[message_in];
alias_of_type(bytess) ->
[bytes_in];
alias_of_type(_) ->
[].

View File

@ -118,17 +118,24 @@ connect(_Id, _Type, undefined) ->
{ok, emqx_htb_limiter:make_infinity_limiter()}; {ok, emqx_htb_limiter:make_infinity_limiter()};
connect(Id, Type, Cfg) -> connect(Id, Type, Cfg) ->
case find_limiter_cfg(Type, Cfg) of case find_limiter_cfg(Type, Cfg) of
{undefined, _} -> {_ClientCfg, undefined, _NodeCfg} ->
{ok, emqx_htb_limiter:make_infinity_limiter()}; {ok, emqx_htb_limiter:make_infinity_limiter()};
{#{rate := infinity}, #{rate := infinity}, #{rate := infinity}} ->
{ok, emqx_htb_limiter:make_infinity_limiter()};
{ClientCfg, #{rate := infinity}, #{rate := infinity}} ->
{ok,
emqx_htb_limiter:make_token_bucket_limiter(
ClientCfg, emqx_limiter_bucket_ref:infinity_bucket()
)};
{ {
#{ #{rate := CliRate} = ClientCfg,
rate := BucketRate, #{rate := BucketRate} = BucketCfg,
capacity := BucketSize _
},
#{rate := CliRate, capacity := CliSize} = ClientCfg
} -> } ->
case emqx_limiter_manager:find_bucket(Id, Type) of case emqx_limiter_manager:find_bucket(Id, Type) of
{ok, Bucket} -> {ok, Bucket} ->
BucketSize = emqx_limiter_schema:calc_capacity(BucketCfg),
CliSize = emqx_limiter_schema:calc_capacity(ClientCfg),
{ok, {ok,
if if
CliRate < BucketRate orelse CliSize < BucketSize -> CliRate < BucketRate orelse CliSize < BucketSize ->
@ -493,12 +500,14 @@ make_root(#{rate := Rate, burst := Burst}) ->
produced => 0.0 produced => 0.0
}. }.
do_add_bucket(Id, #{rate := Rate, capacity := Capacity} = Cfg, #{buckets := Buckets} = State) -> do_add_bucket(_Id, #{rate := infinity}, #{root := #{rate := infinity}} = State) ->
State;
do_add_bucket(Id, #{rate := Rate} = Cfg, #{buckets := Buckets} = State) ->
case maps:get(Id, Buckets, undefined) of case maps:get(Id, Buckets, undefined) of
undefined -> undefined ->
make_bucket(Id, Cfg, State); make_bucket(Id, Cfg, State);
Bucket -> Bucket ->
Bucket2 = Bucket#{rate := Rate, capacity := Capacity}, Bucket2 = Bucket#{rate := Rate, capacity := emqx_limiter_schema:calc_capacity(Cfg)},
State#{buckets := Buckets#{Id := Bucket2}} State#{buckets := Buckets#{Id := Bucket2}}
end. end.
@ -509,7 +518,7 @@ make_bucket(Id, Cfg, #{index := ?COUNTER_SIZE} = State) ->
}); });
make_bucket( make_bucket(
Id, Id,
#{rate := Rate, capacity := Capacity} = Cfg, #{rate := Rate} = Cfg,
#{type := Type, counter := Counter, index := Index, buckets := Buckets} = State #{type := Type, counter := Counter, index := Index, buckets := Buckets} = State
) -> ) ->
NewIndex = Index + 1, NewIndex = Index + 1,
@ -519,7 +528,7 @@ make_bucket(
rate => Rate, rate => Rate,
obtained => Initial, obtained => Initial,
correction => 0, correction => 0,
capacity => Capacity, capacity => emqx_limiter_schema:calc_capacity(Cfg),
counter => Counter, counter => Counter,
index => NewIndex index => NewIndex
}, },
@ -541,19 +550,14 @@ do_del_bucket(Id, #{type := Type, buckets := Buckets} = State) ->
get_initial_val( get_initial_val(
#{ #{
initial := Initial, initial := Initial,
rate := Rate, rate := Rate
capacity := Capacity
} }
) -> ) ->
%% initial will nevner be infinity(see the emqx_limiter_schema)
InfVal = emqx_limiter_schema:infinity_value(),
if if
Initial > 0 -> Initial > 0 ->
Initial; Initial;
Rate =/= infinity -> Rate =/= infinity ->
erlang:min(Rate, Capacity); Rate;
Capacity =/= infinity andalso Capacity =/= InfVal ->
Capacity;
true -> true ->
0 0
end. end.
@ -568,11 +572,12 @@ call(Type, Msg) ->
end. end.
find_limiter_cfg(Type, #{rate := _} = Cfg) -> find_limiter_cfg(Type, #{rate := _} = Cfg) ->
{Cfg, find_client_cfg(Type, maps:get(client, Cfg, undefined))}; {find_client_cfg(Type, maps:get(client, Cfg, undefined)), Cfg, find_node_cfg(Type)};
find_limiter_cfg(Type, Cfg) -> find_limiter_cfg(Type, Cfg) ->
{ {
find_client_cfg(Type, emqx_utils_maps:deep_get([client, Type], Cfg, undefined)),
maps:get(Type, Cfg, undefined), maps:get(Type, Cfg, undefined),
find_client_cfg(Type, emqx_utils_maps:deep_get([client, Type], Cfg, undefined)) find_node_cfg(Type)
}. }.
find_client_cfg(Type, BucketCfg) -> find_client_cfg(Type, BucketCfg) ->
@ -585,3 +590,6 @@ merge_client_cfg(NodeCfg, undefined) ->
NodeCfg; NodeCfg;
merge_client_cfg(NodeCfg, BucketCfg) -> merge_client_cfg(NodeCfg, BucketCfg) ->
maps:merge(NodeCfg, BucketCfg). maps:merge(NodeCfg, BucketCfg).
find_node_cfg(Type) ->
emqx:get_config([limiter, Type], #{rate => infinity, burst => 0}).

View File

@ -121,8 +121,8 @@
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]).
-define(ENABLED(X), (X =/= undefined)). -define(ENABLED(X), (X =/= undefined)).
-define(LIMITER_BYTES_IN, bytes_in). -define(LIMITER_BYTES_IN, bytes).
-define(LIMITER_MESSAGE_IN, message_in). -define(LIMITER_MESSAGE_IN, messages).
-dialyzer({no_match, [info/2]}). -dialyzer({no_match, [info/2]}).
-dialyzer({nowarn_function, [websocket_init/1]}). -dialyzer({nowarn_function, [websocket_init/1]}).

View File

@ -72,7 +72,7 @@ t_consume(_) ->
Cfg = fun(Cfg) -> Cfg = fun(Cfg) ->
Cfg#{ Cfg#{
rate := 100, rate := 100,
capacity := 100, burst := 0,
initial := 100, initial := 100,
max_retry_time := 1000, max_retry_time := 1000,
failure_strategy := force failure_strategy := force
@ -89,7 +89,7 @@ t_retry(_) ->
Cfg = fun(Cfg) -> Cfg = fun(Cfg) ->
Cfg#{ Cfg#{
rate := 50, rate := 50,
capacity := 200, burst := 150,
initial := 0, initial := 0,
max_retry_time := 1000, max_retry_time := 1000,
failure_strategy := force failure_strategy := force
@ -109,7 +109,7 @@ t_restore(_) ->
Cfg = fun(Cfg) -> Cfg = fun(Cfg) ->
Cfg#{ Cfg#{
rate := 1, rate := 1,
capacity := 200, burst := 199,
initial := 50, initial := 50,
max_retry_time := 100, max_retry_time := 100,
failure_strategy := force failure_strategy := force
@ -129,7 +129,7 @@ t_max_retry_time(_) ->
Cfg = fun(Cfg) -> Cfg = fun(Cfg) ->
Cfg#{ Cfg#{
rate := 1, rate := 1,
capacity := 1, burst := 0,
max_retry_time := 500, max_retry_time := 500,
failure_strategy := drop failure_strategy := drop
} }
@ -139,8 +139,12 @@ t_max_retry_time(_) ->
Begin = ?NOW, Begin = ?NOW,
Result = emqx_htb_limiter:consume(101, Client), Result = emqx_htb_limiter:consume(101, Client),
?assertMatch({drop, _}, Result), ?assertMatch({drop, _}, Result),
Time = ?NOW - Begin, End = ?NOW,
?assert(Time >= 500 andalso Time < 550) Time = End - Begin,
?assert(
Time >= 500 andalso Time < 550,
lists:flatten(io_lib:format("Begin:~p, End:~p, Time:~p~n", [Begin, End, Time]))
)
end, end,
with_per_client(Cfg, Case). with_per_client(Cfg, Case).
@ -150,7 +154,7 @@ t_divisible(_) ->
divisible := true, divisible := true,
rate := ?RATE("1000/1s"), rate := ?RATE("1000/1s"),
initial := 600, initial := 600,
capacity := 600 burst := 0
} }
end, end,
Case = fun(BucketCfg) -> Case = fun(BucketCfg) ->
@ -176,7 +180,7 @@ t_low_watermark(_) ->
low_watermark := 400, low_watermark := 400,
rate := ?RATE("1000/1s"), rate := ?RATE("1000/1s"),
initial := 1000, initial := 1000,
capacity := 1000 burst := 0
} }
end, end,
Case = fun(BucketCfg) -> Case = fun(BucketCfg) ->
@ -201,8 +205,7 @@ t_infinity_client(_) ->
Fun = fun(Cfg) -> Cfg end, Fun = fun(Cfg) -> Cfg end,
Case = fun(Cfg) -> Case = fun(Cfg) ->
Client = connect(Cfg), Client = connect(Cfg),
InfVal = emqx_limiter_schema:infinity_value(), ?assertMatch(infinity, Client),
?assertMatch(#{bucket := #{rate := InfVal}}, Client),
Result = emqx_htb_limiter:check(100000, Client), Result = emqx_htb_limiter:check(100000, Client),
?assertEqual({ok, Client}, Result) ?assertEqual({ok, Client}, Result)
end, end,
@ -212,12 +215,12 @@ t_try_restore_agg(_) ->
Fun = fun(#{client := Cli} = Bucket) -> Fun = fun(#{client := Cli} = Bucket) ->
Bucket2 = Bucket#{ Bucket2 = Bucket#{
rate := 1, rate := 1,
capacity := 200, burst := 199,
initial := 50 initial := 50
}, },
Cli2 = Cli#{ Cli2 = Cli#{
rate := infinity, rate := infinity,
capacity := infinity, burst := infinity,
divisible := true, divisible := true,
max_retry_time := 100, max_retry_time := 100,
failure_strategy := force failure_strategy := force
@ -239,11 +242,11 @@ t_short_board(_) ->
Bucket2 = Bucket#{ Bucket2 = Bucket#{
rate := ?RATE("100/1s"), rate := ?RATE("100/1s"),
initial := 0, initial := 0,
capacity := 100 burst := 0
}, },
Cli2 = Cli#{ Cli2 = Cli#{
rate := ?RATE("600/1s"), rate := ?RATE("600/1s"),
capacity := 600, burst := 0,
initial := 600 initial := 600
}, },
Bucket2#{client := Cli2} Bucket2#{client := Cli2}
@ -261,46 +264,45 @@ t_rate(_) ->
Bucket2 = Bucket#{ Bucket2 = Bucket#{
rate := ?RATE("100/100ms"), rate := ?RATE("100/100ms"),
initial := 0, initial := 0,
capacity := infinity burst := infinity
}, },
Cli2 = Cli#{ Cli2 = Cli#{
rate := infinity, rate := infinity,
capacity := infinity, burst := infinity,
initial := 0 initial := 0
}, },
Bucket2#{client := Cli2} Bucket2#{client := Cli2}
end, end,
Case = fun(Cfg) -> Case = fun(Cfg) ->
Time = 1000,
Client = connect(Cfg), Client = connect(Cfg),
Ts1 = erlang:system_time(millisecond),
C1 = emqx_htb_limiter:available(Client), C1 = emqx_htb_limiter:available(Client),
timer:sleep(1000), timer:sleep(1100),
Ts2 = erlang:system_time(millisecond),
C2 = emqx_htb_limiter:available(Client), C2 = emqx_htb_limiter:available(Client),
ShouldInc = floor((Ts2 - Ts1) / 100) * 100, ShouldInc = floor(Time / 100) * 100,
Inc = C2 - C1, Inc = C2 - C1,
?assert(in_range(Inc, ShouldInc - 100, ShouldInc + 100), "test bucket rate") ?assert(in_range(Inc, ShouldInc - 100, ShouldInc + 100), "test bucket rate")
end, end,
with_bucket(Fun, Case). with_bucket(Fun, Case).
t_capacity(_) -> t_capacity(_) ->
Capacity = 600, Capacity = 1200,
Fun = fun(#{client := Cli} = Bucket) -> Fun = fun(#{client := Cli} = Bucket) ->
Bucket2 = Bucket#{ Bucket2 = Bucket#{
rate := ?RATE("100/100ms"), rate := ?RATE("100/100ms"),
initial := 0, initial := 0,
capacity := 600 burst := 200
}, },
Cli2 = Cli#{ Cli2 = Cli#{
rate := infinity, rate := infinity,
capacity := infinity, burst := infinity,
initial := 0 initial := 0
}, },
Bucket2#{client := Cli2} Bucket2#{client := Cli2}
end, end,
Case = fun(Cfg) -> Case = fun(Cfg) ->
Client = connect(Cfg), Client = connect(Cfg),
timer:sleep(1000), timer:sleep(1500),
C1 = emqx_htb_limiter:available(Client), C1 = emqx_htb_limiter:available(Client),
?assertEqual(Capacity, C1, "test bucket capacity") ?assertEqual(Capacity, C1, "test bucket capacity")
end, end,
@ -318,11 +320,11 @@ t_collaborative_alloc(_) ->
Bucket2 = Bucket#{ Bucket2 = Bucket#{
rate := ?RATE("400/1s"), rate := ?RATE("400/1s"),
initial := 0, initial := 0,
capacity := 600 burst := 200
}, },
Cli2 = Cli#{ Cli2 = Cli#{
rate := ?RATE("50"), rate := ?RATE("50"),
capacity := 100, burst := 50,
initial := 100 initial := 100
}, },
Bucket2#{client := Cli2} Bucket2#{client := Cli2}
@ -363,11 +365,11 @@ t_burst(_) ->
Bucket2 = Bucket#{ Bucket2 = Bucket#{
rate := ?RATE("200/1s"), rate := ?RATE("200/1s"),
initial := 0, initial := 0,
capacity := 200 burst := 0
}, },
Cli2 = Cli#{ Cli2 = Cli#{
rate := ?RATE("50/1s"), rate := ?RATE("50/1s"),
capacity := 200, burst := 150,
divisible := true divisible := true
}, },
Bucket2#{client := Cli2} Bucket2#{client := Cli2}
@ -401,11 +403,11 @@ t_limit_global_with_unlimit_other(_) ->
Bucket2 = Bucket#{ Bucket2 = Bucket#{
rate := infinity, rate := infinity,
initial := 0, initial := 0,
capacity := infinity burst := infinity
}, },
Cli2 = Cli#{ Cli2 = Cli#{
rate := infinity, rate := infinity,
capacity := infinity, burst := infinity,
initial := 0 initial := 0
}, },
Bucket2#{client := Cli2} Bucket2#{client := Cli2}
@ -414,7 +416,7 @@ t_limit_global_with_unlimit_other(_) ->
Case = fun() -> Case = fun() ->
C1 = counters:new(1, []), C1 = counters:new(1, []),
start_client({b1, Bucket}, ?NOW + 2000, C1, 20), start_client({b1, Bucket}, ?NOW + 2000, C1, 20),
timer:sleep(2100), timer:sleep(2200),
check_average_rate(C1, 2, 600) check_average_rate(C1, 2, 600)
end, end,
@ -432,7 +434,7 @@ t_check_container(_) ->
Cfg#{ Cfg#{
rate := ?RATE("1000/1s"), rate := ?RATE("1000/1s"),
initial := 1000, initial := 1000,
capacity := 1000 burst := 0
} }
end, end,
Case = fun(#{client := Client} = BucketCfg) -> Case = fun(#{client := Client} = BucketCfg) ->
@ -565,7 +567,7 @@ t_schema_unit(_) ->
?assertMatch({error, _}, M:to_rate("100MB/1")), ?assertMatch({error, _}, M:to_rate("100MB/1")),
?assertMatch({error, _}, M:to_rate("100/10x")), ?assertMatch({error, _}, M:to_rate("100/10x")),
?assertEqual({ok, emqx_limiter_schema:infinity_value()}, M:to_capacity("infinity")), ?assertEqual({ok, infinity}, M:to_capacity("infinity")),
?assertEqual({ok, 100}, M:to_capacity("100")), ?assertEqual({ok, 100}, M:to_capacity("100")),
?assertEqual({ok, 100 * 1024}, M:to_capacity("100KB")), ?assertEqual({ok, 100 * 1024}, M:to_capacity("100KB")),
?assertEqual({ok, 100 * 1024 * 1024}, M:to_capacity("100MB")), ?assertEqual({ok, 100 * 1024 * 1024}, M:to_capacity("100MB")),
@ -748,17 +750,16 @@ connect(Name, Cfg) ->
Limiter. Limiter.
make_limiter_cfg() -> make_limiter_cfg() ->
Infinity = emqx_limiter_schema:infinity_value(),
Client = #{ Client = #{
rate => Infinity, rate => infinity,
initial => 0, initial => 0,
capacity => Infinity, burst => infinity,
low_watermark => 0, low_watermark => 0,
divisible => false, divisible => false,
max_retry_time => timer:seconds(5), max_retry_time => timer:seconds(5),
failure_strategy => force failure_strategy => force
}, },
#{client => Client, rate => Infinity, initial => 0, capacity => Infinity}. #{client => Client, rate => infinity, initial => 0, burst => infinity}.
add_bucket(Cfg) -> add_bucket(Cfg) ->
add_bucket(?MODULE, Cfg). add_bucket(?MODULE, Cfg).

View File

@ -33,28 +33,6 @@ emqx_limiter_schema {
} }
} }
client_bucket_capacity {
desc {
en: """The capacity of per user."""
zh: """每个使用者的令牌容量上限"""
}
label: {
en: """Capacity"""
zh: """容量"""
}
}
capacity {
desc {
en: """The capacity of this token bucket."""
zh: """该令牌桶的容量"""
}
label: {
en: """Capacity"""
zh: """容量"""
}
}
low_watermark { low_watermark {
desc { desc {
en: """If the remaining tokens are lower than this value, en: """If the remaining tokens are lower than this value,
@ -152,30 +130,30 @@ Once the limit is reached, new connections will be refused"""
} }
} }
message_in { messages {
desc { desc {
en: """The message in limiter. en: """The messages limiter.
This is used to limit the inbound message numbers for this EMQX node This is used to limit the inbound message numbers for this EMQX node
Once the limit is reached, the restricted client will be slow down even be hung for a while.""" Once the limit is reached, the restricted client will be slow down even be hung for a while."""
zh: """流入速率控制器。 zh: """流入速率控制器。
这个用来控制当前节点上的消息流入速率,当达到最大速率后,会话将会被限速甚至被强制挂起一小段时间""" 这个用来控制当前节点上的消息流入速率,当达到最大速率后,会话将会被限速甚至被强制挂起一小段时间"""
} }
label: { label: {
en: """Message In""" en: """Messages"""
zh: """消息流入速率""" zh: """消息流入速率"""
} }
} }
bytes_in { bytes {
desc { desc {
en: """The bytes_in limiter. en: """The bytes limiter.
This is used to limit the inbound bytes rate for this EMQX node. This is used to limit the inbound bytes rate for this EMQX node.
Once the limit is reached, the restricted client will be slow down even be hung for a while.""" Once the limit is reached, the restricted client will be slow down even be hung for a while."""
zh: """流入字节率控制器。 zh: """流入字节率控制器。
这个是用来控制当前节点上的数据流入的字节率,每条消息将会消耗和其二进制大小等量的令牌,当达到最大速率后,会话将会被限速甚至被强制挂起一小段时间""" 这个是用来控制当前节点上的数据流入的字节率,每条消息将会消耗和其二进制大小等量的令牌,当达到最大速率后,会话将会被限速甚至被强制挂起一小段时间"""
} }
label: { label: {
en: """Bytes In""" en: """Bytes"""
zh: """流入字节率""" zh: """流入字节率"""
} }
} }