fix(limiter): lift the level of the `client` field

This commit is contained in:
firest 2022-07-22 17:14:24 +08:00
parent dbab1bc96a
commit 15c8110af2
6 changed files with 107 additions and 63 deletions

View File

@ -89,10 +89,10 @@ the check/consume will succeed, but it will be forced to wait for a short period
} }
} }
per_client { client {
desc { desc {
en: """The rate limit for each user of the bucket, this field is not required""" en: """The rate limit for each user of the bucket"""
zh: """对桶的每个使用者的速率控制设置,这个不是必须的""" zh: """对桶的每个使用者的速率控制设置"""
} }
label: { label: {
en: """Per Client""" en: """Per Client"""

View File

@ -32,8 +32,7 @@
desc/1, desc/1,
types/0, types/0,
infinity_value/0, infinity_value/0,
bucket_opts/0, bucket_fields/1
bucket_opts_meta/0
]). ]).
-define(KILOBYTE, 1024). -define(KILOBYTE, 1024).
@ -96,6 +95,16 @@ fields(limiter) ->
default => make_limiter_default(Type) default => make_limiter_default(Type)
})} })}
|| Type <- types() || Type <- types()
] ++
[
{client,
?HOCON(
?R_REF(client_fields),
#{
desc => ?DESC(client),
default => #{}
}
)}
]; ];
fields(node_opts) -> fields(node_opts) ->
[ [
@ -104,15 +113,22 @@ fields(node_opts) ->
?HOCON(burst_rate(), #{ ?HOCON(burst_rate(), #{
desc => ?DESC(burst), desc => ?DESC(burst),
default => 0 default => 0
})}, })}
{client, ?HOCON(?R_REF(client_opts), #{default => #{}})} ];
fields(client_fields) ->
[
{Type,
?HOCON(?R_REF(client_opts), #{
desc => ?DESC(Type),
default => #{}
})}
|| Type <- types()
]; ];
fields(bucket_opts) -> fields(bucket_opts) ->
[ [
{rate, ?HOCON(rate(), #{desc => ?DESC(rate), default => "infinity"})}, {rate, ?HOCON(rate(), #{desc => ?DESC(rate), default => "infinity"})},
{capacity, ?HOCON(capacity(), #{desc => ?DESC(capacity), default => "infinity"})}, {capacity, ?HOCON(capacity(), #{desc => ?DESC(capacity), default => "infinity"})},
{initial, ?HOCON(initial(), #{default => "0", desc => ?DESC(initial)})}, {initial, ?HOCON(initial(), #{default => "0", desc => ?DESC(initial)})}
{client, ?HOCON(?R_REF(client_opts), #{required => false})}
]; ];
fields(client_opts) -> fields(client_opts) ->
[ [
@ -159,14 +175,23 @@ fields(client_opts) ->
default => force default => force
} }
)} )}
]. ];
fields({client_fields, Types}) ->
[
{Type,
?HOCON(?R_REF(client_opts), #{
desc => ?DESC(Type),
required => false
})}
|| Type <- Types
];
fields({bucket_fields, Types}) ->
bucket_fields(Types).
desc(limiter) -> desc(limiter) ->
"Settings for the rate limiter."; "Settings for the rate limiter.";
desc(node_opts) -> desc(node_opts) ->
"Settings for the limiter of the node level."; "Settings for the limiter of the node level.";
desc(node_client_opts) ->
"Settings for the client in the node level.";
desc(bucket_opts) -> desc(bucket_opts) ->
"Settings for the bucket."; "Settings for the bucket.";
desc(client_opts) -> desc(client_opts) ->
@ -174,23 +199,37 @@ desc(client_opts) ->
desc(_) -> desc(_) ->
undefined. undefined.
bucket_opts() -> bucket_fields(Type) when is_atom(Type) ->
fields(bucket_opts) ++
[
{client,
?HOCON( ?HOCON(
?MAP("bucket_name", ?R_REF(bucket_opts)), ?R_REF(?MODULE, client_opts),
bucket_opts_meta()
).
bucket_opts_meta() ->
#{ #{
default => #{}, desc => ?DESC(client),
example => required => false
#{
<<"rate">> => <<"infinity">>,
<<"capcity">> => <<"infinity">>,
<<"initial">> => <<"100">>,
<<"client">> => #{<<"rate">> => <<"infinity">>}
} }
}. )}
];
bucket_fields(Types) ->
[
{Type,
?HOCON(?R_REF(?MODULE, bucket_opts), #{
desc => ?DESC(?MODULE, Type),
required => false
})}
|| Type <- Types
] ++
[
{client,
?HOCON(
?R_REF(?MODULE, {client_fields, Types}),
#{
desc => ?DESC(client),
required => false
}
)}
].
%% default period is 100ms %% default period is 100ms
default_period() -> default_period() ->

View File

