Merge pull request #7311 from lafirest/fix/limit_schema

feat(limiter): simplify the implementation and configuration
This commit is contained in:
lafirest 2022-03-18 16:40:54 +08:00 committed by GitHub
commit ba450c8000
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 529 additions and 770 deletions

View File

@ -6,50 +6,40 @@ limiter {
## rate limiter for message publish ## rate limiter for message publish
bytes_in { bytes_in {
bucket.default { bucket.default {
aggregated.rate = infinity rate = infinity
aggregated.capacity = infinity capacity = infinity
per_client.rate = infinity
per_client.capacity = infinity
} }
} }
## rate limiter for message publish ## rate limiter for message publish
message_in { message_in {
bucket.default { bucket.default {
aggregated.rate = infinity rate = infinity
aggregated.capacity = infinity capacity = infinity
per_client.rate = infinity
per_client.capacity = infinity
} }
} }
## connection rate limiter ## connection rate limiter
connection { connection {
bucket.default { bucket.default {
aggregated.rate = infinity rate = infinity
aggregated.capacity = infinity capacity = infinity
per_client.rate = infinity
per_client.capacity = infinity
} }
} }
## rate limiter for message deliver ## rate limiter for message deliver
message_routing { message_routing {
bucket.default { bucket.default {
aggregated.rate = infinity rate = infinity
aggregated.capacity = infinity capacity = infinity
per_client.rate = infinity
per_client.capacity = infinity
} }
} }
## Some functions that don't need to use global and zone scope, them can shared use this type ## rate limiter for internal batch operation
shared { batch {
bucket.retainer { bucket.retainer {
aggregated.rate = infinity rate = infinity
aggregated.capacity = infinity capacity = infinity
per_client.rate = infinity
per_client.capacity = infinity
} }
} }
} }

View File

