Merge pull request #10376 from lafirest/fix/simplify_limiter_cfg
fix(limiter): simplify the configuration of the limiter
This commit is contained in:
commit
03ea04827d
|
@ -182,10 +182,8 @@
|
||||||
-define(ALARM_SOCK_STATS_KEYS, [send_pend, recv_cnt, recv_oct, send_cnt, send_oct]).
|
-define(ALARM_SOCK_STATS_KEYS, [send_pend, recv_cnt, recv_oct, send_cnt, send_oct]).
|
||||||
-define(ALARM_SOCK_OPTS_KEYS, [high_watermark, high_msgq_watermark, sndbuf, recbuf, buffer]).
|
-define(ALARM_SOCK_OPTS_KEYS, [high_watermark, high_msgq_watermark, sndbuf, recbuf, buffer]).
|
||||||
|
|
||||||
%% use macro to do compile time limiter's type check
|
-define(LIMITER_BYTES_IN, bytes).
|
||||||
-define(LIMITER_BYTES_IN, bytes_in).
|
-define(LIMITER_MESSAGE_IN, messages).
|
||||||
-define(LIMITER_MESSAGE_IN, message_in).
|
|
||||||
-define(EMPTY_QUEUE, {[], []}).
|
|
||||||
|
|
||||||
-dialyzer({no_match, [info/2]}).
|
-dialyzer({no_match, [info/2]}).
|
||||||
-dialyzer(
|
-dialyzer(
|
||||||
|
|
|
@ -139,7 +139,8 @@ make_token_bucket_limiter(Cfg, Bucket) ->
|
||||||
Cfg#{
|
Cfg#{
|
||||||
tokens => emqx_limiter_server:get_initial_val(Cfg),
|
tokens => emqx_limiter_server:get_initial_val(Cfg),
|
||||||
lasttime => ?NOW,
|
lasttime => ?NOW,
|
||||||
bucket => Bucket
|
bucket => Bucket,
|
||||||
|
capacity => emqx_limiter_schema:calc_capacity(Cfg)
|
||||||
}.
|
}.
|
||||||
|
|
||||||
%%@doc create a limiter server's reference
|
%%@doc create a limiter server's reference
|
||||||
|
|
|
@ -23,6 +23,7 @@
|
||||||
%% API
|
%% API
|
||||||
-export([
|
-export([
|
||||||
new/3,
|
new/3,
|
||||||
|
infinity_bucket/0,
|
||||||
check/3,
|
check/3,
|
||||||
try_restore/2,
|
try_restore/2,
|
||||||
available/1
|
available/1
|
||||||
|
@ -58,6 +59,10 @@ new(Counter, Index, Rate) ->
|
||||||
rate => Rate
|
rate => Rate
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
-spec infinity_bucket() -> bucket_ref().
|
||||||
|
infinity_bucket() ->
|
||||||
|
infinity.
|
||||||
|
|
||||||
%% @doc check tokens
|
%% @doc check tokens
|
||||||
-spec check(pos_integer(), bucket_ref(), Disivisble :: boolean()) ->
|
-spec check(pos_integer(), bucket_ref(), Disivisble :: boolean()) ->
|
||||||
HasToken ::
|
HasToken ::
|
||||||
|
|
|
@ -31,20 +31,20 @@
|
||||||
get_bucket_cfg_path/2,
|
get_bucket_cfg_path/2,
|
||||||
desc/1,
|
desc/1,
|
||||||
types/0,
|
types/0,
|
||||||
infinity_value/0
|
calc_capacity/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-define(KILOBYTE, 1024).
|
-define(KILOBYTE, 1024).
|
||||||
-define(BUCKET_KEYS, [
|
-define(BUCKET_KEYS, [
|
||||||
{bytes_in, bucket_infinity},
|
{bytes, bucket_infinity},
|
||||||
{message_in, bucket_infinity},
|
{messages, bucket_infinity},
|
||||||
{connection, bucket_limit},
|
{connection, bucket_limit},
|
||||||
{message_routing, bucket_infinity}
|
{message_routing, bucket_infinity}
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-type limiter_type() ::
|
-type limiter_type() ::
|
||||||
bytes_in
|
bytes
|
||||||
| message_in
|
| messages
|
||||||
| connection
|
| connection
|
||||||
| message_routing
|
| message_routing
|
||||||
%% internal limiter for unclassified resources
|
%% internal limiter for unclassified resources
|
||||||
|
@ -90,14 +90,17 @@
|
||||||
|
|
||||||
namespace() -> limiter.
|
namespace() -> limiter.
|
||||||
|
|
||||||
roots() -> [limiter].
|
roots() ->
|
||||||
|
[{limiter, hoconsc:mk(hoconsc:ref(?MODULE, limiter), #{importance => ?IMPORTANCE_HIDDEN})}].
|
||||||
|
|
||||||
fields(limiter) ->
|
fields(limiter) ->
|
||||||
[
|
[
|
||||||
{Type,
|
{Type,
|
||||||
?HOCON(?R_REF(node_opts), #{
|
?HOCON(?R_REF(node_opts), #{
|
||||||
desc => ?DESC(Type),
|
desc => ?DESC(Type),
|
||||||
default => #{}
|
default => #{},
|
||||||
|
importance => ?IMPORTANCE_HIDDEN,
|
||||||
|
aliases => alias_of_type(Type)
|
||||||
})}
|
})}
|
||||||
|| Type <- types()
|
|| Type <- types()
|
||||||
] ++
|
] ++
|
||||||
|
@ -107,6 +110,7 @@ fields(limiter) ->
|
||||||
?R_REF(client_fields),
|
?R_REF(client_fields),
|
||||||
#{
|
#{
|
||||||
desc => ?DESC(client),
|
desc => ?DESC(client),
|
||||||
|
importance => ?IMPORTANCE_HIDDEN,
|
||||||
default => maps:from_list([
|
default => maps:from_list([
|
||||||
{erlang:atom_to_binary(Type), #{}}
|
{erlang:atom_to_binary(Type), #{}}
|
||||||
|| Type <- types()
|
|| Type <- types()
|
||||||
|
@ -124,30 +128,50 @@ fields(node_opts) ->
|
||||||
})}
|
})}
|
||||||
];
|
];
|
||||||
fields(client_fields) ->
|
fields(client_fields) ->
|
||||||
[
|
client_fields(types(), #{default => #{}});
|
||||||
{Type,
|
|
||||||
?HOCON(?R_REF(client_opts), #{
|
|
||||||
desc => ?DESC(Type),
|
|
||||||
default => #{}
|
|
||||||
})}
|
|
||||||
|| Type <- types()
|
|
||||||
];
|
|
||||||
fields(bucket_infinity) ->
|
fields(bucket_infinity) ->
|
||||||
[
|
[
|
||||||
{rate, ?HOCON(rate(), #{desc => ?DESC(rate), default => <<"infinity">>})},
|
{rate, ?HOCON(rate(), #{desc => ?DESC(rate), default => <<"infinity">>})},
|
||||||
{capacity, ?HOCON(capacity(), #{desc => ?DESC(capacity), default => <<"infinity">>})},
|
{burst,
|
||||||
{initial, ?HOCON(initial(), #{default => <<"0">>, desc => ?DESC(initial)})}
|
?HOCON(capacity(), #{
|
||||||
|
desc => ?DESC(capacity),
|
||||||
|
default => <<"0">>,
|
||||||
|
importance => ?IMPORTANCE_HIDDEN,
|
||||||
|
aliases => [capacity]
|
||||||
|
})},
|
||||||
|
{initial,
|
||||||
|
?HOCON(initial(), #{
|
||||||
|
default => <<"0">>,
|
||||||
|
desc => ?DESC(initial),
|
||||||
|
importance => ?IMPORTANCE_HIDDEN
|
||||||
|
})}
|
||||||
];
|
];
|
||||||
fields(bucket_limit) ->
|
fields(bucket_limit) ->
|
||||||
[
|
[
|
||||||
{rate, ?HOCON(rate(), #{desc => ?DESC(rate), default => <<"1000/s">>})},
|
{rate, ?HOCON(rate(), #{desc => ?DESC(rate), default => <<"1000/s">>})},
|
||||||
{capacity, ?HOCON(capacity(), #{desc => ?DESC(capacity), default => <<"1000">>})},
|
{burst,
|
||||||
{initial, ?HOCON(initial(), #{default => <<"0">>, desc => ?DESC(initial)})}
|
?HOCON(capacity(), #{
|
||||||
|
desc => ?DESC(burst),
|
||||||
|
default => <<"0">>,
|
||||||
|
importance => ?IMPORTANCE_HIDDEN,
|
||||||
|
aliases => [capacity]
|
||||||
|
})},
|
||||||
|
{initial,
|
||||||
|
?HOCON(initial(), #{
|
||||||
|
default => <<"0">>,
|
||||||
|
desc => ?DESC(initial),
|
||||||
|
importance => ?IMPORTANCE_HIDDEN
|
||||||
|
})}
|
||||||
];
|
];
|
||||||
fields(client_opts) ->
|
fields(client_opts) ->
|
||||||
[
|
[
|
||||||
{rate, ?HOCON(rate(), #{default => <<"infinity">>, desc => ?DESC(rate)})},
|
{rate, ?HOCON(rate(), #{default => <<"infinity">>, desc => ?DESC(rate)})},
|
||||||
{initial, ?HOCON(initial(), #{default => <<"0">>, desc => ?DESC(initial)})},
|
{initial,
|
||||||
|
?HOCON(initial(), #{
|
||||||
|
default => <<"0">>,
|
||||||
|
desc => ?DESC(initial),
|
||||||
|
importance => ?IMPORTANCE_HIDDEN
|
||||||
|
})},
|
||||||
%% low_watermark add for emqx_channel and emqx_session
|
%% low_watermark add for emqx_channel and emqx_session
|
||||||
%% both modules consume first and then check
|
%% both modules consume first and then check
|
||||||
%% so we need to use this value to prevent excessive consumption
|
%% so we need to use this value to prevent excessive consumption
|
||||||
|
@ -157,20 +181,24 @@ fields(client_opts) ->
|
||||||
initial(),
|
initial(),
|
||||||
#{
|
#{
|
||||||
desc => ?DESC(low_watermark),
|
desc => ?DESC(low_watermark),
|
||||||
default => <<"0">>
|
default => <<"0">>,
|
||||||
|
importance => ?IMPORTANCE_HIDDEN
|
||||||
}
|
}
|
||||||
)},
|
)},
|
||||||
{capacity,
|
{burst,
|
||||||
?HOCON(capacity(), #{
|
?HOCON(capacity(), #{
|
||||||
desc => ?DESC(client_bucket_capacity),
|
desc => ?DESC(burst),
|
||||||
default => <<"infinity">>
|
default => <<"0">>,
|
||||||
|
importance => ?IMPORTANCE_HIDDEN,
|
||||||
|
aliases => [capacity]
|
||||||
})},
|
})},
|
||||||
{divisible,
|
{divisible,
|
||||||
?HOCON(
|
?HOCON(
|
||||||
boolean(),
|
boolean(),
|
||||||
#{
|
#{
|
||||||
desc => ?DESC(divisible),
|
desc => ?DESC(divisible),
|
||||||
default => false
|
default => false,
|
||||||
|
importance => ?IMPORTANCE_HIDDEN
|
||||||
}
|
}
|
||||||
)},
|
)},
|
||||||
{max_retry_time,
|
{max_retry_time,
|
||||||
|
@ -178,7 +206,8 @@ fields(client_opts) ->
|
||||||
emqx_schema:duration(),
|
emqx_schema:duration(),
|
||||||
#{
|
#{
|
||||||
desc => ?DESC(max_retry_time),
|
desc => ?DESC(max_retry_time),
|
||||||
default => <<"10s">>
|
default => <<"10s">>,
|
||||||
|
importance => ?IMPORTANCE_HIDDEN
|
||||||
}
|
}
|
||||||
)},
|
)},
|
||||||
{failure_strategy,
|
{failure_strategy,
|
||||||
|
@ -186,16 +215,18 @@ fields(client_opts) ->
|
||||||
failure_strategy(),
|
failure_strategy(),
|
||||||
#{
|
#{
|
||||||
desc => ?DESC(failure_strategy),
|
desc => ?DESC(failure_strategy),
|
||||||
default => force
|
default => force,
|
||||||
|
importance => ?IMPORTANCE_HIDDEN
|
||||||
}
|
}
|
||||||
)}
|
)}
|
||||||
];
|
];
|
||||||
fields(listener_fields) ->
|
fields(listener_fields) ->
|
||||||
bucket_fields(?BUCKET_KEYS, listener_client_fields);
|
composite_bucket_fields(?BUCKET_KEYS, listener_client_fields);
|
||||||
fields(listener_client_fields) ->
|
fields(listener_client_fields) ->
|
||||||
client_fields(?BUCKET_KEYS);
|
{Types, _} = lists:unzip(?BUCKET_KEYS),
|
||||||
|
client_fields(Types, #{required => false});
|
||||||
fields(Type) ->
|
fields(Type) ->
|
||||||
bucket_field(Type).
|
simple_bucket_field(Type).
|
||||||
|
|
||||||
desc(limiter) ->
|
desc(limiter) ->
|
||||||
"Settings for the rate limiter.";
|
"Settings for the rate limiter.";
|
||||||
|
@ -230,19 +261,14 @@ get_bucket_cfg_path(Type, BucketName) ->
|
||||||
[limiter, Type, bucket, BucketName].
|
[limiter, Type, bucket, BucketName].
|
||||||
|
|
||||||
types() ->
|
types() ->
|
||||||
[bytes_in, message_in, connection, message_routing, internal].
|
[bytes, messages, connection, message_routing, internal].
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
calc_capacity(#{rate := infinity}) ->
|
||||||
%% Internal functions
|
infinity;
|
||||||
%%--------------------------------------------------------------------
|
calc_capacity(#{burst := infinity}) ->
|
||||||
|
infinity;
|
||||||
%% `infinity` to `infinity_value` rules:
|
calc_capacity(#{rate := Rate, burst := Burst}) ->
|
||||||
%% 1. all infinity capacity will change to infinity_value
|
erlang:floor(1000 * Rate / default_period()) + Burst.
|
||||||
%% 2. if the rate of global and bucket both are `infinity`,
|
|
||||||
%% use `infinity_value` as bucket rate. see `emqx_limiter_server:get_counter_rate/2`
|
|
||||||
infinity_value() ->
|
|
||||||
%% 1 TB
|
|
||||||
1099511627776.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
|
@ -335,7 +361,7 @@ to_quota(Str, Regex) ->
|
||||||
{match, [Quota, ""]} ->
|
{match, [Quota, ""]} ->
|
||||||
{ok, erlang:list_to_integer(Quota)};
|
{ok, erlang:list_to_integer(Quota)};
|
||||||
{match, ""} ->
|
{match, ""} ->
|
||||||
{ok, infinity_value()};
|
{ok, infinity};
|
||||||
_ ->
|
_ ->
|
||||||
{error, Str}
|
{error, Str}
|
||||||
end
|
end
|
||||||
|
@ -350,7 +376,8 @@ apply_unit("mb", Val) -> Val * ?KILOBYTE * ?KILOBYTE;
|
||||||
apply_unit("gb", Val) -> Val * ?KILOBYTE * ?KILOBYTE * ?KILOBYTE;
|
apply_unit("gb", Val) -> Val * ?KILOBYTE * ?KILOBYTE * ?KILOBYTE;
|
||||||
apply_unit(Unit, _) -> throw("invalid unit:" ++ Unit).
|
apply_unit(Unit, _) -> throw("invalid unit:" ++ Unit).
|
||||||
|
|
||||||
bucket_field(Type) when is_atom(Type) ->
|
%% A bucket with only one type
|
||||||
|
simple_bucket_field(Type) when is_atom(Type) ->
|
||||||
fields(bucket_infinity) ++
|
fields(bucket_infinity) ++
|
||||||
[
|
[
|
||||||
{client,
|
{client,
|
||||||
|
@ -358,16 +385,22 @@ bucket_field(Type) when is_atom(Type) ->
|
||||||
?R_REF(?MODULE, client_opts),
|
?R_REF(?MODULE, client_opts),
|
||||||
#{
|
#{
|
||||||
desc => ?DESC(client),
|
desc => ?DESC(client),
|
||||||
required => false
|
required => false,
|
||||||
|
importance => importance_of_type(Type),
|
||||||
|
aliases => alias_of_type(Type)
|
||||||
}
|
}
|
||||||
)}
|
)}
|
||||||
].
|
].
|
||||||
bucket_fields(Types, ClientRef) ->
|
|
||||||
|
%% A bucket with multi types
|
||||||
|
composite_bucket_fields(Types, ClientRef) ->
|
||||||
[
|
[
|
||||||
{Type,
|
{Type,
|
||||||
?HOCON(?R_REF(?MODULE, Opts), #{
|
?HOCON(?R_REF(?MODULE, Opts), #{
|
||||||
desc => ?DESC(?MODULE, Type),
|
desc => ?DESC(?MODULE, Type),
|
||||||
required => false
|
required => false,
|
||||||
|
importance => importance_of_type(Type),
|
||||||
|
aliases => alias_of_type(Type)
|
||||||
})}
|
})}
|
||||||
|| {Type, Opts} <- Types
|
|| {Type, Opts} <- Types
|
||||||
] ++
|
] ++
|
||||||
|
@ -382,12 +415,29 @@ bucket_fields(Types, ClientRef) ->
|
||||||
)}
|
)}
|
||||||
].
|
].
|
||||||
|
|
||||||
client_fields(Types) ->
|
client_fields(Types, Meta) ->
|
||||||
[
|
[
|
||||||
{Type,
|
{Type,
|
||||||
?HOCON(?R_REF(client_opts), #{
|
?HOCON(?R_REF(client_opts), Meta#{
|
||||||
desc => ?DESC(Type),
|
desc => ?DESC(Type),
|
||||||
required => false
|
importance => importance_of_type(Type),
|
||||||
|
aliases => alias_of_type(Type)
|
||||||
})}
|
})}
|
||||||
|| {Type, _} <- Types
|
|| Type <- Types
|
||||||
].
|
].
|
||||||
|
|
||||||
|
importance_of_type(interval) ->
|
||||||
|
?IMPORTANCE_HIDDEN;
|
||||||
|
importance_of_type(message_routing) ->
|
||||||
|
?IMPORTANCE_HIDDEN;
|
||||||
|
importance_of_type(connection) ->
|
||||||
|
?IMPORTANCE_HIDDEN;
|
||||||
|
importance_of_type(_) ->
|
||||||
|
?DEFAULT_IMPORTANCE.
|
||||||
|
|
||||||
|
alias_of_type(messages) ->
|
||||||
|
[message_in];
|
||||||
|
alias_of_type(bytes) ->
|
||||||
|
[bytes_in];
|
||||||
|
alias_of_type(_) ->
|
||||||
|
[].
|
||||||
|
|
|
@ -118,17 +118,24 @@ connect(_Id, _Type, undefined) ->
|
||||||
{ok, emqx_htb_limiter:make_infinity_limiter()};
|
{ok, emqx_htb_limiter:make_infinity_limiter()};
|
||||||
connect(Id, Type, Cfg) ->
|
connect(Id, Type, Cfg) ->
|
||||||
case find_limiter_cfg(Type, Cfg) of
|
case find_limiter_cfg(Type, Cfg) of
|
||||||
{undefined, _} ->
|
{_ClientCfg, undefined, _NodeCfg} ->
|
||||||
{ok, emqx_htb_limiter:make_infinity_limiter()};
|
{ok, emqx_htb_limiter:make_infinity_limiter()};
|
||||||
|
{#{rate := infinity}, #{rate := infinity}, #{rate := infinity}} ->
|
||||||
|
{ok, emqx_htb_limiter:make_infinity_limiter()};
|
||||||
|
{ClientCfg, #{rate := infinity}, #{rate := infinity}} ->
|
||||||
|
{ok,
|
||||||
|
emqx_htb_limiter:make_token_bucket_limiter(
|
||||||
|
ClientCfg, emqx_limiter_bucket_ref:infinity_bucket()
|
||||||
|
)};
|
||||||
{
|
{
|
||||||
#{
|
#{rate := CliRate} = ClientCfg,
|
||||||
rate := BucketRate,
|
#{rate := BucketRate} = BucketCfg,
|
||||||
capacity := BucketSize
|
_
|
||||||
},
|
|
||||||
#{rate := CliRate, capacity := CliSize} = ClientCfg
|
|
||||||
} ->
|
} ->
|
||||||
case emqx_limiter_manager:find_bucket(Id, Type) of
|
case emqx_limiter_manager:find_bucket(Id, Type) of
|
||||||
{ok, Bucket} ->
|
{ok, Bucket} ->
|
||||||
|
BucketSize = emqx_limiter_schema:calc_capacity(BucketCfg),
|
||||||
|
CliSize = emqx_limiter_schema:calc_capacity(ClientCfg),
|
||||||
{ok,
|
{ok,
|
||||||
if
|
if
|
||||||
CliRate < BucketRate orelse CliSize < BucketSize ->
|
CliRate < BucketRate orelse CliSize < BucketSize ->
|
||||||
|
@ -493,12 +500,14 @@ make_root(#{rate := Rate, burst := Burst}) ->
|
||||||
produced => 0.0
|
produced => 0.0
|
||||||
}.
|
}.
|
||||||
|
|
||||||
do_add_bucket(Id, #{rate := Rate, capacity := Capacity} = Cfg, #{buckets := Buckets} = State) ->
|
do_add_bucket(_Id, #{rate := infinity}, #{root := #{rate := infinity}} = State) ->
|
||||||
|
State;
|
||||||
|
do_add_bucket(Id, #{rate := Rate} = Cfg, #{buckets := Buckets} = State) ->
|
||||||
case maps:get(Id, Buckets, undefined) of
|
case maps:get(Id, Buckets, undefined) of
|
||||||
undefined ->
|
undefined ->
|
||||||
make_bucket(Id, Cfg, State);
|
make_bucket(Id, Cfg, State);
|
||||||
Bucket ->
|
Bucket ->
|
||||||
Bucket2 = Bucket#{rate := Rate, capacity := Capacity},
|
Bucket2 = Bucket#{rate := Rate, capacity := emqx_limiter_schema:calc_capacity(Cfg)},
|
||||||
State#{buckets := Buckets#{Id := Bucket2}}
|
State#{buckets := Buckets#{Id := Bucket2}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -509,7 +518,7 @@ make_bucket(Id, Cfg, #{index := ?COUNTER_SIZE} = State) ->
|
||||||
});
|
});
|
||||||
make_bucket(
|
make_bucket(
|
||||||
Id,
|
Id,
|
||||||
#{rate := Rate, capacity := Capacity} = Cfg,
|
#{rate := Rate} = Cfg,
|
||||||
#{type := Type, counter := Counter, index := Index, buckets := Buckets} = State
|
#{type := Type, counter := Counter, index := Index, buckets := Buckets} = State
|
||||||
) ->
|
) ->
|
||||||
NewIndex = Index + 1,
|
NewIndex = Index + 1,
|
||||||
|
@ -519,7 +528,7 @@ make_bucket(
|
||||||
rate => Rate,
|
rate => Rate,
|
||||||
obtained => Initial,
|
obtained => Initial,
|
||||||
correction => 0,
|
correction => 0,
|
||||||
capacity => Capacity,
|
capacity => emqx_limiter_schema:calc_capacity(Cfg),
|
||||||
counter => Counter,
|
counter => Counter,
|
||||||
index => NewIndex
|
index => NewIndex
|
||||||
},
|
},
|
||||||
|
@ -541,19 +550,14 @@ do_del_bucket(Id, #{type := Type, buckets := Buckets} = State) ->
|
||||||
get_initial_val(
|
get_initial_val(
|
||||||
#{
|
#{
|
||||||
initial := Initial,
|
initial := Initial,
|
||||||
rate := Rate,
|
rate := Rate
|
||||||
capacity := Capacity
|
|
||||||
}
|
}
|
||||||
) ->
|
) ->
|
||||||
%% initial will nevner be infinity(see the emqx_limiter_schema)
|
|
||||||
InfVal = emqx_limiter_schema:infinity_value(),
|
|
||||||
if
|
if
|
||||||
Initial > 0 ->
|
Initial > 0 ->
|
||||||
Initial;
|
Initial;
|
||||||
Rate =/= infinity ->
|
Rate =/= infinity ->
|
||||||
erlang:min(Rate, Capacity);
|
Rate;
|
||||||
Capacity =/= infinity andalso Capacity =/= InfVal ->
|
|
||||||
Capacity;
|
|
||||||
true ->
|
true ->
|
||||||
0
|
0
|
||||||
end.
|
end.
|
||||||
|
@ -568,11 +572,12 @@ call(Type, Msg) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
find_limiter_cfg(Type, #{rate := _} = Cfg) ->
|
find_limiter_cfg(Type, #{rate := _} = Cfg) ->
|
||||||
{Cfg, find_client_cfg(Type, maps:get(client, Cfg, undefined))};
|
{find_client_cfg(Type, maps:get(client, Cfg, undefined)), Cfg, find_node_cfg(Type)};
|
||||||
find_limiter_cfg(Type, Cfg) ->
|
find_limiter_cfg(Type, Cfg) ->
|
||||||
{
|
{
|
||||||
|
find_client_cfg(Type, emqx_utils_maps:deep_get([client, Type], Cfg, undefined)),
|
||||||
maps:get(Type, Cfg, undefined),
|
maps:get(Type, Cfg, undefined),
|
||||||
find_client_cfg(Type, emqx_utils_maps:deep_get([client, Type], Cfg, undefined))
|
find_node_cfg(Type)
|
||||||
}.
|
}.
|
||||||
|
|
||||||
find_client_cfg(Type, BucketCfg) ->
|
find_client_cfg(Type, BucketCfg) ->
|
||||||
|
@ -585,3 +590,6 @@ merge_client_cfg(NodeCfg, undefined) ->
|
||||||
NodeCfg;
|
NodeCfg;
|
||||||
merge_client_cfg(NodeCfg, BucketCfg) ->
|
merge_client_cfg(NodeCfg, BucketCfg) ->
|
||||||
maps:merge(NodeCfg, BucketCfg).
|
maps:merge(NodeCfg, BucketCfg).
|
||||||
|
|
||||||
|
find_node_cfg(Type) ->
|
||||||
|
emqx:get_config([limiter, Type], #{rate => infinity, burst => 0}).
|
||||||
|
|
|
@ -121,8 +121,8 @@
|
||||||
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]).
|
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]).
|
||||||
|
|
||||||
-define(ENABLED(X), (X =/= undefined)).
|
-define(ENABLED(X), (X =/= undefined)).
|
||||||
-define(LIMITER_BYTES_IN, bytes_in).
|
-define(LIMITER_BYTES_IN, bytes).
|
||||||
-define(LIMITER_MESSAGE_IN, message_in).
|
-define(LIMITER_MESSAGE_IN, messages).
|
||||||
|
|
||||||
-dialyzer({no_match, [info/2]}).
|
-dialyzer({no_match, [info/2]}).
|
||||||
-dialyzer({nowarn_function, [websocket_init/1]}).
|
-dialyzer({nowarn_function, [websocket_init/1]}).
|
||||||
|
|
|
@ -162,8 +162,7 @@ limiter_conf() ->
|
||||||
Make = fun() ->
|
Make = fun() ->
|
||||||
#{
|
#{
|
||||||
burst => 0,
|
burst => 0,
|
||||||
rate => infinity,
|
rate => infinity
|
||||||
capacity => infinity
|
|
||||||
}
|
}
|
||||||
end,
|
end,
|
||||||
|
|
||||||
|
@ -172,7 +171,7 @@ limiter_conf() ->
|
||||||
Acc#{Name => Make()}
|
Acc#{Name => Make()}
|
||||||
end,
|
end,
|
||||||
#{},
|
#{},
|
||||||
[bytes_in, message_in, message_routing, connection, internal]
|
[bytes, messages, message_routing, connection, internal]
|
||||||
).
|
).
|
||||||
|
|
||||||
stats_conf() ->
|
stats_conf() ->
|
||||||
|
@ -1258,7 +1257,7 @@ limiter_cfg() ->
|
||||||
Client = #{
|
Client = #{
|
||||||
rate => 5,
|
rate => 5,
|
||||||
initial => 0,
|
initial => 0,
|
||||||
capacity => 5,
|
burst => 0,
|
||||||
low_watermark => 1,
|
low_watermark => 1,
|
||||||
divisible => false,
|
divisible => false,
|
||||||
max_retry_time => timer:seconds(5),
|
max_retry_time => timer:seconds(5),
|
||||||
|
@ -1270,7 +1269,7 @@ limiter_cfg() ->
|
||||||
}.
|
}.
|
||||||
|
|
||||||
bucket_cfg() ->
|
bucket_cfg() ->
|
||||||
#{rate => 10, initial => 0, capacity => 10}.
|
#{rate => 10, initial => 0, burst => 0}.
|
||||||
|
|
||||||
add_bucket() ->
|
add_bucket() ->
|
||||||
emqx_limiter_server:add_bucket(?MODULE, message_routing, bucket_cfg()).
|
emqx_limiter_server:add_bucket(?MODULE, message_routing, bucket_cfg()).
|
||||||
|
|
|
@ -427,7 +427,7 @@ t_ensure_rate_limit(_) ->
|
||||||
fun(_, Client) -> {pause, 3000, undefined, Client} end
|
fun(_, Client) -> {pause, 3000, undefined, Client} end
|
||||||
),
|
),
|
||||||
{ok, State2} = emqx_connection:check_limiter(
|
{ok, State2} = emqx_connection:check_limiter(
|
||||||
[{1000, bytes_in}],
|
[{1000, bytes}],
|
||||||
[],
|
[],
|
||||||
WhenOk,
|
WhenOk,
|
||||||
[],
|
[],
|
||||||
|
@ -703,31 +703,29 @@ handle_call(Pid, Call, St) -> emqx_connection:handle_call(Pid, Call, St).
|
||||||
-define(LIMITER_ID, 'tcp:default').
|
-define(LIMITER_ID, 'tcp:default').
|
||||||
|
|
||||||
init_limiter() ->
|
init_limiter() ->
|
||||||
emqx_limiter_container:get_limiter_by_types(?LIMITER_ID, [bytes_in, message_in], limiter_cfg()).
|
emqx_limiter_container:get_limiter_by_types(?LIMITER_ID, [bytes, messages], limiter_cfg()).
|
||||||
|
|
||||||
limiter_cfg() ->
|
limiter_cfg() ->
|
||||||
Infinity = emqx_limiter_schema:infinity_value(),
|
|
||||||
Cfg = bucket_cfg(),
|
Cfg = bucket_cfg(),
|
||||||
Client = #{
|
Client = #{
|
||||||
rate => Infinity,
|
rate => infinity,
|
||||||
initial => 0,
|
initial => 0,
|
||||||
capacity => Infinity,
|
burst => 0,
|
||||||
low_watermark => 1,
|
low_watermark => 1,
|
||||||
divisible => false,
|
divisible => false,
|
||||||
max_retry_time => timer:seconds(5),
|
max_retry_time => timer:seconds(5),
|
||||||
failure_strategy => force
|
failure_strategy => force
|
||||||
},
|
},
|
||||||
#{bytes_in => Cfg, message_in => Cfg, client => #{bytes_in => Client, message_in => Client}}.
|
#{bytes => Cfg, messages => Cfg, client => #{bytes => Client, messages => Client}}.
|
||||||
|
|
||||||
bucket_cfg() ->
|
bucket_cfg() ->
|
||||||
Infinity = emqx_limiter_schema:infinity_value(),
|
#{rate => infinity, initial => 0, burst => 0}.
|
||||||
#{rate => Infinity, initial => 0, capacity => Infinity}.
|
|
||||||
|
|
||||||
add_bucket() ->
|
add_bucket() ->
|
||||||
Cfg = bucket_cfg(),
|
Cfg = bucket_cfg(),
|
||||||
emqx_limiter_server:add_bucket(?LIMITER_ID, bytes_in, Cfg),
|
emqx_limiter_server:add_bucket(?LIMITER_ID, bytes, Cfg),
|
||||||
emqx_limiter_server:add_bucket(?LIMITER_ID, message_in, Cfg).
|
emqx_limiter_server:add_bucket(?LIMITER_ID, messages, Cfg).
|
||||||
|
|
||||||
del_bucket() ->
|
del_bucket() ->
|
||||||
emqx_limiter_server:del_bucket(?LIMITER_ID, bytes_in),
|
emqx_limiter_server:del_bucket(?LIMITER_ID, bytes),
|
||||||
emqx_limiter_server:del_bucket(?LIMITER_ID, message_in).
|
emqx_limiter_server:del_bucket(?LIMITER_ID, messages).
|
||||||
|
|
|
@ -72,7 +72,7 @@ t_consume(_) ->
|
||||||
Cfg = fun(Cfg) ->
|
Cfg = fun(Cfg) ->
|
||||||
Cfg#{
|
Cfg#{
|
||||||
rate := 100,
|
rate := 100,
|
||||||
capacity := 100,
|
burst := 0,
|
||||||
initial := 100,
|
initial := 100,
|
||||||
max_retry_time := 1000,
|
max_retry_time := 1000,
|
||||||
failure_strategy := force
|
failure_strategy := force
|
||||||
|
@ -89,7 +89,7 @@ t_retry(_) ->
|
||||||
Cfg = fun(Cfg) ->
|
Cfg = fun(Cfg) ->
|
||||||
Cfg#{
|
Cfg#{
|
||||||
rate := 50,
|
rate := 50,
|
||||||
capacity := 200,
|
burst := 150,
|
||||||
initial := 0,
|
initial := 0,
|
||||||
max_retry_time := 1000,
|
max_retry_time := 1000,
|
||||||
failure_strategy := force
|
failure_strategy := force
|
||||||
|
@ -109,7 +109,7 @@ t_restore(_) ->
|
||||||
Cfg = fun(Cfg) ->
|
Cfg = fun(Cfg) ->
|
||||||
Cfg#{
|
Cfg#{
|
||||||
rate := 1,
|
rate := 1,
|
||||||
capacity := 200,
|
burst := 199,
|
||||||
initial := 50,
|
initial := 50,
|
||||||
max_retry_time := 100,
|
max_retry_time := 100,
|
||||||
failure_strategy := force
|
failure_strategy := force
|
||||||
|
@ -129,7 +129,7 @@ t_max_retry_time(_) ->
|
||||||
Cfg = fun(Cfg) ->
|
Cfg = fun(Cfg) ->
|
||||||
Cfg#{
|
Cfg#{
|
||||||
rate := 1,
|
rate := 1,
|
||||||
capacity := 1,
|
burst := 0,
|
||||||
max_retry_time := 500,
|
max_retry_time := 500,
|
||||||
failure_strategy := drop
|
failure_strategy := drop
|
||||||
}
|
}
|
||||||
|
@ -139,8 +139,12 @@ t_max_retry_time(_) ->
|
||||||
Begin = ?NOW,
|
Begin = ?NOW,
|
||||||
Result = emqx_htb_limiter:consume(101, Client),
|
Result = emqx_htb_limiter:consume(101, Client),
|
||||||
?assertMatch({drop, _}, Result),
|
?assertMatch({drop, _}, Result),
|
||||||
Time = ?NOW - Begin,
|
End = ?NOW,
|
||||||
?assert(Time >= 500 andalso Time < 550)
|
Time = End - Begin,
|
||||||
|
?assert(
|
||||||
|
Time >= 500 andalso Time < 550,
|
||||||
|
lists:flatten(io_lib:format("Begin:~p, End:~p, Time:~p~n", [Begin, End, Time]))
|
||||||
|
)
|
||||||
end,
|
end,
|
||||||
with_per_client(Cfg, Case).
|
with_per_client(Cfg, Case).
|
||||||
|
|
||||||
|
@ -150,7 +154,7 @@ t_divisible(_) ->
|
||||||
divisible := true,
|
divisible := true,
|
||||||
rate := ?RATE("1000/1s"),
|
rate := ?RATE("1000/1s"),
|
||||||
initial := 600,
|
initial := 600,
|
||||||
capacity := 600
|
burst := 0
|
||||||
}
|
}
|
||||||
end,
|
end,
|
||||||
Case = fun(BucketCfg) ->
|
Case = fun(BucketCfg) ->
|
||||||
|
@ -176,7 +180,7 @@ t_low_watermark(_) ->
|
||||||
low_watermark := 400,
|
low_watermark := 400,
|
||||||
rate := ?RATE("1000/1s"),
|
rate := ?RATE("1000/1s"),
|
||||||
initial := 1000,
|
initial := 1000,
|
||||||
capacity := 1000
|
burst := 0
|
||||||
}
|
}
|
||||||
end,
|
end,
|
||||||
Case = fun(BucketCfg) ->
|
Case = fun(BucketCfg) ->
|
||||||
|
@ -201,8 +205,7 @@ t_infinity_client(_) ->
|
||||||
Fun = fun(Cfg) -> Cfg end,
|
Fun = fun(Cfg) -> Cfg end,
|
||||||
Case = fun(Cfg) ->
|
Case = fun(Cfg) ->
|
||||||
Client = connect(Cfg),
|
Client = connect(Cfg),
|
||||||
InfVal = emqx_limiter_schema:infinity_value(),
|
?assertMatch(infinity, Client),
|
||||||
?assertMatch(#{bucket := #{rate := InfVal}}, Client),
|
|
||||||
Result = emqx_htb_limiter:check(100000, Client),
|
Result = emqx_htb_limiter:check(100000, Client),
|
||||||
?assertEqual({ok, Client}, Result)
|
?assertEqual({ok, Client}, Result)
|
||||||
end,
|
end,
|
||||||
|
@ -212,12 +215,12 @@ t_try_restore_agg(_) ->
|
||||||
Fun = fun(#{client := Cli} = Bucket) ->
|
Fun = fun(#{client := Cli} = Bucket) ->
|
||||||
Bucket2 = Bucket#{
|
Bucket2 = Bucket#{
|
||||||
rate := 1,
|
rate := 1,
|
||||||
capacity := 200,
|
burst := 199,
|
||||||
initial := 50
|
initial := 50
|
||||||
},
|
},
|
||||||
Cli2 = Cli#{
|
Cli2 = Cli#{
|
||||||
rate := infinity,
|
rate := infinity,
|
||||||
capacity := infinity,
|
burst := infinity,
|
||||||
divisible := true,
|
divisible := true,
|
||||||
max_retry_time := 100,
|
max_retry_time := 100,
|
||||||
failure_strategy := force
|
failure_strategy := force
|
||||||
|
@ -239,11 +242,11 @@ t_short_board(_) ->
|
||||||
Bucket2 = Bucket#{
|
Bucket2 = Bucket#{
|
||||||
rate := ?RATE("100/1s"),
|
rate := ?RATE("100/1s"),
|
||||||
initial := 0,
|
initial := 0,
|
||||||
capacity := 100
|
burst := 0
|
||||||
},
|
},
|
||||||
Cli2 = Cli#{
|
Cli2 = Cli#{
|
||||||
rate := ?RATE("600/1s"),
|
rate := ?RATE("600/1s"),
|
||||||
capacity := 600,
|
burst := 0,
|
||||||
initial := 600
|
initial := 600
|
||||||
},
|
},
|
||||||
Bucket2#{client := Cli2}
|
Bucket2#{client := Cli2}
|
||||||
|
@ -261,46 +264,45 @@ t_rate(_) ->
|
||||||
Bucket2 = Bucket#{
|
Bucket2 = Bucket#{
|
||||||
rate := ?RATE("100/100ms"),
|
rate := ?RATE("100/100ms"),
|
||||||
initial := 0,
|
initial := 0,
|
||||||
capacity := infinity
|
burst := infinity
|
||||||
},
|
},
|
||||||
Cli2 = Cli#{
|
Cli2 = Cli#{
|
||||||
rate := infinity,
|
rate := infinity,
|
||||||
capacity := infinity,
|
burst := infinity,
|
||||||
initial := 0
|
initial := 0
|
||||||
},
|
},
|
||||||
Bucket2#{client := Cli2}
|
Bucket2#{client := Cli2}
|
||||||
end,
|
end,
|
||||||
Case = fun(Cfg) ->
|
Case = fun(Cfg) ->
|
||||||
|
Time = 1000,
|
||||||
Client = connect(Cfg),
|
Client = connect(Cfg),
|
||||||
Ts1 = erlang:system_time(millisecond),
|
|
||||||
C1 = emqx_htb_limiter:available(Client),
|
C1 = emqx_htb_limiter:available(Client),
|
||||||
timer:sleep(1000),
|
timer:sleep(1100),
|
||||||
Ts2 = erlang:system_time(millisecond),
|
|
||||||
C2 = emqx_htb_limiter:available(Client),
|
C2 = emqx_htb_limiter:available(Client),
|
||||||
ShouldInc = floor((Ts2 - Ts1) / 100) * 100,
|
ShouldInc = floor(Time / 100) * 100,
|
||||||
Inc = C2 - C1,
|
Inc = C2 - C1,
|
||||||
?assert(in_range(Inc, ShouldInc - 100, ShouldInc + 100), "test bucket rate")
|
?assert(in_range(Inc, ShouldInc - 100, ShouldInc + 100), "test bucket rate")
|
||||||
end,
|
end,
|
||||||
with_bucket(Fun, Case).
|
with_bucket(Fun, Case).
|
||||||
|
|
||||||
t_capacity(_) ->
|
t_capacity(_) ->
|
||||||
Capacity = 600,
|
Capacity = 1200,
|
||||||
Fun = fun(#{client := Cli} = Bucket) ->
|
Fun = fun(#{client := Cli} = Bucket) ->
|
||||||
Bucket2 = Bucket#{
|
Bucket2 = Bucket#{
|
||||||
rate := ?RATE("100/100ms"),
|
rate := ?RATE("100/100ms"),
|
||||||
initial := 0,
|
initial := 0,
|
||||||
capacity := 600
|
burst := 200
|
||||||
},
|
},
|
||||||
Cli2 = Cli#{
|
Cli2 = Cli#{
|
||||||
rate := infinity,
|
rate := infinity,
|
||||||
capacity := infinity,
|
burst := infinity,
|
||||||
initial := 0
|
initial := 0
|
||||||
},
|
},
|
||||||
Bucket2#{client := Cli2}
|
Bucket2#{client := Cli2}
|
||||||
end,
|
end,
|
||||||
Case = fun(Cfg) ->
|
Case = fun(Cfg) ->
|
||||||
Client = connect(Cfg),
|
Client = connect(Cfg),
|
||||||
timer:sleep(1000),
|
timer:sleep(1500),
|
||||||
C1 = emqx_htb_limiter:available(Client),
|
C1 = emqx_htb_limiter:available(Client),
|
||||||
?assertEqual(Capacity, C1, "test bucket capacity")
|
?assertEqual(Capacity, C1, "test bucket capacity")
|
||||||
end,
|
end,
|
||||||
|
@ -318,11 +320,11 @@ t_collaborative_alloc(_) ->
|
||||||
Bucket2 = Bucket#{
|
Bucket2 = Bucket#{
|
||||||
rate := ?RATE("400/1s"),
|
rate := ?RATE("400/1s"),
|
||||||
initial := 0,
|
initial := 0,
|
||||||
capacity := 600
|
burst := 200
|
||||||
},
|
},
|
||||||
Cli2 = Cli#{
|
Cli2 = Cli#{
|
||||||
rate := ?RATE("50"),
|
rate := ?RATE("50"),
|
||||||
capacity := 100,
|
burst := 50,
|
||||||
initial := 100
|
initial := 100
|
||||||
},
|
},
|
||||||
Bucket2#{client := Cli2}
|
Bucket2#{client := Cli2}
|
||||||
|
@ -363,11 +365,11 @@ t_burst(_) ->
|
||||||
Bucket2 = Bucket#{
|
Bucket2 = Bucket#{
|
||||||
rate := ?RATE("200/1s"),
|
rate := ?RATE("200/1s"),
|
||||||
initial := 0,
|
initial := 0,
|
||||||
capacity := 200
|
burst := 0
|
||||||
},
|
},
|
||||||
Cli2 = Cli#{
|
Cli2 = Cli#{
|
||||||
rate := ?RATE("50/1s"),
|
rate := ?RATE("50/1s"),
|
||||||
capacity := 200,
|
burst := 150,
|
||||||
divisible := true
|
divisible := true
|
||||||
},
|
},
|
||||||
Bucket2#{client := Cli2}
|
Bucket2#{client := Cli2}
|
||||||
|
@ -401,11 +403,11 @@ t_limit_global_with_unlimit_other(_) ->
|
||||||
Bucket2 = Bucket#{
|
Bucket2 = Bucket#{
|
||||||
rate := infinity,
|
rate := infinity,
|
||||||
initial := 0,
|
initial := 0,
|
||||||
capacity := infinity
|
burst := infinity
|
||||||
},
|
},
|
||||||
Cli2 = Cli#{
|
Cli2 = Cli#{
|
||||||
rate := infinity,
|
rate := infinity,
|
||||||
capacity := infinity,
|
burst := infinity,
|
||||||
initial := 0
|
initial := 0
|
||||||
},
|
},
|
||||||
Bucket2#{client := Cli2}
|
Bucket2#{client := Cli2}
|
||||||
|
@ -414,7 +416,7 @@ t_limit_global_with_unlimit_other(_) ->
|
||||||
Case = fun() ->
|
Case = fun() ->
|
||||||
C1 = counters:new(1, []),
|
C1 = counters:new(1, []),
|
||||||
start_client({b1, Bucket}, ?NOW + 2000, C1, 20),
|
start_client({b1, Bucket}, ?NOW + 2000, C1, 20),
|
||||||
timer:sleep(2100),
|
timer:sleep(2200),
|
||||||
check_average_rate(C1, 2, 600)
|
check_average_rate(C1, 2, 600)
|
||||||
end,
|
end,
|
||||||
|
|
||||||
|
@ -432,7 +434,7 @@ t_check_container(_) ->
|
||||||
Cfg#{
|
Cfg#{
|
||||||
rate := ?RATE("1000/1s"),
|
rate := ?RATE("1000/1s"),
|
||||||
initial := 1000,
|
initial := 1000,
|
||||||
capacity := 1000
|
burst := 0
|
||||||
}
|
}
|
||||||
end,
|
end,
|
||||||
Case = fun(#{client := Client} = BucketCfg) ->
|
Case = fun(#{client := Client} = BucketCfg) ->
|
||||||
|
@ -565,7 +567,7 @@ t_schema_unit(_) ->
|
||||||
?assertMatch({error, _}, M:to_rate("100MB/1")),
|
?assertMatch({error, _}, M:to_rate("100MB/1")),
|
||||||
?assertMatch({error, _}, M:to_rate("100/10x")),
|
?assertMatch({error, _}, M:to_rate("100/10x")),
|
||||||
|
|
||||||
?assertEqual({ok, emqx_limiter_schema:infinity_value()}, M:to_capacity("infinity")),
|
?assertEqual({ok, infinity}, M:to_capacity("infinity")),
|
||||||
?assertEqual({ok, 100}, M:to_capacity("100")),
|
?assertEqual({ok, 100}, M:to_capacity("100")),
|
||||||
?assertEqual({ok, 100 * 1024}, M:to_capacity("100KB")),
|
?assertEqual({ok, 100 * 1024}, M:to_capacity("100KB")),
|
||||||
?assertEqual({ok, 100 * 1024 * 1024}, M:to_capacity("100MB")),
|
?assertEqual({ok, 100 * 1024 * 1024}, M:to_capacity("100MB")),
|
||||||
|
@ -748,17 +750,16 @@ connect(Name, Cfg) ->
|
||||||
Limiter.
|
Limiter.
|
||||||
|
|
||||||
make_limiter_cfg() ->
|
make_limiter_cfg() ->
|
||||||
Infinity = emqx_limiter_schema:infinity_value(),
|
|
||||||
Client = #{
|
Client = #{
|
||||||
rate => Infinity,
|
rate => infinity,
|
||||||
initial => 0,
|
initial => 0,
|
||||||
capacity => Infinity,
|
burst => infinity,
|
||||||
low_watermark => 0,
|
low_watermark => 0,
|
||||||
divisible => false,
|
divisible => false,
|
||||||
max_retry_time => timer:seconds(5),
|
max_retry_time => timer:seconds(5),
|
||||||
failure_strategy => force
|
failure_strategy => force
|
||||||
},
|
},
|
||||||
#{client => Client, rate => Infinity, initial => 0, capacity => Infinity}.
|
#{client => Client, rate => infinity, initial => 0, burst => infinity}.
|
||||||
|
|
||||||
add_bucket(Cfg) ->
|
add_bucket(Cfg) ->
|
||||||
add_bucket(?MODULE, Cfg).
|
add_bucket(?MODULE, Cfg).
|
||||||
|
|
|
@ -509,16 +509,16 @@ t_handle_timeout_emit_stats(_) ->
|
||||||
t_ensure_rate_limit(_) ->
|
t_ensure_rate_limit(_) ->
|
||||||
{ok, Rate} = emqx_limiter_schema:to_rate("50MB"),
|
{ok, Rate} = emqx_limiter_schema:to_rate("50MB"),
|
||||||
Limiter = init_limiter(#{
|
Limiter = init_limiter(#{
|
||||||
bytes_in => bucket_cfg(),
|
bytes => bucket_cfg(),
|
||||||
message_in => bucket_cfg(),
|
messages => bucket_cfg(),
|
||||||
client => #{bytes_in => client_cfg(Rate)}
|
client => #{bytes => client_cfg(Rate)}
|
||||||
}),
|
}),
|
||||||
St = st(#{limiter => Limiter}),
|
St = st(#{limiter => Limiter}),
|
||||||
|
|
||||||
%% must bigger than value in emqx_ratelimit_SUITE
|
%% must bigger than value in emqx_ratelimit_SUITE
|
||||||
{ok, Need} = emqx_limiter_schema:to_capacity("1GB"),
|
{ok, Need} = emqx_limiter_schema:to_capacity("1GB"),
|
||||||
St1 = ?ws_conn:check_limiter(
|
St1 = ?ws_conn:check_limiter(
|
||||||
[{Need, bytes_in}],
|
[{Need, bytes}],
|
||||||
[],
|
[],
|
||||||
fun(_, _, S) -> S end,
|
fun(_, _, S) -> S end,
|
||||||
[],
|
[],
|
||||||
|
@ -699,23 +699,21 @@ init_limiter() ->
|
||||||
init_limiter(limiter_cfg()).
|
init_limiter(limiter_cfg()).
|
||||||
|
|
||||||
init_limiter(LimiterCfg) ->
|
init_limiter(LimiterCfg) ->
|
||||||
emqx_limiter_container:get_limiter_by_types(?LIMITER_ID, [bytes_in, message_in], LimiterCfg).
|
emqx_limiter_container:get_limiter_by_types(?LIMITER_ID, [bytes, messages], LimiterCfg).
|
||||||
|
|
||||||
limiter_cfg() ->
|
limiter_cfg() ->
|
||||||
Cfg = bucket_cfg(),
|
Cfg = bucket_cfg(),
|
||||||
Client = client_cfg(),
|
Client = client_cfg(),
|
||||||
#{bytes_in => Cfg, message_in => Cfg, client => #{bytes_in => Client, message_in => Client}}.
|
#{bytes => Cfg, messages => Cfg, client => #{bytes => Client, messages => Client}}.
|
||||||
|
|
||||||
client_cfg() ->
|
client_cfg() ->
|
||||||
Infinity = emqx_limiter_schema:infinity_value(),
|
client_cfg(infinity).
|
||||||
client_cfg(Infinity).
|
|
||||||
|
|
||||||
client_cfg(Rate) ->
|
client_cfg(Rate) ->
|
||||||
Infinity = emqx_limiter_schema:infinity_value(),
|
|
||||||
#{
|
#{
|
||||||
rate => Rate,
|
rate => Rate,
|
||||||
initial => 0,
|
initial => 0,
|
||||||
capacity => Infinity,
|
burst => 0,
|
||||||
low_watermark => 1,
|
low_watermark => 1,
|
||||||
divisible => false,
|
divisible => false,
|
||||||
max_retry_time => timer:seconds(5),
|
max_retry_time => timer:seconds(5),
|
||||||
|
@ -723,14 +721,13 @@ client_cfg(Rate) ->
|
||||||
}.
|
}.
|
||||||
|
|
||||||
bucket_cfg() ->
|
bucket_cfg() ->
|
||||||
Infinity = emqx_limiter_schema:infinity_value(),
|
#{rate => infinity, initial => 0, burst => 0}.
|
||||||
#{rate => Infinity, initial => 0, capacity => Infinity}.
|
|
||||||
|
|
||||||
add_bucket() ->
|
add_bucket() ->
|
||||||
Cfg = bucket_cfg(),
|
Cfg = bucket_cfg(),
|
||||||
emqx_limiter_server:add_bucket(?LIMITER_ID, bytes_in, Cfg),
|
emqx_limiter_server:add_bucket(?LIMITER_ID, bytes, Cfg),
|
||||||
emqx_limiter_server:add_bucket(?LIMITER_ID, message_in, Cfg).
|
emqx_limiter_server:add_bucket(?LIMITER_ID, messages, Cfg).
|
||||||
|
|
||||||
del_bucket() ->
|
del_bucket() ->
|
||||||
emqx_limiter_server:del_bucket(?LIMITER_ID, bytes_in),
|
emqx_limiter_server:del_bucket(?LIMITER_ID, bytes),
|
||||||
emqx_limiter_server:del_bucket(?LIMITER_ID, message_in).
|
emqx_limiter_server:del_bucket(?LIMITER_ID, messages).
|
||||||
|
|
|
@ -758,23 +758,22 @@ with_conf(ConfMod, Case) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
make_limiter_cfg(Rate) ->
|
make_limiter_cfg(Rate) ->
|
||||||
Infinity = emqx_limiter_schema:infinity_value(),
|
|
||||||
Client = #{
|
Client = #{
|
||||||
rate => Rate,
|
rate => Rate,
|
||||||
initial => 0,
|
initial => 0,
|
||||||
capacity => Infinity,
|
burst => 0,
|
||||||
low_watermark => 1,
|
low_watermark => 1,
|
||||||
divisible => false,
|
divisible => false,
|
||||||
max_retry_time => timer:seconds(5),
|
max_retry_time => timer:seconds(5),
|
||||||
failure_strategy => force
|
failure_strategy => force
|
||||||
},
|
},
|
||||||
#{client => Client, rate => Infinity, initial => 0, capacity => Infinity}.
|
#{client => Client, rate => Rate, initial => 0, burst => 0}.
|
||||||
|
|
||||||
make_limiter_json(Rate) ->
|
make_limiter_json(Rate) ->
|
||||||
Client = #{
|
Client = #{
|
||||||
<<"rate">> => Rate,
|
<<"rate">> => Rate,
|
||||||
<<"initial">> => 0,
|
<<"initial">> => 0,
|
||||||
<<"capacity">> => <<"infinity">>,
|
<<"burst">> => <<"0">>,
|
||||||
<<"low_watermark">> => 0,
|
<<"low_watermark">> => 0,
|
||||||
<<"divisible">> => <<"false">>,
|
<<"divisible">> => <<"false">>,
|
||||||
<<"max_retry_time">> => <<"5s">>,
|
<<"max_retry_time">> => <<"5s">>,
|
||||||
|
@ -784,5 +783,5 @@ make_limiter_json(Rate) ->
|
||||||
<<"client">> => Client,
|
<<"client">> => Client,
|
||||||
<<"rate">> => <<"infinity">>,
|
<<"rate">> => <<"infinity">>,
|
||||||
<<"initial">> => 0,
|
<<"initial">> => 0,
|
||||||
<<"capacity">> => <<"infinity">>
|
<<"burst">> => <<"0">>
|
||||||
}.
|
}.
|
||||||
|
|
|
@ -0,0 +1,6 @@
|
||||||
|
Simplify the configuration of the limiter feature and optimize some codes
|
||||||
|
- Rename `message_in` to `messages`
|
||||||
|
- Rename `bytes_in` to `bytes`
|
||||||
|
- Use `burst` instead of `capacity`
|
||||||
|
- Hide non-importance fields
|
||||||
|
- Optimize limiter instances in different rate settings
|
|
@ -33,28 +33,6 @@ emqx_limiter_schema {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
client_bucket_capacity {
|
|
||||||
desc {
|
|
||||||
en: """The capacity of per user."""
|
|
||||||
zh: """每个使用者的令牌容量上限"""
|
|
||||||
}
|
|
||||||
label: {
|
|
||||||
en: """Capacity"""
|
|
||||||
zh: """容量"""
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
capacity {
|
|
||||||
desc {
|
|
||||||
en: """The capacity of this token bucket."""
|
|
||||||
zh: """该令牌桶的容量"""
|
|
||||||
}
|
|
||||||
label: {
|
|
||||||
en: """Capacity"""
|
|
||||||
zh: """容量"""
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
low_watermark {
|
low_watermark {
|
||||||
desc {
|
desc {
|
||||||
en: """If the remaining tokens are lower than this value,
|
en: """If the remaining tokens are lower than this value,
|
||||||
|
@ -152,30 +130,30 @@ Once the limit is reached, new connections will be refused"""
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
message_in {
|
messages {
|
||||||
desc {
|
desc {
|
||||||
en: """The message in limiter.
|
en: """The `messages` limiter.
|
||||||
This is used to limit the inbound message numbers for this EMQX node
|
This is used to limit the inbound message numbers for this EMQX node
|
||||||
Once the limit is reached, the restricted client will be slow down even be hung for a while."""
|
Once the limit is reached, the restricted client will be slow down even be hung for a while."""
|
||||||
zh: """流入速率控制器。
|
zh: """流入速率控制器。
|
||||||
这个用来控制当前节点上的消息流入速率,当达到最大速率后,会话将会被限速甚至被强制挂起一小段时间"""
|
这个用来控制当前节点上的消息流入速率,当达到最大速率后,会话将会被限速甚至被强制挂起一小段时间"""
|
||||||
}
|
}
|
||||||
label: {
|
label: {
|
||||||
en: """Message In"""
|
en: """Messages"""
|
||||||
zh: """消息流入速率"""
|
zh: """消息流入速率"""
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bytes_in {
|
bytes {
|
||||||
desc {
|
desc {
|
||||||
en: """The bytes_in limiter.
|
en: """The `bytes` limiter.
|
||||||
This is used to limit the inbound bytes rate for this EMQX node.
|
This is used to limit the inbound bytes rate for this EMQX node.
|
||||||
Once the limit is reached, the restricted client will be slow down even be hung for a while."""
|
Once the limit is reached, the restricted client will be slow down even be hung for a while."""
|
||||||
zh: """流入字节率控制器。
|
zh: """流入字节率控制器。
|
||||||
这个是用来控制当前节点上的数据流入的字节率,每条消息将会消耗和其二进制大小等量的令牌,当达到最大速率后,会话将会被限速甚至被强制挂起一小段时间"""
|
这个是用来控制当前节点上的数据流入的字节率,每条消息将会消耗和其二进制大小等量的令牌,当达到最大速率后,会话将会被限速甚至被强制挂起一小段时间"""
|
||||||
}
|
}
|
||||||
label: {
|
label: {
|
||||||
en: """Bytes In"""
|
en: """Bytes"""
|
||||||
zh: """流入字节率"""
|
zh: """流入字节率"""
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue