Merge pull request #8513 from lafirest/fix/limiter_interface
refactor(limiter): refactor the user interface
This commit is contained in:
commit
14920743ca
|
@ -89,10 +89,10 @@ the check/consume will succeed, but it will be forced to wait for a short period
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
per_client {
|
client {
|
||||||
desc {
|
desc {
|
||||||
en: """The rate limit for each user of the bucket, this field is not required"""
|
en: """The rate limit for each user of the bucket"""
|
||||||
zh: """对桶的每个使用者的速率控制设置,这个不是必须的"""
|
zh: """对桶的每个使用者的速率控制设置"""
|
||||||
}
|
}
|
||||||
label: {
|
label: {
|
||||||
en: """Per Client"""
|
en: """Per Client"""
|
||||||
|
@ -124,20 +124,6 @@ the check/consume will succeed, but it will be forced to wait for a short period
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
batch {
|
|
||||||
desc {
|
|
||||||
en: """The batch limiter.
|
|
||||||
This is used for EMQX internal batch operation
|
|
||||||
e.g. limit the retainer's deliver rate"""
|
|
||||||
zh: """批量操作速率控制器。
|
|
||||||
这是给 EMQX 内部的批量操作使用的,比如用来控制保留消息的派发速率"""
|
|
||||||
}
|
|
||||||
label: {
|
|
||||||
en: """Batch"""
|
|
||||||
zh: """批量操作"""
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
message_routing {
|
message_routing {
|
||||||
desc {
|
desc {
|
||||||
en: """The message routing limiter.
|
en: """The message routing limiter.
|
||||||
|
@ -193,4 +179,12 @@ Once the limit is reached, the restricted client will be slow down even be hung
|
||||||
zh: """流入字节率"""
|
zh: """流入字节率"""
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
internal {
|
||||||
|
desc {
|
||||||
|
en: """Limiter for EMQX internal app."""
|
||||||
|
zh: """EMQX 内部功能所用限制器。"""
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -252,11 +252,12 @@ init(
|
||||||
<<>> -> undefined;
|
<<>> -> undefined;
|
||||||
MP -> MP
|
MP -> MP
|
||||||
end,
|
end,
|
||||||
|
ListenerId = emqx_listeners:listener_id(Type, Listener),
|
||||||
ClientInfo = set_peercert_infos(
|
ClientInfo = set_peercert_infos(
|
||||||
Peercert,
|
Peercert,
|
||||||
#{
|
#{
|
||||||
zone => Zone,
|
zone => Zone,
|
||||||
listener => emqx_listeners:listener_id(Type, Listener),
|
listener => ListenerId,
|
||||||
protocol => Protocol,
|
protocol => Protocol,
|
||||||
peerhost => PeerHost,
|
peerhost => PeerHost,
|
||||||
sockport => SockPort,
|
sockport => SockPort,
|
||||||
|
@ -278,7 +279,9 @@ init(
|
||||||
outbound => #{}
|
outbound => #{}
|
||||||
},
|
},
|
||||||
auth_cache = #{},
|
auth_cache = #{},
|
||||||
quota = emqx_limiter_container:get_limiter_by_names([?LIMITER_ROUTING], LimiterCfg),
|
quota = emqx_limiter_container:get_limiter_by_types(
|
||||||
|
ListenerId, [?LIMITER_ROUTING], LimiterCfg
|
||||||
|
),
|
||||||
timers = #{},
|
timers = #{},
|
||||||
conn_state = idle,
|
conn_state = idle,
|
||||||
takeover = false,
|
takeover = false,
|
||||||
|
@ -1199,9 +1202,6 @@ handle_call(
|
||||||
disconnect_and_shutdown(takenover, AllPendings, Channel);
|
disconnect_and_shutdown(takenover, AllPendings, Channel);
|
||||||
handle_call(list_authz_cache, Channel) ->
|
handle_call(list_authz_cache, Channel) ->
|
||||||
{reply, emqx_authz_cache:list_authz_cache(), Channel};
|
{reply, emqx_authz_cache:list_authz_cache(), Channel};
|
||||||
handle_call({quota, Bucket}, #channel{quota = Quota} = Channel) ->
|
|
||||||
Quota2 = emqx_limiter_container:update_by_name(message_routing, Bucket, Quota),
|
|
||||||
reply(ok, Channel#channel{quota = Quota2});
|
|
||||||
handle_call(
|
handle_call(
|
||||||
{keepalive, Interval},
|
{keepalive, Interval},
|
||||||
Channel = #channel{
|
Channel = #channel{
|
||||||
|
|
|
@ -321,7 +321,7 @@ init_state(
|
||||||
},
|
},
|
||||||
|
|
||||||
LimiterTypes = [?LIMITER_BYTES_IN, ?LIMITER_MESSAGE_IN],
|
LimiterTypes = [?LIMITER_BYTES_IN, ?LIMITER_MESSAGE_IN],
|
||||||
Limiter = emqx_limiter_container:get_limiter_by_names(LimiterTypes, LimiterCfg),
|
Limiter = emqx_limiter_container:get_limiter_by_types(Listener, LimiterTypes, LimiterCfg),
|
||||||
|
|
||||||
FrameOpts = #{
|
FrameOpts = #{
|
||||||
strict_mode => emqx_config:get_zone_conf(Zone, [mqtt, strict_mode]),
|
strict_mode => emqx_config:get_zone_conf(Zone, [mqtt, strict_mode]),
|
||||||
|
@ -672,12 +672,6 @@ handle_call(_From, info, State) ->
|
||||||
{reply, info(State), State};
|
{reply, info(State), State};
|
||||||
handle_call(_From, stats, State) ->
|
handle_call(_From, stats, State) ->
|
||||||
{reply, stats(State), State};
|
{reply, stats(State), State};
|
||||||
handle_call(_From, {ratelimit, Changes}, State = #state{limiter = Limiter}) ->
|
|
||||||
Fun = fun({Type, Bucket}, Acc) ->
|
|
||||||
emqx_limiter_container:update_by_name(Type, Bucket, Acc)
|
|
||||||
end,
|
|
||||||
Limiter2 = lists:foldl(Fun, Limiter, Changes),
|
|
||||||
{reply, ok, State#state{limiter = Limiter2}};
|
|
||||||
handle_call(_From, Req, State = #state{channel = Channel}) ->
|
handle_call(_From, Req, State = #state{channel = Channel}) ->
|
||||||
case emqx_channel:handle_call(Req, Channel) of
|
case emqx_channel:handle_call(Req, Channel) of
|
||||||
{reply, Reply, NChannel} ->
|
{reply, Reply, NChannel} ->
|
||||||
|
|
|
@ -19,12 +19,13 @@
|
||||||
-behaviour(esockd_generic_limiter).
|
-behaviour(esockd_generic_limiter).
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([new_create_options/2, create/1, delete/1, consume/2]).
|
-export([new_create_options/3, create/1, delete/1, consume/2]).
|
||||||
|
|
||||||
-type create_options() :: #{
|
-type create_options() :: #{
|
||||||
module := ?MODULE,
|
module := ?MODULE,
|
||||||
|
id := emqx_limiter_schema:limiter_id(),
|
||||||
type := emqx_limiter_schema:limiter_type(),
|
type := emqx_limiter_schema:limiter_type(),
|
||||||
bucket := emqx_limiter_schema:bucket_name()
|
bucket := hocons:config()
|
||||||
}.
|
}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -32,15 +33,16 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-spec new_create_options(
|
-spec new_create_options(
|
||||||
|
emqx_limiter_schema:limiter_id(),
|
||||||
emqx_limiter_schema:limiter_type(),
|
emqx_limiter_schema:limiter_type(),
|
||||||
emqx_limiter_schema:bucket_name()
|
hocons:config()
|
||||||
) -> create_options().
|
) -> create_options().
|
||||||
new_create_options(Type, BucketName) ->
|
new_create_options(Id, Type, BucketCfg) ->
|
||||||
#{module => ?MODULE, type => Type, bucket => BucketName}.
|
#{module => ?MODULE, id => Id, type => Type, bucket => BucketCfg}.
|
||||||
|
|
||||||
-spec create(create_options()) -> esockd_generic_limiter:limiter().
|
-spec create(create_options()) -> esockd_generic_limiter:limiter().
|
||||||
create(#{module := ?MODULE, type := Type, bucket := BucketName}) ->
|
create(#{module := ?MODULE, id := Id, type := Type, bucket := BucketCfg}) ->
|
||||||
{ok, Limiter} = emqx_limiter_server:connect(Type, BucketName),
|
{ok, Limiter} = emqx_limiter_server:connect(Id, Type, BucketCfg),
|
||||||
#{module => ?MODULE, name => Type, limiter => Limiter}.
|
#{module => ?MODULE, name => Type, limiter => Limiter}.
|
||||||
|
|
||||||
delete(_GLimiter) ->
|
delete(_GLimiter) ->
|
||||||
|
|
|
@ -22,10 +22,8 @@
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([
|
-export([
|
||||||
new/0, new/1, new/2,
|
get_limiter_by_types/3,
|
||||||
get_limiter_by_names/2,
|
|
||||||
add_new/3,
|
add_new/3,
|
||||||
update_by_name/3,
|
|
||||||
set_retry_context/2,
|
set_retry_context/2,
|
||||||
check/3,
|
check/3,
|
||||||
retry/2,
|
retry/2,
|
||||||
|
@ -48,10 +46,10 @@
|
||||||
}.
|
}.
|
||||||
|
|
||||||
-type future() :: pos_integer().
|
-type future() :: pos_integer().
|
||||||
|
-type limiter_id() :: emqx_limiter_schema:limiter_id().
|
||||||
-type limiter_type() :: emqx_limiter_schema:limiter_type().
|
-type limiter_type() :: emqx_limiter_schema:limiter_type().
|
||||||
-type limiter() :: emqx_htb_limiter:limiter().
|
-type limiter() :: emqx_htb_limiter:limiter().
|
||||||
-type retry_context() :: emqx_htb_limiter:retry_context().
|
-type retry_context() :: emqx_htb_limiter:retry_context().
|
||||||
-type bucket_name() :: emqx_limiter_schema:bucket_name().
|
|
||||||
-type millisecond() :: non_neg_integer().
|
-type millisecond() :: non_neg_integer().
|
||||||
-type check_result() ::
|
-type check_result() ::
|
||||||
{ok, container()}
|
{ok, container()}
|
||||||
|
@ -64,46 +62,24 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% API
|
%% API
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
-spec new() -> container().
|
|
||||||
new() ->
|
|
||||||
new([]).
|
|
||||||
|
|
||||||
%% @doc generate default data according to the type of limiter
|
|
||||||
-spec new(list(limiter_type())) -> container().
|
|
||||||
new(Types) ->
|
|
||||||
new(Types, #{}).
|
|
||||||
|
|
||||||
-spec new(
|
|
||||||
list(limiter_type()),
|
|
||||||
#{limiter_type() => emqx_limiter_schema:bucket_name()}
|
|
||||||
) -> container().
|
|
||||||
new(Types, Names) ->
|
|
||||||
get_limiter_by_names(Types, Names).
|
|
||||||
|
|
||||||
%% @doc generate a container
|
%% @doc generate a container
|
||||||
%% according to the type of limiter and the bucket name configuration of the limiter
|
%% according to the type of limiter and the bucket name configuration of the limiter
|
||||||
%% @end
|
%% @end
|
||||||
-spec get_limiter_by_names(
|
-spec get_limiter_by_types(
|
||||||
|
limiter_id() | {atom(), atom()},
|
||||||
list(limiter_type()),
|
list(limiter_type()),
|
||||||
#{limiter_type() => emqx_limiter_schema:bucket_name()}
|
#{limiter_type() => hocons:config()}
|
||||||
) -> container().
|
) -> container().
|
||||||
get_limiter_by_names(Types, BucketNames) ->
|
get_limiter_by_types({Type, Listener}, Types, BucketCfgs) ->
|
||||||
|
Id = emqx_listeners:listener_id(Type, Listener),
|
||||||
|
get_limiter_by_types(Id, Types, BucketCfgs);
|
||||||
|
get_limiter_by_types(Id, Types, BucketCfgs) ->
|
||||||
Init = fun(Type, Acc) ->
|
Init = fun(Type, Acc) ->
|
||||||
{ok, Limiter} = emqx_limiter_server:connect(Type, BucketNames),
|
{ok, Limiter} = emqx_limiter_server:connect(Id, Type, BucketCfgs),
|
||||||
add_new(Type, Limiter, Acc)
|
add_new(Type, Limiter, Acc)
|
||||||
end,
|
end,
|
||||||
lists:foldl(Init, #{retry_ctx => undefined}, Types).
|
lists:foldl(Init, #{retry_ctx => undefined}, Types).
|
||||||
|
|
||||||
%% @doc add the specified type of limiter to the container
|
|
||||||
-spec update_by_name(
|
|
||||||
limiter_type(),
|
|
||||||
bucket_name() | #{limiter_type() => bucket_name()},
|
|
||||||
container()
|
|
||||||
) -> container().
|
|
||||||
update_by_name(Type, Buckets, Container) ->
|
|
||||||
{ok, Limiter} = emqx_limiter_server:connect(Type, Buckets),
|
|
||||||
add_new(Type, Limiter, Container).
|
|
||||||
|
|
||||||
-spec add_new(limiter_type(), limiter(), container()) -> container().
|
-spec add_new(limiter_type(), limiter(), container()) -> container().
|
||||||
add_new(Type, Limiter, Container) ->
|
add_new(Type, Limiter, Container) ->
|
||||||
Container#{
|
Container#{
|
||||||
|
|
|
@ -24,11 +24,9 @@
|
||||||
%% API
|
%% API
|
||||||
-export([
|
-export([
|
||||||
start_link/0,
|
start_link/0,
|
||||||
find_bucket/1,
|
|
||||||
find_bucket/2,
|
find_bucket/2,
|
||||||
insert_bucket/2,
|
|
||||||
insert_bucket/3,
|
insert_bucket/3,
|
||||||
make_path/2,
|
delete_bucket/2,
|
||||||
post_config_update/5
|
post_config_update/5
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
@ -50,20 +48,19 @@
|
||||||
format_status/2
|
format_status/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export_type([path/0]).
|
-type limiter_id() :: emqx_limiter_schema:limiter_id().
|
||||||
|
|
||||||
-type path() :: list(atom()).
|
|
||||||
-type limiter_type() :: emqx_limiter_schema:limiter_type().
|
-type limiter_type() :: emqx_limiter_schema:limiter_type().
|
||||||
-type bucket_name() :: emqx_limiter_schema:bucket_name().
|
-type uid() :: {limiter_id(), limiter_type()}.
|
||||||
|
|
||||||
%% counter record in ets table
|
%% counter record in ets table
|
||||||
-record(bucket, {
|
-record(bucket, {
|
||||||
path :: path(),
|
uid :: uid(),
|
||||||
bucket :: bucket_ref()
|
bucket :: bucket_ref()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-type bucket_ref() :: emqx_limiter_bucket_ref:bucket_ref().
|
-type bucket_ref() :: emqx_limiter_bucket_ref:bucket_ref().
|
||||||
|
|
||||||
|
-define(UID(Id, Type), {Id, Type}).
|
||||||
-define(TAB, emqx_limiter_counters).
|
-define(TAB, emqx_limiter_counters).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -85,14 +82,10 @@ restart_server(Type) ->
|
||||||
stop_server(Type) ->
|
stop_server(Type) ->
|
||||||
emqx_limiter_server_sup:stop(Type).
|
emqx_limiter_server_sup:stop(Type).
|
||||||
|
|
||||||
-spec find_bucket(limiter_type(), bucket_name()) ->
|
-spec find_bucket(limiter_id(), limiter_type()) ->
|
||||||
{ok, bucket_ref()} | undefined.
|
{ok, bucket_ref()} | undefined.
|
||||||
find_bucket(Type, BucketName) ->
|
find_bucket(Id, Type) ->
|
||||||
find_bucket(make_path(Type, BucketName)).
|
case ets:lookup(?TAB, ?UID(Id, Type)) of
|
||||||
|
|
||||||
-spec find_bucket(path()) -> {ok, bucket_ref()} | undefined.
|
|
||||||
find_bucket(Path) ->
|
|
||||||
case ets:lookup(?TAB, Path) of
|
|
||||||
[#bucket{bucket = Bucket}] ->
|
[#bucket{bucket = Bucket}] ->
|
||||||
{ok, Bucket};
|
{ok, Bucket};
|
||||||
_ ->
|
_ ->
|
||||||
|
@ -100,20 +93,19 @@ find_bucket(Path) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec insert_bucket(
|
-spec insert_bucket(
|
||||||
|
limiter_id(),
|
||||||
limiter_type(),
|
limiter_type(),
|
||||||
bucket_name(),
|
|
||||||
bucket_ref()
|
bucket_ref()
|
||||||
) -> boolean().
|
) -> boolean().
|
||||||
insert_bucket(Type, BucketName, Bucket) ->
|
insert_bucket(Id, Type, Bucket) ->
|
||||||
inner_insert_bucket(make_path(Type, BucketName), Bucket).
|
ets:insert(
|
||||||
|
?TAB,
|
||||||
|
#bucket{uid = ?UID(Id, Type), bucket = Bucket}
|
||||||
|
).
|
||||||
|
|
||||||
-spec insert_bucket(path(), bucket_ref()) -> true.
|
-spec delete_bucket(limiter_id(), limiter_type()) -> true.
|
||||||
insert_bucket(Path, Bucket) ->
|
delete_bucket(Type, Id) ->
|
||||||
inner_insert_bucket(Path, Bucket).
|
ets:delete(?TAB, ?UID(Id, Type)).
|
||||||
|
|
||||||
-spec make_path(limiter_type(), bucket_name()) -> path().
|
|
||||||
make_path(Type, BucketName) ->
|
|
||||||
[Type | BucketName].
|
|
||||||
|
|
||||||
post_config_update([limiter, Type], _Config, NewConf, _OldConf, _AppEnvs) ->
|
post_config_update([limiter, Type], _Config, NewConf, _OldConf, _AppEnvs) ->
|
||||||
Config = maps:get(Type, NewConf),
|
Config = maps:get(Type, NewConf),
|
||||||
|
@ -159,7 +151,7 @@ init([]) ->
|
||||||
set,
|
set,
|
||||||
public,
|
public,
|
||||||
named_table,
|
named_table,
|
||||||
{keypos, #bucket.path},
|
{keypos, #bucket.uid},
|
||||||
{write_concurrency, true},
|
{write_concurrency, true},
|
||||||
{read_concurrency, true},
|
{read_concurrency, true},
|
||||||
{heir, erlang:whereis(emqx_limiter_sup), none}
|
{heir, erlang:whereis(emqx_limiter_sup), none}
|
||||||
|
@ -266,9 +258,3 @@ format_status(_Opt, Status) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
-spec inner_insert_bucket(path(), bucket_ref()) -> true.
|
|
||||||
inner_insert_bucket(Path, Bucket) ->
|
|
||||||
ets:insert(
|
|
||||||
?TAB,
|
|
||||||
#bucket{path = Path, bucket = Bucket}
|
|
||||||
).
|
|
||||||
|
|
|
@ -41,8 +41,10 @@
|
||||||
| message_in
|
| message_in
|
||||||
| connection
|
| connection
|
||||||
| message_routing
|
| message_routing
|
||||||
| batch.
|
%% internal limiter for unclassified resources
|
||||||
|
| internal.
|
||||||
|
|
||||||
|
-type limiter_id() :: atom().
|
||||||
-type bucket_name() :: atom().
|
-type bucket_name() :: atom().
|
||||||
-type rate() :: infinity | float().
|
-type rate() :: infinity | float().
|
||||||
-type burst_rate() :: 0 | float().
|
-type burst_rate() :: 0 | float().
|
||||||
|
@ -76,7 +78,7 @@
|
||||||
bucket_name/0
|
bucket_name/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export_type([limiter_type/0, bucket_path/0]).
|
-export_type([limiter_id/0, limiter_type/0, bucket_path/0]).
|
||||||
|
|
||||||
-define(UNIT_TIME_IN_MS, 1000).
|
-define(UNIT_TIME_IN_MS, 1000).
|
||||||
|
|
||||||
|
@ -87,52 +89,50 @@ roots() -> [limiter].
|
||||||
fields(limiter) ->
|
fields(limiter) ->
|
||||||
[
|
[
|
||||||
{Type,
|
{Type,
|
||||||
?HOCON(?R_REF(limiter_opts), #{
|
?HOCON(?R_REF(node_opts), #{
|
||||||
desc => ?DESC(Type),
|
desc => ?DESC(Type),
|
||||||
default => make_limiter_default(Type)
|
default => #{}
|
||||||
})}
|
})}
|
||||||
|| Type <- types()
|
|| Type <- types()
|
||||||
|
] ++
|
||||||
|
[
|
||||||
|
{client,
|
||||||
|
?HOCON(
|
||||||
|
?R_REF(client_fields),
|
||||||
|
#{
|
||||||
|
desc => ?DESC(client),
|
||||||
|
default => maps:from_list([
|
||||||
|
{erlang:atom_to_binary(Type), #{}}
|
||||||
|
|| Type <- types()
|
||||||
|
])
|
||||||
|
}
|
||||||
|
)}
|
||||||
];
|
];
|
||||||
fields(limiter_opts) ->
|
fields(node_opts) ->
|
||||||
[
|
[
|
||||||
{rate, ?HOCON(rate(), #{desc => ?DESC(rate), default => "infinity"})},
|
{rate, ?HOCON(rate(), #{desc => ?DESC(rate), default => "infinity"})},
|
||||||
{burst,
|
{burst,
|
||||||
?HOCON(burst_rate(), #{
|
?HOCON(burst_rate(), #{
|
||||||
desc => ?DESC(burst),
|
desc => ?DESC(burst),
|
||||||
default => 0
|
default => 0
|
||||||
})},
|
})}
|
||||||
{bucket,
|
];
|
||||||
?HOCON(
|
fields(client_fields) ->
|
||||||
?MAP("bucket_name", ?R_REF(bucket_opts)),
|
[
|
||||||
#{
|
{Type,
|
||||||
desc => ?DESC(bucket_cfg),
|
?HOCON(?R_REF(client_opts), #{
|
||||||
default => #{<<"default">> => #{}},
|
desc => ?DESC(Type),
|
||||||
example => #{
|
default => #{}
|
||||||
<<"mybucket-name">> => #{
|
})}
|
||||||
<<"rate">> => <<"infinity">>,
|
|| Type <- types()
|
||||||
<<"capcity">> => <<"infinity">>,
|
|
||||||
<<"initial">> => <<"100">>,
|
|
||||||
<<"per_client">> => #{<<"rate">> => <<"infinity">>}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
)}
|
|
||||||
];
|
];
|
||||||
fields(bucket_opts) ->
|
fields(bucket_opts) ->
|
||||||
[
|
[
|
||||||
{rate, ?HOCON(rate(), #{desc => ?DESC(rate), default => "infinity"})},
|
{rate, ?HOCON(rate(), #{desc => ?DESC(rate), default => "infinity"})},
|
||||||
{capacity, ?HOCON(capacity(), #{desc => ?DESC(capacity), default => "infinity"})},
|
{capacity, ?HOCON(capacity(), #{desc => ?DESC(capacity), default => "infinity"})},
|
||||||
{initial, ?HOCON(initial(), #{default => "0", desc => ?DESC(initial)})},
|
{initial, ?HOCON(initial(), #{default => "0", desc => ?DESC(initial)})}
|
||||||
{per_client,
|
|
||||||
?HOCON(
|
|
||||||
?R_REF(client_bucket),
|
|
||||||
#{
|
|
||||||
default => #{},
|
|
||||||
desc => ?DESC(per_client)
|
|
||||||
}
|
|
||||||
)}
|
|
||||||
];
|
];
|
||||||
fields(client_bucket) ->
|
fields(client_opts) ->
|
||||||
[
|
[
|
||||||
{rate, ?HOCON(rate(), #{default => "infinity", desc => ?DESC(rate)})},
|
{rate, ?HOCON(rate(), #{default => "infinity", desc => ?DESC(rate)})},
|
||||||
{initial, ?HOCON(initial(), #{default => "0", desc => ?DESC(initial)})},
|
{initial, ?HOCON(initial(), #{default => "0", desc => ?DESC(initial)})},
|
||||||
|
@ -177,16 +177,30 @@ fields(client_bucket) ->
|
||||||
default => force
|
default => force
|
||||||
}
|
}
|
||||||
)}
|
)}
|
||||||
].
|
];
|
||||||
|
fields(listener_fields) ->
|
||||||
|
bucket_fields([bytes_in, message_in, connection, message_routing], listener_client_fields);
|
||||||
|
fields(listener_client_fields) ->
|
||||||
|
client_fields([bytes_in, message_in, connection, message_routing]);
|
||||||
|
fields(Type) ->
|
||||||
|
bucket_field(Type).
|
||||||
|
|
||||||
desc(limiter) ->
|
desc(limiter) ->
|
||||||
"Settings for the rate limiter.";
|
"Settings for the rate limiter.";
|
||||||
desc(limiter_opts) ->
|
desc(node_opts) ->
|
||||||
"Settings for the limiter.";
|
"Settings for the limiter of the node level.";
|
||||||
desc(bucket_opts) ->
|
desc(bucket_opts) ->
|
||||||
"Settings for the bucket.";
|
"Settings for the bucket.";
|
||||||
desc(client_bucket) ->
|
desc(client_opts) ->
|
||||||
"Settings for the client bucket.";
|
"Settings for the client in bucket level.";
|
||||||
|
desc(client_fields) ->
|
||||||
|
"Fields of the client level.";
|
||||||
|
desc(listener_fields) ->
|
||||||
|
"Fields of the listener.";
|
||||||
|
desc(listener_client_fields) ->
|
||||||
|
"Fields of the client level of the listener.";
|
||||||
|
desc(internal) ->
|
||||||
|
"Internal limiter.";
|
||||||
desc(_) ->
|
desc(_) ->
|
||||||
undefined.
|
undefined.
|
||||||
|
|
||||||
|
@ -202,7 +216,7 @@ get_bucket_cfg_path(Type, BucketName) ->
|
||||||
[limiter, Type, bucket, BucketName].
|
[limiter, Type, bucket, BucketName].
|
||||||
|
|
||||||
types() ->
|
types() ->
|
||||||
[bytes_in, message_in, connection, message_routing, batch].
|
[bytes_in, message_in, connection, message_routing, internal].
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
|
@ -322,16 +336,44 @@ apply_unit("mb", Val) -> Val * ?KILOBYTE * ?KILOBYTE;
|
||||||
apply_unit("gb", Val) -> Val * ?KILOBYTE * ?KILOBYTE * ?KILOBYTE;
|
apply_unit("gb", Val) -> Val * ?KILOBYTE * ?KILOBYTE * ?KILOBYTE;
|
||||||
apply_unit(Unit, _) -> throw("invalid unit:" ++ Unit).
|
apply_unit(Unit, _) -> throw("invalid unit:" ++ Unit).
|
||||||
|
|
||||||
make_limiter_default(connection) ->
|
bucket_field(Type) when is_atom(Type) ->
|
||||||
|
fields(bucket_opts) ++
|
||||||
|
[
|
||||||
|
{client,
|
||||||
|
?HOCON(
|
||||||
|
?R_REF(?MODULE, client_opts),
|
||||||
#{
|
#{
|
||||||
<<"rate">> => <<"1000/s">>,
|
desc => ?DESC(client),
|
||||||
<<"bucket">> => #{
|
required => false
|
||||||
<<"default">> =>
|
}
|
||||||
|
)}
|
||||||
|
].
|
||||||
|
bucket_fields(Types, ClientRef) ->
|
||||||
|
[
|
||||||
|
{Type,
|
||||||
|
?HOCON(?R_REF(?MODULE, bucket_opts), #{
|
||||||
|
desc => ?DESC(?MODULE, Type),
|
||||||
|
required => false
|
||||||
|
})}
|
||||||
|
|| Type <- Types
|
||||||
|
] ++
|
||||||
|
[
|
||||||
|
{client,
|
||||||
|
?HOCON(
|
||||||
|
?R_REF(?MODULE, ClientRef),
|
||||||
#{
|
#{
|
||||||
<<"rate">> => <<"1000/s">>,
|
desc => ?DESC(client),
|
||||||
<<"capacity">> => 1000
|
required => false
|
||||||
}
|
}
|
||||||
}
|
)}
|
||||||
};
|
].
|
||||||
make_limiter_default(_) ->
|
|
||||||
#{}.
|
client_fields(Types) ->
|
||||||
|
[
|
||||||
|
{Type,
|
||||||
|
?HOCON(?R_REF(client_opts), #{
|
||||||
|
desc => ?DESC(Type),
|
||||||
|
required => false
|
||||||
|
})}
|
||||||
|
|| Type <- Types
|
||||||
|
].
|
||||||
|
|
|
@ -42,11 +42,13 @@
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
start_link/2,
|
start_link/2,
|
||||||
connect/2,
|
connect/3,
|
||||||
|
add_bucket/3,
|
||||||
|
del_bucket/2,
|
||||||
|
get_initial_val/1,
|
||||||
whereis/1,
|
whereis/1,
|
||||||
info/1,
|
info/1,
|
||||||
name/1,
|
name/1,
|
||||||
get_initial_val/1,
|
|
||||||
restart/1,
|
restart/1,
|
||||||
update_config/2
|
update_config/2
|
||||||
]).
|
]).
|
||||||
|
@ -73,16 +75,17 @@
|
||||||
|
|
||||||
-type state() :: #{
|
-type state() :: #{
|
||||||
type := limiter_type(),
|
type := limiter_type(),
|
||||||
root := undefined | root(),
|
root := root(),
|
||||||
buckets := buckets(),
|
buckets := buckets(),
|
||||||
%% current counter to alloc
|
%% current counter to alloc
|
||||||
counter := undefined | counters:counters_ref(),
|
counter := counters:counters_ref(),
|
||||||
index := index()
|
index := 0 | index()
|
||||||
}.
|
}.
|
||||||
|
|
||||||
-type buckets() :: #{bucket_name() => bucket()}.
|
-type buckets() :: #{bucket_name() => bucket()}.
|
||||||
-type limiter_type() :: emqx_limiter_schema:limiter_type().
|
-type limiter_type() :: emqx_limiter_schema:limiter_type().
|
||||||
-type bucket_name() :: emqx_limiter_schema:bucket_name().
|
-type bucket_name() :: emqx_limiter_schema:bucket_name().
|
||||||
|
-type limiter_id() :: emqx_limiter_schema:limiter_id().
|
||||||
-type rate() :: decimal().
|
-type rate() :: decimal().
|
||||||
-type flow() :: decimal().
|
-type flow() :: decimal().
|
||||||
-type capacity() :: decimal().
|
-type capacity() :: decimal().
|
||||||
|
@ -94,7 +97,7 @@
|
||||||
|
|
||||||
%% minimum coefficient for overloaded limiter
|
%% minimum coefficient for overloaded limiter
|
||||||
-define(OVERLOAD_MIN_ALLOC, 0.3).
|
-define(OVERLOAD_MIN_ALLOC, 0.3).
|
||||||
-define(CURRYING(X, F2), fun(Y) -> F2(X, Y) end).
|
-define(COUNTER_SIZE, 8).
|
||||||
|
|
||||||
-export_type([index/0]).
|
-export_type([index/0]).
|
||||||
-import(emqx_limiter_decimal, [add/2, sub/2, mul/2, put_to_counter/3]).
|
-import(emqx_limiter_decimal, [add/2, sub/2, mul/2, put_to_counter/3]).
|
||||||
|
@ -105,39 +108,49 @@
|
||||||
%% API
|
%% API
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
-spec connect(
|
-spec connect(
|
||||||
|
limiter_id(),
|
||||||
limiter_type(),
|
limiter_type(),
|
||||||
bucket_name() | #{limiter_type() => bucket_name() | undefined}
|
bucket_name() | #{limiter_type() => bucket_name() | undefined}
|
||||||
) ->
|
) ->
|
||||||
{ok, emqx_htb_limiter:limiter()} | {error, _}.
|
{ok, emqx_htb_limiter:limiter()} | {error, _}.
|
||||||
%% If no bucket path is set in config, there will be no limit
|
%% If no bucket path is set in config, there will be no limit
|
||||||
connect(_Type, undefined) ->
|
connect(_Id, _Type, undefined) ->
|
||||||
{ok, emqx_htb_limiter:make_infinity_limiter()};
|
{ok, emqx_htb_limiter:make_infinity_limiter()};
|
||||||
connect(Type, BucketName) when is_atom(BucketName) ->
|
connect(Id, Type, Cfg) ->
|
||||||
case get_bucket_cfg(Type, BucketName) of
|
case find_limiter_cfg(Type, Cfg) of
|
||||||
undefined ->
|
{undefined, _} ->
|
||||||
?SLOG(error, #{msg => "bucket_config_not_found", type => Type, bucket => BucketName}),
|
{ok, emqx_htb_limiter:make_infinity_limiter()};
|
||||||
{error, config_not_found};
|
{
|
||||||
#{
|
#{
|
||||||
rate := BucketRate,
|
rate := BucketRate,
|
||||||
capacity := BucketSize,
|
capacity := BucketSize
|
||||||
per_client := #{rate := CliRate, capacity := CliSize} = Cfg
|
},
|
||||||
|
#{rate := CliRate, capacity := CliSize} = ClientCfg
|
||||||
} ->
|
} ->
|
||||||
case emqx_limiter_manager:find_bucket(Type, BucketName) of
|
case emqx_limiter_manager:find_bucket(Id, Type) of
|
||||||
{ok, Bucket} ->
|
{ok, Bucket} ->
|
||||||
{ok,
|
{ok,
|
||||||
if
|
if
|
||||||
CliRate < BucketRate orelse CliSize < BucketSize ->
|
CliRate < BucketRate orelse CliSize < BucketSize ->
|
||||||
emqx_htb_limiter:make_token_bucket_limiter(Cfg, Bucket);
|
emqx_htb_limiter:make_token_bucket_limiter(ClientCfg, Bucket);
|
||||||
true ->
|
true ->
|
||||||
emqx_htb_limiter:make_ref_limiter(Cfg, Bucket)
|
emqx_htb_limiter:make_ref_limiter(ClientCfg, Bucket)
|
||||||
end};
|
end};
|
||||||
undefined ->
|
undefined ->
|
||||||
?SLOG(error, #{msg => "bucket_not_found", type => Type, bucket => BucketName}),
|
?SLOG(error, #{msg => "bucket_not_found", type => Type, id => Id}),
|
||||||
{error, invalid_bucket}
|
{error, invalid_bucket}
|
||||||
end
|
end
|
||||||
end;
|
end.
|
||||||
connect(Type, Paths) ->
|
|
||||||
connect(Type, maps:get(Type, Paths, undefined)).
|
-spec add_bucket(limiter_id(), limiter_type(), hocons:config() | undefined) -> ok.
|
||||||
|
add_bucket(_Id, _Type, undefine) ->
|
||||||
|
ok;
|
||||||
|
add_bucket(Id, Type, Cfg) ->
|
||||||
|
?CALL(Type, {add_bucket, Id, Cfg}).
|
||||||
|
|
||||||
|
-spec del_bucket(limiter_id(), limiter_type()) -> ok.
|
||||||
|
del_bucket(Id, Type) ->
|
||||||
|
?CALL(Type, {del_bucket, Id}).
|
||||||
|
|
||||||
-spec info(limiter_type()) -> state() | {error, _}.
|
-spec info(limiter_type()) -> state() | {error, _}.
|
||||||
info(Type) ->
|
info(Type) ->
|
||||||
|
@ -213,6 +226,12 @@ handle_call(restart, _From, #{type := Type}) ->
|
||||||
handle_call({update_config, Type, Config}, _From, #{type := Type}) ->
|
handle_call({update_config, Type, Config}, _From, #{type := Type}) ->
|
||||||
NewState = init_tree(Type, Config),
|
NewState = init_tree(Type, Config),
|
||||||
{reply, ok, NewState};
|
{reply, ok, NewState};
|
||||||
|
handle_call({add_bucket, Id, Cfg}, _From, State) ->
|
||||||
|
NewState = do_add_bucket(Id, Cfg, State),
|
||||||
|
{reply, ok, NewState};
|
||||||
|
handle_call({del_bucket, Id}, _From, State) ->
|
||||||
|
NewState = do_del_bucket(Id, State),
|
||||||
|
{reply, ok, NewState};
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
?SLOG(error, #{msg => "unexpected_call", call => Req}),
|
?SLOG(error, #{msg => "unexpected_call", call => Req}),
|
||||||
{reply, ignored, State}.
|
{reply, ignored, State}.
|
||||||
|
@ -456,24 +475,14 @@ init_tree(Type) when is_atom(Type) ->
|
||||||
Cfg = emqx:get_config([limiter, Type]),
|
Cfg = emqx:get_config([limiter, Type]),
|
||||||
init_tree(Type, Cfg).
|
init_tree(Type, Cfg).
|
||||||
|
|
||||||
init_tree(Type, #{bucket := Buckets} = Cfg) ->
|
init_tree(Type, Cfg) ->
|
||||||
State = #{
|
#{
|
||||||
type => Type,
|
type => Type,
|
||||||
root => undefined,
|
root => make_root(Cfg),
|
||||||
counter => undefined,
|
counter => counters:new(?COUNTER_SIZE, [write_concurrency]),
|
||||||
index => 1,
|
index => 0,
|
||||||
buckets => #{}
|
buckets => #{}
|
||||||
},
|
}.
|
||||||
|
|
||||||
Root = make_root(Cfg),
|
|
||||||
{CounterNum, DelayBuckets} = make_bucket(maps:to_list(Buckets), Type, Cfg, 1, []),
|
|
||||||
|
|
||||||
State2 = State#{
|
|
||||||
root := Root,
|
|
||||||
counter := counters:new(CounterNum, [write_concurrency])
|
|
||||||
},
|
|
||||||
|
|
||||||
lists:foldl(fun(F, Acc) -> F(Acc) end, State2, DelayBuckets).
|
|
||||||
|
|
||||||
-spec make_root(hocons:confg()) -> root().
|
-spec make_root(hocons:confg()) -> root().
|
||||||
make_root(#{rate := Rate, burst := Burst}) ->
|
make_root(#{rate := Rate, burst := Burst}) ->
|
||||||
|
@ -484,79 +493,50 @@ make_root(#{rate := Rate, burst := Burst}) ->
|
||||||
produced => 0.0
|
produced => 0.0
|
||||||
}.
|
}.
|
||||||
|
|
||||||
make_bucket([{Name, Conf} | T], Type, GlobalCfg, CounterNum, DelayBuckets) ->
|
do_add_bucket(Id, #{rate := Rate, capacity := Capacity} = Cfg, #{buckets := Buckets} = State) ->
|
||||||
Path = emqx_limiter_manager:make_path(Type, Name),
|
case maps:get(Id, Buckets, undefined) of
|
||||||
Rate = get_counter_rate(Conf, GlobalCfg),
|
undefined ->
|
||||||
#{capacity := Capacity} = Conf,
|
make_bucket(Id, Cfg, State);
|
||||||
Initial = get_initial_val(Conf),
|
Bucket ->
|
||||||
CounterNum2 = CounterNum + 1,
|
Bucket2 = Bucket#{rate := Rate, capacity := Capacity},
|
||||||
InitFun = fun(#{name := BucketName} = Bucket, #{buckets := Buckets} = State) ->
|
State#{buckets := Buckets#{Id := Bucket2}}
|
||||||
{Counter, Idx, State2} = alloc_counter(Path, Rate, Initial, State),
|
end.
|
||||||
Bucket2 = Bucket#{counter := Counter, index := Idx},
|
|
||||||
State2#{buckets := Buckets#{BucketName => Bucket2}}
|
|
||||||
end,
|
|
||||||
|
|
||||||
|
make_bucket(Id, Cfg, #{index := ?COUNTER_SIZE} = State) ->
|
||||||
|
make_bucket(Id, Cfg, State#{
|
||||||
|
counter => counters:new(?COUNTER_SIZE, [write_concurrency]),
|
||||||
|
index => 0
|
||||||
|
});
|
||||||
|
make_bucket(
|
||||||
|
Id,
|
||||||
|
#{rate := Rate, capacity := Capacity} = Cfg,
|
||||||
|
#{type := Type, counter := Counter, index := Index, buckets := Buckets} = State
|
||||||
|
) ->
|
||||||
|
NewIndex = Index + 1,
|
||||||
|
Initial = get_initial_val(Cfg),
|
||||||
Bucket = #{
|
Bucket = #{
|
||||||
name => Name,
|
name => Id,
|
||||||
rate => Rate,
|
rate => Rate,
|
||||||
obtained => Initial,
|
obtained => Initial,
|
||||||
correction => 0,
|
correction => 0,
|
||||||
capacity => Capacity,
|
capacity => Capacity,
|
||||||
counter => undefined,
|
counter => Counter,
|
||||||
index => undefined
|
index => NewIndex
|
||||||
},
|
},
|
||||||
|
_ = put_to_counter(Counter, NewIndex, Initial),
|
||||||
|
Ref = emqx_limiter_bucket_ref:new(Counter, NewIndex, Rate),
|
||||||
|
emqx_limiter_manager:insert_bucket(Id, Type, Ref),
|
||||||
|
State#{buckets := Buckets#{Id => Bucket}, index := NewIndex}.
|
||||||
|
|
||||||
DelayInit = ?CURRYING(Bucket, InitFun),
|
do_del_bucket(Id, #{type := Type, buckets := Buckets} = State) ->
|
||||||
|
case maps:get(Id, Buckets, undefined) of
|
||||||
make_bucket(
|
undefined ->
|
||||||
T,
|
State;
|
||||||
Type,
|
|
||||||
GlobalCfg,
|
|
||||||
CounterNum2,
|
|
||||||
[DelayInit | DelayBuckets]
|
|
||||||
);
|
|
||||||
make_bucket([], _Type, _Global, CounterNum, DelayBuckets) ->
|
|
||||||
{CounterNum, DelayBuckets}.
|
|
||||||
|
|
||||||
-spec alloc_counter(emqx_limiter_manager:path(), rate(), capacity(), state()) ->
|
|
||||||
{counters:counters_ref(), pos_integer(), state()}.
|
|
||||||
alloc_counter(
|
|
||||||
Path,
|
|
||||||
Rate,
|
|
||||||
Initial,
|
|
||||||
#{counter := Counter, index := Index} = State
|
|
||||||
) ->
|
|
||||||
case emqx_limiter_manager:find_bucket(Path) of
|
|
||||||
{ok, #{
|
|
||||||
counter := ECounter,
|
|
||||||
index := EIndex
|
|
||||||
}} when ECounter =/= undefined ->
|
|
||||||
init_counter(Path, ECounter, EIndex, Rate, Initial, State);
|
|
||||||
_ ->
|
_ ->
|
||||||
init_counter(
|
emqx_limiter_manager:delete_bucket(Id, Type),
|
||||||
Path,
|
State#{buckets := maps:remove(Id, Buckets)}
|
||||||
Counter,
|
|
||||||
Index,
|
|
||||||
Rate,
|
|
||||||
Initial,
|
|
||||||
State#{index := Index + 1}
|
|
||||||
)
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
init_counter(Path, Counter, Index, Rate, Initial, State) ->
|
|
||||||
_ = put_to_counter(Counter, Index, Initial),
|
|
||||||
Ref = emqx_limiter_bucket_ref:new(Counter, Index, Rate),
|
|
||||||
emqx_limiter_manager:insert_bucket(Path, Ref),
|
|
||||||
{Counter, Index, State}.
|
|
||||||
|
|
||||||
%% @doc find first limited node
|
|
||||||
get_counter_rate(#{rate := Rate}, _GlobalCfg) when Rate =/= infinity ->
|
|
||||||
Rate;
|
|
||||||
get_counter_rate(_Cfg, #{rate := Rate}) when Rate =/= infinity ->
|
|
||||||
Rate;
|
|
||||||
get_counter_rate(_Cfg, _GlobalCfg) ->
|
|
||||||
emqx_limiter_schema:infinity_value().
|
|
||||||
|
|
||||||
-spec get_initial_val(hocons:config()) -> decimal().
|
-spec get_initial_val(hocons:config()) -> decimal().
|
||||||
get_initial_val(
|
get_initial_val(
|
||||||
#{
|
#{
|
||||||
|
@ -587,8 +567,21 @@ call(Type, Msg) ->
|
||||||
gen_server:call(Pid, Msg)
|
gen_server:call(Pid, Msg)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec get_bucket_cfg(limiter_type(), bucket_name()) ->
|
find_limiter_cfg(Type, #{rate := _} = Cfg) ->
|
||||||
undefined | limiter_not_started | hocons:config().
|
{Cfg, find_client_cfg(Type, maps:get(client, Cfg, undefined))};
|
||||||
get_bucket_cfg(Type, Bucket) ->
|
find_limiter_cfg(Type, Cfg) ->
|
||||||
Path = emqx_limiter_schema:get_bucket_cfg_path(Type, Bucket),
|
{
|
||||||
emqx:get_config(Path, undefined).
|
maps:get(Type, Cfg, undefined),
|
||||||
|
find_client_cfg(Type, emqx_map_lib:deep_get([client, Type], Cfg, undefined))
|
||||||
|
}.
|
||||||
|
|
||||||
|
find_client_cfg(Type, BucketCfg) ->
|
||||||
|
NodeCfg = emqx:get_config([limiter, client, Type], undefined),
|
||||||
|
merge_client_cfg(NodeCfg, BucketCfg).
|
||||||
|
|
||||||
|
merge_client_cfg(undefined, BucketCfg) ->
|
||||||
|
BucketCfg;
|
||||||
|
merge_client_cfg(NodeCfg, undefined) ->
|
||||||
|
NodeCfg;
|
||||||
|
merge_client_cfg(NodeCfg, BucketCfg) ->
|
||||||
|
maps:merge(NodeCfg, BucketCfg).
|
||||||
|
|
|
@ -279,12 +279,19 @@ stop_listener(Type, ListenerName, #{bind := Bind} = Conf) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec do_stop_listener(atom(), atom(), map()) -> ok | {error, term()}.
|
-spec do_stop_listener(atom(), atom(), map()) -> ok | {error, term()}.
|
||||||
do_stop_listener(Type, ListenerName, #{bind := ListenOn}) when Type == tcp; Type == ssl ->
|
|
||||||
esockd:close(listener_id(Type, ListenerName), ListenOn);
|
do_stop_listener(Type, ListenerName, #{bind := ListenOn} = Conf) when Type == tcp; Type == ssl ->
|
||||||
do_stop_listener(Type, ListenerName, _Conf) when Type == ws; Type == wss ->
|
Id = listener_id(Type, ListenerName),
|
||||||
cowboy:stop_listener(listener_id(Type, ListenerName));
|
del_limiter_bucket(Id, Conf),
|
||||||
do_stop_listener(quic, ListenerName, _Conf) ->
|
esockd:close(Id, ListenOn);
|
||||||
quicer:stop_listener(listener_id(quic, ListenerName)).
|
do_stop_listener(Type, ListenerName, Conf) when Type == ws; Type == wss ->
|
||||||
|
Id = listener_id(Type, ListenerName),
|
||||||
|
del_limiter_bucket(Id, Conf),
|
||||||
|
cowboy:stop_listener(Id);
|
||||||
|
do_stop_listener(quic, ListenerName, Conf) ->
|
||||||
|
Id = listener_id(quic, ListenerName),
|
||||||
|
del_limiter_bucket(Id, Conf),
|
||||||
|
quicer:stop_listener(Id).
|
||||||
|
|
||||||
-ifndef(TEST).
|
-ifndef(TEST).
|
||||||
console_print(Fmt, Args) -> ?ULOG(Fmt, Args).
|
console_print(Fmt, Args) -> ?ULOG(Fmt, Args).
|
||||||
|
@ -300,10 +307,12 @@ do_start_listener(_Type, _ListenerName, #{enabled := false}) ->
|
||||||
do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) when
|
do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) when
|
||||||
Type == tcp; Type == ssl
|
Type == tcp; Type == ssl
|
||||||
->
|
->
|
||||||
|
Id = listener_id(Type, ListenerName),
|
||||||
|
add_limiter_bucket(Id, Opts),
|
||||||
esockd:open(
|
esockd:open(
|
||||||
listener_id(Type, ListenerName),
|
Id,
|
||||||
ListenOn,
|
ListenOn,
|
||||||
merge_default(esockd_opts(Type, Opts)),
|
merge_default(esockd_opts(Id, Type, Opts)),
|
||||||
{emqx_connection, start_link, [
|
{emqx_connection, start_link, [
|
||||||
#{
|
#{
|
||||||
listener => {Type, ListenerName},
|
listener => {Type, ListenerName},
|
||||||
|
@ -318,6 +327,7 @@ do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) when
|
||||||
Type == ws; Type == wss
|
Type == ws; Type == wss
|
||||||
->
|
->
|
||||||
Id = listener_id(Type, ListenerName),
|
Id = listener_id(Type, ListenerName),
|
||||||
|
add_limiter_bucket(Id, Opts),
|
||||||
RanchOpts = ranch_opts(Type, ListenOn, Opts),
|
RanchOpts = ranch_opts(Type, ListenOn, Opts),
|
||||||
WsOpts = ws_opts(Type, ListenerName, Opts),
|
WsOpts = ws_opts(Type, ListenerName, Opts),
|
||||||
case Type of
|
case Type of
|
||||||
|
@ -352,8 +362,10 @@ do_start_listener(quic, ListenerName, #{bind := ListenOn} = Opts) ->
|
||||||
limiter => limiter(Opts)
|
limiter => limiter(Opts)
|
||||||
},
|
},
|
||||||
StreamOpts = [{stream_callback, emqx_quic_stream}],
|
StreamOpts = [{stream_callback, emqx_quic_stream}],
|
||||||
|
Id = listener_id(quic, ListenerName),
|
||||||
|
add_limiter_bucket(Id, Opts),
|
||||||
quicer:start_listener(
|
quicer:start_listener(
|
||||||
listener_id(quic, ListenerName),
|
Id,
|
||||||
port(ListenOn),
|
port(ListenOn),
|
||||||
{ListenOpts, ConnectionOpts, StreamOpts}
|
{ListenOpts, ConnectionOpts, StreamOpts}
|
||||||
);
|
);
|
||||||
|
@ -410,16 +422,18 @@ post_config_update([listeners, Type, Name], {action, _Action, _}, NewConf, OldCo
|
||||||
post_config_update(_Path, _Request, _NewConf, _OldConf, _AppEnvs) ->
|
post_config_update(_Path, _Request, _NewConf, _OldConf, _AppEnvs) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
esockd_opts(Type, Opts0) ->
|
esockd_opts(ListenerId, Type, Opts0) ->
|
||||||
Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0),
|
Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0),
|
||||||
Limiter = limiter(Opts0),
|
Limiter = limiter(Opts0),
|
||||||
Opts2 =
|
Opts2 =
|
||||||
case maps:get(connection, Limiter, undefined) of
|
case maps:get(connection, Limiter, undefined) of
|
||||||
undefined ->
|
undefined ->
|
||||||
Opts1;
|
Opts1;
|
||||||
BucketName ->
|
BucketCfg ->
|
||||||
Opts1#{
|
Opts1#{
|
||||||
limiter => emqx_esockd_htb_limiter:new_create_options(connection, BucketName)
|
limiter => emqx_esockd_htb_limiter:new_create_options(
|
||||||
|
ListenerId, connection, BucketCfg
|
||||||
|
)
|
||||||
}
|
}
|
||||||
end,
|
end,
|
||||||
Opts3 = Opts2#{
|
Opts3 = Opts2#{
|
||||||
|
@ -539,6 +553,27 @@ zone(Opts) ->
|
||||||
limiter(Opts) ->
|
limiter(Opts) ->
|
||||||
maps:get(limiter, Opts, #{}).
|
maps:get(limiter, Opts, #{}).
|
||||||
|
|
||||||
|
add_limiter_bucket(Id, #{limiter := Limiter}) ->
|
||||||
|
maps:fold(
|
||||||
|
fun(Type, Cfg, _) ->
|
||||||
|
emqx_limiter_server:add_bucket(Id, Type, Cfg)
|
||||||
|
end,
|
||||||
|
ok,
|
||||||
|
maps:without([client], Limiter)
|
||||||
|
);
|
||||||
|
add_limiter_bucket(_Id, _Cfg) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
del_limiter_bucket(Id, #{limiter := Limiters}) ->
|
||||||
|
lists:foreach(
|
||||||
|
fun(Type) ->
|
||||||
|
emqx_limiter_server:del_bucket(Id, Type)
|
||||||
|
end,
|
||||||
|
maps:keys(Limiters)
|
||||||
|
);
|
||||||
|
del_limiter_bucket(_Id, _Cfg) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
enable_authn(Opts) ->
|
enable_authn(Opts) ->
|
||||||
maps:get(enable_authn, Opts, true).
|
maps:get(enable_authn, Opts, true).
|
||||||
|
|
||||||
|
|
|
@ -1619,10 +1619,15 @@ base_listener(Bind) ->
|
||||||
)},
|
)},
|
||||||
{"limiter",
|
{"limiter",
|
||||||
sc(
|
sc(
|
||||||
map("ratelimit_name", emqx_limiter_schema:bucket_name()),
|
?R_REF(
|
||||||
|
emqx_limiter_schema,
|
||||||
|
listener_fields
|
||||||
|
),
|
||||||
#{
|
#{
|
||||||
desc => ?DESC(base_listener_limiter),
|
desc => ?DESC(base_listener_limiter),
|
||||||
default => #{<<"connection">> => <<"default">>}
|
default => #{
|
||||||
|
<<"connection">> => #{<<"rate">> => <<"1000/s">>, <<"capacity">> => 1000}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
)},
|
)},
|
||||||
{"enable_authn",
|
{"enable_authn",
|
||||||
|
|
|
@ -273,7 +273,7 @@ check_origin_header(Req, #{listener := {Type, Listener}} = Opts) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
websocket_init([Req, Opts]) ->
|
websocket_init([Req, Opts]) ->
|
||||||
#{zone := Zone, limiter := LimiterCfg, listener := {Type, Listener}} = Opts,
|
#{zone := Zone, limiter := LimiterCfg, listener := {Type, Listener} = ListenerCfg} = Opts,
|
||||||
case check_max_connection(Type, Listener) of
|
case check_max_connection(Type, Listener) of
|
||||||
allow ->
|
allow ->
|
||||||
{Peername, PeerCert} = get_peer_info(Type, Listener, Req, Opts),
|
{Peername, PeerCert} = get_peer_info(Type, Listener, Req, Opts),
|
||||||
|
@ -287,8 +287,10 @@ websocket_init([Req, Opts]) ->
|
||||||
ws_cookie => WsCookie,
|
ws_cookie => WsCookie,
|
||||||
conn_mod => ?MODULE
|
conn_mod => ?MODULE
|
||||||
},
|
},
|
||||||
Limiter = emqx_limiter_container:get_limiter_by_names(
|
Limiter = emqx_limiter_container:get_limiter_by_types(
|
||||||
[?LIMITER_BYTES_IN, ?LIMITER_MESSAGE_IN], LimiterCfg
|
ListenerCfg,
|
||||||
|
[?LIMITER_BYTES_IN, ?LIMITER_MESSAGE_IN],
|
||||||
|
LimiterCfg
|
||||||
),
|
),
|
||||||
MQTTPiggyback = get_ws_opts(Type, Listener, mqtt_piggyback),
|
MQTTPiggyback = get_ws_opts(Type, Listener, mqtt_piggyback),
|
||||||
FrameOpts = #{
|
FrameOpts = #{
|
||||||
|
@ -487,9 +489,6 @@ handle_call(From, info, State) ->
|
||||||
handle_call(From, stats, State) ->
|
handle_call(From, stats, State) ->
|
||||||
gen_server:reply(From, stats(State)),
|
gen_server:reply(From, stats(State)),
|
||||||
return(State);
|
return(State);
|
||||||
handle_call(_From, {ratelimit, Type, Bucket}, State = #state{limiter = Limiter}) ->
|
|
||||||
Limiter2 = emqx_limiter_container:update_by_name(Type, Bucket, Limiter),
|
|
||||||
{reply, ok, State#state{limiter = Limiter2}};
|
|
||||||
handle_call(From, Req, State = #state{channel = Channel}) ->
|
handle_call(From, Req, State = #state{channel = Channel}) ->
|
||||||
case emqx_channel:handle_call(Req, Channel) of
|
case emqx_channel:handle_call(Req, Channel) of
|
||||||
{reply, Reply, NChannel} ->
|
{reply, Reply, NChannel} ->
|
||||||
|
|
|
@ -33,18 +33,6 @@ force_gc_conf() ->
|
||||||
force_shutdown_conf() ->
|
force_shutdown_conf() ->
|
||||||
#{enable => true, max_heap_size => 4194304, max_message_queue_len => 1000}.
|
#{enable => true, max_heap_size => 4194304, max_message_queue_len => 1000}.
|
||||||
|
|
||||||
rate_limit_conf() ->
|
|
||||||
#{
|
|
||||||
conn_bytes_in => ["100KB", "10s"],
|
|
||||||
conn_messages_in => ["100", "10s"],
|
|
||||||
max_conn_rate => 1000,
|
|
||||||
quota =>
|
|
||||||
#{
|
|
||||||
conn_messages_routing => infinity,
|
|
||||||
overall_messages_routing => infinity
|
|
||||||
}
|
|
||||||
}.
|
|
||||||
|
|
||||||
rpc_conf() ->
|
rpc_conf() ->
|
||||||
#{
|
#{
|
||||||
async_batch_size => 256,
|
async_batch_size => 256,
|
||||||
|
@ -173,27 +161,9 @@ listeners_conf() ->
|
||||||
limiter_conf() ->
|
limiter_conf() ->
|
||||||
Make = fun() ->
|
Make = fun() ->
|
||||||
#{
|
#{
|
||||||
bucket =>
|
|
||||||
#{
|
|
||||||
default =>
|
|
||||||
#{
|
|
||||||
capacity => infinity,
|
|
||||||
initial => 0,
|
|
||||||
rate => infinity,
|
|
||||||
per_client =>
|
|
||||||
#{
|
|
||||||
capacity => infinity,
|
|
||||||
divisible => false,
|
|
||||||
failure_strategy => force,
|
|
||||||
initial => 0,
|
|
||||||
low_watermark => 0,
|
|
||||||
max_retry_time => 5000,
|
|
||||||
rate => infinity
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
burst => 0,
|
burst => 0,
|
||||||
rate => infinity
|
rate => infinity,
|
||||||
|
capacity => infinity
|
||||||
}
|
}
|
||||||
end,
|
end,
|
||||||
|
|
||||||
|
@ -202,7 +172,7 @@ limiter_conf() ->
|
||||||
Acc#{Name => Make()}
|
Acc#{Name => Make()}
|
||||||
end,
|
end,
|
||||||
#{},
|
#{},
|
||||||
[bytes_in, message_in, message_routing, connection, batch]
|
[bytes_in, message_in, message_routing, connection, internal]
|
||||||
).
|
).
|
||||||
|
|
||||||
stats_conf() ->
|
stats_conf() ->
|
||||||
|
@ -213,7 +183,6 @@ zone_conf() ->
|
||||||
|
|
||||||
basic_conf() ->
|
basic_conf() ->
|
||||||
#{
|
#{
|
||||||
rate_limit => rate_limit_conf(),
|
|
||||||
force_gc => force_gc_conf(),
|
force_gc => force_gc_conf(),
|
||||||
force_shutdown => force_shutdown_conf(),
|
force_shutdown => force_shutdown_conf(),
|
||||||
mqtt => mqtt_conf(),
|
mqtt => mqtt_conf(),
|
||||||
|
@ -274,10 +243,9 @@ end_per_suite(_Config) ->
|
||||||
emqx_banned
|
emqx_banned
|
||||||
]).
|
]).
|
||||||
|
|
||||||
init_per_testcase(TestCase, Config) ->
|
init_per_testcase(_TestCase, Config) ->
|
||||||
OldConf = set_test_listener_confs(),
|
OldConf = set_test_listener_confs(),
|
||||||
emqx_common_test_helpers:start_apps([]),
|
emqx_common_test_helpers:start_apps([]),
|
||||||
check_modify_limiter(TestCase),
|
|
||||||
[{config, OldConf} | Config].
|
[{config, OldConf} | Config].
|
||||||
|
|
||||||
end_per_testcase(_TestCase, Config) ->
|
end_per_testcase(_TestCase, Config) ->
|
||||||
|
@ -285,41 +253,6 @@ end_per_testcase(_TestCase, Config) ->
|
||||||
emqx_common_test_helpers:stop_apps([]),
|
emqx_common_test_helpers:stop_apps([]),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
check_modify_limiter(TestCase) ->
|
|
||||||
Checks = [t_quota_qos0, t_quota_qos1, t_quota_qos2],
|
|
||||||
case lists:member(TestCase, Checks) of
|
|
||||||
true ->
|
|
||||||
modify_limiter();
|
|
||||||
_ ->
|
|
||||||
ok
|
|
||||||
end.
|
|
||||||
|
|
||||||
%% per_client 5/1s,5
|
|
||||||
%% aggregated 10/1s,10
|
|
||||||
modify_limiter() ->
|
|
||||||
Limiter = emqx_config:get([limiter]),
|
|
||||||
#{message_routing := #{bucket := Bucket} = Routing} = Limiter,
|
|
||||||
#{default := #{per_client := Client} = Default} = Bucket,
|
|
||||||
Client2 = Client#{
|
|
||||||
rate := 5,
|
|
||||||
initial := 0,
|
|
||||||
capacity := 5,
|
|
||||||
low_watermark := 1
|
|
||||||
},
|
|
||||||
Default2 = Default#{
|
|
||||||
per_client := Client2,
|
|
||||||
rate => 10,
|
|
||||||
initial => 0,
|
|
||||||
capacity => 10
|
|
||||||
},
|
|
||||||
Bucket2 = Bucket#{default := Default2},
|
|
||||||
Routing2 = Routing#{bucket := Bucket2},
|
|
||||||
|
|
||||||
emqx_config:put([limiter], Limiter#{message_routing := Routing2}),
|
|
||||||
emqx_limiter_manager:restart_server(message_routing),
|
|
||||||
timer:sleep(100),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Test cases for channel info/stats/caps
|
%% Test cases for channel info/stats/caps
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -729,6 +662,7 @@ t_process_unsubscribe(_) ->
|
||||||
|
|
||||||
t_quota_qos0(_) ->
|
t_quota_qos0(_) ->
|
||||||
esockd_limiter:start_link(),
|
esockd_limiter:start_link(),
|
||||||
|
add_bucket(),
|
||||||
Cnter = counters:new(1, []),
|
Cnter = counters:new(1, []),
|
||||||
ok = meck:expect(emqx_broker, publish, fun(_) -> [{node(), <<"topic">>, {ok, 4}}] end),
|
ok = meck:expect(emqx_broker, publish, fun(_) -> [{node(), <<"topic">>, {ok, 4}}] end),
|
||||||
ok = meck:expect(
|
ok = meck:expect(
|
||||||
|
@ -755,10 +689,12 @@ t_quota_qos0(_) ->
|
||||||
|
|
||||||
ok = meck:expect(emqx_metrics, inc, fun(_) -> ok end),
|
ok = meck:expect(emqx_metrics, inc, fun(_) -> ok end),
|
||||||
ok = meck:expect(emqx_metrics, inc, fun(_, _) -> ok end),
|
ok = meck:expect(emqx_metrics, inc, fun(_, _) -> ok end),
|
||||||
|
del_bucket(),
|
||||||
esockd_limiter:stop().
|
esockd_limiter:stop().
|
||||||
|
|
||||||
t_quota_qos1(_) ->
|
t_quota_qos1(_) ->
|
||||||
esockd_limiter:start_link(),
|
esockd_limiter:start_link(),
|
||||||
|
add_bucket(),
|
||||||
ok = meck:expect(emqx_broker, publish, fun(_) -> [{node(), <<"topic">>, {ok, 4}}] end),
|
ok = meck:expect(emqx_broker, publish, fun(_) -> [{node(), <<"topic">>, {ok, 4}}] end),
|
||||||
Chann = channel(#{conn_state => connected, quota => quota()}),
|
Chann = channel(#{conn_state => connected, quota => quota()}),
|
||||||
Pub = ?PUBLISH_PACKET(?QOS_1, <<"topic">>, 1, <<"payload">>),
|
Pub = ?PUBLISH_PACKET(?QOS_1, <<"topic">>, 1, <<"payload">>),
|
||||||
|
@ -769,10 +705,12 @@ t_quota_qos1(_) ->
|
||||||
{ok, ?PUBACK_PACKET(1, ?RC_SUCCESS), Chann4} = emqx_channel:handle_in(Pub, Chann3),
|
{ok, ?PUBACK_PACKET(1, ?RC_SUCCESS), Chann4} = emqx_channel:handle_in(Pub, Chann3),
|
||||||
%% Quota in overall
|
%% Quota in overall
|
||||||
{ok, ?PUBACK_PACKET(1, ?RC_QUOTA_EXCEEDED), _} = emqx_channel:handle_in(Pub, Chann4),
|
{ok, ?PUBACK_PACKET(1, ?RC_QUOTA_EXCEEDED), _} = emqx_channel:handle_in(Pub, Chann4),
|
||||||
|
del_bucket(),
|
||||||
esockd_limiter:stop().
|
esockd_limiter:stop().
|
||||||
|
|
||||||
t_quota_qos2(_) ->
|
t_quota_qos2(_) ->
|
||||||
esockd_limiter:start_link(),
|
esockd_limiter:start_link(),
|
||||||
|
add_bucket(),
|
||||||
ok = meck:expect(emqx_broker, publish, fun(_) -> [{node(), <<"topic">>, {ok, 4}}] end),
|
ok = meck:expect(emqx_broker, publish, fun(_) -> [{node(), <<"topic">>, {ok, 4}}] end),
|
||||||
Chann = channel(#{conn_state => connected, quota => quota()}),
|
Chann = channel(#{conn_state => connected, quota => quota()}),
|
||||||
Pub1 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>),
|
Pub1 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>),
|
||||||
|
@ -786,6 +724,7 @@ t_quota_qos2(_) ->
|
||||||
{ok, ?PUBREC_PACKET(3, ?RC_SUCCESS), Chann4} = emqx_channel:handle_in(Pub3, Chann3),
|
{ok, ?PUBREC_PACKET(3, ?RC_SUCCESS), Chann4} = emqx_channel:handle_in(Pub3, Chann3),
|
||||||
%% Quota in overall
|
%% Quota in overall
|
||||||
{ok, ?PUBREC_PACKET(4, ?RC_QUOTA_EXCEEDED), _} = emqx_channel:handle_in(Pub4, Chann4),
|
{ok, ?PUBREC_PACKET(4, ?RC_QUOTA_EXCEEDED), _} = emqx_channel:handle_in(Pub4, Chann4),
|
||||||
|
del_bucket(),
|
||||||
esockd_limiter:stop().
|
esockd_limiter:stop().
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -952,12 +891,6 @@ t_handle_call_takeover_end(_) ->
|
||||||
{shutdown, takenover, [], _, _Chan} =
|
{shutdown, takenover, [], _, _Chan} =
|
||||||
emqx_channel:handle_call({takeover, 'end'}, channel()).
|
emqx_channel:handle_call({takeover, 'end'}, channel()).
|
||||||
|
|
||||||
t_handle_call_quota(_) ->
|
|
||||||
{reply, ok, _Chan} = emqx_channel:handle_call(
|
|
||||||
{quota, default},
|
|
||||||
channel()
|
|
||||||
).
|
|
||||||
|
|
||||||
t_handle_call_unexpected(_) ->
|
t_handle_call_unexpected(_) ->
|
||||||
{reply, ignored, _Chan} = emqx_channel:handle_call(unexpected_req, channel()).
|
{reply, ignored, _Chan} = emqx_channel:handle_call(unexpected_req, channel()).
|
||||||
|
|
||||||
|
@ -1176,7 +1109,7 @@ t_ws_cookie_init(_) ->
|
||||||
ConnInfo,
|
ConnInfo,
|
||||||
#{
|
#{
|
||||||
zone => default,
|
zone => default,
|
||||||
limiter => limiter_cfg(),
|
limiter => undefined,
|
||||||
listener => {tcp, default}
|
listener => {tcp, default}
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
|
@ -1210,7 +1143,7 @@ channel(InitFields) ->
|
||||||
ConnInfo,
|
ConnInfo,
|
||||||
#{
|
#{
|
||||||
zone => default,
|
zone => default,
|
||||||
limiter => limiter_cfg(),
|
limiter => undefined,
|
||||||
listener => {tcp, default}
|
listener => {tcp, default}
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
|
@ -1270,9 +1203,31 @@ session(InitFields) when is_map(InitFields) ->
|
||||||
|
|
||||||
%% conn: 5/s; overall: 10/s
|
%% conn: 5/s; overall: 10/s
|
||||||
quota() ->
|
quota() ->
|
||||||
emqx_limiter_container:get_limiter_by_names([message_routing], limiter_cfg()).
|
emqx_limiter_container:get_limiter_by_types(?MODULE, [message_routing], limiter_cfg()).
|
||||||
|
|
||||||
limiter_cfg() -> #{message_routing => default}.
|
limiter_cfg() ->
|
||||||
|
Client = #{
|
||||||
|
rate => 5,
|
||||||
|
initial => 0,
|
||||||
|
capacity => 5,
|
||||||
|
low_watermark => 1,
|
||||||
|
divisible => false,
|
||||||
|
max_retry_time => timer:seconds(5),
|
||||||
|
failure_strategy => force
|
||||||
|
},
|
||||||
|
#{
|
||||||
|
message_routing => bucket_cfg(),
|
||||||
|
client => #{message_routing => Client}
|
||||||
|
}.
|
||||||
|
|
||||||
|
bucket_cfg() ->
|
||||||
|
#{rate => 10, initial => 0, capacity => 10}.
|
||||||
|
|
||||||
|
add_bucket() ->
|
||||||
|
emqx_limiter_server:add_bucket(?MODULE, message_routing, bucket_cfg()).
|
||||||
|
|
||||||
|
del_bucket() ->
|
||||||
|
emqx_limiter_server:del_bucket(?MODULE, message_routing).
|
||||||
|
|
||||||
v4(Channel) ->
|
v4(Channel) ->
|
||||||
ConnInfo = emqx_channel:info(conninfo, Channel),
|
ConnInfo = emqx_channel:info(conninfo, Channel),
|
||||||
|
|
|
@ -78,6 +78,7 @@ end_per_suite(_Config) ->
|
||||||
init_per_testcase(TestCase, Config) when
|
init_per_testcase(TestCase, Config) when
|
||||||
TestCase =/= t_ws_pingreq_before_connected
|
TestCase =/= t_ws_pingreq_before_connected
|
||||||
->
|
->
|
||||||
|
add_bucket(),
|
||||||
ok = meck:expect(emqx_transport, wait, fun(Sock) -> {ok, Sock} end),
|
ok = meck:expect(emqx_transport, wait, fun(Sock) -> {ok, Sock} end),
|
||||||
ok = meck:expect(emqx_transport, type, fun(_Sock) -> tcp end),
|
ok = meck:expect(emqx_transport, type, fun(_Sock) -> tcp end),
|
||||||
ok = meck:expect(
|
ok = meck:expect(
|
||||||
|
@ -104,9 +105,11 @@ init_per_testcase(TestCase, Config) when
|
||||||
_ -> Config
|
_ -> Config
|
||||||
end;
|
end;
|
||||||
init_per_testcase(_, Config) ->
|
init_per_testcase(_, Config) ->
|
||||||
|
add_bucket(),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_testcase(TestCase, Config) ->
|
end_per_testcase(TestCase, Config) ->
|
||||||
|
del_bucket(),
|
||||||
case erlang:function_exported(?MODULE, TestCase, 2) of
|
case erlang:function_exported(?MODULE, TestCase, 2) of
|
||||||
true -> ?MODULE:TestCase('end', Config);
|
true -> ?MODULE:TestCase('end', Config);
|
||||||
false -> ok
|
false -> ok
|
||||||
|
@ -291,11 +294,6 @@ t_handle_call(_) ->
|
||||||
?assertMatch({ok, _St}, handle_msg({event, undefined}, St)),
|
?assertMatch({ok, _St}, handle_msg({event, undefined}, St)),
|
||||||
?assertMatch({reply, _Info, _NSt}, handle_call(self(), info, St)),
|
?assertMatch({reply, _Info, _NSt}, handle_call(self(), info, St)),
|
||||||
?assertMatch({reply, _Stats, _NSt}, handle_call(self(), stats, St)),
|
?assertMatch({reply, _Stats, _NSt}, handle_call(self(), stats, St)),
|
||||||
?assertMatch({reply, ok, _NSt}, handle_call(self(), {ratelimit, []}, St)),
|
|
||||||
?assertMatch(
|
|
||||||
{reply, ok, _NSt},
|
|
||||||
handle_call(self(), {ratelimit, [{bytes_in, default}]}, St)
|
|
||||||
),
|
|
||||||
?assertEqual({reply, ignored, St}, handle_call(self(), for_testing, St)),
|
?assertEqual({reply, ignored, St}, handle_call(self(), for_testing, St)),
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
{stop, {shutdown, kicked}, ok, _NSt},
|
{stop, {shutdown, kicked}, ok, _NSt},
|
||||||
|
@ -704,7 +702,34 @@ handle_msg(Msg, St) -> emqx_connection:handle_msg(Msg, St).
|
||||||
|
|
||||||
handle_call(Pid, Call, St) -> emqx_connection:handle_call(Pid, Call, St).
|
handle_call(Pid, Call, St) -> emqx_connection:handle_call(Pid, Call, St).
|
||||||
|
|
||||||
limiter_cfg() -> #{}.
|
-define(LIMITER_ID, 'tcp:default').
|
||||||
|
|
||||||
init_limiter() ->
|
init_limiter() ->
|
||||||
emqx_limiter_container:get_limiter_by_names([bytes_in, message_in], limiter_cfg()).
|
emqx_limiter_container:get_limiter_by_types(?LIMITER_ID, [bytes_in, message_in], limiter_cfg()).
|
||||||
|
|
||||||
|
limiter_cfg() ->
|
||||||
|
Infinity = emqx_limiter_schema:infinity_value(),
|
||||||
|
Cfg = bucket_cfg(),
|
||||||
|
Client = #{
|
||||||
|
rate => Infinity,
|
||||||
|
initial => 0,
|
||||||
|
capacity => Infinity,
|
||||||
|
low_watermark => 1,
|
||||||
|
divisible => false,
|
||||||
|
max_retry_time => timer:seconds(5),
|
||||||
|
failure_strategy => force
|
||||||
|
},
|
||||||
|
#{bytes_in => Cfg, message_in => Cfg, client => #{bytes_in => Client, message_in => Client}}.
|
||||||
|
|
||||||
|
bucket_cfg() ->
|
||||||
|
Infinity = emqx_limiter_schema:infinity_value(),
|
||||||
|
#{rate => Infinity, initial => 0, capacity => Infinity}.
|
||||||
|
|
||||||
|
add_bucket() ->
|
||||||
|
Cfg = bucket_cfg(),
|
||||||
|
emqx_limiter_server:add_bucket(?LIMITER_ID, bytes_in, Cfg),
|
||||||
|
emqx_limiter_server:add_bucket(?LIMITER_ID, message_in, Cfg).
|
||||||
|
|
||||||
|
del_bucket() ->
|
||||||
|
emqx_limiter_server:del_bucket(?LIMITER_ID, bytes_in),
|
||||||
|
emqx_limiter_server:del_bucket(?LIMITER_ID, message_in).
|
||||||
|
|
|
@ -24,48 +24,7 @@
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
-include_lib("common_test/include/ct.hrl").
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
|
||||||
-define(BASE_CONF, <<
|
-define(BASE_CONF, <<"">>).
|
||||||
""
|
|
||||||
"\n"
|
|
||||||
"limiter {\n"
|
|
||||||
" bytes_in {\n"
|
|
||||||
" bucket.default {\n"
|
|
||||||
" rate = infinity\n"
|
|
||||||
" capacity = infinity\n"
|
|
||||||
" }\n"
|
|
||||||
" }\n"
|
|
||||||
"\n"
|
|
||||||
" message_in {\n"
|
|
||||||
" bucket.default {\n"
|
|
||||||
" rate = infinity\n"
|
|
||||||
" capacity = infinity\n"
|
|
||||||
" }\n"
|
|
||||||
" }\n"
|
|
||||||
"\n"
|
|
||||||
" connection {\n"
|
|
||||||
" bucket.default {\n"
|
|
||||||
" rate = infinity\n"
|
|
||||||
" capacity = infinity\n"
|
|
||||||
" }\n"
|
|
||||||
" }\n"
|
|
||||||
"\n"
|
|
||||||
" message_routing {\n"
|
|
||||||
" bucket.default {\n"
|
|
||||||
" rate = infinity\n"
|
|
||||||
" capacity = infinity\n"
|
|
||||||
" }\n"
|
|
||||||
" }\n"
|
|
||||||
"\n"
|
|
||||||
" batch {\n"
|
|
||||||
" bucket.retainer {\n"
|
|
||||||
" rate = infinity\n"
|
|
||||||
" capacity = infinity\n"
|
|
||||||
" }\n"
|
|
||||||
" }\n"
|
|
||||||
"}\n"
|
|
||||||
"\n"
|
|
||||||
""
|
|
||||||
>>).
|
|
||||||
|
|
||||||
-record(client, {
|
-record(client, {
|
||||||
counter :: counters:counter_ref(),
|
counter :: counters:counter_ref(),
|
||||||
|
@ -97,6 +56,9 @@ end_per_suite(_Config) ->
|
||||||
init_per_testcase(_TestCase, Config) ->
|
init_per_testcase(_TestCase, Config) ->
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
|
end_per_testcase(_TestCase, Config) ->
|
||||||
|
Config.
|
||||||
|
|
||||||
load_conf() ->
|
load_conf() ->
|
||||||
emqx_common_test_helpers:load_config(emqx_limiter_schema, ?BASE_CONF).
|
emqx_common_test_helpers:load_config(emqx_limiter_schema, ?BASE_CONF).
|
||||||
|
|
||||||
|
@ -116,12 +78,12 @@ t_consume(_) ->
|
||||||
failure_strategy := force
|
failure_strategy := force
|
||||||
}
|
}
|
||||||
end,
|
end,
|
||||||
Case = fun() ->
|
Case = fun(BucketCfg) ->
|
||||||
Client = connect(default),
|
Client = connect(BucketCfg),
|
||||||
{ok, L2} = emqx_htb_limiter:consume(50, Client),
|
{ok, L2} = emqx_htb_limiter:consume(50, Client),
|
||||||
{ok, _L3} = emqx_htb_limiter:consume(150, L2)
|
{ok, _L3} = emqx_htb_limiter:consume(150, L2)
|
||||||
end,
|
end,
|
||||||
with_per_client(default, Cfg, Case).
|
with_per_client(Cfg, Case).
|
||||||
|
|
||||||
t_retry(_) ->
|
t_retry(_) ->
|
||||||
Cfg = fun(Cfg) ->
|
Cfg = fun(Cfg) ->
|
||||||
|
@ -133,15 +95,15 @@ t_retry(_) ->
|
||||||
failure_strategy := force
|
failure_strategy := force
|
||||||
}
|
}
|
||||||
end,
|
end,
|
||||||
Case = fun() ->
|
Case = fun(BucketCfg) ->
|
||||||
Client = connect(default),
|
Client = connect(BucketCfg),
|
||||||
{ok, Client} = emqx_htb_limiter:retry(Client),
|
{ok, Client2} = emqx_htb_limiter:retry(Client),
|
||||||
{_, _, Retry, L2} = emqx_htb_limiter:check(150, Client),
|
{_, _, Retry, L2} = emqx_htb_limiter:check(150, Client2),
|
||||||
L3 = emqx_htb_limiter:set_retry(Retry, L2),
|
L3 = emqx_htb_limiter:set_retry(Retry, L2),
|
||||||
timer:sleep(500),
|
timer:sleep(500),
|
||||||
{ok, _L4} = emqx_htb_limiter:retry(L3)
|
{ok, _L4} = emqx_htb_limiter:retry(L3)
|
||||||
end,
|
end,
|
||||||
with_per_client(default, Cfg, Case).
|
with_per_client(Cfg, Case).
|
||||||
|
|
||||||
t_restore(_) ->
|
t_restore(_) ->
|
||||||
Cfg = fun(Cfg) ->
|
Cfg = fun(Cfg) ->
|
||||||
|
@ -153,15 +115,15 @@ t_restore(_) ->
|
||||||
failure_strategy := force
|
failure_strategy := force
|
||||||
}
|
}
|
||||||
end,
|
end,
|
||||||
Case = fun() ->
|
Case = fun(BucketCfg) ->
|
||||||
Client = connect(default),
|
Client = connect(BucketCfg),
|
||||||
{_, _, Retry, L2} = emqx_htb_limiter:check(150, Client),
|
{_, _, Retry, L2} = emqx_htb_limiter:check(150, Client),
|
||||||
timer:sleep(200),
|
timer:sleep(200),
|
||||||
{ok, L3} = emqx_htb_limiter:check(Retry, L2),
|
{ok, L3} = emqx_htb_limiter:check(Retry, L2),
|
||||||
Avaiable = emqx_htb_limiter:available(L3),
|
Avaiable = emqx_htb_limiter:available(L3),
|
||||||
?assert(Avaiable >= 50)
|
?assert(Avaiable >= 50)
|
||||||
end,
|
end,
|
||||||
with_per_client(default, Cfg, Case).
|
with_per_client(Cfg, Case).
|
||||||
|
|
||||||
t_max_retry_time(_) ->
|
t_max_retry_time(_) ->
|
||||||
Cfg = fun(Cfg) ->
|
Cfg = fun(Cfg) ->
|
||||||
|
@ -172,15 +134,15 @@ t_max_retry_time(_) ->
|
||||||
failure_strategy := drop
|
failure_strategy := drop
|
||||||
}
|
}
|
||||||
end,
|
end,
|
||||||
Case = fun() ->
|
Case = fun(BucketCfg) ->
|
||||||
Client = connect(default),
|
Client = connect(BucketCfg),
|
||||||
Begin = ?NOW,
|
Begin = ?NOW,
|
||||||
Result = emqx_htb_limiter:consume(101, Client),
|
Result = emqx_htb_limiter:consume(101, Client),
|
||||||
?assertMatch({drop, _}, Result),
|
?assertMatch({drop, _}, Result),
|
||||||
Time = ?NOW - Begin,
|
Time = ?NOW - Begin,
|
||||||
?assert(Time >= 500 andalso Time < 550)
|
?assert(Time >= 500 andalso Time < 550)
|
||||||
end,
|
end,
|
||||||
with_per_client(default, Cfg, Case).
|
with_per_client(Cfg, Case).
|
||||||
|
|
||||||
t_divisible(_) ->
|
t_divisible(_) ->
|
||||||
Cfg = fun(Cfg) ->
|
Cfg = fun(Cfg) ->
|
||||||
|
@ -191,8 +153,8 @@ t_divisible(_) ->
|
||||||
capacity := 600
|
capacity := 600
|
||||||
}
|
}
|
||||||
end,
|
end,
|
||||||
Case = fun() ->
|
Case = fun(BucketCfg) ->
|
||||||
Client = connect(default),
|
Client = connect(BucketCfg),
|
||||||
Result = emqx_htb_limiter:check(1000, Client),
|
Result = emqx_htb_limiter:check(1000, Client),
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
{partial, 400,
|
{partial, 400,
|
||||||
|
@ -206,7 +168,7 @@ t_divisible(_) ->
|
||||||
Result
|
Result
|
||||||
)
|
)
|
||||||
end,
|
end,
|
||||||
with_per_client(default, Cfg, Case).
|
with_per_client(Cfg, Case).
|
||||||
|
|
||||||
t_low_watermark(_) ->
|
t_low_watermark(_) ->
|
||||||
Cfg = fun(Cfg) ->
|
Cfg = fun(Cfg) ->
|
||||||
|
@ -217,8 +179,8 @@ t_low_watermark(_) ->
|
||||||
capacity := 1000
|
capacity := 1000
|
||||||
}
|
}
|
||||||
end,
|
end,
|
||||||
Case = fun() ->
|
Case = fun(BucketCfg) ->
|
||||||
Client = connect(default),
|
Client = connect(BucketCfg),
|
||||||
Result = emqx_htb_limiter:check(500, Client),
|
Result = emqx_htb_limiter:check(500, Client),
|
||||||
?assertMatch({ok, _}, Result),
|
?assertMatch({ok, _}, Result),
|
||||||
{_, Client2} = Result,
|
{_, Client2} = Result,
|
||||||
|
@ -233,28 +195,21 @@ t_low_watermark(_) ->
|
||||||
Result2
|
Result2
|
||||||
)
|
)
|
||||||
end,
|
end,
|
||||||
with_per_client(default, Cfg, Case).
|
with_per_client(Cfg, Case).
|
||||||
|
|
||||||
t_infinity_client(_) ->
|
t_infinity_client(_) ->
|
||||||
Fun = fun(#{per_client := Cli} = Bucket) ->
|
Fun = fun(Cfg) -> Cfg end,
|
||||||
Bucket2 = Bucket#{
|
Case = fun(Cfg) ->
|
||||||
rate := infinity,
|
Client = connect(Cfg),
|
||||||
capacity := infinity
|
|
||||||
},
|
|
||||||
Cli2 = Cli#{rate := infinity, capacity := infinity},
|
|
||||||
Bucket2#{per_client := Cli2}
|
|
||||||
end,
|
|
||||||
Case = fun() ->
|
|
||||||
Client = connect(default),
|
|
||||||
InfVal = emqx_limiter_schema:infinity_value(),
|
InfVal = emqx_limiter_schema:infinity_value(),
|
||||||
?assertMatch(#{bucket := #{rate := InfVal}}, Client),
|
?assertMatch(#{bucket := #{rate := InfVal}}, Client),
|
||||||
Result = emqx_htb_limiter:check(100000, Client),
|
Result = emqx_htb_limiter:check(100000, Client),
|
||||||
?assertEqual({ok, Client}, Result)
|
?assertEqual({ok, Client}, Result)
|
||||||
end,
|
end,
|
||||||
with_bucket(default, Fun, Case).
|
with_per_client(Fun, Case).
|
||||||
|
|
||||||
t_try_restore_agg(_) ->
|
t_try_restore_agg(_) ->
|
||||||
Fun = fun(#{per_client := Cli} = Bucket) ->
|
Fun = fun(#{client := Cli} = Bucket) ->
|
||||||
Bucket2 = Bucket#{
|
Bucket2 = Bucket#{
|
||||||
rate := 1,
|
rate := 1,
|
||||||
capacity := 200,
|
capacity := 200,
|
||||||
|
@ -267,20 +222,20 @@ t_try_restore_agg(_) ->
|
||||||
max_retry_time := 100,
|
max_retry_time := 100,
|
||||||
failure_strategy := force
|
failure_strategy := force
|
||||||
},
|
},
|
||||||
Bucket2#{per_client := Cli2}
|
Bucket2#{client := Cli2}
|
||||||
end,
|
end,
|
||||||
Case = fun() ->
|
Case = fun(Cfg) ->
|
||||||
Client = connect(default),
|
Client = connect(Cfg),
|
||||||
{_, _, Retry, L2} = emqx_htb_limiter:check(150, Client),
|
{_, _, Retry, L2} = emqx_htb_limiter:check(150, Client),
|
||||||
timer:sleep(200),
|
timer:sleep(200),
|
||||||
{ok, L3} = emqx_htb_limiter:check(Retry, L2),
|
{ok, L3} = emqx_htb_limiter:check(Retry, L2),
|
||||||
Avaiable = emqx_htb_limiter:available(L3),
|
Avaiable = emqx_htb_limiter:available(L3),
|
||||||
?assert(Avaiable >= 50)
|
?assert(Avaiable >= 50)
|
||||||
end,
|
end,
|
||||||
with_bucket(default, Fun, Case).
|
with_bucket(Fun, Case).
|
||||||
|
|
||||||
t_short_board(_) ->
|
t_short_board(_) ->
|
||||||
Fun = fun(#{per_client := Cli} = Bucket) ->
|
Fun = fun(#{client := Cli} = Bucket) ->
|
||||||
Bucket2 = Bucket#{
|
Bucket2 = Bucket#{
|
||||||
rate := ?RATE("100/1s"),
|
rate := ?RATE("100/1s"),
|
||||||
initial := 0,
|
initial := 0,
|
||||||
|
@ -291,18 +246,18 @@ t_short_board(_) ->
|
||||||
capacity := 600,
|
capacity := 600,
|
||||||
initial := 600
|
initial := 600
|
||||||
},
|
},
|
||||||
Bucket2#{per_client := Cli2}
|
Bucket2#{client := Cli2}
|
||||||
end,
|
end,
|
||||||
Case = fun() ->
|
Case = fun(Cfg) ->
|
||||||
Counter = counters:new(1, []),
|
Counter = counters:new(1, []),
|
||||||
start_client(default, ?NOW + 2000, Counter, 20),
|
start_client(Cfg, ?NOW + 2000, Counter, 20),
|
||||||
timer:sleep(2100),
|
timer:sleep(2100),
|
||||||
check_average_rate(Counter, 2, 100)
|
check_average_rate(Counter, 2, 100)
|
||||||
end,
|
end,
|
||||||
with_bucket(default, Fun, Case).
|
with_bucket(Fun, Case).
|
||||||
|
|
||||||
t_rate(_) ->
|
t_rate(_) ->
|
||||||
Fun = fun(#{per_client := Cli} = Bucket) ->
|
Fun = fun(#{client := Cli} = Bucket) ->
|
||||||
Bucket2 = Bucket#{
|
Bucket2 = Bucket#{
|
||||||
rate := ?RATE("100/100ms"),
|
rate := ?RATE("100/100ms"),
|
||||||
initial := 0,
|
initial := 0,
|
||||||
|
@ -313,10 +268,10 @@ t_rate(_) ->
|
||||||
capacity := infinity,
|
capacity := infinity,
|
||||||
initial := 0
|
initial := 0
|
||||||
},
|
},
|
||||||
Bucket2#{per_client := Cli2}
|
Bucket2#{client := Cli2}
|
||||||
end,
|
end,
|
||||||
Case = fun() ->
|
Case = fun(Cfg) ->
|
||||||
Client = connect(default),
|
Client = connect(Cfg),
|
||||||
Ts1 = erlang:system_time(millisecond),
|
Ts1 = erlang:system_time(millisecond),
|
||||||
C1 = emqx_htb_limiter:available(Client),
|
C1 = emqx_htb_limiter:available(Client),
|
||||||
timer:sleep(1000),
|
timer:sleep(1000),
|
||||||
|
@ -326,11 +281,11 @@ t_rate(_) ->
|
||||||
Inc = C2 - C1,
|
Inc = C2 - C1,
|
||||||
?assert(in_range(Inc, ShouldInc - 100, ShouldInc + 100), "test bucket rate")
|
?assert(in_range(Inc, ShouldInc - 100, ShouldInc + 100), "test bucket rate")
|
||||||
end,
|
end,
|
||||||
with_bucket(default, Fun, Case).
|
with_bucket(Fun, Case).
|
||||||
|
|
||||||
t_capacity(_) ->
|
t_capacity(_) ->
|
||||||
Capacity = 600,
|
Capacity = 600,
|
||||||
Fun = fun(#{per_client := Cli} = Bucket) ->
|
Fun = fun(#{client := Cli} = Bucket) ->
|
||||||
Bucket2 = Bucket#{
|
Bucket2 = Bucket#{
|
||||||
rate := ?RATE("100/100ms"),
|
rate := ?RATE("100/100ms"),
|
||||||
initial := 0,
|
initial := 0,
|
||||||
|
@ -341,25 +296,25 @@ t_capacity(_) ->
|
||||||
capacity := infinity,
|
capacity := infinity,
|
||||||
initial := 0
|
initial := 0
|
||||||
},
|
},
|
||||||
Bucket2#{per_client := Cli2}
|
Bucket2#{client := Cli2}
|
||||||
end,
|
end,
|
||||||
Case = fun() ->
|
Case = fun(Cfg) ->
|
||||||
Client = connect(default),
|
Client = connect(Cfg),
|
||||||
timer:sleep(1000),
|
timer:sleep(1000),
|
||||||
C1 = emqx_htb_limiter:available(Client),
|
C1 = emqx_htb_limiter:available(Client),
|
||||||
?assertEqual(Capacity, C1, "test bucket capacity")
|
?assertEqual(Capacity, C1, "test bucket capacity")
|
||||||
end,
|
end,
|
||||||
with_bucket(default, Fun, Case).
|
with_bucket(Fun, Case).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Test Cases Global Level
|
%% Test Cases Global Level
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
t_collaborative_alloc(_) ->
|
t_collaborative_alloc(_) ->
|
||||||
GlobalMod = fun(Cfg) ->
|
GlobalMod = fun(#{message_routing := MR} = Cfg) ->
|
||||||
Cfg#{rate := ?RATE("600/1s")}
|
Cfg#{message_routing := MR#{rate := ?RATE("600/1s")}}
|
||||||
end,
|
end,
|
||||||
|
|
||||||
Bucket1 = fun(#{per_client := Cli} = Bucket) ->
|
Bucket1 = fun(#{client := Cli} = Bucket) ->
|
||||||
Bucket2 = Bucket#{
|
Bucket2 = Bucket#{
|
||||||
rate := ?RATE("400/1s"),
|
rate := ?RATE("400/1s"),
|
||||||
initial := 0,
|
initial := 0,
|
||||||
|
@ -370,7 +325,7 @@ t_collaborative_alloc(_) ->
|
||||||
capacity := 100,
|
capacity := 100,
|
||||||
initial := 100
|
initial := 100
|
||||||
},
|
},
|
||||||
Bucket2#{per_client := Cli2}
|
Bucket2#{client := Cli2}
|
||||||
end,
|
end,
|
||||||
|
|
||||||
Bucket2 = fun(Bucket) ->
|
Bucket2 = fun(Bucket) ->
|
||||||
|
@ -381,8 +336,8 @@ t_collaborative_alloc(_) ->
|
||||||
Case = fun() ->
|
Case = fun() ->
|
||||||
C1 = counters:new(1, []),
|
C1 = counters:new(1, []),
|
||||||
C2 = counters:new(1, []),
|
C2 = counters:new(1, []),
|
||||||
start_client(b1, ?NOW + 2000, C1, 20),
|
start_client({b1, Bucket1}, ?NOW + 2000, C1, 20),
|
||||||
start_client(b2, ?NOW + 2000, C2, 30),
|
start_client({b2, Bucket2}, ?NOW + 2000, C2, 30),
|
||||||
timer:sleep(2100),
|
timer:sleep(2100),
|
||||||
check_average_rate(C1, 2, 300),
|
check_average_rate(C1, 2, 300),
|
||||||
check_average_rate(C2, 2, 300)
|
check_average_rate(C2, 2, 300)
|
||||||
|
@ -395,14 +350,16 @@ t_collaborative_alloc(_) ->
|
||||||
).
|
).
|
||||||
|
|
||||||
t_burst(_) ->
|
t_burst(_) ->
|
||||||
GlobalMod = fun(Cfg) ->
|
GlobalMod = fun(#{message_routing := MR} = Cfg) ->
|
||||||
Cfg#{
|
Cfg#{
|
||||||
|
message_routing := MR#{
|
||||||
rate := ?RATE("200/1s"),
|
rate := ?RATE("200/1s"),
|
||||||
burst := ?RATE("400/1s")
|
burst := ?RATE("400/1s")
|
||||||
}
|
}
|
||||||
|
}
|
||||||
end,
|
end,
|
||||||
|
|
||||||
Bucket = fun(#{per_client := Cli} = Bucket) ->
|
Bucket = fun(#{client := Cli} = Bucket) ->
|
||||||
Bucket2 = Bucket#{
|
Bucket2 = Bucket#{
|
||||||
rate := ?RATE("200/1s"),
|
rate := ?RATE("200/1s"),
|
||||||
initial := 0,
|
initial := 0,
|
||||||
|
@ -413,16 +370,16 @@ t_burst(_) ->
|
||||||
capacity := 200,
|
capacity := 200,
|
||||||
divisible := true
|
divisible := true
|
||||||
},
|
},
|
||||||
Bucket2#{per_client := Cli2}
|
Bucket2#{client := Cli2}
|
||||||
end,
|
end,
|
||||||
|
|
||||||
Case = fun() ->
|
Case = fun() ->
|
||||||
C1 = counters:new(1, []),
|
C1 = counters:new(1, []),
|
||||||
C2 = counters:new(1, []),
|
C2 = counters:new(1, []),
|
||||||
C3 = counters:new(1, []),
|
C3 = counters:new(1, []),
|
||||||
start_client(b1, ?NOW + 2000, C1, 20),
|
start_client({b1, Bucket}, ?NOW + 2000, C1, 20),
|
||||||
start_client(b2, ?NOW + 2000, C2, 30),
|
start_client({b2, Bucket}, ?NOW + 2000, C2, 30),
|
||||||
start_client(b3, ?NOW + 2000, C3, 30),
|
start_client({b3, Bucket}, ?NOW + 2000, C3, 30),
|
||||||
timer:sleep(2100),
|
timer:sleep(2100),
|
||||||
|
|
||||||
Total = lists:sum([counters:get(X, 1) || X <- [C1, C2, C3]]),
|
Total = lists:sum([counters:get(X, 1) || X <- [C1, C2, C3]]),
|
||||||
|
@ -436,11 +393,11 @@ t_burst(_) ->
|
||||||
).
|
).
|
||||||
|
|
||||||
t_limit_global_with_unlimit_other(_) ->
|
t_limit_global_with_unlimit_other(_) ->
|
||||||
GlobalMod = fun(Cfg) ->
|
GlobalMod = fun(#{message_routing := MR} = Cfg) ->
|
||||||
Cfg#{rate := ?RATE("600/1s")}
|
Cfg#{message_routing := MR#{rate := ?RATE("600/1s")}}
|
||||||
end,
|
end,
|
||||||
|
|
||||||
Bucket = fun(#{per_client := Cli} = Bucket) ->
|
Bucket = fun(#{client := Cli} = Bucket) ->
|
||||||
Bucket2 = Bucket#{
|
Bucket2 = Bucket#{
|
||||||
rate := infinity,
|
rate := infinity,
|
||||||
initial := 0,
|
initial := 0,
|
||||||
|
@ -451,12 +408,12 @@ t_limit_global_with_unlimit_other(_) ->
|
||||||
capacity := infinity,
|
capacity := infinity,
|
||||||
initial := 0
|
initial := 0
|
||||||
},
|
},
|
||||||
Bucket2#{per_client := Cli2}
|
Bucket2#{client := Cli2}
|
||||||
end,
|
end,
|
||||||
|
|
||||||
Case = fun() ->
|
Case = fun() ->
|
||||||
C1 = counters:new(1, []),
|
C1 = counters:new(1, []),
|
||||||
start_client(b1, ?NOW + 2000, C1, 20),
|
start_client({b1, Bucket}, ?NOW + 2000, C1, 20),
|
||||||
timer:sleep(2100),
|
timer:sleep(2100),
|
||||||
check_average_rate(C1, 2, 600)
|
check_average_rate(C1, 2, 600)
|
||||||
end,
|
end,
|
||||||
|
@ -470,28 +427,6 @@ t_limit_global_with_unlimit_other(_) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Test Cases container
|
%% Test Cases container
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
t_new_container(_) ->
|
|
||||||
C1 = emqx_limiter_container:new(),
|
|
||||||
C2 = emqx_limiter_container:new([message_routing]),
|
|
||||||
C3 = emqx_limiter_container:update_by_name(message_routing, default, C1),
|
|
||||||
?assertMatch(
|
|
||||||
#{
|
|
||||||
message_routing := _,
|
|
||||||
retry_ctx := undefined,
|
|
||||||
{retry, message_routing} := _
|
|
||||||
},
|
|
||||||
C2
|
|
||||||
),
|
|
||||||
?assertMatch(
|
|
||||||
#{
|
|
||||||
message_routing := _,
|
|
||||||
retry_ctx := undefined,
|
|
||||||
{retry, message_routing} := _
|
|
||||||
},
|
|
||||||
C3
|
|
||||||
),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
t_check_container(_) ->
|
t_check_container(_) ->
|
||||||
Cfg = fun(Cfg) ->
|
Cfg = fun(Cfg) ->
|
||||||
Cfg#{
|
Cfg#{
|
||||||
|
@ -500,10 +435,11 @@ t_check_container(_) ->
|
||||||
capacity := 1000
|
capacity := 1000
|
||||||
}
|
}
|
||||||
end,
|
end,
|
||||||
Case = fun() ->
|
Case = fun(#{client := Client} = BucketCfg) ->
|
||||||
C1 = emqx_limiter_container:new(
|
C1 = emqx_limiter_container:get_limiter_by_types(
|
||||||
|
?MODULE,
|
||||||
[message_routing],
|
[message_routing],
|
||||||
#{message_routing => default}
|
#{message_routing => BucketCfg, client => #{message_routing => Client}}
|
||||||
),
|
),
|
||||||
{ok, C2} = emqx_limiter_container:check(1000, message_routing, C1),
|
{ok, C2} = emqx_limiter_container:check(1000, message_routing, C1),
|
||||||
{pause, Pause, C3} = emqx_limiter_container:check(1000, message_routing, C2),
|
{pause, Pause, C3} = emqx_limiter_container:check(1000, message_routing, C2),
|
||||||
|
@ -514,7 +450,39 @@ t_check_container(_) ->
|
||||||
RetryData = emqx_limiter_container:get_retry_context(C5),
|
RetryData = emqx_limiter_container:get_retry_context(C5),
|
||||||
?assertEqual(Context, RetryData)
|
?assertEqual(Context, RetryData)
|
||||||
end,
|
end,
|
||||||
with_per_client(default, Cfg, Case).
|
with_per_client(Cfg, Case).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Test Override
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
t_bucket_no_client(_) ->
|
||||||
|
Rate = ?RATE("1/s"),
|
||||||
|
GlobalMod = fun(#{client := #{message_routing := MR} = Client} = Cfg) ->
|
||||||
|
Cfg#{client := Client#{message_routing := MR#{rate := Rate}}}
|
||||||
|
end,
|
||||||
|
BucketMod = fun(Bucket) ->
|
||||||
|
maps:remove(client, Bucket)
|
||||||
|
end,
|
||||||
|
Case = fun() ->
|
||||||
|
Limiter = connect(BucketMod(make_limiter_cfg())),
|
||||||
|
?assertMatch(#{rate := Rate}, Limiter)
|
||||||
|
end,
|
||||||
|
with_global(GlobalMod, [BucketMod], Case).
|
||||||
|
|
||||||
|
t_bucket_client(_) ->
|
||||||
|
GlobalRate = ?RATE("1/s"),
|
||||||
|
BucketRate = ?RATE("10/s"),
|
||||||
|
GlobalMod = fun(#{client := #{message_routing := MR} = Client} = Cfg) ->
|
||||||
|
Cfg#{client := Client#{message_routing := MR#{rate := GlobalRate}}}
|
||||||
|
end,
|
||||||
|
BucketMod = fun(#{client := Client} = Bucket) ->
|
||||||
|
Bucket#{client := Client#{rate := BucketRate}}
|
||||||
|
end,
|
||||||
|
Case = fun() ->
|
||||||
|
Limiter = connect(BucketMod(make_limiter_cfg())),
|
||||||
|
?assertMatch(#{rate := BucketRate}, Limiter)
|
||||||
|
end,
|
||||||
|
with_global(GlobalMod, [BucketMod], Case).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Test Cases misc
|
%% Test Cases misc
|
||||||
|
@ -607,19 +575,23 @@ t_schema_unit(_) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%%% Internal functions
|
%%% Internal functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
start_client(Name, EndTime, Counter, Number) ->
|
start_client(Cfg, EndTime, Counter, Number) ->
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
fun(_) ->
|
fun(_) ->
|
||||||
spawn(fun() ->
|
spawn(fun() ->
|
||||||
start_client(Name, EndTime, Counter)
|
do_start_client(Cfg, EndTime, Counter)
|
||||||
end)
|
end)
|
||||||
end,
|
end,
|
||||||
lists:seq(1, Number)
|
lists:seq(1, Number)
|
||||||
).
|
).
|
||||||
|
|
||||||
start_client(Name, EndTime, Counter) ->
|
do_start_client({Name, CfgFun}, EndTime, Counter) ->
|
||||||
#{per_client := PerClient} =
|
do_start_client(Name, CfgFun(make_limiter_cfg()), EndTime, Counter);
|
||||||
emqx_config:get([limiter, message_routing, bucket, Name]),
|
do_start_client(Cfg, EndTime, Counter) ->
|
||||||
|
do_start_client(?MODULE, Cfg, EndTime, Counter).
|
||||||
|
|
||||||
|
do_start_client(Name, Cfg, EndTime, Counter) ->
|
||||||
|
#{client := PerClient} = Cfg,
|
||||||
#{rate := Rate} = PerClient,
|
#{rate := Rate} = PerClient,
|
||||||
Client = #client{
|
Client = #client{
|
||||||
start = ?NOW,
|
start = ?NOW,
|
||||||
|
@ -627,7 +599,7 @@ start_client(Name, EndTime, Counter) ->
|
||||||
counter = Counter,
|
counter = Counter,
|
||||||
obtained = 0,
|
obtained = 0,
|
||||||
rate = Rate,
|
rate = Rate,
|
||||||
client = connect(Name)
|
client = connect(Name, Cfg)
|
||||||
},
|
},
|
||||||
client_loop(Client).
|
client_loop(Client).
|
||||||
|
|
||||||
|
@ -711,35 +683,50 @@ to_rate(Str) ->
|
||||||
{ok, Rate} = emqx_limiter_schema:to_rate(Str),
|
{ok, Rate} = emqx_limiter_schema:to_rate(Str),
|
||||||
Rate.
|
Rate.
|
||||||
|
|
||||||
with_global(Modifier, BuckeTemps, Case) ->
|
with_global(Modifier, Buckets, Case) ->
|
||||||
Fun = fun(Cfg) ->
|
with_config([limiter], Modifier, Buckets, Case).
|
||||||
#{bucket := #{default := BucketCfg}} = Cfg2 = Modifier(Cfg),
|
|
||||||
Fun = fun({Name, BMod}, Acc) ->
|
|
||||||
Acc#{Name => BMod(BucketCfg)}
|
|
||||||
end,
|
|
||||||
Buckets = lists:foldl(Fun, #{}, BuckeTemps),
|
|
||||||
Cfg2#{bucket := Buckets}
|
|
||||||
end,
|
|
||||||
|
|
||||||
with_config([limiter, message_routing], Fun, Case).
|
with_bucket(Modifier, Case) ->
|
||||||
|
Cfg = Modifier(make_limiter_cfg()),
|
||||||
|
add_bucket(Cfg),
|
||||||
|
Case(Cfg),
|
||||||
|
del_bucket().
|
||||||
|
|
||||||
with_bucket(Bucket, Modifier, Case) ->
|
with_per_client(Modifier, Case) ->
|
||||||
Path = [limiter, message_routing, bucket, Bucket],
|
#{client := Client} = Cfg = make_limiter_cfg(),
|
||||||
with_config(Path, Modifier, Case).
|
Cfg2 = Cfg#{client := Modifier(Client)},
|
||||||
|
add_bucket(Cfg2),
|
||||||
|
Case(Cfg2),
|
||||||
|
del_bucket().
|
||||||
|
|
||||||
with_per_client(Bucket, Modifier, Case) ->
|
with_config(Path, Modifier, Buckets, Case) ->
|
||||||
Path = [limiter, message_routing, bucket, Bucket, per_client],
|
|
||||||
with_config(Path, Modifier, Case).
|
|
||||||
|
|
||||||
with_config(Path, Modifier, Case) ->
|
|
||||||
Cfg = emqx_config:get(Path),
|
Cfg = emqx_config:get(Path),
|
||||||
NewCfg = Modifier(Cfg),
|
NewCfg = Modifier(Cfg),
|
||||||
ct:pal("test with config:~p~n", [NewCfg]),
|
|
||||||
emqx_config:put(Path, NewCfg),
|
emqx_config:put(Path, NewCfg),
|
||||||
emqx_limiter_server:restart(message_routing),
|
emqx_limiter_server:restart(message_routing),
|
||||||
timer:sleep(500),
|
timer:sleep(500),
|
||||||
|
BucketCfg = make_limiter_cfg(),
|
||||||
|
lists:foreach(
|
||||||
|
fun
|
||||||
|
({Name, BucketFun}) ->
|
||||||
|
add_bucket(Name, BucketFun(BucketCfg));
|
||||||
|
(BucketFun) ->
|
||||||
|
add_bucket(BucketFun(BucketCfg))
|
||||||
|
end,
|
||||||
|
Buckets
|
||||||
|
),
|
||||||
DelayReturn = delay_return(Case),
|
DelayReturn = delay_return(Case),
|
||||||
|
lists:foreach(
|
||||||
|
fun
|
||||||
|
({Name, _Cfg}) ->
|
||||||
|
del_bucket(Name);
|
||||||
|
(_Cfg) ->
|
||||||
|
del_bucket()
|
||||||
|
end,
|
||||||
|
Buckets
|
||||||
|
),
|
||||||
emqx_config:put(Path, Cfg),
|
emqx_config:put(Path, Cfg),
|
||||||
|
emqx_limiter_server:restart(message_routing),
|
||||||
DelayReturn().
|
DelayReturn().
|
||||||
|
|
||||||
delay_return(Case) ->
|
delay_return(Case) ->
|
||||||
|
@ -751,10 +738,40 @@ delay_return(Case) ->
|
||||||
fun() -> erlang:raise(Type, Reason, Trace) end
|
fun() -> erlang:raise(Type, Reason, Trace) end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
connect(Name) ->
|
connect({Name, CfgFun}) ->
|
||||||
{ok, Limiter} = emqx_limiter_server:connect(message_routing, Name),
|
connect(Name, CfgFun(make_limiter_cfg()));
|
||||||
|
connect(Cfg) ->
|
||||||
|
connect(?MODULE, Cfg).
|
||||||
|
|
||||||
|
connect(Name, Cfg) ->
|
||||||
|
{ok, Limiter} = emqx_limiter_server:connect(Name, message_routing, Cfg),
|
||||||
Limiter.
|
Limiter.
|
||||||
|
|
||||||
|
make_limiter_cfg() ->
|
||||||
|
Infinity = emqx_limiter_schema:infinity_value(),
|
||||||
|
Client = #{
|
||||||
|
rate => Infinity,
|
||||||
|
initial => 0,
|
||||||
|
capacity => Infinity,
|
||||||
|
low_watermark => 0,
|
||||||
|
divisible => false,
|
||||||
|
max_retry_time => timer:seconds(5),
|
||||||
|
failure_strategy => force
|
||||||
|
},
|
||||||
|
#{client => Client, rate => Infinity, initial => 0, capacity => Infinity}.
|
||||||
|
|
||||||
|
add_bucket(Cfg) ->
|
||||||
|
add_bucket(?MODULE, Cfg).
|
||||||
|
|
||||||
|
add_bucket(Name, Cfg) ->
|
||||||
|
emqx_limiter_server:add_bucket(Name, message_routing, Cfg).
|
||||||
|
|
||||||
|
del_bucket() ->
|
||||||
|
del_bucket(?MODULE).
|
||||||
|
|
||||||
|
del_bucket(Name) ->
|
||||||
|
emqx_limiter_server:del_bucket(Name, message_routing).
|
||||||
|
|
||||||
check_average_rate(Counter, Second, Rate) ->
|
check_average_rate(Counter, Second, Rate) ->
|
||||||
Cost = counters:get(Counter, 1),
|
Cost = counters:get(Counter, 1),
|
||||||
PerSec = Cost / Second,
|
PerSec = Cost / Second,
|
||||||
|
|
|
@ -59,6 +59,7 @@ init_per_testcase(TestCase, Config) when
|
||||||
TestCase =/= t_ws_pingreq_before_connected,
|
TestCase =/= t_ws_pingreq_before_connected,
|
||||||
TestCase =/= t_ws_non_check_origin
|
TestCase =/= t_ws_non_check_origin
|
||||||
->
|
->
|
||||||
|
add_bucket(),
|
||||||
%% Meck Cm
|
%% Meck Cm
|
||||||
ok = meck:new(emqx_cm, [passthrough, no_history, no_link]),
|
ok = meck:new(emqx_cm, [passthrough, no_history, no_link]),
|
||||||
ok = meck:expect(emqx_cm, mark_channel_connected, fun(_) -> ok end),
|
ok = meck:expect(emqx_cm, mark_channel_connected, fun(_) -> ok end),
|
||||||
|
@ -96,6 +97,7 @@ init_per_testcase(TestCase, Config) when
|
||||||
| Config
|
| Config
|
||||||
];
|
];
|
||||||
init_per_testcase(t_ws_non_check_origin, Config) ->
|
init_per_testcase(t_ws_non_check_origin, Config) ->
|
||||||
|
add_bucket(),
|
||||||
ok = emqx_common_test_helpers:start_apps([]),
|
ok = emqx_common_test_helpers:start_apps([]),
|
||||||
PrevConfig = emqx_config:get_listener_conf(ws, default, [websocket]),
|
PrevConfig = emqx_config:get_listener_conf(ws, default, [websocket]),
|
||||||
emqx_config:put_listener_conf(ws, default, [websocket, check_origin_enable], false),
|
emqx_config:put_listener_conf(ws, default, [websocket, check_origin_enable], false),
|
||||||
|
@ -105,6 +107,7 @@ init_per_testcase(t_ws_non_check_origin, Config) ->
|
||||||
| Config
|
| Config
|
||||||
];
|
];
|
||||||
init_per_testcase(_, Config) ->
|
init_per_testcase(_, Config) ->
|
||||||
|
add_bucket(),
|
||||||
PrevConfig = emqx_config:get_listener_conf(ws, default, [websocket]),
|
PrevConfig = emqx_config:get_listener_conf(ws, default, [websocket]),
|
||||||
ok = emqx_common_test_helpers:start_apps([]),
|
ok = emqx_common_test_helpers:start_apps([]),
|
||||||
[
|
[
|
||||||
|
@ -119,6 +122,7 @@ end_per_testcase(TestCase, _Config) when
|
||||||
TestCase =/= t_ws_non_check_origin,
|
TestCase =/= t_ws_non_check_origin,
|
||||||
TestCase =/= t_ws_pingreq_before_connected
|
TestCase =/= t_ws_pingreq_before_connected
|
||||||
->
|
->
|
||||||
|
del_bucket(),
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
fun meck:unload/1,
|
fun meck:unload/1,
|
||||||
[
|
[
|
||||||
|
@ -131,11 +135,13 @@ end_per_testcase(TestCase, _Config) when
|
||||||
]
|
]
|
||||||
);
|
);
|
||||||
end_per_testcase(t_ws_non_check_origin, Config) ->
|
end_per_testcase(t_ws_non_check_origin, Config) ->
|
||||||
|
del_bucket(),
|
||||||
PrevConfig = ?config(prev_config, Config),
|
PrevConfig = ?config(prev_config, Config),
|
||||||
emqx_config:put_listener_conf(ws, default, [websocket], PrevConfig),
|
emqx_config:put_listener_conf(ws, default, [websocket], PrevConfig),
|
||||||
emqx_common_test_helpers:stop_apps([]),
|
emqx_common_test_helpers:stop_apps([]),
|
||||||
ok;
|
ok;
|
||||||
end_per_testcase(_, Config) ->
|
end_per_testcase(_, Config) ->
|
||||||
|
del_bucket(),
|
||||||
PrevConfig = ?config(prev_config, Config),
|
PrevConfig = ?config(prev_config, Config),
|
||||||
emqx_config:put_listener_conf(ws, default, [websocket], PrevConfig),
|
emqx_config:put_listener_conf(ws, default, [websocket], PrevConfig),
|
||||||
emqx_common_test_helpers:stop_apps([]),
|
emqx_common_test_helpers:stop_apps([]),
|
||||||
|
@ -501,15 +507,12 @@ t_handle_timeout_emit_stats(_) ->
|
||||||
?assertEqual(undefined, ?ws_conn:info(stats_timer, St)).
|
?assertEqual(undefined, ?ws_conn:info(stats_timer, St)).
|
||||||
|
|
||||||
t_ensure_rate_limit(_) ->
|
t_ensure_rate_limit(_) ->
|
||||||
%% XXX In the future, limiter should provide API for config update
|
|
||||||
Path = [limiter, bytes_in, bucket, default, per_client],
|
|
||||||
PerClient = emqx_config:get(Path),
|
|
||||||
{ok, Rate} = emqx_limiter_schema:to_rate("50MB"),
|
{ok, Rate} = emqx_limiter_schema:to_rate("50MB"),
|
||||||
emqx_config:put(Path, PerClient#{rate := Rate}),
|
Limiter = init_limiter(#{
|
||||||
emqx_limiter_server:restart(bytes_in),
|
bytes_in => bucket_cfg(),
|
||||||
timer:sleep(100),
|
message_in => bucket_cfg(),
|
||||||
|
client => #{bytes_in => client_cfg(Rate)}
|
||||||
Limiter = init_limiter(),
|
}),
|
||||||
St = st(#{limiter => Limiter}),
|
St = st(#{limiter => Limiter}),
|
||||||
|
|
||||||
%% must bigger than value in emqx_ratelimit_SUITE
|
%% must bigger than value in emqx_ratelimit_SUITE
|
||||||
|
@ -522,11 +525,7 @@ t_ensure_rate_limit(_) ->
|
||||||
St
|
St
|
||||||
),
|
),
|
||||||
?assertEqual(blocked, ?ws_conn:info(sockstate, St1)),
|
?assertEqual(blocked, ?ws_conn:info(sockstate, St1)),
|
||||||
?assertEqual([{active, false}], ?ws_conn:info(postponed, St1)),
|
?assertEqual([{active, false}], ?ws_conn:info(postponed, St1)).
|
||||||
|
|
||||||
emqx_config:put(Path, PerClient),
|
|
||||||
emqx_limiter_server:restart(bytes_in),
|
|
||||||
timer:sleep(100).
|
|
||||||
|
|
||||||
t_parse_incoming(_) ->
|
t_parse_incoming(_) ->
|
||||||
{Packets, St} = ?ws_conn:parse_incoming(<<48, 3>>, [], st()),
|
{Packets, St} = ?ws_conn:parse_incoming(<<48, 3>>, [], st()),
|
||||||
|
@ -691,7 +690,44 @@ ws_client(State) ->
|
||||||
ct:fail(ws_timeout)
|
ct:fail(ws_timeout)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
limiter_cfg() -> #{bytes_in => default, message_in => default}.
|
-define(LIMITER_ID, 'ws:default').
|
||||||
|
|
||||||
init_limiter() ->
|
init_limiter() ->
|
||||||
emqx_limiter_container:get_limiter_by_names([bytes_in, message_in], limiter_cfg()).
|
init_limiter(limiter_cfg()).
|
||||||
|
|
||||||
|
init_limiter(LimiterCfg) ->
|
||||||
|
emqx_limiter_container:get_limiter_by_types(?LIMITER_ID, [bytes_in, message_in], LimiterCfg).
|
||||||
|
|
||||||
|
limiter_cfg() ->
|
||||||
|
Cfg = bucket_cfg(),
|
||||||
|
Client = client_cfg(),
|
||||||
|
#{bytes_in => Cfg, message_in => Cfg, client => #{bytes_in => Client, message_in => Client}}.
|
||||||
|
|
||||||
|
client_cfg() ->
|
||||||
|
Infinity = emqx_limiter_schema:infinity_value(),
|
||||||
|
client_cfg(Infinity).
|
||||||
|
|
||||||
|
client_cfg(Rate) ->
|
||||||
|
Infinity = emqx_limiter_schema:infinity_value(),
|
||||||
|
#{
|
||||||
|
rate => Rate,
|
||||||
|
initial => 0,
|
||||||
|
capacity => Infinity,
|
||||||
|
low_watermark => 1,
|
||||||
|
divisible => false,
|
||||||
|
max_retry_time => timer:seconds(5),
|
||||||
|
failure_strategy => force
|
||||||
|
}.
|
||||||
|
|
||||||
|
bucket_cfg() ->
|
||||||
|
Infinity = emqx_limiter_schema:infinity_value(),
|
||||||
|
#{rate => Infinity, initial => 0, capacity => Infinity}.
|
||||||
|
|
||||||
|
add_bucket() ->
|
||||||
|
Cfg = bucket_cfg(),
|
||||||
|
emqx_limiter_server:add_bucket(?LIMITER_ID, bytes_in, Cfg),
|
||||||
|
emqx_limiter_server:add_bucket(?LIMITER_ID, message_in, Cfg).
|
||||||
|
|
||||||
|
del_bucket() ->
|
||||||
|
emqx_limiter_server:del_bucket(?LIMITER_ID, bytes_in),
|
||||||
|
emqx_limiter_server:del_bucket(?LIMITER_ID, message_in).
|
||||||
|
|
|
@ -777,6 +777,8 @@ to_bin(List) when is_list(List) ->
|
||||||
end;
|
end;
|
||||||
to_bin(Boolean) when is_boolean(Boolean) -> Boolean;
|
to_bin(Boolean) when is_boolean(Boolean) -> Boolean;
|
||||||
to_bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8);
|
to_bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8);
|
||||||
|
to_bin({Type, Args}) ->
|
||||||
|
unicode:characters_to_binary(io_lib:format("~p(~p)", [Type, Args]));
|
||||||
to_bin(X) ->
|
to_bin(X) ->
|
||||||
X.
|
X.
|
||||||
|
|
||||||
|
|
|
@ -348,12 +348,16 @@ enable_retainer(
|
||||||
#{context_id := ContextId} = State,
|
#{context_id := ContextId} = State,
|
||||||
#{
|
#{
|
||||||
msg_clear_interval := ClearInterval,
|
msg_clear_interval := ClearInterval,
|
||||||
backend := BackendCfg
|
backend := BackendCfg,
|
||||||
|
flow_control := FlowControl
|
||||||
}
|
}
|
||||||
) ->
|
) ->
|
||||||
NewContextId = ContextId + 1,
|
NewContextId = ContextId + 1,
|
||||||
Context = create_resource(new_context(NewContextId), BackendCfg),
|
Context = create_resource(new_context(NewContextId), BackendCfg),
|
||||||
load(Context),
|
load(Context),
|
||||||
|
emqx_limiter_server:add_bucket(
|
||||||
|
?APP, internal, maps:get(batch_deliver_limiter, FlowControl, undefined)
|
||||||
|
),
|
||||||
State#{
|
State#{
|
||||||
enable := true,
|
enable := true,
|
||||||
context_id := NewContextId,
|
context_id := NewContextId,
|
||||||
|
@ -369,6 +373,7 @@ disable_retainer(
|
||||||
} = State
|
} = State
|
||||||
) ->
|
) ->
|
||||||
unload(),
|
unload(),
|
||||||
|
emqx_limiter_server:del_bucket(?APP, internal),
|
||||||
ok = close_resource(Context),
|
ok = close_resource(Context),
|
||||||
State#{
|
State#{
|
||||||
enable := false,
|
enable := false,
|
||||||
|
|
|
@ -151,13 +151,8 @@ config(get, _) ->
|
||||||
{200, emqx:get_raw_config([retainer])};
|
{200, emqx:get_raw_config([retainer])};
|
||||||
config(put, #{body := Body}) ->
|
config(put, #{body := Body}) ->
|
||||||
try
|
try
|
||||||
check_bucket_exists(
|
{ok, _} = emqx_retainer:update_config(Body),
|
||||||
Body,
|
|
||||||
fun(Conf) ->
|
|
||||||
{ok, _} = emqx_retainer:update_config(Conf),
|
|
||||||
{200, emqx:get_raw_config([retainer])}
|
{200, emqx:get_raw_config([retainer])}
|
||||||
end
|
|
||||||
)
|
|
||||||
catch
|
catch
|
||||||
_:Reason:_ ->
|
_:Reason:_ ->
|
||||||
{400, #{
|
{400, #{
|
||||||
|
@ -237,30 +232,3 @@ check_backend(Type, Params, Cont) ->
|
||||||
_ ->
|
_ ->
|
||||||
{400, 'BAD_REQUEST', <<"This API only support built in database">>}
|
{400, 'BAD_REQUEST', <<"This API only support built in database">>}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
check_bucket_exists(
|
|
||||||
#{
|
|
||||||
<<"flow_control">> :=
|
|
||||||
#{<<"batch_deliver_limiter">> := Name} = Flow
|
|
||||||
} = Conf,
|
|
||||||
Cont
|
|
||||||
) ->
|
|
||||||
case erlang:binary_to_atom(Name) of
|
|
||||||
'' ->
|
|
||||||
%% workaround, empty string means set the value to undefined,
|
|
||||||
%% but now, we can't store `undefined` in the config file correct,
|
|
||||||
%% but, we can delete this field
|
|
||||||
Cont(Conf#{
|
|
||||||
<<"flow_control">> := maps:remove(<<"batch_deliver_limiter">>, Flow)
|
|
||||||
});
|
|
||||||
Bucket ->
|
|
||||||
Path = emqx_limiter_schema:get_bucket_cfg_path(batch, Bucket),
|
|
||||||
case emqx:get_config(Path, undefined) of
|
|
||||||
undefined ->
|
|
||||||
{400, 'BAD_REQUEST', <<"The limiter bucket not exists">>};
|
|
||||||
_ ->
|
|
||||||
Cont(Conf)
|
|
||||||
end
|
|
||||||
end;
|
|
||||||
check_bucket_exists(Conf, Cont) ->
|
|
||||||
Cont(Conf).
|
|
||||||
|
|
|
@ -115,8 +115,8 @@ start_link(Pool, Id) ->
|
||||||
init([Pool, Id]) ->
|
init([Pool, Id]) ->
|
||||||
erlang:process_flag(trap_exit, true),
|
erlang:process_flag(trap_exit, true),
|
||||||
true = gproc_pool:connect_worker(Pool, {Pool, Id}),
|
true = gproc_pool:connect_worker(Pool, {Pool, Id}),
|
||||||
BucketName = emqx:get_config([retainer, flow_control, batch_deliver_limiter], undefined),
|
BucketCfg = emqx:get_config([retainer, flow_control, batch_deliver_limiter], undefined),
|
||||||
{ok, Limiter} = emqx_limiter_server:connect(batch, BucketName),
|
{ok, Limiter} = emqx_limiter_server:connect(?APP, internal, BucketCfg),
|
||||||
{ok, #{pool => Pool, id => Id, limiter => Limiter}}.
|
{ok, #{pool => Pool, id => Id, limiter => Limiter}}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -155,8 +155,8 @@ handle_cast({dispatch, Context, Pid, Topic}, #{limiter := Limiter} = State) ->
|
||||||
{ok, Limiter2} = dispatch(Context, Pid, Topic, undefined, Limiter),
|
{ok, Limiter2} = dispatch(Context, Pid, Topic, undefined, Limiter),
|
||||||
{noreply, State#{limiter := Limiter2}};
|
{noreply, State#{limiter := Limiter2}};
|
||||||
handle_cast({refresh_limiter, Conf}, State) ->
|
handle_cast({refresh_limiter, Conf}, State) ->
|
||||||
BucketName = emqx_map_lib:deep_get([flow_control, batch_deliver_limiter], Conf, undefined),
|
BucketCfg = emqx_map_lib:deep_get([flow_control, batch_deliver_limiter], Conf, undefined),
|
||||||
{ok, Limiter} = emqx_limiter_server:connect(batch, BucketName),
|
{ok, Limiter} = emqx_limiter_server:connect(?APP, internal, BucketCfg),
|
||||||
{noreply, State#{limiter := Limiter}};
|
{noreply, State#{limiter := Limiter}};
|
||||||
handle_cast(Msg, State) ->
|
handle_cast(Msg, State) ->
|
||||||
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
|
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
|
||||||
|
|
|
@ -86,7 +86,7 @@ fields(flow_control) ->
|
||||||
)},
|
)},
|
||||||
{batch_deliver_limiter,
|
{batch_deliver_limiter,
|
||||||
sc(
|
sc(
|
||||||
emqx_limiter_schema:bucket_name(),
|
?R_REF(emqx_limiter_schema, internal),
|
||||||
batch_deliver_limiter,
|
batch_deliver_limiter,
|
||||||
undefined
|
undefined
|
||||||
)}
|
)}
|
||||||
|
|
|
@ -368,27 +368,16 @@ t_stop_publish_clear_msg(_) ->
|
||||||
ok = emqtt:disconnect(C1).
|
ok = emqtt:disconnect(C1).
|
||||||
|
|
||||||
t_flow_control(_) ->
|
t_flow_control(_) ->
|
||||||
#{per_client := PerClient} = RetainerCfg = emqx_config:get([limiter, batch, bucket, retainer]),
|
Rate = emqx_ratelimiter_SUITE:to_rate("1/1s"),
|
||||||
RetainerCfg2 = RetainerCfg#{
|
LimiterCfg = make_limiter_cfg(Rate),
|
||||||
per_client :=
|
JsonCfg = make_limiter_json(<<"1/1s">>),
|
||||||
PerClient#{
|
emqx_limiter_server:add_bucket(emqx_retainer, internal, LimiterCfg),
|
||||||
rate := emqx_ratelimiter_SUITE:to_rate("1/1s"),
|
|
||||||
capacity := 1
|
|
||||||
}
|
|
||||||
},
|
|
||||||
emqx_config:put([limiter, batch, bucket, retainer], RetainerCfg2),
|
|
||||||
emqx_limiter_manager:restart_server(batch),
|
|
||||||
timer:sleep(500),
|
|
||||||
|
|
||||||
emqx_retainer_dispatcher:refresh_limiter(),
|
|
||||||
timer:sleep(500),
|
|
||||||
|
|
||||||
emqx_retainer:update_config(#{
|
emqx_retainer:update_config(#{
|
||||||
<<"flow_control">> =>
|
<<"flow_control">> =>
|
||||||
#{
|
#{
|
||||||
<<"batch_read_number">> => 1,
|
<<"batch_read_number">> => 1,
|
||||||
<<"batch_deliver_number">> => 1,
|
<<"batch_deliver_number">> => 1,
|
||||||
<<"batch_deliver_limiter">> => retainer
|
<<"batch_deliver_limiter">> => JsonCfg
|
||||||
}
|
}
|
||||||
}),
|
}),
|
||||||
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
||||||
|
@ -424,13 +413,14 @@ t_flow_control(_) ->
|
||||||
|
|
||||||
ok = emqtt:disconnect(C1),
|
ok = emqtt:disconnect(C1),
|
||||||
|
|
||||||
%% recover the limiter
|
emqx_limiter_server:del_bucket(emqx_retainer, internal),
|
||||||
emqx_config:put([limiter, batch, bucket, retainer], RetainerCfg),
|
emqx_retainer:update_config(#{
|
||||||
emqx_limiter_manager:restart_server(batch),
|
<<"flow_control">> =>
|
||||||
timer:sleep(500),
|
#{
|
||||||
|
<<"batch_read_number">> => 1,
|
||||||
emqx_retainer_dispatcher:refresh_limiter(),
|
<<"batch_deliver_number">> => 1
|
||||||
timer:sleep(500),
|
}
|
||||||
|
}),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_clear_expired(_) ->
|
t_clear_expired(_) ->
|
||||||
|
@ -684,3 +674,33 @@ with_conf(ConfMod, Case) ->
|
||||||
emqx_retainer:update_config(Conf),
|
emqx_retainer:update_config(Conf),
|
||||||
erlang:raise(Type, Error, Strace)
|
erlang:raise(Type, Error, Strace)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
make_limiter_cfg(Rate) ->
|
||||||
|
Infinity = emqx_limiter_schema:infinity_value(),
|
||||||
|
Client = #{
|
||||||
|
rate => Rate,
|
||||||
|
initial => 0,
|
||||||
|
capacity => Infinity,
|
||||||
|
low_watermark => 1,
|
||||||
|
divisible => false,
|
||||||
|
max_retry_time => timer:seconds(5),
|
||||||
|
failure_strategy => force
|
||||||
|
},
|
||||||
|
#{client => Client, rate => Infinity, initial => 0, capacity => Infinity}.
|
||||||
|
|
||||||
|
make_limiter_json(Rate) ->
|
||||||
|
Client = #{
|
||||||
|
<<"rate">> => Rate,
|
||||||
|
<<"initial">> => 0,
|
||||||
|
<<"capacity">> => <<"infinity">>,
|
||||||
|
<<"low_watermark">> => 0,
|
||||||
|
<<"divisible">> => <<"false">>,
|
||||||
|
<<"max_retry_time">> => <<"5s">>,
|
||||||
|
<<"failure_strategy">> => <<"force">>
|
||||||
|
},
|
||||||
|
#{
|
||||||
|
<<"client">> => Client,
|
||||||
|
<<"rate">> => <<"infinity">>,
|
||||||
|
<<"initial">> => 0,
|
||||||
|
<<"capacity">> => <<"infinity">>
|
||||||
|
}.
|
||||||
|
|
Loading…
Reference in New Issue