@ -22,24 +22,28 @@
%% API %% API
-export([ make_token_bucket_limiter/2, make_ref_limiter/2, check/2 -export([ make_token_bucket_limiter/2, make_ref_limiter/2, check/2
, consume/2, set_retry/2, retry/1, make_infinity_limiter/1 , consume/2, set_retry/2, retry/1, make_infinity_limiter/0
, make_future/1, available/1 , make_future/1, available/1
]). ]).
-export_type([token_bucket_limiter/0]). -export_type([token_bucket_limiter/0]).
%% a token bucket limiter with a limiter server's bucket reference %% a token bucket limiter with a limiter server's bucket reference
-type token_bucket_limiter() :: #{ tokens := non_neg_integer() %% the number of tokens currently available -type token_bucket_limiter() :: #{ %% the number of tokens currently available
tokens := non_neg_integer()
, rate := decimal() , rate := decimal()
, capacity := decimal() , capacity := decimal()
, lasttime := millisecond() , lasttime := millisecond()
, max_retry_time := non_neg_integer() %% @see emqx_limiter_schema %% @see emqx_limiter_schema
, failure_strategy := failure_strategy() %% @see emqx_limiter_schema , max_retry_time := non_neg_integer()
%% @see emqx_limiter_schema
, failure_strategy := failure_strategy()
, divisible := boolean() %% @see emqx_limiter_schema , divisible := boolean() %% @see emqx_limiter_schema
, low_water_mark := non_neg_integer() %% @see emqx_limiter_schema , low_water_mark := non_neg_integer() %% @see emqx_limiter_schema
, bucket := bucket() %% the limiter server's bucket , bucket := bucket() %% the limiter server's bucket
%% retry contenxt %% retry contenxt
, retry_ctx => undefined %% undefined meaning there is no retry context or no need to retry %% undefined meaning no retry context or no need to retry
, retry_ctx => undefined
| retry_context(token_bucket_limiter()) %% the retry context | retry_context(token_bucket_limiter()) %% the retry context
, atom => any() %% allow to add other keys , atom => any() %% allow to add other keys
}. }.
@ -119,8 +123,8 @@ make_token_bucket_limiter(Cfg, Bucket) ->
make_ref_limiter(Cfg, Bucket) when Bucket =/= infinity -> make_ref_limiter(Cfg, Bucket) when Bucket =/= infinity ->
Cfg#{bucket => Bucket}. Cfg#{bucket => Bucket}.
-spec make_infinity_limiter(limiter_bucket_cfg()) -> infinity. -spec make_infinity_limiter() -> infinity.
make_infinity_limiter(_) -> make_infinity_limiter() ->
infinity. infinity.
%% @doc request some tokens %% @doc request some tokens
@ -247,12 +251,11 @@ try_consume(_, _, Limiter) ->
-spec do_check(acquire_type(Limiter), Limiter) -> inner_check_result(Limiter) -spec do_check(acquire_type(Limiter), Limiter) -> inner_check_result(Limiter)
when Limiter :: limiter(). when Limiter :: limiter().
do_check(Need, #{tokens := Tokens} = Limiter) -> do_check(Need, #{tokens := Tokens} = Limiter) when Need =< Tokens ->
if Need =< Tokens -> do_check_with_parent_limiter(Need, Limiter);
do_check_with_parent_limiter(Need, Limiter);
true -> do_check(Need, #{tokens := _} = Limiter) ->
do_reset(Need, Limiter) do_reset(Need, Limiter);
end;
do_check(Need, #{divisible := Divisible, do_check(Need, #{divisible := Divisible,
bucket := Bucket} = Ref) -> bucket := Bucket} = Ref) ->
@ -275,7 +278,8 @@ on_failure(throw, Limiter) ->
Message = io_lib:format("limiter consume failed, limiter:~p~n", [Limiter]), Message = io_lib:format("limiter consume failed, limiter:~p~n", [Limiter]),
erlang:throw({rate_check_fail, Message}). erlang:throw({rate_check_fail, Message}).
-spec do_check_with_parent_limiter(pos_integer(), token_bucket_limiter()) -> inner_check_result(token_bucket_limiter()). -spec do_check_with_parent_limiter(pos_integer(), token_bucket_limiter()) ->
inner_check_result(token_bucket_limiter()).
do_check_with_parent_limiter(Need, do_check_with_parent_limiter(Need,
#{tokens := Tokens, #{tokens := Tokens,
divisible := Divisible, divisible := Divisible,
@ -301,17 +305,19 @@ do_reset(Need,
capacity := Capacity} = Limiter) -> capacity := Capacity} = Limiter) ->
Now = ?NOW, Now = ?NOW,
Tokens2 = apply_elapsed_time(Rate, Now - LastTime, Tokens, Capacity), Tokens2 = apply_elapsed_time(Rate, Now - LastTime, Tokens, Capacity),
if Tokens2 >= Need ->
case erlang:floor(Tokens2) of
Available when Available >= Need ->
Limiter2 = Limiter#{tokens := Tokens2, lasttime := Now}, Limiter2 = Limiter#{tokens := Tokens2, lasttime := Now},
do_check_with_parent_limiter(Need, Limiter2); do_check_with_parent_limiter(Need, Limiter2);
Divisible andalso Tokens2 > 0 -> Available when Divisible andalso Available > 0 ->
%% must be allocated here, because may be Need > Capacity %% must be allocated here, because may be Need > Capacity
return_pause(Rate, return_pause(Rate,
partial, partial,
fun do_reset/2, fun do_reset/2,
Need - Tokens2, Need - Available,
Limiter#{tokens := 0, lasttime := Now}); Limiter#{tokens := 0, lasttime := Now});
true -> _ ->
return_pause(Rate, pause, fun do_reset/2, Need, Limiter) return_pause(Rate, pause, fun do_reset/2, Need, Limiter)
end. end.
@ -326,8 +332,8 @@ return_pause(Rate, PauseType, Fun, Diff, Limiter) ->
Pause = emqx_misc:clamp(Val, ?MINIMUM_PAUSE, ?MAXIMUM_PAUSE), Pause = emqx_misc:clamp(Val, ?MINIMUM_PAUSE, ?MAXIMUM_PAUSE),
{PauseType, Pause, make_retry_context(Fun, Diff), Limiter}. {PauseType, Pause, make_retry_context(Fun, Diff), Limiter}.
-spec make_retry_context(undefined | retry_fun(Limiter), non_neg_integer()) -> retry_context(Limiter) -spec make_retry_context(undefined | retry_fun(Limiter), non_neg_integer()) ->
when Limiter :: limiter(). retry_context(Limiter) when Limiter :: limiter().
make_retry_context(Fun, Diff) -> make_retry_context(Fun, Diff) ->
#{continuation => Fun, diff => Diff}. #{continuation => Fun, diff => Diff}.

View File

@ -39,8 +39,7 @@
{ok, Pid :: pid(), State :: term()} | {ok, Pid :: pid(), State :: term()} |
{error, Reason :: term()}. {error, Reason :: term()}.
start(_StartType, _StartArgs) -> start(_StartType, _StartArgs) ->
{ok, _} = Result = emqx_limiter_sup:start_link(), {ok, _} = emqx_limiter_sup:start_link().
Result.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @private %% @private

View File

@ -21,7 +21,7 @@
%% @end %% @end
%% API %% API
-export([ new/0, new/1, get_limiter_by_names/2 -export([ new/0, new/1, new/2, get_limiter_by_names/2
, add_new/3, update_by_name/3, set_retry_context/2 , add_new/3, update_by_name/3, set_retry_context/2
, check/3, retry/2, get_retry_context/1 , check/3, retry/2, get_retry_context/1
, check_list/2, retry_list/2 , check_list/2, retry_list/2
@ -30,7 +30,10 @@
-export_type([container/0, check_result/0]). -export_type([container/0, check_result/0]).
-type container() :: #{ limiter_type() => undefined | limiter() -type container() :: #{ limiter_type() => undefined | limiter()
, retry_key() => undefined | retry_context() | future() %% the retry context of the limiter %% the retry context of the limiter
, retry_key() => undefined
| retry_context()
| future()
, retry_ctx := undefined | any() %% the retry context of the container , retry_ctx := undefined | any() %% the retry context of the container
}. }.
@ -57,12 +60,18 @@ new() ->
%% @doc generate default data according to the type of limiter %% @doc generate default data according to the type of limiter
-spec new(list(limiter_type())) -> container(). -spec new(list(limiter_type())) -> container().
new(Types) -> new(Types) ->
get_limiter_by_names(Types, #{}). new(Types, #{}).
-spec new(list(limiter_type()),
#{limiter_type() => emqx_limiter_schema:bucket_name()}) -> container().
new(Types, Names) ->
get_limiter_by_names(Types, Names).
%% @doc generate a container %% @doc generate a container
%% according to the type of limiter and the bucket name configuration of the limiter %% according to the type of limiter and the bucket name configuration of the limiter
%% @end %% @end
-spec get_limiter_by_names(list(limiter_type()), #{limiter_type() => emqx_limiter_schema:bucket_name()}) -> container(). -spec get_limiter_by_names(list(limiter_type()),
#{limiter_type() => emqx_limiter_schema:bucket_name()}) -> container().
get_limiter_by_names(Types, BucketNames) -> get_limiter_by_names(Types, BucketNames) ->
Init = fun(Type, Acc) -> Init = fun(Type, Acc) ->
Limiter = emqx_limiter_server:connect(Type, BucketNames), Limiter = emqx_limiter_server:connect(Type, BucketNames),

View File

@ -23,8 +23,9 @@
%% API %% API
-export([ start_link/0, start_server/1, find_bucket/1 -export([ start_link/0, start_server/1, find_bucket/1
, find_bucket/3, insert_bucket/2, insert_bucket/4 , find_bucket/2, insert_bucket/2, insert_bucket/3
, make_path/3, restart_server/1]). , make_path/2, restart_server/1
]).
%% gen_server callbacks %% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@ -34,7 +35,6 @@
-type path() :: list(atom()). -type path() :: list(atom()).
-type limiter_type() :: emqx_limiter_schema:limiter_type(). -type limiter_type() :: emqx_limiter_schema:limiter_type().
-type zone_name() :: emqx_limiter_schema:zone_name().
-type bucket_name() :: emqx_limiter_schema:bucket_name(). -type bucket_name() :: emqx_limiter_schema:bucket_name().
%% counter record in ets table %% counter record in ets table
@ -57,10 +57,10 @@ start_server(Type) ->
restart_server(Type) -> restart_server(Type) ->
emqx_limiter_server_sup:restart(Type). emqx_limiter_server_sup:restart(Type).
-spec find_bucket(limiter_type(), zone_name(), bucket_name()) -> -spec find_bucket(limiter_type(), bucket_name()) ->
{ok, bucket_ref()} | undefined. {ok, bucket_ref()} | undefined.
find_bucket(Type, Zone, BucketId) -> find_bucket(Type, BucketName) ->
find_bucket(make_path(Type, Zone, BucketId)). find_bucket(make_path(Type, BucketName)).
-spec find_bucket(path()) -> {ok, bucket_ref()} | undefined. -spec find_bucket(path()) -> {ok, bucket_ref()} | undefined.
find_bucket(Path) -> find_bucket(Path) ->
@ -72,21 +72,19 @@ find_bucket(Path) ->
end. end.
-spec insert_bucket(limiter_type(), -spec insert_bucket(limiter_type(),
zone_name(),
bucket_name(), bucket_name(),
bucket_ref()) -> boolean(). bucket_ref()) -> boolean().
insert_bucket(Type, Zone, BucketId, Bucket) -> insert_bucket(Type, BucketName, Bucket) ->
inner_insert_bucket(make_path(Type, Zone, BucketId), inner_insert_bucket(make_path(Type, BucketName), Bucket).
Bucket).
-spec insert_bucket(path(), bucket_ref()) -> true. -spec insert_bucket(path(), bucket_ref()) -> true.
insert_bucket(Path, Bucket) -> insert_bucket(Path, Bucket) ->
inner_insert_bucket(Path, Bucket). inner_insert_bucket(Path, Bucket).
-spec make_path(limiter_type(), zone_name(), bucket_name()) -> path(). -spec make_path(limiter_type(), bucket_name()) -> path().
make_path(Type, Name, BucketId) -> make_path(Type, BucketName) ->
[Type, Name, BucketId]. [Type | BucketName].
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc %% @doc

View File

@ -20,7 +20,8 @@
-export([ roots/0, fields/1, to_rate/1, to_capacity/1 -export([ roots/0, fields/1, to_rate/1, to_capacity/1
, minimum_period/0, to_burst_rate/1, to_initial/1 , minimum_period/0, to_burst_rate/1, to_initial/1
, namespace/0]). , namespace/0, get_bucket_cfg_path/2
]).
-define(KILOBYTE, 1024). -define(KILOBYTE, 1024).
@ -28,14 +29,14 @@
| message_in | message_in
| connection | connection
| message_routing | message_routing
| shared. | batch.
-type bucket_name() :: atom(). -type bucket_name() :: atom().
-type zone_name() :: atom().
-type rate() :: infinity | float(). -type rate() :: infinity | float().
-type burst_rate() :: 0 | float(). -type burst_rate() :: 0 | float().
-type capacity() :: infinity | number(). %% the capacity of the token bucket -type capacity() :: infinity | number(). %% the capacity of the token bucket
-type initial() :: non_neg_integer(). %% initial capacity of the token bucket -type initial() :: non_neg_integer(). %% initial capacity of the token bucket
-type bucket_path() :: list(atom()).
%% the processing strategy after the failure of the token request %% the processing strategy after the failure of the token request
-type failure_strategy() :: force %% Forced to pass -type failure_strategy() :: force %% Forced to pass
@ -52,58 +53,80 @@
, capacity/0 , capacity/0
, initial/0 , initial/0
, failure_strategy/0 , failure_strategy/0
, bucket_name/0
]). ]).
-export_type([limiter_type/0, bucket_name/0, zone_name/0]). -export_type([limiter_type/0, bucket_path/0]).
-import(emqx_schema, [sc/2, map/2]). -import(emqx_schema, [sc/2, map/2]).
-define(UNIT_TIME_IN_MS, 1000).
namespace() -> limiter. namespace() -> limiter.
roots() -> [limiter]. roots() -> [limiter].
fields(limiter) -> fields(limiter) ->
[ {bytes_in, sc(ref(limiter_opts), #{})} [ {bytes_in, sc(ref(limiter_opts),
, {message_in, sc(ref(limiter_opts), #{})} #{description =>
, {connection, sc(ref(limiter_opts), #{})} <<"The bytes_in limiter.<br>"
, {message_routing, sc(ref(limiter_opts), #{})} "It is used to limit the inbound bytes rate for this EMQX node."
, {shared, sc(ref(shared_limiter_opts), "If the this limiter limit is reached,"
#{description => "the restricted client will be slow down even be hung for a while.">>
<<"Some functions that do not need to use global and zone scope," })}
"them can shared use this type">>})} , {message_in, sc(ref(limiter_opts),
#{description =>
<<"The message_in limiter.<br>"
"This is used to limit the inbound message numbers for this EMQX node"
"If the this limiter limit is reached,"
"the restricted client will be slow down even be hung for a while.">>
})}
, {connection, sc(ref(limiter_opts),
#{description =>
<<"The connection limiter.<br>"
"This is used to limit the connection rate for this EMQX node"
"If the this limiter limit is reached,"
"New connections will be refused"
>>})}
, {message_routing, sc(ref(limiter_opts),
#{description =>
<<"The message_routing limiter.<br>"
"This is used to limite the deliver rate for this EMQX node"
"If the this limiter limit is reached,"
"New publish will be refused"
>>
})}
, {batch, sc(ref(limiter_opts),
#{description => <<"The batch limiter.<br>"
"This is used for EMQX internal batch operation"
"e.g. limite the retainer's deliver rate"
>>
})}
]; ];
fields(limiter_opts) -> fields(limiter_opts) ->
[ {global, sc(ref(rate_burst), #{required => false})} [ {rate, sc(rate(), #{default => "infinity", desc => "The rate"})}
, {zone, sc(map("zone name", ref(rate_burst)), #{required => false})} , {burst, sc(burst_rate(),
, {bucket, sc(map("bucket_id", ref(bucket)), #{default => "0/0s",
#{desc => "Token bucket"})} desc => "The burst, This value is based on rate."
"this value + rate = the maximum limit that can be achieved when limiter burst"
})}
, {bucket, sc(map("bucket name", ref(bucket_opts)), #{desc => "Buckets config"})}
]; ];
fields(shared_limiter_opts) -> fields(bucket_opts) ->
[{bucket, sc(map("bucket_id", ref(bucket)), [ {rate, sc(rate(), #{desc => "The rate for this bucket"})}
#{desc => "Token bucket"})} , {capacity, sc(capacity(), #{desc => "The maximum number of tokens for this bucket"})}
]; , {initial, sc(initial(), #{default => "0",
desc => "The initial number of tokens for this bucket"})}
fields(rate_burst) -> , {per_client, sc(ref(client_bucket),
[ {rate, sc(rate(), #{})} #{default => #{},
, {burst, sc(burst_rate(), #{default => "0/0s"})} desc => "The rate limit for each user of the bucket,"
]; "this field is not required"
})}
fields(bucket) ->
[ {zone, sc(atom(), #{desc => "The bucket's zone", default => default})}
, {aggregated, sc(ref(bucket_aggregated), #{})}
, {per_client, sc(ref(client_bucket), #{})}
];
fields(bucket_aggregated) ->
[ {rate, sc(rate(), #{})}
, {initial, sc(initial(), #{default => "0"})}
, {capacity, sc(capacity(), #{})}
]; ];
fields(client_bucket) -> fields(client_bucket) ->
[ {rate, sc(rate(), #{})} [ {rate, sc(rate(), #{default => "infinity"})}
, {initial, sc(initial(), #{default => "0"})} , {initial, sc(initial(), #{default => "0"})}
%% low_water_mark add for emqx_channel and emqx_session %% low_water_mark add for emqx_channel and emqx_session
%% both modules consume first and then check %% both modules consume first and then check
@ -113,13 +136,14 @@ fields(client_bucket) ->
#{desc => "If the remaining tokens are lower than this value, #{desc => "If the remaining tokens are lower than this value,
the check/consume will succeed, but it will be forced to wait for a short period of time.", the check/consume will succeed, but it will be forced to wait for a short period of time.",
default => "0"})} default => "0"})}
, {capacity, sc(capacity(), #{desc => "The capacity of the token bucket."})} , {capacity, sc(capacity(), #{desc => "The capacity of the token bucket.",
default => "infinity"})}
, {divisible, sc(boolean(), , {divisible, sc(boolean(),
#{desc => "Is it possible to split the number of requested tokens?", #{desc => "Is it possible to split the number of requested tokens?",
default => false})} default => false})}
, {max_retry_time, sc(emqx_schema:duration(), , {max_retry_time, sc(emqx_schema:duration(),
#{ desc => "The maximum retry time when acquire failed." #{ desc => "The maximum retry time when acquire failed."
, default => "5s"})} , default => "10s"})}
, {failure_strategy, sc(failure_strategy(), , {failure_strategy, sc(failure_strategy(),
#{ desc => "The strategy when all the retries failed." #{ desc => "The strategy when all the retries failed."
, default => force})} , default => force})}
@ -132,6 +156,10 @@ minimum_period() ->
to_rate(Str) -> to_rate(Str) ->
to_rate(Str, true, false). to_rate(Str, true, false).
-spec get_bucket_cfg_path(limiter_type(), bucket_name()) -> bucket_path().
get_bucket_cfg_path(Type, BucketName) ->
[limiter, Type, bucket, BucketName].
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internal functions %% Internal functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -145,22 +173,38 @@ to_rate(Str, CanInfinity, CanZero) ->
case Tokens of case Tokens of
["infinity"] when CanInfinity -> ["infinity"] when CanInfinity ->
{ok, infinity}; {ok, infinity};
["0", _] when CanZero -> [QuotaStr] -> %% if time unit is 1s, it can be omitted
{ok, 0}; %% for burst {ok, Val} = to_capacity(QuotaStr),
[Quota, Interval] -> check_capacity(Str, Val, CanZero,
{ok, Val} = to_capacity(Quota), fun(Quota) ->
case emqx_schema:to_duration_ms(Interval) of {ok, Quota * minimum_period() / ?UNIT_TIME_IN_MS}
{ok, Ms} when Ms > 0 -> end);
{ok, Val * minimum_period() / Ms}; [QuotaStr, Interval] ->
_ -> {ok, Val} = to_capacity(QuotaStr),
{error, Str} check_capacity(Str, Val, CanZero,
end; fun(Quota) ->
case emqx_schema:to_duration_ms(Interval) of
{ok, Ms} when Ms > 0 ->
{ok, Quota * minimum_period() / Ms};
_ ->
{error, Str}
end
end);
_ -> _ ->
{error, Str} {error, Str}
end. end.
check_capacity(_Str, 0, true, _Cont) ->
{ok, 0};
check_capacity(Str, 0, false, _Cont) ->
{error, Str};
check_capacity(_Str, Quota, _CanZero, Cont) ->
Cont(Quota).
to_capacity(Str) -> to_capacity(Str) ->
Regex = "^\s*(?:(?:([1-9][0-9]*)([a-zA-z]*))|infinity)\s*$", Regex = "^\s*(?:([0-9]+)([a-zA-z]*))|infinity\s*$",
to_quota(Str, Regex). to_quota(Str, Regex).
to_initial(Str) -> to_initial(Str) ->
@ -175,9 +219,9 @@ to_quota(Str, Regex) ->
Val = erlang:list_to_integer(Quota), Val = erlang:list_to_integer(Quota),
Unit2 = string:to_lower(Unit), Unit2 = string:to_lower(Unit),
{ok, apply_unit(Unit2, Val)}; {ok, apply_unit(Unit2, Val)};
{match, [Quota]} -> {match, [Quota, ""]} ->
{ok, erlang:list_to_integer(Quota)}; {ok, erlang:list_to_integer(Quota)};
{match, []} -> {match, ""} ->
{ok, infinity}; {ok, infinity};
_ -> _ ->
{error, Str} {error, Str}

View File

@ -34,26 +34,16 @@
terminate/2, code_change/3, format_status/2]). terminate/2, code_change/3, format_status/2]).
-export([ start_link/1, connect/2, info/1 -export([ start_link/1, connect/2, info/1
, name/1, get_initial_val/1]). , name/1, get_initial_val/1, update_config/1
]).
-type root() :: #{ rate := rate() %% number of tokens generated per period -type root() :: #{ rate := rate() %% number of tokens generated per period
, burst := rate() , burst := rate()
, period := pos_integer() %% token generation interval(second) , period := pos_integer() %% token generation interval(second)
, childs := list(node_id()) %% node children
, consumed := non_neg_integer() , consumed := non_neg_integer()
}. }.
-type zone() :: #{ id := node_id() -type bucket() :: #{ name := bucket_name()
, name := zone_name()
, rate := rate()
, burst := rate()
, obtained := non_neg_integer() %% number of tokens obtained
, childs := list(node_id())
}.
-type bucket() :: #{ id := node_id()
, name := bucket_name()
, zone := zone_name() %% pointer to zone node, use for burst
, rate := rate() , rate := rate()
, obtained := non_neg_integer() , obtained := non_neg_integer()
, correction := emqx_limiter_decimal:zero_or_float() %% token correction value , correction := emqx_limiter_decimal:zero_or_float() %% token correction value
@ -62,19 +52,14 @@
, index := undefined | index() , index := undefined | index()
}. }.
-type state() :: #{ root := undefined | root() -type state() :: #{ type := limiter_type()
, root := undefined | root()
, buckets := buckets()
, counter := undefined | counters:counters_ref() %% current counter to alloc , counter := undefined | counters:counters_ref() %% current counter to alloc
, index := index() , index := index()
, zones := #{zone_name() => node_id()}
, buckets := list(node_id())
, nodes := nodes()
, type := limiter_type()
}. }.
-type node_id() :: pos_integer(). -type buckets() :: #{bucket_name() => bucket()}.
-type node_data() :: zone() | bucket().
-type nodes() :: #{node_id() => node_data()}.
-type zone_name() :: emqx_limiter_schema:zone_name().
-type limiter_type() :: emqx_limiter_schema:limiter_type(). -type limiter_type() :: emqx_limiter_schema:limiter_type().
-type bucket_name() :: emqx_limiter_schema:bucket_name(). -type bucket_name() :: emqx_limiter_schema:bucket_name().
-type rate() :: decimal(). -type rate() :: decimal().
@ -85,6 +70,7 @@
-define(CALL(Type), gen_server:call(name(Type), ?FUNCTION_NAME)). -define(CALL(Type), gen_server:call(name(Type), ?FUNCTION_NAME)).
-define(OVERLOAD_MIN_ALLOC, 0.3). %% minimum coefficient for overloaded limiter -define(OVERLOAD_MIN_ALLOC, 0.3). %% minimum coefficient for overloaded limiter
-define(CURRYING(X, F2), fun(Y) -> F2(X, Y) end).
-export_type([index/0]). -export_type([index/0]).
-import(emqx_limiter_decimal, [add/2, sub/2, mul/2, put_to_counter/3]). -import(emqx_limiter_decimal, [add/2, sub/2, mul/2, put_to_counter/3]).
@ -95,33 +81,38 @@
%% API %% API
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec connect(limiter_type(), -spec connect(limiter_type(),
bucket_name() | #{limiter_type() => bucket_name()}) -> emqx_htb_limiter:limiter(). bucket_name() | #{limiter_type() => bucket_name() | undefined}) ->
emqx_htb_limiter:limiter().
%% If no bucket path is set in config, there will be no limit
connect(_Type, undefined) ->
emqx_htb_limiter:make_infinity_limiter();
connect(Type, BucketName) when is_atom(BucketName) -> connect(Type, BucketName) when is_atom(BucketName) ->
Path = [limiter, Type, bucket, BucketName], CfgPath = emqx_limiter_schema:get_bucket_cfg_path(Type, BucketName),
case emqx:get_config(Path, undefined) of case emqx:get_config(CfgPath, undefined) of
undefined -> undefined ->
?SLOG(error, #{msg => "bucket_config_not_found", path => Path}), ?SLOG(error, #{msg => "bucket_config_not_found", path => CfgPath}),
throw("bucket's config not found"); throw("bucket's config not found");
#{zone := Zone, #{rate := AggrRate,
aggregated := #{rate := AggrRate, capacity := AggrSize}, capacity := AggrSize,
per_client := #{rate := CliRate, capacity := CliSize} = Cfg} -> per_client := #{rate := CliRate, capacity := CliSize} = Cfg} ->
case emqx_limiter_manager:find_bucket(Type, Zone, BucketName) of case emqx_limiter_manager:find_bucket(Type, BucketName) of
{ok, Bucket} -> {ok, Bucket} ->
if CliRate < AggrRate orelse CliSize < AggrSize -> if CliRate < AggrRate orelse CliSize < AggrSize ->
emqx_htb_limiter:make_token_bucket_limiter(Cfg, Bucket); emqx_htb_limiter:make_token_bucket_limiter(Cfg, Bucket);
Bucket =:= infinity -> Bucket =:= infinity ->
emqx_htb_limiter:make_infinity_limiter(Cfg); emqx_htb_limiter:make_infinity_limiter();
true -> true ->
emqx_htb_limiter:make_ref_limiter(Cfg, Bucket) emqx_htb_limiter:make_ref_limiter(Cfg, Bucket)
end; end;
undefined -> undefined ->
?SLOG(error, #{msg => "bucket_not_found", path => Path}), ?SLOG(error, #{msg => "bucket_not_found", path => CfgPath}),
throw("invalid bucket") throw("invalid bucket")
end end
end; end;
connect(Type, Names) -> connect(Type, Paths) ->
connect(Type, maps:get(Type, Names, default)). connect(Type, maps:get(Type, Paths, undefined)).
-spec info(limiter_type()) -> state(). -spec info(limiter_type()) -> state().
info(Type) -> info(Type) ->
@ -131,6 +122,10 @@ info(Type) ->
name(Type) -> name(Type) ->
erlang:list_to_atom(io_lib:format("~s_~s", [?MODULE, Type])). erlang:list_to_atom(io_lib:format("~s_~s", [?MODULE, Type])).
-spec update_config(limiter_type()) -> ok.
update_config(Type) ->
?CALL(Type).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc %% @doc
%% Starts the server %% Starts the server
@ -140,6 +135,7 @@ name(Type) ->
start_link(Type) -> start_link(Type) ->
gen_server:start_link({local, name(Type)}, ?MODULE, [Type], []). gen_server:start_link({local, name(Type)}, ?MODULE, [Type], []).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%%% gen_server callbacks %%% gen_server callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -156,17 +152,10 @@ start_link(Type) ->
{stop, Reason :: term()} | {stop, Reason :: term()} |
ignore. ignore.
init([Type]) -> init([Type]) ->
State = #{root => undefined, State = init_tree(Type),
counter => undefined, #{root := #{period := Perido}} = State,
index => 1,
zones => #{},
nodes => #{},
buckets => [],
type => Type},
State2 = init_tree(Type, State),
#{root := #{period := Perido}} = State2,
oscillate(Perido), oscillate(Perido),
{ok, State2}. {ok, State}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @private %% @private
@ -186,6 +175,10 @@ init([Type]) ->
handle_call(info, _From, State) -> handle_call(info, _From, State) ->
{reply, State, State}; {reply, State, State};
handle_call(update_config, _From, #{type := Type}) ->
NewState = init_tree(Type),
{reply, ok, NewState};
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", call => Req}), ?SLOG(error, #{msg => "unexpected_call", call => Req}),
{reply, ignored, State}. {reply, ignored, State}.
@ -273,59 +266,38 @@ oscillate(Interval) ->
-spec oscillation(state()) -> state(). -spec oscillation(state()) -> state().
oscillation(#{root := #{rate := Flow, oscillation(#{root := #{rate := Flow,
period := Interval, period := Interval,
childs := ChildIds,
consumed := Consumed} = Root, consumed := Consumed} = Root,
nodes := Nodes} = State) -> buckets := Buckets} = State) ->
oscillate(Interval), oscillate(Interval),
Childs = get_ordered_childs(ChildIds, Nodes), Ordereds = get_ordered_buckets(Buckets),
{Alloced, Nodes2} = transverse(Childs, Flow, 0, Nodes), {Alloced, Buckets2} = transverse(Ordereds, Flow, 0, Buckets),
maybe_burst(State#{nodes := Nodes2, maybe_burst(State#{buckets := Buckets2,
root := Root#{consumed := Consumed + Alloced}}). root := Root#{consumed := Consumed + Alloced}}).
%% @doc horizontal spread %% @doc horizontal spread
-spec transverse(list(node_data()), -spec transverse(list(bucket()),
flow(), flow(),
non_neg_integer(), non_neg_integer(),
nodes()) -> {non_neg_integer(), nodes()}. buckets()) -> {non_neg_integer(), buckets()}.
transverse([H | T], InFlow, Alloced, Nodes) when InFlow > 0 -> transverse([H | T], InFlow, Alloced, Buckets) when InFlow > 0 ->
{NodeAlloced, Nodes2} = longitudinal(H, InFlow, Nodes), {BucketAlloced, Buckets2} = longitudinal(H, InFlow, Buckets),
InFlow2 = sub(InFlow, NodeAlloced), InFlow2 = sub(InFlow, BucketAlloced),
Alloced2 = Alloced + NodeAlloced, Alloced2 = Alloced + BucketAlloced,
transverse(T, InFlow2, Alloced2, Nodes2); transverse(T, InFlow2, Alloced2, Buckets2);
transverse(_, _, Alloced, Nodes) -> transverse(_, _, Alloced, Buckets) ->
{Alloced, Nodes}. {Alloced, Buckets}.
%% @doc vertical spread %% @doc vertical spread
-spec longitudinal(node_data(), flow(), nodes()) -> -spec longitudinal(bucket(), flow(), buckets()) ->
{non_neg_integer(), nodes()}. {non_neg_integer(), buckets()}.
longitudinal(#{id := Id, longitudinal(#{name := Name,
rate := Rate,
obtained := Obtained,
childs := ChildIds} = Node, InFlow, Nodes) ->
Flow = erlang:min(InFlow, Rate),
if Flow > 0 ->
Childs = get_ordered_childs(ChildIds, Nodes),
{Alloced, Nodes2} = transverse(Childs, Flow, 0, Nodes),
if Alloced > 0 ->
{Alloced,
Nodes2#{Id => Node#{obtained := Obtained + Alloced}}};
true ->
%% childs are empty or all counter childs are full
{0, Nodes2}
end;
true ->
{0, Nodes}
end;
longitudinal(#{id := Id,
rate := Rate, rate := Rate,
capacity := Capacity, capacity := Capacity,
counter := Counter, counter := Counter,
index := Index, index := Index,
obtained := Obtained} = Node, obtained := Obtained} = Bucket,
InFlow, Nodes) when Counter =/= undefined -> InFlow, Buckets) when Counter =/= undefined ->
Flow = erlang:min(InFlow, Rate), Flow = erlang:min(InFlow, Rate),
ShouldAlloc = ShouldAlloc =
@ -345,204 +317,157 @@ longitudinal(#{id := Id,
%% XXX if capacity is infinity, and flow always > 0, the value in %% XXX if capacity is infinity, and flow always > 0, the value in
%% counter will be overflow at some point in the future, do we need %% counter will be overflow at some point in the future, do we need
%% to deal with this situation??? %% to deal with this situation???
{Inc, Node2} = emqx_limiter_correction:add(Available, Node), {Inc, Bucket2} = emqx_limiter_correction:add(Available, Bucket),
counters:add(Counter, Index, Inc), counters:add(Counter, Index, Inc),
{Inc, {Inc,
Nodes#{Id := Node2#{obtained := Obtained + Inc}}}; Buckets#{Name := Bucket2#{obtained := Obtained + Inc}}};
_ -> _ ->
{0, Nodes} {0, Buckets}
end; end;
longitudinal(_, _, Nodes) -> longitudinal(_, _, Buckets) ->
{0, Nodes}. {0, Buckets}.
-spec get_ordered_childs(list(node_id()), nodes()) -> list(node_data()). -spec get_ordered_buckets(list(bucket()) | buckets()) -> list(bucket()).
get_ordered_childs(Ids, Nodes) -> get_ordered_buckets(Buckets) when is_map(Buckets) ->
Childs = [maps:get(Id, Nodes) || Id <- Ids], BucketList = maps:values(Buckets),
get_ordered_buckets(BucketList);
get_ordered_buckets(Buckets) ->
%% sort by obtained, avoid node goes hungry %% sort by obtained, avoid node goes hungry
lists:sort(fun(#{obtained := A}, #{obtained := B}) -> lists:sort(fun(#{obtained := A}, #{obtained := B}) ->
A < B A < B
end, end,
Childs). Buckets).
-spec maybe_burst(state()) -> state(). -spec maybe_burst(state()) -> state().
maybe_burst(#{buckets := Buckets, maybe_burst(#{buckets := Buckets,
zones := Zones, root := #{burst := Burst}} = State) when Burst > 0 ->
root := #{burst := Burst}, Fold = fun(_Name, #{counter := Cnt, index := Idx} = Bucket, Acc) when Cnt =/= undefined ->
nodes := Nodes} = State) when Burst > 0 -> case counters:get(Cnt, Idx) > 0 of
%% find empty buckets and group by zone name true ->
GroupFun = fun(Id, Groups) -> Acc;
#{counter := Counter, false ->
index := Index, [Bucket | Acc]
zone := Zone} = maps:get(Id, Nodes), end;
case counters:get(Counter, Index) of (_Name, _Bucket, Acc) ->
Any when Any =< 0 -> Acc
Group = maps:get(Zone, Groups, []), end,
maps:put(Zone, [Id | Group], Groups);
_ ->
Groups
end
end,
case lists:foldl(GroupFun, #{}, Buckets) of Empties = maps:fold(Fold, [], Buckets),
Groups when map_size(Groups) > 0 -> dispatch_burst(Empties, Burst, State);
%% remove the zone which don't support burst
Filter = fun({Name, Childs}, Acc) ->
ZoneId = maps:get(Name, Zones),
#{burst := ZoneBurst} = Zone = maps:get(ZoneId, Nodes),
case ZoneBurst > 0 of
true ->
[{Zone, Childs} | Acc];
_ ->
Acc
end
end,
FilterL = lists:foldl(Filter, [], maps:to_list(Groups)),
dispatch_burst(FilterL, State);
_ ->
State
end;
maybe_burst(State) -> maybe_burst(State) ->
State. State.
-spec dispatch_burst(list({zone(), list(node_id())}), state()) -> state(). -spec dispatch_burst(list(bucket()), non_neg_integer(), state()) -> state().
dispatch_burst([], State) -> dispatch_burst([], _, State) ->
State; State;
dispatch_burst(GroupL, dispatch_burst(Empties, InFlow,
#{root := #{burst := Burst}, #{root := #{consumed := Consumed} = Root, buckets := Buckets} = State) ->
nodes := Nodes} = State) -> EachFlow = InFlow / erlang:length(Empties),
InFlow = Burst / erlang:length(GroupL), {Alloced, Buckets2} = dispatch_burst_to_buckets(Empties, EachFlow, 0, Buckets),
Dispatch = fun({Zone, Childs}, NodeAcc) -> State#{root := Root#{consumed := Consumed + Alloced}, buckets := Buckets2}.
#{id := ZoneId,
burst := ZoneBurst,
obtained := Obtained} = Zone,
case erlang:min(InFlow, ZoneBurst) of -spec dispatch_burst_to_buckets(list(bucket()),
0 -> NodeAcc; float(),
ZoneFlow -> non_neg_integer(), buckets()) -> {non_neg_integer(), buckets()}.
EachFlow = ZoneFlow / erlang:length(Childs), dispatch_burst_to_buckets([Bucket | T], InFlow, Alloced, Buckets) ->
{Alloced, NodeAcc2} = dispatch_burst_to_buckets(Childs, EachFlow, 0, NodeAcc), #{name := Name,
Zone2 = Zone#{obtained := Obtained + Alloced}, counter := Counter,
NodeAcc2#{ZoneId := Zone2}
end
end,
State#{nodes := lists:foldl(Dispatch, Nodes, GroupL)}.
-spec dispatch_burst_to_buckets(list(node_id()),
float(), non_neg_integer(), nodes()) -> {non_neg_integer(), nodes()}.
dispatch_burst_to_buckets([ChildId | T], InFlow, Alloced, Nodes) ->
#{counter := Counter,
index := Index, index := Index,
obtained := Obtained} = Bucket = maps:get(ChildId, Nodes), obtained := Obtained} = Bucket,
{Inc, Bucket2} = emqx_limiter_correction:add(InFlow, Bucket), {Inc, Bucket2} = emqx_limiter_correction:add(InFlow, Bucket),
counters:add(Counter, Index, Inc), counters:add(Counter, Index, Inc),
Nodes2 = Nodes#{ChildId := Bucket2#{obtained := Obtained + Inc}}, Buckets2 = Buckets#{Name := Bucket2#{obtained := Obtained + Inc}},
dispatch_burst_to_buckets(T, InFlow, Alloced + Inc, Nodes2); dispatch_burst_to_buckets(T, InFlow, Alloced + Inc, Buckets2);
dispatch_burst_to_buckets([], _, Alloced, Nodes) -> dispatch_burst_to_buckets([], _, Alloced, Buckets) ->
{Alloced, Nodes}. {Alloced, Buckets}.
-spec init_tree(emqx_limiter_schema:limiter_type(), state()) -> state(). -spec init_tree(emqx_limiter_schema:limiter_type()) -> state().
init_tree(Type, State) -> init_tree(Type) ->
case emqx:get_config([limiter, Type]) of State = #{ type => Type
#{global := Global, , root => undefined
zone := Zone, , counter => undefined
bucket := Bucket} -> ok; , index => 1
#{bucket := Bucket} -> , buckets => #{}
Global = default_rate_burst_cfg(), },
Zone = #{default => default_rate_burst_cfg()},
ok
end,
{Factor, Root} = make_root(Global, Zone),
State2 = State#{root := Root},
{NodeId, State3} = make_zone(maps:to_list(Zone), Factor, 1, State2),
State4 = State3#{counter := counters:new(maps:size(Bucket),
[write_concurrency])},
make_bucket(maps:to_list(Bucket), Global, Zone, Factor, NodeId, [], State4).
-spec make_root(hocons:confg(), hocon:config()) -> {number(), root()}. #{bucket := Buckets} = Cfg = emqx:get_config([limiter, Type]),
make_root(#{rate := Rate, burst := Burst}, Zone) -> {Factor, Root} = make_root(Cfg),
ZoneNum = maps:size(Zone), {CounterNum, DelayBuckets} = make_bucket(maps:to_list(Buckets), Type, Cfg, Factor, 1, []),
Childs = lists:seq(1, ZoneNum),
State2 = State#{root := Root,
counter := counters:new(CounterNum, [write_concurrency])
},
lists:foldl(fun(F, Acc) -> F(Acc) end, State2, DelayBuckets).
-spec make_root(hocons:confg()) -> {number(), root()}.
make_root(#{rate := Rate, burst := Burst}) when Rate >= 1 ->
{1, #{rate => Rate,
burst => Burst,
period => emqx_limiter_schema:minimum_period(),
consumed => 0}};
make_root(#{rate := Rate, burst := Burst}) ->
MiniPeriod = emqx_limiter_schema:minimum_period(), MiniPeriod = emqx_limiter_schema:minimum_period(),
if Rate >= 1 -> Factor = 1 / Rate,
{1, #{rate => Rate, {Factor, #{rate => 1,
burst => Burst, burst => Burst * Factor,
period => MiniPeriod, period => erlang:floor(Factor * MiniPeriod),
childs => Childs, consumed => 0}}.
consumed => 0}};
true ->
Factor = 1 / Rate,
{Factor, #{rate => 1,
burst => Burst * Factor,
period => erlang:floor(Factor * MiniPeriod),
childs => Childs,
consumed => 0}}
end.
make_zone([{Name, ZoneCfg} | T], Factor, NodeId, State) -> make_bucket([{Name, Conf} | T], Type, GlobalCfg, Factor, CounterNum, DelayBuckets) ->
#{rate := Rate, burst := Burst} = ZoneCfg, Path = emqx_limiter_manager:make_path(Type, Name),
#{zones := Zones, nodes := Nodes} = State, case get_counter_rate(Conf, GlobalCfg) of
Zone = #{id => NodeId, infinity ->
name => Name, Rate = infinity,
rate => mul(Rate, Factor), Capacity = infinity,
burst => Burst, Ref = emqx_limiter_bucket_ref:new(undefined, undefined, Rate),
obtained => 0, emqx_limiter_manager:insert_bucket(Path, Ref),
childs => []}, CounterNum2 = CounterNum,
State2 = State#{zones := Zones#{Name => NodeId}, InitFun = fun(#{name := BucketName} = Bucket, #{buckets := Buckets} = State) ->
nodes := Nodes#{NodeId => Zone}}, State#{buckets := Buckets#{BucketName => Bucket}}
make_zone(T, Factor, NodeId + 1, State2); end;
make_zone([], _, NodeId, State2) ->
{NodeId, State2}.
make_bucket([{Name, Conf} | T], Global, Zone, Factor, Id, Buckets, #{type := Type} = State) ->
#{zone := ZoneName,
aggregated := Aggregated} = Conf,
Path = emqx_limiter_manager:make_path(Type, ZoneName, Name),
case get_counter_rate(Conf, Zone, Global) of
infinity ->
State2 = State,
Rate = infinity,
Capacity = infinity,
Counter = undefined,
Index = undefined,
Ref = emqx_limiter_bucket_ref:new(Counter, Index, Rate),
emqx_limiter_manager:insert_bucket(Path, Ref);
RawRate -> RawRate ->
#{capacity := Capacity} = Aggregated, #{capacity := Capacity} = Conf,
Initial = get_initial_val(Aggregated), Initial = get_initial_val(Conf),
{Counter, Index, State2} = alloc_counter(Path, RawRate, Initial, State), Rate = mul(RawRate, Factor),
Rate = mul(RawRate, Factor) CounterNum2 = CounterNum + 1,
InitFun = fun(#{name := BucketName} = Bucket, #{buckets := Buckets} = State) ->
{Counter, Idx, State2} = alloc_counter(Path, RawRate, Initial, State),
Bucket2 = Bucket#{counter := Counter, index := Idx},
State2#{buckets := Buckets#{BucketName => Bucket2}}
end
end, end,
Node = #{ id => Id Bucket = #{ name => Name
, name => Name , rate => Rate
, zone => ZoneName , obtained => 0
, rate => Rate , correction => 0
, obtained => 0 , capacity => Capacity
, correction => 0 , counter => undefined
, capacity => Capacity , index => undefined},
, counter => Counter
, index => Index},
State3 = add_zone_child(Id, Node, ZoneName, State2), DelayInit = ?CURRYING(Bucket, InitFun),
make_bucket(T, Global, Zone, Factor, Id + 1, [Id | Buckets], State3);
make_bucket([], _, _, _, _, Buckets, State) -> make_bucket(T,
State#{buckets := Buckets}. Type, GlobalCfg, Factor, CounterNum2, [DelayInit | DelayBuckets]);
make_bucket([], _Type, _Global, _Factor, CounterNum, DelayBuckets) ->
{CounterNum, DelayBuckets}.
-spec alloc_counter(emqx_limiter_manager:path(), rate(), capacity(), state()) -> -spec alloc_counter(emqx_limiter_manager:path(), rate(), capacity(), state()) ->
{counters:counters_ref(), pos_integer(), state()}. {counters:counters_ref(), pos_integer(), state()}.
alloc_counter(Path, Rate, Initial, alloc_counter(Path, Rate, Initial,
#{counter := Counter, index := Index} = State) -> #{counter := Counter, index := Index} = State) ->
case emqx_limiter_manager:find_bucket(Path) of case emqx_limiter_manager:find_bucket(Path) of
{ok, #{counter := ECounter, {ok, #{counter := ECounter,
index := EIndex}} when ECounter =/= undefined -> index := EIndex}} when ECounter =/= undefined ->
@ -558,33 +483,16 @@ init_counter(Path, Counter, Index, Rate, Initial, State) ->
emqx_limiter_manager:insert_bucket(Path, Ref), emqx_limiter_manager:insert_bucket(Path, Ref),
{Counter, Index, State}. {Counter, Index, State}.
-spec add_zone_child(node_id(), bucket(), zone_name(), state()) -> state().
add_zone_child(NodeId, Bucket, Name, #{zones := Zones, nodes := Nodes} = State) ->
ZoneId = maps:get(Name, Zones),
#{childs := Childs} = Zone = maps:get(ZoneId, Nodes),
Nodes2 = Nodes#{ZoneId => Zone#{childs := [NodeId | Childs]},
NodeId => Bucket},
State#{nodes := Nodes2}.
%% @doc find first limited node %% @doc find first limited node
get_counter_rate(#{zone := ZoneName, get_counter_rate(#{rate := Rate, capacity := Capacity}, _GlobalCfg)
aggregated := Cfg}, ZoneCfg, Global) -> when Rate =/= infinity orelse Capacity =/= infinity -> %% TODO maybe no need to check capacity
Zone = maps:get(ZoneName, ZoneCfg), Rate;
Search = lists:search(fun(E) -> is_limited(E) end,
[Cfg, Zone, Global]),
case Search of
{value, #{rate := Rate}} ->
Rate;
false ->
infinity
end.
is_limited(#{rate := Rate, capacity := Capacity}) -> get_counter_rate(_Cfg, #{rate := Rate}) ->
Rate =/= infinity orelse Capacity =/= infinity; Rate.
is_limited(#{rate := Rate}) ->
Rate =/= infinity.
-spec get_initial_val(hocons:config()) -> decimal().
get_initial_val(#{initial := Initial, get_initial_val(#{initial := Initial,
rate := Rate, rate := Rate,
capacity := Capacity}) -> capacity := Capacity}) ->
@ -598,6 +506,3 @@ get_initial_val(#{initial := Initial,
true -> true ->
0 0
end. end.
default_rate_burst_cfg() ->
#{rate => infinity, burst => 0}.

View File

@ -46,6 +46,7 @@ start(Type) ->
Spec = make_child(Type), Spec = make_child(Type),
supervisor:start_child(?MODULE, Spec). supervisor:start_child(?MODULE, Spec).
%% XXX This is maybe a workaround, not so good
-spec restart(emqx_limiter_schema:limiter_type()) -> _. -spec restart(emqx_limiter_schema:limiter_type()) -> _.
restart(Type) -> restart(Type) ->
Id = emqx_limiter_server:name(Type), Id = emqx_limiter_server:name(Type),

View File

@ -1197,7 +1197,7 @@ base_listener() ->
#{ default => 'default' #{ default => 'default'
})} })}
, {"limiter", , {"limiter",
sc(map("ratelimit bucket's name", atom()), #{default => #{}})} sc(map("ratelimit's type", emqx_limiter_schema:bucket_name()), #{default => #{}})}
]. ].
%% utils %% utils

View File

@ -113,57 +113,32 @@ listener_mqtt_ws_conf() ->
listeners_conf() -> listeners_conf() ->
#{tcp => #{default => listener_mqtt_tcp_conf()}, #{tcp => #{default => listener_mqtt_tcp_conf()},
ws => #{default => listener_mqtt_ws_conf()} ws => #{default => listener_mqtt_ws_conf()}
}. }.
limiter_conf() -> limiter_conf() ->
#{bytes_in => Make = fun() ->
#{bucket => #{bucket =>
#{default => #{default =>
#{aggregated => #{capacity => infinity,
#{capacity => infinity,initial => 0,rate => infinity}, initial => 0,
per_client => rate => infinity,
#{capacity => infinity,divisible => false, per_client =>
failure_strategy => force,initial => 0,low_water_mark => 0, #{capacity => infinity,divisible => false,
max_retry_time => 5000,rate => infinity}, failure_strategy => force,initial => 0,low_water_mark => 0,
zone => default}}, max_retry_time => 5000,rate => infinity
global => #{burst => 0,rate => infinity}, }
zone => #{default => #{burst => 0,rate => infinity}}}, }
connection => },
#{bucket => burst => 0,
#{default => rate => infinity
#{aggregated => }
#{capacity => infinity,initial => 0,rate => infinity}, end,
per_client =>
#{capacity => infinity,divisible => false, lists:foldl(fun(Name, Acc) ->
failure_strategy => force,initial => 0,low_water_mark => 0, Acc#{Name => Make()}
max_retry_time => 5000,rate => infinity}, end,
zone => default}}, #{},
global => #{burst => 0,rate => infinity}, [bytes_in, message_in, message_routing, connection, batch]).
zone => #{default => #{burst => 0,rate => infinity}}},
message_in =>
#{bucket =>
#{default =>
#{aggregated =>
#{capacity => infinity,initial => 0,rate => infinity},
per_client =>
#{capacity => infinity,divisible => false,
failure_strategy => force,initial => 0,low_water_mark => 0,
max_retry_time => 5000,rate => infinity},
zone => default}},
global => #{burst => 0,rate => infinity},
zone => #{default => #{burst => 0,rate => infinity}}},
message_routing =>
#{bucket =>
#{default =>
#{aggregated =>
#{capacity => infinity,initial => 0,rate => infinity},
per_client =>
#{capacity => infinity,divisible => false,
failure_strategy => force,initial => 0,low_water_mark => 0,
max_retry_time => 5000,rate => infinity},
zone => default}},
global => #{burst => 0,rate => infinity},
zone => #{default => #{burst => 0,rate => infinity}}}}.
stats_conf() -> stats_conf() ->
#{enable => true}. #{enable => true}.
@ -232,7 +207,7 @@ end_per_suite(_Config) ->
init_per_testcase(TestCase, Config) -> init_per_testcase(TestCase, Config) ->
OldConf = set_test_listener_confs(), OldConf = set_test_listener_confs(),
emqx_common_test_helpers:start_apps([]), emqx_common_test_helpers:start_apps([]),
modify_limiter(TestCase, OldConf), check_modify_limiter(TestCase),
[{config, OldConf}|Config]. [{config, OldConf}|Config].
end_per_testcase(_TestCase, Config) -> end_per_testcase(_TestCase, Config) ->
@ -240,18 +215,19 @@ end_per_testcase(_TestCase, Config) ->
emqx_common_test_helpers:stop_apps([]), emqx_common_test_helpers:stop_apps([]),
Config. Config.
modify_limiter(TestCase, NewConf) -> check_modify_limiter(TestCase) ->
Checks = [t_quota_qos0, t_quota_qos1, t_quota_qos2], Checks = [t_quota_qos0, t_quota_qos1, t_quota_qos2],
case lists:member(TestCase, Checks) of case lists:member(TestCase, Checks) of
true -> true ->
modify_limiter(NewConf); modify_limiter();
_ -> _ ->
ok ok
end. end.
%% per_client 5/1s,5 %% per_client 5/1s,5
%% aggregated 10/1s,10 %% aggregated 10/1s,10
modify_limiter(#{limiter := Limiter} = NewConf) -> modify_limiter() ->
Limiter = emqx_config:get([limiter]),
#{message_routing := #{bucket := Bucket} = Routing} = Limiter, #{message_routing := #{bucket := Bucket} = Routing} = Limiter,
#{default := #{per_client := Client} = Default} = Bucket, #{default := #{per_client := Client} = Default} = Bucket,
Client2 = Client#{rate := 5, Client2 = Client#{rate := 5,
@ -259,16 +235,15 @@ modify_limiter(#{limiter := Limiter} = NewConf) ->
capacity := 5, capacity := 5,
low_water_mark := 1}, low_water_mark := 1},
Default2 = Default#{per_client := Client2, Default2 = Default#{per_client := Client2,
aggregated := #{rate => 10, rate => 10,
initial => 0, initial => 0,
capacity => 10 capacity => 10},
}},
Bucket2 = Bucket#{default := Default2}, Bucket2 = Bucket#{default := Default2},
Routing2 = Routing#{bucket := Bucket2}, Routing2 = Routing#{bucket := Bucket2},
NewConf2 = NewConf#{limiter := Limiter#{message_routing := Routing2}}, emqx_config:put([limiter], Limiter#{message_routing := Routing2}),
emqx_config:put(NewConf2),
emqx_limiter_manager:restart_server(message_routing), emqx_limiter_manager:restart_server(message_routing),
timer:sleep(100),
ok. ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -1078,4 +1053,4 @@ session(InitFields) when is_map(InitFields) ->
quota() -> quota() ->
emqx_limiter_container:get_limiter_by_names([message_routing], limiter_cfg()). emqx_limiter_container:get_limiter_by_names([message_routing], limiter_cfg()).
limiter_cfg() -> #{}. limiter_cfg() -> #{message_routing => default}.