@ -116,35 +116,28 @@
%% If no bucket path is set in config, there will be no limit %% If no bucket path is set in config, there will be no limit
connect(_Id, _Type, undefined) -> connect(_Id, _Type, undefined) ->
{ok, emqx_htb_limiter:make_infinity_limiter()}; {ok, emqx_htb_limiter:make_infinity_limiter()};
connect( connect(Id, Type, Cfg) ->
Id, case find_limiter_cfg(Type, Cfg) of
Type, {undefined, _} ->
{ok, emqx_htb_limiter:make_infinity_limiter()};
{
#{ #{
rate := BucketRate, rate := BucketRate,
capacity := BucketSize capacity := BucketSize
} = BucketCfg } = BucketCfg,
) -> #{rate := CliRate, capacity := CliSize} = ClientCfg
case emqx_limiter_manager:find_bucket(Id, Type) of } ->
{ok, Bucket} ->
case find_client_cfg(Type, BucketCfg) of
#{rate := CliRate, capacity := CliSize} = ClientCfg ->
{ok, {ok,
if if
CliRate < BucketRate orelse CliSize < BucketSize -> CliRate < BucketRate orelse CliSize < BucketSize ->
emqx_htb_limiter:make_token_bucket_limiter(ClientCfg, Bucket); emqx_htb_limiter:make_token_bucket_limiter(ClientCfg, BucketCfg);
true -> true ->
emqx_htb_limiter:make_ref_limiter(ClientCfg, Bucket) emqx_htb_limiter:make_ref_limiter(ClientCfg, BucketCfg)
end}; end};
{error, invalid_node_cfg} = Error ->
?SLOG(error, #{msg => "invalid_node_cfg", type => Type, id => Id}),
Error
end;
undefined -> undefined ->
?SLOG(error, #{msg => "bucket_not_found", type => Type, id => Id}), ?SLOG(error, #{msg => "bucket_not_found", type => Type, id => Id}),
{error, invalid_bucket} {error, invalid_bucket}
end; end.
connect(Id, Type, Paths) ->
connect(Id, Type, maps:get(Type, Paths, undefined)).
-spec add_bucket(limiter_id(), limiter_type(), hocons:config() | undefined) -> ok. -spec add_bucket(limiter_id(), limiter_type(), hocons:config() | undefined) -> ok.
add_bucket(_Id, _Type, undefine) -> add_bucket(_Id, _Type, undefine) ->
@ -571,9 +564,16 @@ call(Type, Msg) ->
gen_server:call(Pid, Msg) gen_server:call(Pid, Msg)
end. end.
find_client_cfg(Type, Cfg) -> find_limiter_cfg(Type, #{rate := _} = Cfg) ->
NodeCfg = emqx:get_config([limiter, Type, client], undefined), {Cfg, find_client_cfg(Type, maps:get(client, Cfg, undefined))};
BucketCfg = maps:get(client, Cfg, undefined), find_limiter_cfg(Type, Cfg) ->
{
maps:get(Type, Cfg),
find_client_cfg(Type, emqx_map_lib:deep_get([client, Type], Cfg, undefined))
}.
find_client_cfg(Type, BucketCfg) ->
NodeCfg = emqx:get_config([limiter, client, Type], undefined),
merge_client_cfg(NodeCfg, BucketCfg). merge_client_cfg(NodeCfg, BucketCfg).
merge_client_cfg(undefined, BucketCfg) -> merge_client_cfg(undefined, BucketCfg) ->

View File

@ -1619,10 +1619,13 @@ base_listener(Bind) ->
)}, )},
{"limiter", {"limiter",
sc( sc(
map("ratelimit_name", ?R_REF(emqx_limiter_schema, bucket_opts)), ?R_REF(
emqx_limiter_schema,
{bucket_fields, [bytes_in, message_in, connection, message_routing]}
),
#{ #{
desc => ?DESC(base_listener_limiter) desc => ?DESC(base_listener_limiter),
%% TODO default => #{<<"connection">> => <<"default">>} default => #{}
} }
)}, )},
{"enable_authn", {"enable_authn",

View File

@ -755,6 +755,8 @@ to_bin(List) when is_list(List) ->
end; end;
to_bin(Boolean) when is_boolean(Boolean) -> Boolean; to_bin(Boolean) when is_boolean(Boolean) -> Boolean;
to_bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8); to_bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8);
to_bin({Type, Args}) ->
unicode:characters_to_binary(io_lib:format("~p(~p)", [Type, Args]));
to_bin(X) -> to_bin(X) ->
X. X.

View File

@ -86,7 +86,7 @@ fields(flow_control) ->
)}, )},
{batch_deliver_limiter, {batch_deliver_limiter,
sc( sc(
?R_REF(emqx_limiter_schema, bucket_opts), ?R_REF(emqx_limiter_schema, {bucket_fields, internal}),
batch_deliver_limiter, batch_deliver_limiter,
undefined undefined
)} )}