diff --git a/apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf b/apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf
index 4c1f1b7fb..a29903205 100644
--- a/apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf
+++ b/apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf
@@ -6,50 +6,40 @@ limiter {
## rate limiter for message publish
bytes_in {
bucket.default {
- aggregated.rate = infinity
- aggregated.capacity = infinity
- per_client.rate = infinity
- per_client.capacity = infinity
+ rate = infinity
+ capacity = infinity
}
}
## rate limiter for message publish
message_in {
bucket.default {
- aggregated.rate = infinity
- aggregated.capacity = infinity
- per_client.rate = infinity
- per_client.capacity = infinity
+ rate = infinity
+ capacity = infinity
}
}
## connection rate limiter
connection {
bucket.default {
- aggregated.rate = infinity
- aggregated.capacity = infinity
- per_client.rate = infinity
- per_client.capacity = infinity
+ rate = infinity
+ capacity = infinity
}
}
## rate limiter for message deliver
message_routing {
bucket.default {
- aggregated.rate = infinity
- aggregated.capacity = infinity
- per_client.rate = infinity
- per_client.capacity = infinity
+ rate = infinity
+ capacity = infinity
}
}
- ## Some functions that don't need to use global and zone scope, them can shared use this type
- shared {
+ ## rate limiter for internal batch operation
+ batch {
bucket.retainer {
- aggregated.rate = infinity
- aggregated.capacity = infinity
- per_client.rate = infinity
- per_client.capacity = infinity
+ rate = infinity
+ capacity = infinity
}
}
}
diff --git a/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl b/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl
index 45256c00b..2a4e13731 100644
--- a/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl
+++ b/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl
@@ -22,24 +22,28 @@
%% API
-export([ make_token_bucket_limiter/2, make_ref_limiter/2, check/2
- , consume/2, set_retry/2, retry/1, make_infinity_limiter/1
+ , consume/2, set_retry/2, retry/1, make_infinity_limiter/0
, make_future/1, available/1
]).
-export_type([token_bucket_limiter/0]).
%% a token bucket limiter with a limiter server's bucket reference
--type token_bucket_limiter() :: #{ tokens := non_neg_integer() %% the number of tokens currently available
+-type token_bucket_limiter() :: #{ %% the number of tokens currently available
+ tokens := non_neg_integer()
, rate := decimal()
, capacity := decimal()
, lasttime := millisecond()
- , max_retry_time := non_neg_integer() %% @see emqx_limiter_schema
- , failure_strategy := failure_strategy() %% @see emqx_limiter_schema
+ %% @see emqx_limiter_schema
+ , max_retry_time := non_neg_integer()
+ %% @see emqx_limiter_schema
+ , failure_strategy := failure_strategy()
, divisible := boolean() %% @see emqx_limiter_schema
, low_water_mark := non_neg_integer() %% @see emqx_limiter_schema
, bucket := bucket() %% the limiter server's bucket
%% retry contenxt
- , retry_ctx => undefined %% undefined meaning there is no retry context or no need to retry
+ %% undefined meaning no retry context or no need to retry
+ , retry_ctx => undefined
| retry_context(token_bucket_limiter()) %% the retry context
, atom => any() %% allow to add other keys
}.
@@ -119,8 +123,8 @@ make_token_bucket_limiter(Cfg, Bucket) ->
make_ref_limiter(Cfg, Bucket) when Bucket =/= infinity ->
Cfg#{bucket => Bucket}.
--spec make_infinity_limiter(limiter_bucket_cfg()) -> infinity.
-make_infinity_limiter(_) ->
+-spec make_infinity_limiter() -> infinity.
+make_infinity_limiter() ->
infinity.
%% @doc request some tokens
@@ -247,12 +251,11 @@ try_consume(_, _, Limiter) ->
-spec do_check(acquire_type(Limiter), Limiter) -> inner_check_result(Limiter)
when Limiter :: limiter().
-do_check(Need, #{tokens := Tokens} = Limiter) ->
- if Need =< Tokens ->
- do_check_with_parent_limiter(Need, Limiter);
- true ->
- do_reset(Need, Limiter)
- end;
+do_check(Need, #{tokens := Tokens} = Limiter) when Need =< Tokens ->
+ do_check_with_parent_limiter(Need, Limiter);
+
+do_check(Need, #{tokens := _} = Limiter) ->
+ do_reset(Need, Limiter);
do_check(Need, #{divisible := Divisible,
bucket := Bucket} = Ref) ->
@@ -275,7 +278,8 @@ on_failure(throw, Limiter) ->
Message = io_lib:format("limiter consume failed, limiter:~p~n", [Limiter]),
erlang:throw({rate_check_fail, Message}).
--spec do_check_with_parent_limiter(pos_integer(), token_bucket_limiter()) -> inner_check_result(token_bucket_limiter()).
+-spec do_check_with_parent_limiter(pos_integer(), token_bucket_limiter()) ->
+ inner_check_result(token_bucket_limiter()).
do_check_with_parent_limiter(Need,
#{tokens := Tokens,
divisible := Divisible,
@@ -301,17 +305,19 @@ do_reset(Need,
capacity := Capacity} = Limiter) ->
Now = ?NOW,
Tokens2 = apply_elapsed_time(Rate, Now - LastTime, Tokens, Capacity),
- if Tokens2 >= Need ->
+
+ case erlang:floor(Tokens2) of
+ Available when Available >= Need ->
Limiter2 = Limiter#{tokens := Tokens2, lasttime := Now},
do_check_with_parent_limiter(Need, Limiter2);
- Divisible andalso Tokens2 > 0 ->
+ Available when Divisible andalso Available > 0 ->
%% must be allocated here, because may be Need > Capacity
return_pause(Rate,
partial,
fun do_reset/2,
- Need - Tokens2,
+ Need - Available,
Limiter#{tokens := 0, lasttime := Now});
- true ->
+ _ ->
return_pause(Rate, pause, fun do_reset/2, Need, Limiter)
end.
@@ -326,8 +332,8 @@ return_pause(Rate, PauseType, Fun, Diff, Limiter) ->
Pause = emqx_misc:clamp(Val, ?MINIMUM_PAUSE, ?MAXIMUM_PAUSE),
{PauseType, Pause, make_retry_context(Fun, Diff), Limiter}.
--spec make_retry_context(undefined | retry_fun(Limiter), non_neg_integer()) -> retry_context(Limiter)
- when Limiter :: limiter().
+-spec make_retry_context(undefined | retry_fun(Limiter), non_neg_integer()) ->
+ retry_context(Limiter) when Limiter :: limiter().
make_retry_context(Fun, Diff) ->
#{continuation => Fun, diff => Diff}.
diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_app.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_app.erl
index b4a92596f..6e66645f3 100644
--- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_app.erl
+++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_app.erl
@@ -39,8 +39,7 @@
{ok, Pid :: pid(), State :: term()} |
{error, Reason :: term()}.
start(_StartType, _StartArgs) ->
- {ok, _} = Result = emqx_limiter_sup:start_link(),
- Result.
+ {ok, _} = emqx_limiter_sup:start_link().
%%--------------------------------------------------------------------
%% @private
diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_container.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_container.erl
index 998bd9432..65b213485 100644
--- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_container.erl
+++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_container.erl
@@ -21,7 +21,7 @@
%% @end
%% API
--export([ new/0, new/1, get_limiter_by_names/2
+-export([ new/0, new/1, new/2, get_limiter_by_names/2
, add_new/3, update_by_name/3, set_retry_context/2
, check/3, retry/2, get_retry_context/1
, check_list/2, retry_list/2
@@ -30,7 +30,10 @@
-export_type([container/0, check_result/0]).
-type container() :: #{ limiter_type() => undefined | limiter()
- , retry_key() => undefined | retry_context() | future() %% the retry context of the limiter
+ %% the retry context of the limiter
+ , retry_key() => undefined
+ | retry_context()
+ | future()
, retry_ctx := undefined | any() %% the retry context of the container
}.
@@ -57,12 +60,18 @@ new() ->
%% @doc generate default data according to the type of limiter
-spec new(list(limiter_type())) -> container().
new(Types) ->
- get_limiter_by_names(Types, #{}).
+ new(Types, #{}).
+
+-spec new(list(limiter_type()),
+ #{limiter_type() => emqx_limiter_schema:bucket_name()}) -> container().
+new(Types, Names) ->
+ get_limiter_by_names(Types, Names).
%% @doc generate a container
%% according to the type of limiter and the bucket name configuration of the limiter
%% @end
--spec get_limiter_by_names(list(limiter_type()), #{limiter_type() => emqx_limiter_schema:bucket_name()}) -> container().
+-spec get_limiter_by_names(list(limiter_type()),
+ #{limiter_type() => emqx_limiter_schema:bucket_name()}) -> container().
get_limiter_by_names(Types, BucketNames) ->
Init = fun(Type, Acc) ->
Limiter = emqx_limiter_server:connect(Type, BucketNames),
diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl
index 4571a59a3..9cf4bc1d0 100644
--- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl
+++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl
@@ -23,8 +23,9 @@
%% API
-export([ start_link/0, start_server/1, find_bucket/1
- , find_bucket/3, insert_bucket/2, insert_bucket/4
- , make_path/3, restart_server/1]).
+ , find_bucket/2, insert_bucket/2, insert_bucket/3
+ , make_path/2, restart_server/1
+ ]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@@ -34,7 +35,6 @@
-type path() :: list(atom()).
-type limiter_type() :: emqx_limiter_schema:limiter_type().
--type zone_name() :: emqx_limiter_schema:zone_name().
-type bucket_name() :: emqx_limiter_schema:bucket_name().
%% counter record in ets table
@@ -57,10 +57,10 @@ start_server(Type) ->
restart_server(Type) ->
emqx_limiter_server_sup:restart(Type).
--spec find_bucket(limiter_type(), zone_name(), bucket_name()) ->
+-spec find_bucket(limiter_type(), bucket_name()) ->
{ok, bucket_ref()} | undefined.
-find_bucket(Type, Zone, BucketId) ->
- find_bucket(make_path(Type, Zone, BucketId)).
+find_bucket(Type, BucketName) ->
+ find_bucket(make_path(Type, BucketName)).
-spec find_bucket(path()) -> {ok, bucket_ref()} | undefined.
find_bucket(Path) ->
@@ -72,21 +72,19 @@ find_bucket(Path) ->
end.
-spec insert_bucket(limiter_type(),
- zone_name(),
bucket_name(),
bucket_ref()) -> boolean().
-insert_bucket(Type, Zone, BucketId, Bucket) ->
- inner_insert_bucket(make_path(Type, Zone, BucketId),
- Bucket).
+insert_bucket(Type, BucketName, Bucket) ->
+ inner_insert_bucket(make_path(Type, BucketName), Bucket).
-spec insert_bucket(path(), bucket_ref()) -> true.
insert_bucket(Path, Bucket) ->
inner_insert_bucket(Path, Bucket).
--spec make_path(limiter_type(), zone_name(), bucket_name()) -> path().
-make_path(Type, Name, BucketId) ->
- [Type, Name, BucketId].
+-spec make_path(limiter_type(), bucket_name()) -> path().
+make_path(Type, BucketName) ->
+ [Type | BucketName].
%%--------------------------------------------------------------------
%% @doc
diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl
index c7a5af24a..3f313e7e4 100644
--- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl
+++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl
@@ -20,7 +20,8 @@
-export([ roots/0, fields/1, to_rate/1, to_capacity/1
, minimum_period/0, to_burst_rate/1, to_initial/1
- , namespace/0]).
+ , namespace/0, get_bucket_cfg_path/2
+ ]).
-define(KILOBYTE, 1024).
@@ -28,14 +29,14 @@
| message_in
| connection
| message_routing
- | shared.
+ | batch.
-type bucket_name() :: atom().
--type zone_name() :: atom().
-type rate() :: infinity | float().
-type burst_rate() :: 0 | float().
-type capacity() :: infinity | number(). %% the capacity of the token bucket
-type initial() :: non_neg_integer(). %% initial capacity of the token bucket
+-type bucket_path() :: list(atom()).
%% the processing strategy after the failure of the token request
-type failure_strategy() :: force %% Forced to pass
@@ -52,58 +53,80 @@
, capacity/0
, initial/0
, failure_strategy/0
+ , bucket_name/0
]).
--export_type([limiter_type/0, bucket_name/0, zone_name/0]).
+-export_type([limiter_type/0, bucket_path/0]).
-import(emqx_schema, [sc/2, map/2]).
+-define(UNIT_TIME_IN_MS, 1000).
namespace() -> limiter.
roots() -> [limiter].
fields(limiter) ->
- [ {bytes_in, sc(ref(limiter_opts), #{})}
- , {message_in, sc(ref(limiter_opts), #{})}
- , {connection, sc(ref(limiter_opts), #{})}
- , {message_routing, sc(ref(limiter_opts), #{})}
- , {shared, sc(ref(shared_limiter_opts),
- #{description =>
- <<"Some functions that do not need to use global and zone scope,"
- "them can shared use this type">>})}
+ [ {bytes_in, sc(ref(limiter_opts),
+ #{description =>
+ <<"The bytes_in limiter.
"
+ "It is used to limit the inbound bytes rate for this EMQX node."
+ "If the this limiter limit is reached,"
+ "the restricted client will be slow down even be hung for a while.">>
+ })}
+ , {message_in, sc(ref(limiter_opts),
+ #{description =>
+ <<"The message_in limiter.
"
+ "This is used to limit the inbound message numbers for this EMQX node"
+ "If the this limiter limit is reached,"
+ "the restricted client will be slow down even be hung for a while.">>
+ })}
+ , {connection, sc(ref(limiter_opts),
+ #{description =>
+ <<"The connection limiter.
"
+ "This is used to limit the connection rate for this EMQX node"
+ "If the this limiter limit is reached,"
+ "New connections will be refused"
+ >>})}
+ , {message_routing, sc(ref(limiter_opts),
+ #{description =>
+ <<"The message_routing limiter.
"
+ "This is used to limite the deliver rate for this EMQX node"
+ "If the this limiter limit is reached,"
+ "New publish will be refused"
+ >>
+ })}
+ , {batch, sc(ref(limiter_opts),
+ #{description => <<"The batch limiter.
"
+ "This is used for EMQX internal batch operation"
+ "e.g. limite the retainer's deliver rate"
+ >>
+ })}
];
fields(limiter_opts) ->
- [ {global, sc(ref(rate_burst), #{required => false})}
- , {zone, sc(map("zone name", ref(rate_burst)), #{required => false})}
- , {bucket, sc(map("bucket_id", ref(bucket)),
- #{desc => "Token bucket"})}
+ [ {rate, sc(rate(), #{default => "infinity", desc => "The rate"})}
+ , {burst, sc(burst_rate(),
+ #{default => "0/0s",
+ desc => "The burst, This value is based on rate."
+ "this value + rate = the maximum limit that can be achieved when limiter burst"
+ })}
+ , {bucket, sc(map("bucket name", ref(bucket_opts)), #{desc => "Buckets config"})}
];
-fields(shared_limiter_opts) ->
- [{bucket, sc(map("bucket_id", ref(bucket)),
- #{desc => "Token bucket"})}
- ];
-
-fields(rate_burst) ->
- [ {rate, sc(rate(), #{})}
- , {burst, sc(burst_rate(), #{default => "0/0s"})}
- ];
-
-fields(bucket) ->
- [ {zone, sc(atom(), #{desc => "The bucket's zone", default => default})}
- , {aggregated, sc(ref(bucket_aggregated), #{})}
- , {per_client, sc(ref(client_bucket), #{})}
- ];
-
-fields(bucket_aggregated) ->
- [ {rate, sc(rate(), #{})}
- , {initial, sc(initial(), #{default => "0"})}
- , {capacity, sc(capacity(), #{})}
+fields(bucket_opts) ->
+ [ {rate, sc(rate(), #{desc => "The rate for this bucket"})}
+ , {capacity, sc(capacity(), #{desc => "The maximum number of tokens for this bucket"})}
+ , {initial, sc(initial(), #{default => "0",
+ desc => "The initial number of tokens for this bucket"})}
+ , {per_client, sc(ref(client_bucket),
+ #{default => #{},
+ desc => "The rate limit for each user of the bucket,"
+ "this field is not required"
+ })}
];
fields(client_bucket) ->
- [ {rate, sc(rate(), #{})}
+ [ {rate, sc(rate(), #{default => "infinity"})}
, {initial, sc(initial(), #{default => "0"})}
%% low_water_mark add for emqx_channel and emqx_session
%% both modules consume first and then check
@@ -113,13 +136,14 @@ fields(client_bucket) ->
#{desc => "If the remaining tokens are lower than this value,
the check/consume will succeed, but it will be forced to wait for a short period of time.",
default => "0"})}
- , {capacity, sc(capacity(), #{desc => "The capacity of the token bucket."})}
+ , {capacity, sc(capacity(), #{desc => "The capacity of the token bucket.",
+ default => "infinity"})}
, {divisible, sc(boolean(),
#{desc => "Is it possible to split the number of requested tokens?",
default => false})}
, {max_retry_time, sc(emqx_schema:duration(),
#{ desc => "The maximum retry time when acquire failed."
- , default => "5s"})}
+ , default => "10s"})}
, {failure_strategy, sc(failure_strategy(),
#{ desc => "The strategy when all the retries failed."
, default => force})}
@@ -132,6 +156,10 @@ minimum_period() ->
to_rate(Str) ->
to_rate(Str, true, false).
+-spec get_bucket_cfg_path(limiter_type(), bucket_name()) -> bucket_path().
+get_bucket_cfg_path(Type, BucketName) ->
+ [limiter, Type, bucket, BucketName].
+
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
@@ -145,22 +173,38 @@ to_rate(Str, CanInfinity, CanZero) ->
case Tokens of
["infinity"] when CanInfinity ->
{ok, infinity};
- ["0", _] when CanZero ->
- {ok, 0}; %% for burst
- [Quota, Interval] ->
- {ok, Val} = to_capacity(Quota),
- case emqx_schema:to_duration_ms(Interval) of
- {ok, Ms} when Ms > 0 ->
- {ok, Val * minimum_period() / Ms};
- _ ->
- {error, Str}
- end;
+ [QuotaStr] -> %% if time unit is 1s, it can be omitted
+ {ok, Val} = to_capacity(QuotaStr),
+ check_capacity(Str, Val, CanZero,
+ fun(Quota) ->
+ {ok, Quota * minimum_period() / ?UNIT_TIME_IN_MS}
+ end);
+ [QuotaStr, Interval] ->
+ {ok, Val} = to_capacity(QuotaStr),
+ check_capacity(Str, Val, CanZero,
+ fun(Quota) ->
+ case emqx_schema:to_duration_ms(Interval) of
+ {ok, Ms} when Ms > 0 ->
+ {ok, Quota * minimum_period() / Ms};
+ _ ->
+ {error, Str}
+ end
+ end);
_ ->
{error, Str}
end.
+check_capacity(_Str, 0, true, _Cont) ->
+ {ok, 0};
+
+check_capacity(Str, 0, false, _Cont) ->
+ {error, Str};
+
+check_capacity(_Str, Quota, _CanZero, Cont) ->
+ Cont(Quota).
+
to_capacity(Str) ->
- Regex = "^\s*(?:(?:([1-9][0-9]*)([a-zA-z]*))|infinity)\s*$",
+ Regex = "^\s*(?:([0-9]+)([a-zA-z]*))|infinity\s*$",
to_quota(Str, Regex).
to_initial(Str) ->
@@ -175,9 +219,9 @@ to_quota(Str, Regex) ->
Val = erlang:list_to_integer(Quota),
Unit2 = string:to_lower(Unit),
{ok, apply_unit(Unit2, Val)};
- {match, [Quota]} ->
+ {match, [Quota, ""]} ->
{ok, erlang:list_to_integer(Quota)};
- {match, []} ->
+ {match, ""} ->
{ok, infinity};
_ ->
{error, Str}
diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl
index c9984cd1a..84688ba38 100644
--- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl
+++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl
@@ -34,26 +34,16 @@
terminate/2, code_change/3, format_status/2]).
-export([ start_link/1, connect/2, info/1
- , name/1, get_initial_val/1]).
+ , name/1, get_initial_val/1, update_config/1
+ ]).
-type root() :: #{ rate := rate() %% number of tokens generated per period
, burst := rate()
, period := pos_integer() %% token generation interval(second)
- , childs := list(node_id()) %% node children
, consumed := non_neg_integer()
}.
--type zone() :: #{ id := node_id()
- , name := zone_name()
- , rate := rate()
- , burst := rate()
- , obtained := non_neg_integer() %% number of tokens obtained
- , childs := list(node_id())
- }.
-
--type bucket() :: #{ id := node_id()
- , name := bucket_name()
- , zone := zone_name() %% pointer to zone node, use for burst
+-type bucket() :: #{ name := bucket_name()
, rate := rate()
, obtained := non_neg_integer()
, correction := emqx_limiter_decimal:zero_or_float() %% token correction value
@@ -62,19 +52,14 @@
, index := undefined | index()
}.
--type state() :: #{ root := undefined | root()
+-type state() :: #{ type := limiter_type()
+ , root := undefined | root()
+ , buckets := buckets()
, counter := undefined | counters:counters_ref() %% current counter to alloc
, index := index()
- , zones := #{zone_name() => node_id()}
- , buckets := list(node_id())
- , nodes := nodes()
- , type := limiter_type()
}.
--type node_id() :: pos_integer().
--type node_data() :: zone() | bucket().
--type nodes() :: #{node_id() => node_data()}.
--type zone_name() :: emqx_limiter_schema:zone_name().
+-type buckets() :: #{bucket_name() => bucket()}.
-type limiter_type() :: emqx_limiter_schema:limiter_type().
-type bucket_name() :: emqx_limiter_schema:bucket_name().
-type rate() :: decimal().
@@ -85,6 +70,7 @@
-define(CALL(Type), gen_server:call(name(Type), ?FUNCTION_NAME)).
-define(OVERLOAD_MIN_ALLOC, 0.3). %% minimum coefficient for overloaded limiter
+-define(CURRYING(X, F2), fun(Y) -> F2(X, Y) end).
-export_type([index/0]).
-import(emqx_limiter_decimal, [add/2, sub/2, mul/2, put_to_counter/3]).
@@ -95,33 +81,38 @@
%% API
%%--------------------------------------------------------------------
-spec connect(limiter_type(),
- bucket_name() | #{limiter_type() => bucket_name()}) -> emqx_htb_limiter:limiter().
+ bucket_name() | #{limiter_type() => bucket_name() | undefined}) ->
+ emqx_htb_limiter:limiter().
+%% If no bucket path is set in config, there will be no limit
+connect(_Type, undefined) ->
+ emqx_htb_limiter:make_infinity_limiter();
+
connect(Type, BucketName) when is_atom(BucketName) ->
- Path = [limiter, Type, bucket, BucketName],
- case emqx:get_config(Path, undefined) of
+ CfgPath = emqx_limiter_schema:get_bucket_cfg_path(Type, BucketName),
+ case emqx:get_config(CfgPath, undefined) of
undefined ->
- ?SLOG(error, #{msg => "bucket_config_not_found", path => Path}),
+ ?SLOG(error, #{msg => "bucket_config_not_found", path => CfgPath}),
throw("bucket's config not found");
- #{zone := Zone,
- aggregated := #{rate := AggrRate, capacity := AggrSize},
+ #{rate := AggrRate,
+ capacity := AggrSize,
per_client := #{rate := CliRate, capacity := CliSize} = Cfg} ->
- case emqx_limiter_manager:find_bucket(Type, Zone, BucketName) of
+ case emqx_limiter_manager:find_bucket(Type, BucketName) of
{ok, Bucket} ->
if CliRate < AggrRate orelse CliSize < AggrSize ->
emqx_htb_limiter:make_token_bucket_limiter(Cfg, Bucket);
Bucket =:= infinity ->
- emqx_htb_limiter:make_infinity_limiter(Cfg);
+ emqx_htb_limiter:make_infinity_limiter();
true ->
emqx_htb_limiter:make_ref_limiter(Cfg, Bucket)
end;
undefined ->
- ?SLOG(error, #{msg => "bucket_not_found", path => Path}),
+ ?SLOG(error, #{msg => "bucket_not_found", path => CfgPath}),
throw("invalid bucket")
end
end;
-connect(Type, Names) ->
- connect(Type, maps:get(Type, Names, default)).
+connect(Type, Paths) ->
+ connect(Type, maps:get(Type, Paths, undefined)).
-spec info(limiter_type()) -> state().
info(Type) ->
@@ -131,6 +122,10 @@ info(Type) ->
name(Type) ->
erlang:list_to_atom(io_lib:format("~s_~s", [?MODULE, Type])).
+-spec update_config(limiter_type()) -> ok.
+update_config(Type) ->
+ ?CALL(Type).
+
%%--------------------------------------------------------------------
%% @doc
%% Starts the server
@@ -140,6 +135,7 @@ name(Type) ->
start_link(Type) ->
gen_server:start_link({local, name(Type)}, ?MODULE, [Type], []).
+
%%--------------------------------------------------------------------
%%% gen_server callbacks
%%--------------------------------------------------------------------
@@ -156,17 +152,10 @@ start_link(Type) ->
{stop, Reason :: term()} |
ignore.
init([Type]) ->
- State = #{root => undefined,
- counter => undefined,
- index => 1,
- zones => #{},
- nodes => #{},
- buckets => [],
- type => Type},
- State2 = init_tree(Type, State),
- #{root := #{period := Perido}} = State2,
+ State = init_tree(Type),
+ #{root := #{period := Perido}} = State,
oscillate(Perido),
- {ok, State2}.
+ {ok, State}.
%%--------------------------------------------------------------------
%% @private
@@ -186,6 +175,10 @@ init([Type]) ->
handle_call(info, _From, State) ->
{reply, State, State};
+handle_call(update_config, _From, #{type := Type}) ->
+ NewState = init_tree(Type),
+ {reply, ok, NewState};
+
handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", call => Req}),
{reply, ignored, State}.
@@ -273,59 +266,38 @@ oscillate(Interval) ->
-spec oscillation(state()) -> state().
oscillation(#{root := #{rate := Flow,
period := Interval,
- childs := ChildIds,
consumed := Consumed} = Root,
- nodes := Nodes} = State) ->
+ buckets := Buckets} = State) ->
oscillate(Interval),
- Childs = get_ordered_childs(ChildIds, Nodes),
- {Alloced, Nodes2} = transverse(Childs, Flow, 0, Nodes),
- maybe_burst(State#{nodes := Nodes2,
+ Ordereds = get_ordered_buckets(Buckets),
+ {Alloced, Buckets2} = transverse(Ordereds, Flow, 0, Buckets),
+ maybe_burst(State#{buckets := Buckets2,
root := Root#{consumed := Consumed + Alloced}}).
%% @doc horizontal spread
--spec transverse(list(node_data()),
+-spec transverse(list(bucket()),
flow(),
non_neg_integer(),
- nodes()) -> {non_neg_integer(), nodes()}.
-transverse([H | T], InFlow, Alloced, Nodes) when InFlow > 0 ->
- {NodeAlloced, Nodes2} = longitudinal(H, InFlow, Nodes),
- InFlow2 = sub(InFlow, NodeAlloced),
- Alloced2 = Alloced + NodeAlloced,
- transverse(T, InFlow2, Alloced2, Nodes2);
+ buckets()) -> {non_neg_integer(), buckets()}.
+transverse([H | T], InFlow, Alloced, Buckets) when InFlow > 0 ->
+ {BucketAlloced, Buckets2} = longitudinal(H, InFlow, Buckets),
+ InFlow2 = sub(InFlow, BucketAlloced),
+ Alloced2 = Alloced + BucketAlloced,
+ transverse(T, InFlow2, Alloced2, Buckets2);
-transverse(_, _, Alloced, Nodes) ->
- {Alloced, Nodes}.
+transverse(_, _, Alloced, Buckets) ->
+ {Alloced, Buckets}.
%% @doc vertical spread
--spec longitudinal(node_data(), flow(), nodes()) ->
- {non_neg_integer(), nodes()}.
-longitudinal(#{id := Id,
- rate := Rate,
- obtained := Obtained,
- childs := ChildIds} = Node, InFlow, Nodes) ->
- Flow = erlang:min(InFlow, Rate),
-
- if Flow > 0 ->
- Childs = get_ordered_childs(ChildIds, Nodes),
- {Alloced, Nodes2} = transverse(Childs, Flow, 0, Nodes),
- if Alloced > 0 ->
- {Alloced,
- Nodes2#{Id => Node#{obtained := Obtained + Alloced}}};
- true ->
- %% childs are empty or all counter childs are full
- {0, Nodes2}
- end;
- true ->
- {0, Nodes}
- end;
-
-longitudinal(#{id := Id,
+-spec longitudinal(bucket(), flow(), buckets()) ->
+ {non_neg_integer(), buckets()}.
+longitudinal(#{name := Name,
rate := Rate,
capacity := Capacity,
counter := Counter,
index := Index,
- obtained := Obtained} = Node,
- InFlow, Nodes) when Counter =/= undefined ->
+ obtained := Obtained} = Bucket,
+ InFlow, Buckets) when Counter =/= undefined ->
Flow = erlang:min(InFlow, Rate),
ShouldAlloc =
@@ -345,204 +317,157 @@ longitudinal(#{id := Id,
%% XXX if capacity is infinity, and flow always > 0, the value in
%% counter will be overflow at some point in the future, do we need
%% to deal with this situation???
- {Inc, Node2} = emqx_limiter_correction:add(Available, Node),
+ {Inc, Bucket2} = emqx_limiter_correction:add(Available, Bucket),
counters:add(Counter, Index, Inc),
{Inc,
- Nodes#{Id := Node2#{obtained := Obtained + Inc}}};
+ Buckets#{Name := Bucket2#{obtained := Obtained + Inc}}};
_ ->
- {0, Nodes}
+ {0, Buckets}
end;
-longitudinal(_, _, Nodes) ->
- {0, Nodes}.
+longitudinal(_, _, Buckets) ->
+ {0, Buckets}.
--spec get_ordered_childs(list(node_id()), nodes()) -> list(node_data()).
-get_ordered_childs(Ids, Nodes) ->
- Childs = [maps:get(Id, Nodes) || Id <- Ids],
+-spec get_ordered_buckets(list(bucket()) | buckets()) -> list(bucket()).
+get_ordered_buckets(Buckets) when is_map(Buckets) ->
+ BucketList = maps:values(Buckets),
+ get_ordered_buckets(BucketList);
+get_ordered_buckets(Buckets) ->
%% sort by obtained, avoid node goes hungry
lists:sort(fun(#{obtained := A}, #{obtained := B}) ->
A < B
end,
- Childs).
+ Buckets).
-spec maybe_burst(state()) -> state().
maybe_burst(#{buckets := Buckets,
- zones := Zones,
- root := #{burst := Burst},
- nodes := Nodes} = State) when Burst > 0 ->
- %% find empty buckets and group by zone name
- GroupFun = fun(Id, Groups) ->
- #{counter := Counter,
- index := Index,
- zone := Zone} = maps:get(Id, Nodes),
- case counters:get(Counter, Index) of
- Any when Any =< 0 ->
- Group = maps:get(Zone, Groups, []),
- maps:put(Zone, [Id | Group], Groups);
- _ ->
- Groups
- end
- end,
+ root := #{burst := Burst}} = State) when Burst > 0 ->
+ Fold = fun(_Name, #{counter := Cnt, index := Idx} = Bucket, Acc) when Cnt =/= undefined ->
+ case counters:get(Cnt, Idx) > 0 of
+ true ->
+ Acc;
+ false ->
+ [Bucket | Acc]
+ end;
+ (_Name, _Bucket, Acc) ->
+ Acc
+ end,
- case lists:foldl(GroupFun, #{}, Buckets) of
- Groups when map_size(Groups) > 0 ->
- %% remove the zone which don't support burst
- Filter = fun({Name, Childs}, Acc) ->
- ZoneId = maps:get(Name, Zones),
- #{burst := ZoneBurst} = Zone = maps:get(ZoneId, Nodes),
- case ZoneBurst > 0 of
- true ->
- [{Zone, Childs} | Acc];
- _ ->
- Acc
- end
- end,
-
- FilterL = lists:foldl(Filter, [], maps:to_list(Groups)),
- dispatch_burst(FilterL, State);
- _ ->
- State
- end;
+ Empties = maps:fold(Fold, [], Buckets),
+ dispatch_burst(Empties, Burst, State);
maybe_burst(State) ->
State.
--spec dispatch_burst(list({zone(), list(node_id())}), state()) -> state().
-dispatch_burst([], State) ->
+-spec dispatch_burst(list(bucket()), non_neg_integer(), state()) -> state().
+dispatch_burst([], _, State) ->
State;
-dispatch_burst(GroupL,
- #{root := #{burst := Burst},
- nodes := Nodes} = State) ->
- InFlow = Burst / erlang:length(GroupL),
- Dispatch = fun({Zone, Childs}, NodeAcc) ->
- #{id := ZoneId,
- burst := ZoneBurst,
- obtained := Obtained} = Zone,
+dispatch_burst(Empties, InFlow,
+ #{root := #{consumed := Consumed} = Root, buckets := Buckets} = State) ->
+ EachFlow = InFlow / erlang:length(Empties),
+ {Alloced, Buckets2} = dispatch_burst_to_buckets(Empties, EachFlow, 0, Buckets),
+ State#{root := Root#{consumed := Consumed + Alloced}, buckets := Buckets2}.
- case erlang:min(InFlow, ZoneBurst) of
- 0 -> NodeAcc;
- ZoneFlow ->
- EachFlow = ZoneFlow / erlang:length(Childs),
- {Alloced, NodeAcc2} = dispatch_burst_to_buckets(Childs, EachFlow, 0, NodeAcc),
- Zone2 = Zone#{obtained := Obtained + Alloced},
- NodeAcc2#{ZoneId := Zone2}
- end
- end,
- State#{nodes := lists:foldl(Dispatch, Nodes, GroupL)}.
-
--spec dispatch_burst_to_buckets(list(node_id()),
- float(), non_neg_integer(), nodes()) -> {non_neg_integer(), nodes()}.
-dispatch_burst_to_buckets([ChildId | T], InFlow, Alloced, Nodes) ->
- #{counter := Counter,
+-spec dispatch_burst_to_buckets(list(bucket()),
+ float(),
+ non_neg_integer(), buckets()) -> {non_neg_integer(), buckets()}.
+dispatch_burst_to_buckets([Bucket | T], InFlow, Alloced, Buckets) ->
+ #{name := Name,
+ counter := Counter,
index := Index,
- obtained := Obtained} = Bucket = maps:get(ChildId, Nodes),
+ obtained := Obtained} = Bucket,
{Inc, Bucket2} = emqx_limiter_correction:add(InFlow, Bucket),
counters:add(Counter, Index, Inc),
- Nodes2 = Nodes#{ChildId := Bucket2#{obtained := Obtained + Inc}},
- dispatch_burst_to_buckets(T, InFlow, Alloced + Inc, Nodes2);
+ Buckets2 = Buckets#{Name := Bucket2#{obtained := Obtained + Inc}},
+ dispatch_burst_to_buckets(T, InFlow, Alloced + Inc, Buckets2);
-dispatch_burst_to_buckets([], _, Alloced, Nodes) ->
- {Alloced, Nodes}.
+dispatch_burst_to_buckets([], _, Alloced, Buckets) ->
+ {Alloced, Buckets}.
--spec init_tree(emqx_limiter_schema:limiter_type(), state()) -> state().
-init_tree(Type, State) ->
- case emqx:get_config([limiter, Type]) of
- #{global := Global,
- zone := Zone,
- bucket := Bucket} -> ok;
- #{bucket := Bucket} ->
- Global = default_rate_burst_cfg(),
- Zone = #{default => default_rate_burst_cfg()},
- ok
- end,
- {Factor, Root} = make_root(Global, Zone),
- State2 = State#{root := Root},
- {NodeId, State3} = make_zone(maps:to_list(Zone), Factor, 1, State2),
- State4 = State3#{counter := counters:new(maps:size(Bucket),
- [write_concurrency])},
- make_bucket(maps:to_list(Bucket), Global, Zone, Factor, NodeId, [], State4).
+-spec init_tree(emqx_limiter_schema:limiter_type()) -> state().
+init_tree(Type) ->
+ State = #{ type => Type
+ , root => undefined
+ , counter => undefined
+ , index => 1
+ , buckets => #{}
+ },
--spec make_root(hocons:confg(), hocon:config()) -> {number(), root()}.
-make_root(#{rate := Rate, burst := Burst}, Zone) ->
- ZoneNum = maps:size(Zone),
- Childs = lists:seq(1, ZoneNum),
+ #{bucket := Buckets} = Cfg = emqx:get_config([limiter, Type]),
+ {Factor, Root} = make_root(Cfg),
+ {CounterNum, DelayBuckets} = make_bucket(maps:to_list(Buckets), Type, Cfg, Factor, 1, []),
+
+ State2 = State#{root := Root,
+ counter := counters:new(CounterNum, [write_concurrency])
+ },
+
+ lists:foldl(fun(F, Acc) -> F(Acc) end, State2, DelayBuckets).
+
+-spec make_root(hocons:confg()) -> {number(), root()}.
+make_root(#{rate := Rate, burst := Burst}) when Rate >= 1 ->
+ {1, #{rate => Rate,
+ burst => Burst,
+ period => emqx_limiter_schema:minimum_period(),
+ consumed => 0}};
+
+make_root(#{rate := Rate, burst := Burst}) ->
MiniPeriod = emqx_limiter_schema:minimum_period(),
- if Rate >= 1 ->
- {1, #{rate => Rate,
- burst => Burst,
- period => MiniPeriod,
- childs => Childs,
- consumed => 0}};
- true ->
- Factor = 1 / Rate,
- {Factor, #{rate => 1,
- burst => Burst * Factor,
- period => erlang:floor(Factor * MiniPeriod),
- childs => Childs,
- consumed => 0}}
- end.
+ Factor = 1 / Rate,
+ {Factor, #{rate => 1,
+ burst => Burst * Factor,
+ period => erlang:floor(Factor * MiniPeriod),
+ consumed => 0}}.
-make_zone([{Name, ZoneCfg} | T], Factor, NodeId, State) ->
- #{rate := Rate, burst := Burst} = ZoneCfg,
- #{zones := Zones, nodes := Nodes} = State,
- Zone = #{id => NodeId,
- name => Name,
- rate => mul(Rate, Factor),
- burst => Burst,
- obtained => 0,
- childs => []},
- State2 = State#{zones := Zones#{Name => NodeId},
- nodes := Nodes#{NodeId => Zone}},
- make_zone(T, Factor, NodeId + 1, State2);
-
-make_zone([], _, NodeId, State2) ->
- {NodeId, State2}.
-
-make_bucket([{Name, Conf} | T], Global, Zone, Factor, Id, Buckets, #{type := Type} = State) ->
- #{zone := ZoneName,
- aggregated := Aggregated} = Conf,
- Path = emqx_limiter_manager:make_path(Type, ZoneName, Name),
- case get_counter_rate(Conf, Zone, Global) of
- infinity ->
- State2 = State,
- Rate = infinity,
- Capacity = infinity,
- Counter = undefined,
- Index = undefined,
- Ref = emqx_limiter_bucket_ref:new(Counter, Index, Rate),
- emqx_limiter_manager:insert_bucket(Path, Ref);
+make_bucket([{Name, Conf} | T], Type, GlobalCfg, Factor, CounterNum, DelayBuckets) ->
+ Path = emqx_limiter_manager:make_path(Type, Name),
+ case get_counter_rate(Conf, GlobalCfg) of
+ infinity ->
+ Rate = infinity,
+ Capacity = infinity,
+ Ref = emqx_limiter_bucket_ref:new(undefined, undefined, Rate),
+ emqx_limiter_manager:insert_bucket(Path, Ref),
+ CounterNum2 = CounterNum,
+ InitFun = fun(#{name := BucketName} = Bucket, #{buckets := Buckets} = State) ->
+ State#{buckets := Buckets#{BucketName => Bucket}}
+ end;
RawRate ->
- #{capacity := Capacity} = Aggregated,
- Initial = get_initial_val(Aggregated),
- {Counter, Index, State2} = alloc_counter(Path, RawRate, Initial, State),
- Rate = mul(RawRate, Factor)
+ #{capacity := Capacity} = Conf,
+ Initial = get_initial_val(Conf),
+ Rate = mul(RawRate, Factor),
+ CounterNum2 = CounterNum + 1,
+ InitFun = fun(#{name := BucketName} = Bucket, #{buckets := Buckets} = State) ->
+ {Counter, Idx, State2} = alloc_counter(Path, RawRate, Initial, State),
+ Bucket2 = Bucket#{counter := Counter, index := Idx},
+ State2#{buckets := Buckets#{BucketName => Bucket2}}
+ end
end,
- Node = #{ id => Id
- , name => Name
- , zone => ZoneName
- , rate => Rate
- , obtained => 0
- , correction => 0
- , capacity => Capacity
- , counter => Counter
- , index => Index},
+ Bucket = #{ name => Name
+ , rate => Rate
+ , obtained => 0
+ , correction => 0
+ , capacity => Capacity
+ , counter => undefined
+ , index => undefined},
- State3 = add_zone_child(Id, Node, ZoneName, State2),
- make_bucket(T, Global, Zone, Factor, Id + 1, [Id | Buckets], State3);
+ DelayInit = ?CURRYING(Bucket, InitFun),
-make_bucket([], _, _, _, _, Buckets, State) ->
- State#{buckets := Buckets}.
+ make_bucket(T,
+ Type, GlobalCfg, Factor, CounterNum2, [DelayInit | DelayBuckets]);
+
+make_bucket([], _Type, _Global, _Factor, CounterNum, DelayBuckets) ->
+ {CounterNum, DelayBuckets}.
-spec alloc_counter(emqx_limiter_manager:path(), rate(), capacity(), state()) ->
{counters:counters_ref(), pos_integer(), state()}.
alloc_counter(Path, Rate, Initial,
#{counter := Counter, index := Index} = State) ->
+
case emqx_limiter_manager:find_bucket(Path) of
{ok, #{counter := ECounter,
index := EIndex}} when ECounter =/= undefined ->
@@ -558,33 +483,16 @@ init_counter(Path, Counter, Index, Rate, Initial, State) ->
emqx_limiter_manager:insert_bucket(Path, Ref),
{Counter, Index, State}.
--spec add_zone_child(node_id(), bucket(), zone_name(), state()) -> state().
-add_zone_child(NodeId, Bucket, Name, #{zones := Zones, nodes := Nodes} = State) ->
- ZoneId = maps:get(Name, Zones),
- #{childs := Childs} = Zone = maps:get(ZoneId, Nodes),
- Nodes2 = Nodes#{ZoneId => Zone#{childs := [NodeId | Childs]},
- NodeId => Bucket},
- State#{nodes := Nodes2}.
%% @doc find first limited node
-get_counter_rate(#{zone := ZoneName,
- aggregated := Cfg}, ZoneCfg, Global) ->
- Zone = maps:get(ZoneName, ZoneCfg),
- Search = lists:search(fun(E) -> is_limited(E) end,
- [Cfg, Zone, Global]),
- case Search of
- {value, #{rate := Rate}} ->
- Rate;
- false ->
- infinity
- end.
+get_counter_rate(#{rate := Rate, capacity := Capacity}, _GlobalCfg)
+ when Rate =/= infinity orelse Capacity =/= infinity -> %% TODO maybe no need to check capacity
+ Rate;
-is_limited(#{rate := Rate, capacity := Capacity}) ->
- Rate =/= infinity orelse Capacity =/= infinity;
-
-is_limited(#{rate := Rate}) ->
- Rate =/= infinity.
+get_counter_rate(_Cfg, #{rate := Rate}) ->
+ Rate.
+-spec get_initial_val(hocons:config()) -> decimal().
get_initial_val(#{initial := Initial,
rate := Rate,
capacity := Capacity}) ->
@@ -598,6 +506,3 @@ get_initial_val(#{initial := Initial,
true ->
0
end.
-
-default_rate_burst_cfg() ->
- #{rate => infinity, burst => 0}.
diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server_sup.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server_sup.erl
index 7f8d227ec..70332481a 100644
--- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server_sup.erl
+++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server_sup.erl
@@ -46,6 +46,7 @@ start(Type) ->
Spec = make_child(Type),
supervisor:start_child(?MODULE, Spec).
+%% XXX This is maybe a workaround, not so good
-spec restart(emqx_limiter_schema:limiter_type()) -> _.
restart(Type) ->
Id = emqx_limiter_server:name(Type),
diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl
index 592d6983a..35012acc7 100644
--- a/apps/emqx/src/emqx_schema.erl
+++ b/apps/emqx/src/emqx_schema.erl
@@ -1197,7 +1197,7 @@ base_listener() ->
#{ default => 'default'
})}
, {"limiter",
- sc(map("ratelimit bucket's name", atom()), #{default => #{}})}
+ sc(map("ratelimit's type", emqx_limiter_schema:bucket_name()), #{default => #{}})}
].
%% utils
diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl
index d97b0197b..256a271b2 100644
--- a/apps/emqx/test/emqx_channel_SUITE.erl
+++ b/apps/emqx/test/emqx_channel_SUITE.erl
@@ -113,57 +113,32 @@ listener_mqtt_ws_conf() ->
listeners_conf() ->
#{tcp => #{default => listener_mqtt_tcp_conf()},
ws => #{default => listener_mqtt_ws_conf()}
- }.
+ }.
limiter_conf() ->
- #{bytes_in =>
- #{bucket =>
- #{default =>
- #{aggregated =>
- #{capacity => infinity,initial => 0,rate => infinity},
- per_client =>
- #{capacity => infinity,divisible => false,
- failure_strategy => force,initial => 0,low_water_mark => 0,
- max_retry_time => 5000,rate => infinity},
- zone => default}},
- global => #{burst => 0,rate => infinity},
- zone => #{default => #{burst => 0,rate => infinity}}},
- connection =>
- #{bucket =>
- #{default =>
- #{aggregated =>
- #{capacity => infinity,initial => 0,rate => infinity},
- per_client =>
- #{capacity => infinity,divisible => false,
- failure_strategy => force,initial => 0,low_water_mark => 0,
- max_retry_time => 5000,rate => infinity},
- zone => default}},
- global => #{burst => 0,rate => infinity},
- zone => #{default => #{burst => 0,rate => infinity}}},
- message_in =>
- #{bucket =>
- #{default =>
- #{aggregated =>
- #{capacity => infinity,initial => 0,rate => infinity},
- per_client =>
- #{capacity => infinity,divisible => false,
- failure_strategy => force,initial => 0,low_water_mark => 0,
- max_retry_time => 5000,rate => infinity},
- zone => default}},
- global => #{burst => 0,rate => infinity},
- zone => #{default => #{burst => 0,rate => infinity}}},
- message_routing =>
- #{bucket =>
- #{default =>
- #{aggregated =>
- #{capacity => infinity,initial => 0,rate => infinity},
- per_client =>
- #{capacity => infinity,divisible => false,
- failure_strategy => force,initial => 0,low_water_mark => 0,
- max_retry_time => 5000,rate => infinity},
- zone => default}},
- global => #{burst => 0,rate => infinity},
- zone => #{default => #{burst => 0,rate => infinity}}}}.
+ Make = fun() ->
+ #{bucket =>
+ #{default =>
+ #{capacity => infinity,
+ initial => 0,
+ rate => infinity,
+ per_client =>
+ #{capacity => infinity,divisible => false,
+ failure_strategy => force,initial => 0,low_water_mark => 0,
+ max_retry_time => 5000,rate => infinity
+ }
+ }
+ },
+ burst => 0,
+ rate => infinity
+ }
+ end,
+
+ lists:foldl(fun(Name, Acc) ->
+ Acc#{Name => Make()}
+ end,
+ #{},
+ [bytes_in, message_in, message_routing, connection, batch]).
stats_conf() ->
#{enable => true}.
@@ -232,7 +207,7 @@ end_per_suite(_Config) ->
init_per_testcase(TestCase, Config) ->
OldConf = set_test_listener_confs(),
emqx_common_test_helpers:start_apps([]),
- modify_limiter(TestCase, OldConf),
+ check_modify_limiter(TestCase),
[{config, OldConf}|Config].
end_per_testcase(_TestCase, Config) ->
@@ -240,18 +215,19 @@ end_per_testcase(_TestCase, Config) ->
emqx_common_test_helpers:stop_apps([]),
Config.
-modify_limiter(TestCase, NewConf) ->
+check_modify_limiter(TestCase) ->
Checks = [t_quota_qos0, t_quota_qos1, t_quota_qos2],
case lists:member(TestCase, Checks) of
true ->
- modify_limiter(NewConf);
+ modify_limiter();
_ ->
ok
end.
%% per_client 5/1s,5
%% aggregated 10/1s,10
-modify_limiter(#{limiter := Limiter} = NewConf) ->
+modify_limiter() ->
+ Limiter = emqx_config:get([limiter]),
#{message_routing := #{bucket := Bucket} = Routing} = Limiter,
#{default := #{per_client := Client} = Default} = Bucket,
Client2 = Client#{rate := 5,
@@ -259,16 +235,15 @@ modify_limiter(#{limiter := Limiter} = NewConf) ->
capacity := 5,
low_water_mark := 1},
Default2 = Default#{per_client := Client2,
- aggregated := #{rate => 10,
- initial => 0,
- capacity => 10
- }},
+ rate => 10,
+ initial => 0,
+ capacity => 10},
Bucket2 = Bucket#{default := Default2},
Routing2 = Routing#{bucket := Bucket2},
- NewConf2 = NewConf#{limiter := Limiter#{message_routing := Routing2}},
- emqx_config:put(NewConf2),
+ emqx_config:put([limiter], Limiter#{message_routing := Routing2}),
emqx_limiter_manager:restart_server(message_routing),
+ timer:sleep(100),
ok.
%%--------------------------------------------------------------------
@@ -1078,4 +1053,4 @@ session(InitFields) when is_map(InitFields) ->
quota() ->
emqx_limiter_container:get_limiter_by_names([message_routing], limiter_cfg()).
-limiter_cfg() -> #{}.
+limiter_cfg() -> #{message_routing => default}.
diff --git a/apps/emqx/test/emqx_ratelimiter_SUITE.erl b/apps/emqx/test/emqx_ratelimiter_SUITE.erl
index ea2e31700..589e78e8e 100644
--- a/apps/emqx/test/emqx_ratelimiter_SUITE.erl
+++ b/apps/emqx/test/emqx_ratelimiter_SUITE.erl
@@ -27,59 +27,37 @@
-define(BASE_CONF, <<"""
limiter {
bytes_in {
- global.rate = infinity
- zone.default.rate = infinity
bucket.default {
- zone = default
- aggregated.rate = infinity
- aggregated.capacity = infinity
- per_client.rate = \"100MB/1s\"
- per_client.capacity = infinity
+ rate = infinity
+ capacity = infinity
}
}
message_in {
- global.rate = infinity
- zone.default.rate = infinity
bucket.default {
- zone = default
- aggregated.rate = infinity
- aggregated.capacity = infinity
- per_client.rate = infinity
- per_client.capacity = infinity
+ rate = infinity
+ capacity = infinity
}
}
connection {
- global.rate = infinity
- zone.default.rate = infinity
bucket.default {
- zone = default
- aggregated.rate = infinity
- aggregated.capacity = infinity
- per_client.rate = infinity
- per_client.capacity = infinity
+ rate = infinity
+ capacity = infinity
}
}
message_routing {
- global.rate = infinity
- zone.default.rate = infinity
bucket.default {
- zone = default
- aggregated.rate = infinity
- aggregated.capacity = infinity
- per_client.rate = infinity
- per_client.capacity = infinity
+ rate = infinity
+ capacity = infinity
}
}
- shared {
+ batch {
bucket.retainer {
- aggregated.rate = infinity
- aggregated.capacity = infinity
- per_client.rate = infinity
- per_client.capacity = infinity
+ rate = infinity
+ capacity = infinity
}
}
}
@@ -97,6 +75,7 @@ limiter {
-define(LOGT(Format, Args), ct:pal("TEST_SUITE: " ++ Format, Args)).
-define(RATE(Rate), to_rate(Rate)).
-define(NOW, erlang:system_time(millisecond)).
+-define(CONST(X), fun(_) -> X end).
%%--------------------------------------------------------------------
%% Setups
@@ -231,12 +210,11 @@ t_low_water_mark(_) ->
with_per_client(default, Cfg, Case).
t_infinity_client(_) ->
- Fun = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) ->
- Aggr2 = Aggr#{rate := infinity,
- capacity := infinity},
+ Fun = fun(#{per_client := Cli} = Bucket) ->
+ Bucket2 = Bucket#{rate := infinity,
+ capacity := infinity},
Cli2 = Cli#{rate := infinity, capacity := infinity},
- Bucket#{aggregated := Aggr2,
- per_client := Cli2}
+ Bucket2#{per_client := Cli2}
end,
Case = fun() ->
Client = connect(default),
@@ -247,14 +225,13 @@ t_infinity_client(_) ->
with_bucket(default, Fun, Case).
t_try_restore_agg(_) ->
- Fun = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) ->
- Aggr2 = Aggr#{rate := 1,
- capacity := 200,
- initial := 50},
+ Fun = fun(#{per_client := Cli} = Bucket) ->
+ Bucket2 = Bucket#{rate := 1,
+ capacity := 200,
+ initial := 50},
Cli2 = Cli#{rate := infinity, capacity := infinity, divisible := true,
max_retry_time := 100, failure_strategy := force},
- Bucket#{aggregated := Aggr2,
- per_client := Cli2}
+ Bucket2#{per_client := Cli2}
end,
Case = fun() ->
Client = connect(default),
@@ -267,15 +244,14 @@ t_try_restore_agg(_) ->
with_bucket(default, Fun, Case).
t_short_board(_) ->
- Fun = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) ->
- Aggr2 = Aggr#{rate := ?RATE("100/1s"),
- initial := 0,
- capacity := 100},
+ Fun = fun(#{per_client := Cli} = Bucket) ->
+ Bucket2 = Bucket#{rate := ?RATE("100/1s"),
+ initial := 0,
+ capacity := 100},
Cli2 = Cli#{rate := ?RATE("600/1s"),
capacity := 600,
initial := 600},
- Bucket#{aggregated := Aggr2,
- per_client := Cli2}
+ Bucket2#{per_client := Cli2}
end,
Case = fun() ->
Counter = counters:new(1, []),
@@ -286,15 +262,14 @@ t_short_board(_) ->
with_bucket(default, Fun, Case).
t_rate(_) ->
- Fun = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) ->
- Aggr2 = Aggr#{rate := ?RATE("100/100ms"),
- initial := 0,
- capacity := infinity},
+ Fun = fun(#{per_client := Cli} = Bucket) ->
+ Bucket2 = Bucket#{rate := ?RATE("100/100ms"),
+ initial := 0,
+ capacity := infinity},
Cli2 = Cli#{rate := infinity,
capacity := infinity,
initial := 0},
- Bucket#{aggregated := Aggr2,
- per_client := Cli2}
+ Bucket2#{per_client := Cli2}
end,
Case = fun() ->
Client = connect(default),
@@ -311,113 +286,74 @@ t_rate(_) ->
t_capacity(_) ->
Capacity = 600,
- Fun = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) ->
- Aggr2 = Aggr#{rate := ?RATE("100/100ms"),
- initial := 0,
- capacity := 600},
- Cli2 = Cli#{rate := infinity,
- capacity := infinity,
- initial := 0},
- Bucket#{aggregated := Aggr2,
- per_client := Cli2}
+ Fun = fun(#{per_client := Cli} = Bucket) ->
+ Bucket2 = Bucket#{rate := ?RATE("100/100ms"),
+ initial := 0,
+ capacity := 600},
+ Cli2 = Cli#{rate := infinity,
+ capacity := infinity,
+ initial := 0},
+ Bucket2#{per_client := Cli2}
end,
Case = fun() ->
- Client = connect(default),
- timer:sleep(1000),
+ Client = connect(default),
+ timer:sleep(1000),
C1 = emqx_htb_limiter:available(Client),
?assertEqual(Capacity, C1, "test bucket capacity")
end,
with_bucket(default, Fun, Case).
-%%--------------------------------------------------------------------
-%% Test Cases Zone Level
-%%--------------------------------------------------------------------
-t_limit_zone_with_unlimit_bucket(_) ->
- ZoneMod = fun(Cfg) ->
- Cfg#{rate := ?RATE("600/1s"),
- burst := ?RATE("60/1s")}
- end,
-
- Bucket = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) ->
- Aggr2 = Aggr#{rate := infinity,
- initial := 0,
- capacity := infinity},
- Cli2 = Cli#{rate := infinity,
- initial := 0,
- capacity := infinity,
- divisible := true},
- Bucket#{aggregated := Aggr2, per_client := Cli2}
- end,
-
- Case = fun() ->
- C1 = counters:new(1, []),
- start_client(b1, ?NOW + 2000, C1, 20),
- timer:sleep(2100),
- check_average_rate(C1, 2, 600)
- end,
-
- with_zone(default, ZoneMod, [{b1, Bucket}], Case).
-
-
%%--------------------------------------------------------------------
%% Test Cases Global Level
%%--------------------------------------------------------------------
-t_burst_and_fairness(_) ->
+t_collaborative_alloc(_) ->
GlobalMod = fun(Cfg) ->
- Cfg#{burst := ?RATE("60/1s")}
+ Cfg#{rate := ?RATE("600/1s")}
end,
- ZoneMod = fun(Cfg) ->
- Cfg#{rate := ?RATE("600/1s"),
- burst := ?RATE("60/1s")}
- end,
-
- Bucket = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) ->
- Aggr2 = Aggr#{rate := ?RATE("500/1s"),
- initial := 0,
- capacity := 500},
- Cli2 = Cli#{rate := ?RATE("600/1s"),
- capacity := 600,
- initial := 600},
- Bucket#{aggregated := Aggr2,
- per_client := Cli2}
+ Bucket1 = fun(#{per_client := Cli} = Bucket) ->
+ Bucket2 = Bucket#{rate := ?RATE("400/1s"),
+ initial := 0,
+ capacity := 600},
+ Cli2 = Cli#{rate := ?RATE("50"),
+ capacity := 100,
+ initial := 100},
+ Bucket2#{per_client := Cli2}
end,
+ Bucket2 = fun(Bucket) ->
+ Bucket2 = Bucket1(Bucket),
+ Bucket2#{rate := ?RATE("200/1s")}
+ end,
+
Case = fun() ->
C1 = counters:new(1, []),
C2 = counters:new(1, []),
start_client(b1, ?NOW + 2000, C1, 20),
start_client(b2, ?NOW + 2000, C2, 30),
timer:sleep(2100),
- check_average_rate(C1, 2, 330),
- check_average_rate(C2, 2, 330)
+ check_average_rate(C1, 2, 300),
+ check_average_rate(C2, 2, 300)
end,
with_global(GlobalMod,
- default,
- ZoneMod,
- [{b1, Bucket}, {b2, Bucket}],
+ [{b1, Bucket1}, {b2, Bucket2}],
Case).
t_burst(_) ->
GlobalMod = fun(Cfg) ->
- Cfg#{burst := ?RATE("60/1s")}
+ Cfg#{rate := ?RATE("200/1s"),
+ burst := ?RATE("400/1s")}
end,
- ZoneMod = fun(Cfg) ->
- Cfg#{rate := ?RATE("60/1s"),
- burst := ?RATE("60/1s")}
- end,
-
- Bucket = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) ->
- Aggr2 = Aggr#{rate := ?RATE("500/1s"),
- initial := 0,
- capacity := 500},
- Cli2 = Cli#{rate := ?RATE("600/1s"),
- capacity := 600,
+ Bucket = fun(#{per_client := Cli} = Bucket) ->
+ Bucket2 = Bucket#{rate := ?RATE("200/1s"),
+ initial := 0,
+ capacity := 200},
+ Cli2 = Cli#{rate := ?RATE("50/1s"),
+ capacity := 200,
divisible := true},
- Bucket#{aggregated := Aggr2,
- per_client := Cli2}
+ Bucket2#{per_client := Cli2}
end,
Case = fun() ->
@@ -430,180 +366,39 @@ t_burst(_) ->
timer:sleep(2100),
Total = lists:sum([counters:get(X, 1) || X <- [C1, C2, C3]]),
- in_range(Total / 2, 30)
+ in_range(Total / 2, 300)
end,
with_global(GlobalMod,
- default,
- ZoneMod,
[{b1, Bucket}, {b2, Bucket}, {b3, Bucket}],
Case).
-
t_limit_global_with_unlimit_other(_) ->
GlobalMod = fun(Cfg) ->
Cfg#{rate := ?RATE("600/1s")}
end,
- ZoneMod = fun(Cfg) -> Cfg#{rate := infinity} end,
-
- Bucket = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) ->
- Aggr2 = Aggr#{rate := infinity,
- initial := 0,
- capacity := infinity},
- Cli2 = Cli#{rate := infinity,
- capacity := infinity,
- initial := 0},
- Bucket#{aggregated := Aggr2,
- per_client := Cli2}
+ Bucket = fun(#{per_client := Cli} = Bucket) ->
+ Bucket2 = Bucket#{rate := infinity,
+ initial := 0,
+ capacity := infinity},
+ Cli2 = Cli#{rate := infinity,
+ capacity := infinity,
+ initial := 0},
+ Bucket2#{per_client := Cli2}
end,
Case = fun() ->
- C1 = counters:new(1, []),
- start_client(b1, ?NOW + 2000, C1, 20),
- timer:sleep(2100),
- check_average_rate(C1, 2, 600)
+ C1 = counters:new(1, []),
+ start_client(b1, ?NOW + 2000, C1, 20),
+ timer:sleep(2100),
+ check_average_rate(C1, 2, 600)
end,
with_global(GlobalMod,
- default,
- ZoneMod,
[{b1, Bucket}],
Case).
-t_multi_zones(_) ->
- GlobalMod = fun(Cfg) ->
- Cfg#{rate := ?RATE("600/1s")}
- end,
-
- Zone1 = fun(Cfg) ->
- Cfg#{rate := ?RATE("400/1s")}
- end,
-
- Zone2 = fun(Cfg) ->
- Cfg#{rate := ?RATE("500/1s")}
- end,
-
- Bucket = fun(Zone, Rate) ->
- fun(#{aggregated := Aggr, per_client := Cli} = Bucket) ->
- Aggr2 = Aggr#{rate := infinity,
- initial := 0,
- capacity := infinity},
- Cli2 = Cli#{rate := Rate,
- capacity := infinity,
- initial := 0},
- Bucket#{aggregated := Aggr2,
- per_client := Cli2,
- zone := Zone}
- end
- end,
-
- Case = fun() ->
- C1 = counters:new(1, []),
- C2 = counters:new(1, []),
- start_client(b1, ?NOW + 2000, C1, 25),
- start_client(b2, ?NOW + 2000, C2, 20),
- timer:sleep(2100),
- check_average_rate(C1, 2, 300),
- check_average_rate(C2, 2, 300)
- end,
-
- with_global(GlobalMod,
- [z1, z2],
- [Zone1, Zone2],
- [{b1, Bucket(z1, ?RATE("400/1s"))}, {b2, Bucket(z2, ?RATE("500/1s"))}],
- Case).
-
-%% because the simulated client will try to reach the maximum rate
-%% when divisiable = true, a large number of divided tokens will be generated
-%% so this is not an accurate test
-t_multi_zones_with_divisible(_) ->
- GlobalMod = fun(Cfg) ->
- Cfg#{rate := ?RATE("600/1s")}
- end,
-
- Zone1 = fun(Cfg) ->
- Cfg#{rate := ?RATE("400/1s")}
- end,
-
- Zone2 = fun(Cfg) ->
- Cfg#{rate := ?RATE("500/1s")}
- end,
-
- Bucket = fun(Zone, Rate) ->
- fun(#{aggregated := Aggr, per_client := Cli} = Bucket) ->
- Aggr2 = Aggr#{rate := Rate,
- initial := 0,
- capacity := infinity},
- Cli2 = Cli#{rate := Rate,
- divisible := true,
- capacity := infinity,
- initial := 0},
- Bucket#{aggregated := Aggr2,
- per_client := Cli2,
- zone := Zone}
- end
- end,
-
- Case = fun() ->
- C1 = counters:new(1, []),
- C2 = counters:new(1, []),
- start_client(b1, ?NOW + 2000, C1, 25),
- start_client(b2, ?NOW + 2000, C2, 20),
- timer:sleep(2100),
- check_average_rate(C1, 2, 300),
- check_average_rate(C2, 2, 300)
- end,
-
- with_global(GlobalMod,
- [z1, z2],
- [Zone1, Zone2],
- [{b1, Bucket(z1, ?RATE("400/1s"))}, {b2, Bucket(z2, ?RATE("500/1s"))}],
- Case).
-
-t_zone_hunger_and_fair(_) ->
- GlobalMod = fun(Cfg) ->
- Cfg#{rate := ?RATE("600/1s")}
- end,
-
- Zone1 = fun(Cfg) ->
- Cfg#{rate := ?RATE("600/1s")}
- end,
-
- Zone2 = fun(Cfg) ->
- Cfg#{rate := ?RATE("50/1s")}
- end,
-
- Bucket = fun(Zone, Rate) ->
- fun(#{aggregated := Aggr, per_client := Cli} = Bucket) ->
- Aggr2 = Aggr#{rate := infinity,
- initial := 0,
- capacity := infinity},
- Cli2 = Cli#{rate := Rate,
- capacity := infinity,
- initial := 0},
- Bucket#{aggregated := Aggr2,
- per_client := Cli2,
- zone := Zone}
- end
- end,
-
- Case = fun() ->
- C1 = counters:new(1, []),
- C2 = counters:new(1, []),
- start_client(b1, ?NOW + 2000, C1, 20),
- start_client(b2, ?NOW + 2000, C2, 20),
- timer:sleep(2100),
- check_average_rate(C1, 2, 550),
- check_average_rate(C2, 2, 50)
- end,
-
- with_global(GlobalMod,
- [z1, z2],
- [Zone1, Zone2],
- [{b1, Bucket(z1, ?RATE("600/1s"))}, {b2, Bucket(z2, ?RATE("50/1s"))}],
- Case).
-
%%--------------------------------------------------------------------
%% Test Cases container
%%--------------------------------------------------------------------
@@ -626,7 +421,8 @@ t_check_container(_) ->
capacity := 1000}
end,
Case = fun() ->
- C1 = emqx_limiter_container:new([message_routing]),
+ C1 = emqx_limiter_container:new([message_routing],
+ #{message_routing => default}),
{ok, C2} = emqx_limiter_container:check(1000, message_routing, C1),
{pause, Pause, C3} = emqx_limiter_container:check(1000, message_routing, C2),
timer:sleep(Pause),
@@ -663,9 +459,7 @@ t_limiter_server(_) ->
?assertMatch(#{root := _,
counter := _,
index := _,
- zones := _,
buckets := _,
- nodes := _,
type := message_routing}, State),
Name = emqx_limiter_server:name(message_routing),
@@ -675,6 +469,32 @@ t_limiter_server(_) ->
ok = emqx_limiter_server:format_status(normal, ok),
ok.
+t_decimal(_) ->
+ ?assertEqual(infinity, emqx_limiter_decimal:add(infinity, 3)),
+ ?assertEqual(5, emqx_limiter_decimal:add(2, 3)),
+ ?assertEqual(infinity, emqx_limiter_decimal:sub(infinity, 3)),
+ ?assertEqual(-1, emqx_limiter_decimal:sub(2, 3)),
+ ?assertEqual(infinity, emqx_limiter_decimal:mul(infinity, 3)),
+ ?assertEqual(6, emqx_limiter_decimal:mul(2, 3)),
+ ?assertEqual(infinity, emqx_limiter_decimal:floor_div(infinity, 3)),
+ ?assertEqual(2, emqx_limiter_decimal:floor_div(7, 3)),
+ ok.
+
+t_schema_unit(_) ->
+ M = emqx_limiter_schema,
+ ?assertEqual(limiter, M:namespace()),
+ ?assertEqual({ok, infinity}, M:to_rate(" infinity ")),
+ ?assertMatch({ok, _}, M:to_rate("100")),
+ ?assertMatch({error, _}, M:to_rate("0")),
+ ?assertMatch({ok, _}, M:to_rate("100/10s")),
+ ?assertMatch({error, _}, M:to_rate("100/10x")),
+ ?assertEqual({ok, infinity}, M:to_capacity("infinity")),
+ ?assertEqual({ok, 100}, M:to_capacity("100")),
+ ?assertEqual({ok, 100 * 1024}, M:to_capacity("100KB")),
+ ?assertEqual({ok, 100 * 1024 * 1024}, M:to_capacity("100MB")),
+ ?assertEqual({ok, 100 * 1024 * 1024 * 1024}, M:to_capacity("100GB")),
+ ok.
+
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
@@ -752,7 +572,6 @@ client_try_check(Need, #client{counter = Counter,
end
end.
-
%% XXX not a god test, because client's rate maybe bigger than global rate
%% so if client' rate = infinity
%% client's divisible should be true or capacity must be bigger than number of each consume
@@ -769,25 +588,17 @@ to_rate(Str) ->
{ok, Rate} = emqx_limiter_schema:to_rate(Str),
Rate.
-with_global(Modifier, ZoneName, ZoneModifier, Buckets, Case) ->
- Path = [limiter, message_routing],
- #{global := Global} = Cfg = emqx_config:get(Path),
- Cfg2 = Cfg#{global := Modifier(Global)},
- with_zone(Cfg2, ZoneName, ZoneModifier, Buckets, Case).
+with_global(Modifier, BuckeTemps, Case) ->
+ Fun = fun(Cfg) ->
+ #{bucket := #{default := BucketCfg}} = Cfg2 = Modifier(Cfg),
+ Fun = fun({Name, BMod}, Acc) ->
+ Acc#{Name => BMod(BucketCfg)}
+ end,
+ Buckets = lists:foldl(Fun, #{}, BuckeTemps),
+ Cfg2#{bucket := Buckets}
+ end,
-with_zone(Name, Modifier, Buckets, Case) ->
- Path = [limiter, message_routing],
- Cfg = emqx_config:get(Path),
- with_zone(Cfg, Name, Modifier, Buckets, Case).
-
-with_zone(Cfg, Name, Modifier, Buckets, Case) ->
- Path = [limiter, message_routing],
- #{zone := ZoneCfgs,
- bucket := BucketCfgs} = Cfg,
- ZoneCfgs2 = apply_modifier(Name, Modifier, ZoneCfgs),
- BucketCfgs2 = apply_modifier(Buckets, BucketCfgs),
- Cfg2 = Cfg#{zone := ZoneCfgs2, bucket := BucketCfgs2},
- with_config(Path, fun(_) -> Cfg2 end, Case).
+ with_config([limiter, message_routing], Fun, Case).
with_bucket(Bucket, Modifier, Case) ->
Path = [limiter, message_routing, bucket, Bucket],
@@ -802,8 +613,8 @@ with_config(Path, Modifier, Case) ->
NewCfg = Modifier(Cfg),
ct:pal("test with config:~p~n", [NewCfg]),
emqx_config:put(Path, NewCfg),
- emqx_limiter_manager:restart_server(message_routing),
- timer:sleep(100),
+ emqx_limiter_server:update_config(message_routing),
+ timer:sleep(500),
DelayReturn = delay_return(Case),
emqx_config:put(Path, Cfg),
DelayReturn().
diff --git a/apps/emqx/test/emqx_ws_connection_SUITE.erl b/apps/emqx/test/emqx_ws_connection_SUITE.erl
index a2f5c3502..455cf3e43 100644
--- a/apps/emqx/test/emqx_ws_connection_SUITE.erl
+++ b/apps/emqx/test/emqx_ws_connection_SUITE.erl
@@ -405,16 +405,30 @@ t_handle_timeout_emit_stats(_) ->
?assertEqual(undefined, ?ws_conn:info(stats_timer, St)).
t_ensure_rate_limit(_) ->
+ %% XXX In the future, limiter should provide API for config update
+ Path = [limiter, bytes_in, bucket, default, per_client],
+ PerClient = emqx_config:get(Path),
+ {ok, Rate}= emqx_limiter_schema:to_rate("50MB"),
+ emqx_config:put(Path, PerClient#{rate := Rate}),
+ emqx_limiter_server:update_config(bytes_in),
+ timer:sleep(100),
+
Limiter = init_limiter(),
St = st(#{limiter => Limiter}),
- {ok, Need} = emqx_limiter_schema:to_capacity("1GB"), %% must bigger than value in emqx_ratelimit_SUITE
+
+ %% must bigger than value in emqx_ratelimit_SUITE
+ {ok, Need} = emqx_limiter_schema:to_capacity("1GB"),
St1 = ?ws_conn:check_limiter([{Need, bytes_in}],
[],
fun(_, _, S) -> S end,
[],
St),
?assertEqual(blocked, ?ws_conn:info(sockstate, St1)),
- ?assertEqual([{active, false}], ?ws_conn:info(postponed, St1)).
+ ?assertEqual([{active, false}], ?ws_conn:info(postponed, St1)),
+
+ emqx_config:put(Path, PerClient),
+ emqx_limiter_server:update_config(bytes_in),
+ timer:sleep(100).
t_parse_incoming(_) ->
{Packets, St} = ?ws_conn:parse_incoming(<<48,3>>, [], st()),
@@ -558,7 +572,7 @@ ws_client(State) ->
ct:fail(ws_timeout)
end.
-limiter_cfg() -> #{}.
+limiter_cfg() -> #{bytes_in => default, message_in => default}.
init_limiter() ->
emqx_limiter_container:get_limiter_by_names([bytes_in, message_in], limiter_cfg()).
diff --git a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl
index 6121e39b6..093c6fcc4 100644
--- a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl
+++ b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl
@@ -493,6 +493,8 @@ typename_to_spec("failure_strategy()", _Mod) ->
#{type => string, example => <<"force">>};
typename_to_spec("initial()", _Mod) ->
#{type => string, example => <<"0M">>};
+typename_to_spec("bucket_name()", _Mod) ->
+ #{type => string, example => <<"retainer">>};
typename_to_spec(Name, Mod) ->
Spec = range(Name),
Spec1 = remote_module_type(Spec, Name, Mod),
diff --git a/apps/emqx_retainer/etc/emqx_retainer.conf b/apps/emqx_retainer/etc/emqx_retainer.conf
index e561bc52f..051f44940 100644
--- a/apps/emqx_retainer/etc/emqx_retainer.conf
+++ b/apps/emqx_retainer/etc/emqx_retainer.conf
@@ -57,10 +57,15 @@ retainer {
## Default: 0
batch_deliver_number = 0
- ## deliver limiter bucket
+ ## The rate limiter name for retained messages delivery.
+ ## In order to avoid delivering too many messages to the client at once, which may cause the client
+ ## to block or crash, or message dropped due to exceeding the size of the message queue. We need
+ ## to specify a rate limiter for the retained messages delivery to the client.
##
- ## Default: 0s
- limiter_bucket_name = retainer
+ ## The names of the available rate limiters are taken from the existing rate limiters under `limiter.batch`.
+ ## You can remove this field if you don't want any limit
+ ## Default: retainer
+ batch_deliver_limiter = retainer
}
## Maximum retained message size.
diff --git a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl
index d57c49799..6eb5457b7 100644
--- a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl
+++ b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl
@@ -85,8 +85,8 @@ start_link(Pool, Id) ->
init([Pool, Id]) ->
erlang:process_flag(trap_exit, true),
true = gproc_pool:connect_worker(Pool, {Pool, Id}),
- Bucket = emqx:get_config([retainer, flow_control, limiter_bucket_name]),
- Limiter = emqx_limiter_server:connect(shared, Bucket),
+ BucketName = emqx:get_config([retainer, flow_control, batch_deliver_limiter]),
+ Limiter = emqx_limiter_server:connect(batch, BucketName),
{ok, #{pool => Pool, id => Id, limiter => Limiter}}.
%%--------------------------------------------------------------------
@@ -124,8 +124,8 @@ handle_cast({dispatch, Context, Pid, Topic}, #{limiter := Limiter} = State) ->
{noreply, State#{limiter := Limiter2}};
handle_cast(refresh_limiter, State) ->
- Bucket = emqx:get_config([retainer, flow_control, limiter_bucket_name]),
- Limiter = emqx_limiter_server:connect(shared, Bucket),
+ BucketName = emqx:get_config([retainer, flow_control, batch_deliver_limiter]),
+ Limiter = emqx_limiter_server:connect(batch, BucketName),
{noreply, State#{limiter := Limiter}};
handle_cast(Msg, State) ->
@@ -198,14 +198,15 @@ dispatch(Context, Pid, Topic, Cursor, Limiter) ->
Mod = emqx_retainer:get_backend_module(),
case Cursor =/= undefined orelse emqx_topic:wildcard(Topic) of
false ->
- {ok, Result} = Mod:read_message(Context, Topic),
+ {ok, Result} = erlang:apply(Mod, read_message, [Context, Topic]),
deliver(Result, Context, Pid, Topic, undefined, Limiter);
true ->
- {ok, Result, NewCursor} = Mod:match_messages(Context, Topic, Cursor),
+ {ok, Result, NewCursor} = erlang:apply(Mod, match_messages, [Context, Topic, Cursor]),
deliver(Result, Context, Pid, Topic, NewCursor, Limiter)
end.
--spec deliver(list(emqx_types:message()), context(), pid(), topic(), cursor(), limiter()) -> {ok, limiter()}.
+-spec deliver(list(emqx_types:message()), context(), pid(), topic(), cursor(), limiter()) ->
+ {ok, limiter()}.
deliver([], _Context, _Pid, _Topic, undefined, Limiter) ->
{ok, Limiter};
diff --git a/apps/emqx_retainer/src/emqx_retainer_schema.erl b/apps/emqx_retainer/src/emqx_retainer_schema.erl
index 395f0e363..12189c737 100644
--- a/apps/emqx_retainer/src/emqx_retainer_schema.erl
+++ b/apps/emqx_retainer/src/emqx_retainer_schema.erl
@@ -29,7 +29,7 @@ fields(mnesia_config) ->
fields(flow_control) ->
[ {batch_read_number, sc(integer(), 0, fun is_pos_integer/1)}
, {batch_deliver_number, sc(range(0, 1000), 0)}
- , {limiter_bucket_name, sc(atom(), retainer)}
+ , {batch_deliver_limiter, sc(emqx_limiter_schema:bucket_name(), undefined)}
].
%%--------------------------------------------------------------------
diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl
index a9405373e..9e957f214 100644
--- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl
+++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl
@@ -36,7 +36,7 @@ retainer {
flow_control {
batch_read_number = 0
batch_deliver_number = 0
- limiter_bucket_name = retainer
+ batch_deliver_limiter = retainer
}
backend {
type = built_in_database
@@ -281,12 +281,11 @@ t_stop_publish_clear_msg(_) ->
ok = emqtt:disconnect(C1).
t_flow_control(_) ->
- #{per_client := PerClient} = RetainerCfg = emqx_config:get([limiter, shared, bucket, retainer]),
- RetainerCfg2 = RetainerCfg#{
- per_client := PerClient#{
- rate := emqx_ratelimiter_SUITE:to_rate("1/1s"),
- capacity := 1}},
- emqx_config:put([limiter, shared, bucket, retainer], RetainerCfg2),
+ #{per_client := PerClient} = RetainerCfg = emqx_config:get([limiter, batch, bucket, retainer]),
+ RetainerCfg2 = RetainerCfg#{per_client :=
+ PerClient#{rate := emqx_ratelimiter_SUITE:to_rate("1/1s"),
+ capacity := 1}},
+ emqx_config:put([limiter, batch, bucket, retainer], RetainerCfg2),
emqx_limiter_manager:restart_server(shared),
timer:sleep(500),
@@ -296,7 +295,7 @@ t_flow_control(_) ->
emqx_retainer:update_config(#{<<"flow_control">> =>
#{<<"batch_read_number">> => 1,
<<"batch_deliver_number">> => 1,
- <<"limiter_bucket_name">> => retainer}}),
+ <<"batch_deliver_limiter">> => retainer}}),
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1),
emqtt:publish(
@@ -326,7 +325,7 @@ t_flow_control(_) ->
ok = emqtt:disconnect(C1),
%% recover the limiter
- emqx_config:put([limiter, shared, bucket, retainer], RetainerCfg),
+ emqx_config:put([limiter, batch, bucket, retainer], RetainerCfg),
emqx_limiter_manager:restart_server(shared),
timer:sleep(500),