View File

@ -27,59 +27,37 @@
-define(BASE_CONF, <<""" -define(BASE_CONF, <<"""
limiter { limiter {
bytes_in { bytes_in {
global.rate = infinity
zone.default.rate = infinity
bucket.default { bucket.default {
zone = default rate = infinity
aggregated.rate = infinity capacity = infinity
aggregated.capacity = infinity
per_client.rate = \"100MB/1s\"
per_client.capacity = infinity
} }
} }
message_in { message_in {
global.rate = infinity
zone.default.rate = infinity
bucket.default { bucket.default {
zone = default rate = infinity
aggregated.rate = infinity capacity = infinity
aggregated.capacity = infinity
per_client.rate = infinity
per_client.capacity = infinity
} }
} }
connection { connection {
global.rate = infinity
zone.default.rate = infinity
bucket.default { bucket.default {
zone = default rate = infinity
aggregated.rate = infinity capacity = infinity
aggregated.capacity = infinity
per_client.rate = infinity
per_client.capacity = infinity
} }
} }
message_routing { message_routing {
global.rate = infinity
zone.default.rate = infinity
bucket.default { bucket.default {
zone = default rate = infinity
aggregated.rate = infinity capacity = infinity
aggregated.capacity = infinity
per_client.rate = infinity
per_client.capacity = infinity
} }
} }
shared { batch {
bucket.retainer { bucket.retainer {
aggregated.rate = infinity rate = infinity
aggregated.capacity = infinity capacity = infinity
per_client.rate = infinity
per_client.capacity = infinity
} }
} }
} }
@ -97,6 +75,7 @@ limiter {
-define(LOGT(Format, Args), ct:pal("TEST_SUITE: " ++ Format, Args)). -define(LOGT(Format, Args), ct:pal("TEST_SUITE: " ++ Format, Args)).
-define(RATE(Rate), to_rate(Rate)). -define(RATE(Rate), to_rate(Rate)).
-define(NOW, erlang:system_time(millisecond)). -define(NOW, erlang:system_time(millisecond)).
-define(CONST(X), fun(_) -> X end).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Setups %% Setups
@ -231,12 +210,11 @@ t_low_water_mark(_) ->
with_per_client(default, Cfg, Case). with_per_client(default, Cfg, Case).
t_infinity_client(_) -> t_infinity_client(_) ->
Fun = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) -> Fun = fun(#{per_client := Cli} = Bucket) ->
Aggr2 = Aggr#{rate := infinity, Bucket2 = Bucket#{rate := infinity,
capacity := infinity}, capacity := infinity},
Cli2 = Cli#{rate := infinity, capacity := infinity}, Cli2 = Cli#{rate := infinity, capacity := infinity},
Bucket#{aggregated := Aggr2, Bucket2#{per_client := Cli2}
per_client := Cli2}
end, end,
Case = fun() -> Case = fun() ->
Client = connect(default), Client = connect(default),
@ -247,14 +225,13 @@ t_infinity_client(_) ->
with_bucket(default, Fun, Case). with_bucket(default, Fun, Case).
t_try_restore_agg(_) -> t_try_restore_agg(_) ->
Fun = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) -> Fun = fun(#{per_client := Cli} = Bucket) ->
Aggr2 = Aggr#{rate := 1, Bucket2 = Bucket#{rate := 1,
capacity := 200, capacity := 200,
initial := 50}, initial := 50},
Cli2 = Cli#{rate := infinity, capacity := infinity, divisible := true, Cli2 = Cli#{rate := infinity, capacity := infinity, divisible := true,
max_retry_time := 100, failure_strategy := force}, max_retry_time := 100, failure_strategy := force},
Bucket#{aggregated := Aggr2, Bucket2#{per_client := Cli2}
per_client := Cli2}
end, end,
Case = fun() -> Case = fun() ->
Client = connect(default), Client = connect(default),
@ -267,15 +244,14 @@ t_try_restore_agg(_) ->
with_bucket(default, Fun, Case). with_bucket(default, Fun, Case).
t_short_board(_) -> t_short_board(_) ->
Fun = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) -> Fun = fun(#{per_client := Cli} = Bucket) ->
Aggr2 = Aggr#{rate := ?RATE("100/1s"), Bucket2 = Bucket#{rate := ?RATE("100/1s"),
initial := 0, initial := 0,
capacity := 100}, capacity := 100},
Cli2 = Cli#{rate := ?RATE("600/1s"), Cli2 = Cli#{rate := ?RATE("600/1s"),
capacity := 600, capacity := 600,
initial := 600}, initial := 600},
Bucket#{aggregated := Aggr2, Bucket2#{per_client := Cli2}
per_client := Cli2}
end, end,
Case = fun() -> Case = fun() ->
Counter = counters:new(1, []), Counter = counters:new(1, []),
@ -286,15 +262,14 @@ t_short_board(_) ->
with_bucket(default, Fun, Case). with_bucket(default, Fun, Case).
t_rate(_) -> t_rate(_) ->
Fun = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) -> Fun = fun(#{per_client := Cli} = Bucket) ->
Aggr2 = Aggr#{rate := ?RATE("100/100ms"), Bucket2 = Bucket#{rate := ?RATE("100/100ms"),
initial := 0, initial := 0,
capacity := infinity}, capacity := infinity},
Cli2 = Cli#{rate := infinity, Cli2 = Cli#{rate := infinity,
capacity := infinity, capacity := infinity,
initial := 0}, initial := 0},
Bucket#{aggregated := Aggr2, Bucket2#{per_client := Cli2}
per_client := Cli2}
end, end,
Case = fun() -> Case = fun() ->
Client = connect(default), Client = connect(default),
@ -311,113 +286,74 @@ t_rate(_) ->
t_capacity(_) -> t_capacity(_) ->
Capacity = 600, Capacity = 600,
Fun = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) -> Fun = fun(#{per_client := Cli} = Bucket) ->
Aggr2 = Aggr#{rate := ?RATE("100/100ms"), Bucket2 = Bucket#{rate := ?RATE("100/100ms"),
initial := 0, initial := 0,
capacity := 600}, capacity := 600},
Cli2 = Cli#{rate := infinity, Cli2 = Cli#{rate := infinity,
capacity := infinity, capacity := infinity,
initial := 0}, initial := 0},
Bucket#{aggregated := Aggr2, Bucket2#{per_client := Cli2}
per_client := Cli2}
end, end,
Case = fun() -> Case = fun() ->
Client = connect(default), Client = connect(default),
timer:sleep(1000), timer:sleep(1000),
C1 = emqx_htb_limiter:available(Client), C1 = emqx_htb_limiter:available(Client),
?assertEqual(Capacity, C1, "test bucket capacity") ?assertEqual(Capacity, C1, "test bucket capacity")
end, end,
with_bucket(default, Fun, Case). with_bucket(default, Fun, Case).
%%--------------------------------------------------------------------
%% Test Cases Zone Level
%%--------------------------------------------------------------------
t_limit_zone_with_unlimit_bucket(_) ->
ZoneMod = fun(Cfg) ->
Cfg#{rate := ?RATE("600/1s"),
burst := ?RATE("60/1s")}
end,
Bucket = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) ->
Aggr2 = Aggr#{rate := infinity,
initial := 0,
capacity := infinity},
Cli2 = Cli#{rate := infinity,
initial := 0,
capacity := infinity,
divisible := true},
Bucket#{aggregated := Aggr2, per_client := Cli2}
end,
Case = fun() ->
C1 = counters:new(1, []),
start_client(b1, ?NOW + 2000, C1, 20),
timer:sleep(2100),
check_average_rate(C1, 2, 600)
end,
with_zone(default, ZoneMod, [{b1, Bucket}], Case).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Test Cases Global Level %% Test Cases Global Level
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
t_burst_and_fairness(_) -> t_collaborative_alloc(_) ->
GlobalMod = fun(Cfg) -> GlobalMod = fun(Cfg) ->
Cfg#{burst := ?RATE("60/1s")} Cfg#{rate := ?RATE("600/1s")}
end, end,
ZoneMod = fun(Cfg) -> Bucket1 = fun(#{per_client := Cli} = Bucket) ->
Cfg#{rate := ?RATE("600/1s"), Bucket2 = Bucket#{rate := ?RATE("400/1s"),
burst := ?RATE("60/1s")} initial := 0,
end, capacity := 600},
Cli2 = Cli#{rate := ?RATE("50"),
Bucket = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) -> capacity := 100,
Aggr2 = Aggr#{rate := ?RATE("500/1s"), initial := 100},
initial := 0, Bucket2#{per_client := Cli2}
capacity := 500},
Cli2 = Cli#{rate := ?RATE("600/1s"),
capacity := 600,
initial := 600},
Bucket#{aggregated := Aggr2,
per_client := Cli2}
end, end,
Bucket2 = fun(Bucket) ->
Bucket2 = Bucket1(Bucket),
Bucket2#{rate := ?RATE("200/1s")}
end,
Case = fun() -> Case = fun() ->
C1 = counters:new(1, []), C1 = counters:new(1, []),
C2 = counters:new(1, []), C2 = counters:new(1, []),
start_client(b1, ?NOW + 2000, C1, 20), start_client(b1, ?NOW + 2000, C1, 20),
start_client(b2, ?NOW + 2000, C2, 30), start_client(b2, ?NOW + 2000, C2, 30),
timer:sleep(2100), timer:sleep(2100),
check_average_rate(C1, 2, 330), check_average_rate(C1, 2, 300),
check_average_rate(C2, 2, 330) check_average_rate(C2, 2, 300)
end, end,
with_global(GlobalMod, with_global(GlobalMod,
default, [{b1, Bucket1}, {b2, Bucket2}],
ZoneMod,
[{b1, Bucket}, {b2, Bucket}],
Case). Case).
t_burst(_) -> t_burst(_) ->
GlobalMod = fun(Cfg) -> GlobalMod = fun(Cfg) ->
Cfg#{burst := ?RATE("60/1s")} Cfg#{rate := ?RATE("200/1s"),
burst := ?RATE("400/1s")}
end, end,
ZoneMod = fun(Cfg) -> Bucket = fun(#{per_client := Cli} = Bucket) ->
Cfg#{rate := ?RATE("60/1s"), Bucket2 = Bucket#{rate := ?RATE("200/1s"),
burst := ?RATE("60/1s")} initial := 0,
end, capacity := 200},
Cli2 = Cli#{rate := ?RATE("50/1s"),
Bucket = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) -> capacity := 200,
Aggr2 = Aggr#{rate := ?RATE("500/1s"),
initial := 0,
capacity := 500},
Cli2 = Cli#{rate := ?RATE("600/1s"),
capacity := 600,
divisible := true}, divisible := true},
Bucket#{aggregated := Aggr2, Bucket2#{per_client := Cli2}
per_client := Cli2}
end, end,
Case = fun() -> Case = fun() ->
@ -430,180 +366,39 @@ t_burst(_) ->
timer:sleep(2100), timer:sleep(2100),
Total = lists:sum([counters:get(X, 1) || X <- [C1, C2, C3]]), Total = lists:sum([counters:get(X, 1) || X <- [C1, C2, C3]]),
in_range(Total / 2, 30) in_range(Total / 2, 300)
end, end,
with_global(GlobalMod, with_global(GlobalMod,
default,
ZoneMod,
[{b1, Bucket}, {b2, Bucket}, {b3, Bucket}], [{b1, Bucket}, {b2, Bucket}, {b3, Bucket}],
Case). Case).
t_limit_global_with_unlimit_other(_) -> t_limit_global_with_unlimit_other(_) ->
GlobalMod = fun(Cfg) -> GlobalMod = fun(Cfg) ->
Cfg#{rate := ?RATE("600/1s")} Cfg#{rate := ?RATE("600/1s")}
end, end,
ZoneMod = fun(Cfg) -> Cfg#{rate := infinity} end, Bucket = fun(#{per_client := Cli} = Bucket) ->
Bucket2 = Bucket#{rate := infinity,
Bucket = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) -> initial := 0,
Aggr2 = Aggr#{rate := infinity, capacity := infinity},
initial := 0, Cli2 = Cli#{rate := infinity,
capacity := infinity}, capacity := infinity,
Cli2 = Cli#{rate := infinity, initial := 0},
capacity := infinity, Bucket2#{per_client := Cli2}
initial := 0},
Bucket#{aggregated := Aggr2,
per_client := Cli2}
end, end,
Case = fun() -> Case = fun() ->
C1 = counters:new(1, []), C1 = counters:new(1, []),
start_client(b1, ?NOW + 2000, C1, 20), start_client(b1, ?NOW + 2000, C1, 20),
timer:sleep(2100), timer:sleep(2100),
check_average_rate(C1, 2, 600) check_average_rate(C1, 2, 600)
end, end,
with_global(GlobalMod, with_global(GlobalMod,
default,
ZoneMod,
[{b1, Bucket}], [{b1, Bucket}],
Case). Case).
t_multi_zones(_) ->
GlobalMod = fun(Cfg) ->
Cfg#{rate := ?RATE("600/1s")}
end,
Zone1 = fun(Cfg) ->
Cfg#{rate := ?RATE("400/1s")}
end,
Zone2 = fun(Cfg) ->
Cfg#{rate := ?RATE("500/1s")}
end,
Bucket = fun(Zone, Rate) ->
fun(#{aggregated := Aggr, per_client := Cli} = Bucket) ->
Aggr2 = Aggr#{rate := infinity,
initial := 0,
capacity := infinity},
Cli2 = Cli#{rate := Rate,
capacity := infinity,
initial := 0},
Bucket#{aggregated := Aggr2,
per_client := Cli2,
zone := Zone}
end
end,
Case = fun() ->
C1 = counters:new(1, []),
C2 = counters:new(1, []),
start_client(b1, ?NOW + 2000, C1, 25),
start_client(b2, ?NOW + 2000, C2, 20),
timer:sleep(2100),
check_average_rate(C1, 2, 300),
check_average_rate(C2, 2, 300)
end,
with_global(GlobalMod,
[z1, z2],
[Zone1, Zone2],
[{b1, Bucket(z1, ?RATE("400/1s"))}, {b2, Bucket(z2, ?RATE("500/1s"))}],
Case).
%% because the simulated client will try to reach the maximum rate
%% when divisiable = true, a large number of divided tokens will be generated
%% so this is not an accurate test
t_multi_zones_with_divisible(_) ->
GlobalMod = fun(Cfg) ->
Cfg#{rate := ?RATE("600/1s")}
end,
Zone1 = fun(Cfg) ->
Cfg#{rate := ?RATE("400/1s")}
end,
Zone2 = fun(Cfg) ->
Cfg#{rate := ?RATE("500/1s")}
end,
Bucket = fun(Zone, Rate) ->
fun(#{aggregated := Aggr, per_client := Cli} = Bucket) ->
Aggr2 = Aggr#{rate := Rate,
initial := 0,
capacity := infinity},
Cli2 = Cli#{rate := Rate,
divisible := true,
capacity := infinity,
initial := 0},
Bucket#{aggregated := Aggr2,
per_client := Cli2,
zone := Zone}
end
end,
Case = fun() ->
C1 = counters:new(1, []),
C2 = counters:new(1, []),
start_client(b1, ?NOW + 2000, C1, 25),
start_client(b2, ?NOW + 2000, C2, 20),
timer:sleep(2100),
check_average_rate(C1, 2, 300),
check_average_rate(C2, 2, 300)
end,
with_global(GlobalMod,
[z1, z2],
[Zone1, Zone2],
[{b1, Bucket(z1, ?RATE("400/1s"))}, {b2, Bucket(z2, ?RATE("500/1s"))}],
Case).
t_zone_hunger_and_fair(_) ->
GlobalMod = fun(Cfg) ->
Cfg#{rate := ?RATE("600/1s")}
end,
Zone1 = fun(Cfg) ->
Cfg#{rate := ?RATE("600/1s")}
end,
Zone2 = fun(Cfg) ->
Cfg#{rate := ?RATE("50/1s")}
end,
Bucket = fun(Zone, Rate) ->
fun(#{aggregated := Aggr, per_client := Cli} = Bucket) ->
Aggr2 = Aggr#{rate := infinity,
initial := 0,
capacity := infinity},
Cli2 = Cli#{rate := Rate,
capacity := infinity,
initial := 0},
Bucket#{aggregated := Aggr2,
per_client := Cli2,
zone := Zone}
end
end,
Case = fun() ->
C1 = counters:new(1, []),
C2 = counters:new(1, []),
start_client(b1, ?NOW + 2000, C1, 20),
start_client(b2, ?NOW + 2000, C2, 20),
timer:sleep(2100),
check_average_rate(C1, 2, 550),
check_average_rate(C2, 2, 50)
end,
with_global(GlobalMod,
[z1, z2],
[Zone1, Zone2],
[{b1, Bucket(z1, ?RATE("600/1s"))}, {b2, Bucket(z2, ?RATE("50/1s"))}],
Case).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Test Cases container %% Test Cases container
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -626,7 +421,8 @@ t_check_container(_) ->
capacity := 1000} capacity := 1000}
end, end,
Case = fun() -> Case = fun() ->
C1 = emqx_limiter_container:new([message_routing]), C1 = emqx_limiter_container:new([message_routing],
#{message_routing => default}),
{ok, C2} = emqx_limiter_container:check(1000, message_routing, C1), {ok, C2} = emqx_limiter_container:check(1000, message_routing, C1),
{pause, Pause, C3} = emqx_limiter_container:check(1000, message_routing, C2), {pause, Pause, C3} = emqx_limiter_container:check(1000, message_routing, C2),
timer:sleep(Pause), timer:sleep(Pause),
@ -663,9 +459,7 @@ t_limiter_server(_) ->
?assertMatch(#{root := _, ?assertMatch(#{root := _,
counter := _, counter := _,
index := _, index := _,
zones := _,
buckets := _, buckets := _,
nodes := _,
type := message_routing}, State), type := message_routing}, State),
Name = emqx_limiter_server:name(message_routing), Name = emqx_limiter_server:name(message_routing),
@ -675,6 +469,32 @@ t_limiter_server(_) ->
ok = emqx_limiter_server:format_status(normal, ok), ok = emqx_limiter_server:format_status(normal, ok),
ok. ok.
t_decimal(_) ->
?assertEqual(infinity, emqx_limiter_decimal:add(infinity, 3)),
?assertEqual(5, emqx_limiter_decimal:add(2, 3)),
?assertEqual(infinity, emqx_limiter_decimal:sub(infinity, 3)),
?assertEqual(-1, emqx_limiter_decimal:sub(2, 3)),
?assertEqual(infinity, emqx_limiter_decimal:mul(infinity, 3)),
?assertEqual(6, emqx_limiter_decimal:mul(2, 3)),
?assertEqual(infinity, emqx_limiter_decimal:floor_div(infinity, 3)),
?assertEqual(2, emqx_limiter_decimal:floor_div(7, 3)),
ok.
t_schema_unit(_) ->
M = emqx_limiter_schema,
?assertEqual(limiter, M:namespace()),
?assertEqual({ok, infinity}, M:to_rate(" infinity ")),
?assertMatch({ok, _}, M:to_rate("100")),
?assertMatch({error, _}, M:to_rate("0")),
?assertMatch({ok, _}, M:to_rate("100/10s")),
?assertMatch({error, _}, M:to_rate("100/10x")),
?assertEqual({ok, infinity}, M:to_capacity("infinity")),
?assertEqual({ok, 100}, M:to_capacity("100")),
?assertEqual({ok, 100 * 1024}, M:to_capacity("100KB")),
?assertEqual({ok, 100 * 1024 * 1024}, M:to_capacity("100MB")),
?assertEqual({ok, 100 * 1024 * 1024 * 1024}, M:to_capacity("100GB")),
ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%%% Internal functions %%% Internal functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -752,7 +572,6 @@ client_try_check(Need, #client{counter = Counter,
end end
end. end.
%% XXX not a god test, because client's rate maybe bigger than global rate %% XXX not a god test, because client's rate maybe bigger than global rate
%% so if client' rate = infinity %% so if client' rate = infinity
%% client's divisible should be true or capacity must be bigger than number of each consume %% client's divisible should be true or capacity must be bigger than number of each consume
@ -769,25 +588,17 @@ to_rate(Str) ->
{ok, Rate} = emqx_limiter_schema:to_rate(Str), {ok, Rate} = emqx_limiter_schema:to_rate(Str),
Rate. Rate.
with_global(Modifier, ZoneName, ZoneModifier, Buckets, Case) -> with_global(Modifier, BuckeTemps, Case) ->
Path = [limiter, message_routing], Fun = fun(Cfg) ->
#{global := Global} = Cfg = emqx_config:get(Path), #{bucket := #{default := BucketCfg}} = Cfg2 = Modifier(Cfg),
Cfg2 = Cfg#{global := Modifier(Global)}, Fun = fun({Name, BMod}, Acc) ->
with_zone(Cfg2, ZoneName, ZoneModifier, Buckets, Case). Acc#{Name => BMod(BucketCfg)}
end,
Buckets = lists:foldl(Fun, #{}, BuckeTemps),
Cfg2#{bucket := Buckets}
end,
with_zone(Name, Modifier, Buckets, Case) -> with_config([limiter, message_routing], Fun, Case).
Path = [limiter, message_routing],
Cfg = emqx_config:get(Path),
with_zone(Cfg, Name, Modifier, Buckets, Case).
with_zone(Cfg, Name, Modifier, Buckets, Case) ->
Path = [limiter, message_routing],
#{zone := ZoneCfgs,
bucket := BucketCfgs} = Cfg,
ZoneCfgs2 = apply_modifier(Name, Modifier, ZoneCfgs),
BucketCfgs2 = apply_modifier(Buckets, BucketCfgs),
Cfg2 = Cfg#{zone := ZoneCfgs2, bucket := BucketCfgs2},
with_config(Path, fun(_) -> Cfg2 end, Case).
with_bucket(Bucket, Modifier, Case) -> with_bucket(Bucket, Modifier, Case) ->
Path = [limiter, message_routing, bucket, Bucket], Path = [limiter, message_routing, bucket, Bucket],
@ -802,8 +613,8 @@ with_config(Path, Modifier, Case) ->
NewCfg = Modifier(Cfg), NewCfg = Modifier(Cfg),
ct:pal("test with config:~p~n", [NewCfg]), ct:pal("test with config:~p~n", [NewCfg]),
emqx_config:put(Path, NewCfg), emqx_config:put(Path, NewCfg),
emqx_limiter_manager:restart_server(message_routing), emqx_limiter_server:update_config(message_routing),
timer:sleep(100), timer:sleep(500),
DelayReturn = delay_return(Case), DelayReturn = delay_return(Case),
emqx_config:put(Path, Cfg), emqx_config:put(Path, Cfg),
DelayReturn(). DelayReturn().

View File

@ -405,16 +405,30 @@ t_handle_timeout_emit_stats(_) ->
?assertEqual(undefined, ?ws_conn:info(stats_timer, St)). ?assertEqual(undefined, ?ws_conn:info(stats_timer, St)).
t_ensure_rate_limit(_) -> t_ensure_rate_limit(_) ->
%% XXX In the future, limiter should provide API for config update
Path = [limiter, bytes_in, bucket, default, per_client],
PerClient = emqx_config:get(Path),
{ok, Rate}= emqx_limiter_schema:to_rate("50MB"),
emqx_config:put(Path, PerClient#{rate := Rate}),
emqx_limiter_server:update_config(bytes_in),
timer:sleep(100),
Limiter = init_limiter(), Limiter = init_limiter(),
St = st(#{limiter => Limiter}), St = st(#{limiter => Limiter}),
{ok, Need} = emqx_limiter_schema:to_capacity("1GB"), %% must bigger than value in emqx_ratelimit_SUITE
%% must bigger than value in emqx_ratelimit_SUITE
{ok, Need} = emqx_limiter_schema:to_capacity("1GB"),
St1 = ?ws_conn:check_limiter([{Need, bytes_in}], St1 = ?ws_conn:check_limiter([{Need, bytes_in}],
[], [],
fun(_, _, S) -> S end, fun(_, _, S) -> S end,
[], [],
St), St),
?assertEqual(blocked, ?ws_conn:info(sockstate, St1)), ?assertEqual(blocked, ?ws_conn:info(sockstate, St1)),
?assertEqual([{active, false}], ?ws_conn:info(postponed, St1)). ?assertEqual([{active, false}], ?ws_conn:info(postponed, St1)),
emqx_config:put(Path, PerClient),
emqx_limiter_server:update_config(bytes_in),
timer:sleep(100).
t_parse_incoming(_) -> t_parse_incoming(_) ->
{Packets, St} = ?ws_conn:parse_incoming(<<48,3>>, [], st()), {Packets, St} = ?ws_conn:parse_incoming(<<48,3>>, [], st()),
@ -558,7 +572,7 @@ ws_client(State) ->
ct:fail(ws_timeout) ct:fail(ws_timeout)
end. end.
limiter_cfg() -> #{}. limiter_cfg() -> #{bytes_in => default, message_in => default}.
init_limiter() -> init_limiter() ->
emqx_limiter_container:get_limiter_by_names([bytes_in, message_in], limiter_cfg()). emqx_limiter_container:get_limiter_by_names([bytes_in, message_in], limiter_cfg()).

View File

@ -493,6 +493,8 @@ typename_to_spec("failure_strategy()", _Mod) ->
#{type => string, example => <<"force">>}; #{type => string, example => <<"force">>};
typename_to_spec("initial()", _Mod) -> typename_to_spec("initial()", _Mod) ->
#{type => string, example => <<"0M">>}; #{type => string, example => <<"0M">>};
typename_to_spec("bucket_name()", _Mod) ->
#{type => string, example => <<"retainer">>};
typename_to_spec(Name, Mod) -> typename_to_spec(Name, Mod) ->
Spec = range(Name), Spec = range(Name),
Spec1 = remote_module_type(Spec, Name, Mod), Spec1 = remote_module_type(Spec, Name, Mod),

View File

@ -57,10 +57,15 @@ retainer {
## Default: 0 ## Default: 0
batch_deliver_number = 0 batch_deliver_number = 0
## deliver limiter bucket ## The rate limiter name for retained messages delivery.
## In order to avoid delivering too many messages to the client at once, which may cause the client
## to block or crash, or message dropped due to exceeding the size of the message queue. We need
## to specify a rate limiter for the retained messages delivery to the client.
## ##
## Default: 0s ## The names of the available rate limiters are taken from the existing rate limiters under `limiter.batch`.
limiter_bucket_name = retainer ## You can remove this field if you don't want any limit
## Default: retainer
batch_deliver_limiter = retainer
} }
## Maximum retained message size. ## Maximum retained message size.

View File

@ -85,8 +85,8 @@ start_link(Pool, Id) ->
init([Pool, Id]) -> init([Pool, Id]) ->
erlang:process_flag(trap_exit, true), erlang:process_flag(trap_exit, true),
true = gproc_pool:connect_worker(Pool, {Pool, Id}), true = gproc_pool:connect_worker(Pool, {Pool, Id}),
Bucket = emqx:get_config([retainer, flow_control, limiter_bucket_name]), BucketName = emqx:get_config([retainer, flow_control, batch_deliver_limiter]),
Limiter = emqx_limiter_server:connect(shared, Bucket), Limiter = emqx_limiter_server:connect(batch, BucketName),
{ok, #{pool => Pool, id => Id, limiter => Limiter}}. {ok, #{pool => Pool, id => Id, limiter => Limiter}}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -124,8 +124,8 @@ handle_cast({dispatch, Context, Pid, Topic}, #{limiter := Limiter} = State) ->
{noreply, State#{limiter := Limiter2}}; {noreply, State#{limiter := Limiter2}};
handle_cast(refresh_limiter, State) -> handle_cast(refresh_limiter, State) ->
Bucket = emqx:get_config([retainer, flow_control, limiter_bucket_name]), BucketName = emqx:get_config([retainer, flow_control, batch_deliver_limiter]),
Limiter = emqx_limiter_server:connect(shared, Bucket), Limiter = emqx_limiter_server:connect(batch, BucketName),
{noreply, State#{limiter := Limiter}}; {noreply, State#{limiter := Limiter}};
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
@ -198,14 +198,15 @@ dispatch(Context, Pid, Topic, Cursor, Limiter) ->
Mod = emqx_retainer:get_backend_module(), Mod = emqx_retainer:get_backend_module(),
case Cursor =/= undefined orelse emqx_topic:wildcard(Topic) of case Cursor =/= undefined orelse emqx_topic:wildcard(Topic) of
false -> false ->
{ok, Result} = Mod:read_message(Context, Topic), {ok, Result} = erlang:apply(Mod, read_message, [Context, Topic]),
deliver(Result, Context, Pid, Topic, undefined, Limiter); deliver(Result, Context, Pid, Topic, undefined, Limiter);
true -> true ->
{ok, Result, NewCursor} = Mod:match_messages(Context, Topic, Cursor), {ok, Result, NewCursor} = erlang:apply(Mod, match_messages, [Context, Topic, Cursor]),
deliver(Result, Context, Pid, Topic, NewCursor, Limiter) deliver(Result, Context, Pid, Topic, NewCursor, Limiter)
end. end.
-spec deliver(list(emqx_types:message()), context(), pid(), topic(), cursor(), limiter()) -> {ok, limiter()}. -spec deliver(list(emqx_types:message()), context(), pid(), topic(), cursor(), limiter()) ->
{ok, limiter()}.
deliver([], _Context, _Pid, _Topic, undefined, Limiter) -> deliver([], _Context, _Pid, _Topic, undefined, Limiter) ->
{ok, Limiter}; {ok, Limiter};

View File

@ -29,7 +29,7 @@ fields(mnesia_config) ->
fields(flow_control) -> fields(flow_control) ->
[ {batch_read_number, sc(integer(), 0, fun is_pos_integer/1)} [ {batch_read_number, sc(integer(), 0, fun is_pos_integer/1)}
, {batch_deliver_number, sc(range(0, 1000), 0)} , {batch_deliver_number, sc(range(0, 1000), 0)}
, {limiter_bucket_name, sc(atom(), retainer)} , {batch_deliver_limiter, sc(emqx_limiter_schema:bucket_name(), undefined)}
]. ].
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -36,7 +36,7 @@ retainer {
flow_control { flow_control {
batch_read_number = 0 batch_read_number = 0
batch_deliver_number = 0 batch_deliver_number = 0
limiter_bucket_name = retainer batch_deliver_limiter = retainer
} }
backend { backend {
type = built_in_database type = built_in_database
@ -281,12 +281,11 @@ t_stop_publish_clear_msg(_) ->
ok = emqtt:disconnect(C1). ok = emqtt:disconnect(C1).
t_flow_control(_) -> t_flow_control(_) ->
#{per_client := PerClient} = RetainerCfg = emqx_config:get([limiter, shared, bucket, retainer]), #{per_client := PerClient} = RetainerCfg = emqx_config:get([limiter, batch, bucket, retainer]),
RetainerCfg2 = RetainerCfg#{ RetainerCfg2 = RetainerCfg#{per_client :=
per_client := PerClient#{ PerClient#{rate := emqx_ratelimiter_SUITE:to_rate("1/1s"),
rate := emqx_ratelimiter_SUITE:to_rate("1/1s"), capacity := 1}},
capacity := 1}}, emqx_config:put([limiter, batch, bucket, retainer], RetainerCfg2),
emqx_config:put([limiter, shared, bucket, retainer], RetainerCfg2),
emqx_limiter_manager:restart_server(shared), emqx_limiter_manager:restart_server(shared),
timer:sleep(500), timer:sleep(500),
@ -296,7 +295,7 @@ t_flow_control(_) ->
emqx_retainer:update_config(#{<<"flow_control">> => emqx_retainer:update_config(#{<<"flow_control">> =>
#{<<"batch_read_number">> => 1, #{<<"batch_read_number">> => 1,
<<"batch_deliver_number">> => 1, <<"batch_deliver_number">> => 1,
<<"limiter_bucket_name">> => retainer}}), <<"batch_deliver_limiter">> => retainer}}),
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1), {ok, _} = emqtt:connect(C1),
emqtt:publish( emqtt:publish(
@ -326,7 +325,7 @@ t_flow_control(_) ->
ok = emqtt:disconnect(C1), ok = emqtt:disconnect(C1),
%% recover the limiter %% recover the limiter
emqx_config:put([limiter, shared, bucket, retainer], RetainerCfg), emqx_config:put([limiter, batch, bucket, retainer], RetainerCfg),
emqx_limiter_manager:restart_server(shared), emqx_limiter_manager:restart_server(shared),
timer:sleep(500), timer:sleep(500),