Merge pull request #7868 from lafirest/feat/limiter_api

feat(limiter): add limiter view/update api
This commit is contained in:
lafirest 2022-05-05 19:08:47 +08:00 committed by GitHub
commit 97e778e797
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 116 additions and 25 deletions

View File

@ -29,7 +29,8 @@
to_initial/1,
namespace/0,
get_bucket_cfg_path/2,
desc/1
desc/1,
types/0
]).
-define(KILOBYTE, 1024).
@ -187,6 +188,9 @@ to_rate(Str) ->
get_bucket_cfg_path(Type, BucketName) ->
[limiter, Type, bucket, BucketName].
types() ->
[bytes_in, message_in, connection, message_routing, batch].
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
@ -195,18 +199,36 @@ ref(Field) -> hoconsc:ref(?MODULE, Field).
to_burst_rate(Str) ->
to_rate(Str, false, true).
%% rate can be: 10 10MB 10MB/s 10MB/2s infinity
%% e.g. the bytes_in regex tree is:
%%
%% __ infinity
%% | - xMB
%% rate -| |
%% __ ?Size(/?Time) -| - xMB/s
%% | |
%% - xMB/?Time -|
%% - xMB/ys
to_rate(Str, CanInfinity, CanZero) ->
Tokens = [string:trim(T) || T <- string:tokens(Str, "/")],
case Tokens of
["infinity"] when CanInfinity ->
Regex = "^\s*(?:([0-9]+[a-zA-Z]*)(?:/([0-9]*)([m s h d M S H D]{1,2}))?\s*$)|infinity\s*$",
{ok, MP} = re:compile(Regex),
case re:run(Str, MP, [{capture, all_but_first, list}]) of
{match, []} when CanInfinity ->
{ok, infinity};
%% if time unit is 1s, it can be omitted
[QuotaStr] ->
{match, [QuotaStr]} ->
Fun = fun(Quota) ->
{ok, Quota * minimum_period() / ?UNIT_TIME_IN_MS}
end,
to_capacity(QuotaStr, Str, CanZero, Fun);
[QuotaStr, Interval] ->
{match, [QuotaStr, TimeVal, TimeUnit]} ->
Interval =
case TimeVal of
%% for xM/s
[] -> "1" ++ TimeUnit;
%% for xM/ys
_ -> TimeVal ++ TimeUnit
end,
Fun = fun(Quota) ->
try
case emqx_schema:to_duration_ms(Interval) of
@ -242,11 +264,11 @@ check_capacity(_Str, Quota, _CanZero, Cont) ->
Cont(Quota).
to_capacity(Str) ->
Regex = "^\s*(?:([0-9]+)([a-zA-z]*))|infinity\s*$",
Regex = "^\s*(?:([0-9]+)([a-zA-Z]*))|infinity\s*$",
to_quota(Str, Regex).
to_initial(Str) ->
Regex = "^\s*([0-9]+)([a-zA-z]*)\s*$",
Regex = "^\s*([0-9]+)([a-zA-Z]*)\s*$",
to_quota(Str, Regex).
to_quota(Str, Regex) ->

View File

@ -568,11 +568,32 @@ t_decimal(_) ->
t_schema_unit(_) ->
M = emqx_limiter_schema,
?assertEqual(limiter, M:namespace()),
%% infinity
?assertEqual({ok, infinity}, M:to_rate(" infinity ")),
%% xMB
?assertMatch({ok, _}, M:to_rate("100")),
?assertMatch({error, _}, M:to_rate("0")),
?assertMatch({ok, _}, M:to_rate(" 100 ")),
?assertMatch({ok, _}, M:to_rate("100MB")),
%% xMB/s
?assertMatch({ok, _}, M:to_rate("100/s")),
?assertMatch({ok, _}, M:to_rate("100MB/s")),
%% xMB/ys
?assertMatch({ok, _}, M:to_rate("100/10s")),
?assertMatch({ok, _}, M:to_rate("100MB/10s")),
?assertMatch({error, _}, M:to_rate("infini")),
?assertMatch({error, _}, M:to_rate("0")),
?assertMatch({error, _}, M:to_rate("MB")),
?assertMatch({error, _}, M:to_rate("10s")),
?assertMatch({error, _}, M:to_rate("100MB/")),
?assertMatch({error, _}, M:to_rate("100MB/xx")),
?assertMatch({error, _}, M:to_rate("100MB/1")),
?assertMatch({error, _}, M:to_rate("100/10x")),
?assertEqual({ok, infinity}, M:to_capacity("infinity")),
?assertEqual({ok, 100}, M:to_capacity("100")),
?assertEqual({ok, 100 * 1024}, M:to_capacity("100KB")),

View File

@ -667,16 +667,16 @@ typename_to_spec("log_level()", _Mod) ->
enum => [debug, info, notice, warning, error, critical, alert, emergency, all]
};
typename_to_spec("rate()", _Mod) ->
#{type => string, example => <<"10M/s">>};
#{type => string, example => <<"10MB">>};
typename_to_spec("capacity()", _Mod) ->
#{type => string, example => <<"100M">>};
#{type => string, example => <<"100MB">>};
typename_to_spec("burst_rate()", _Mod) ->
%% 0/0s = no burst
#{type => string, example => <<"10M/1s">>};
#{type => string, example => <<"10MB">>};
typename_to_spec("failure_strategy()", _Mod) ->
#{type => string, example => <<"force">>};
typename_to_spec("initial()", _Mod) ->
#{type => string, example => <<"0M">>};
#{type => string, example => <<"0MB">>};
typename_to_spec("bucket_name()", _Mod) ->
#{type => string, example => <<"retainer">>};
typename_to_spec(Name, Mod) ->

View File

@ -25,6 +25,7 @@
-export([
config/3,
limiter/3,
config_reset/3,
configs/3,
get_full_config/0,
@ -71,7 +72,12 @@ api_spec() ->
namespace() -> "configuration".
paths() ->
["/configs", "/configs_reset/:rootname", "/configs/global_zone"] ++
[
"/configs",
"/configs_reset/:rootname",
"/configs/global_zone",
"/configs/limiter/:limiter_type"
] ++
lists:map(fun({Name, _Type}) -> ?PREFIX ++ binary_to_list(Name) end, config_list()).
schema("/configs") ->
@ -156,6 +162,42 @@ schema("/configs/global_zone") ->
}
}
};
schema("/configs/limiter/:limiter_type") ->
Schema = hoconsc:ref(emqx_limiter_schema, limiter_opts),
Parameters = [
{limiter_type,
hoconsc:mk(
hoconsc:enum(emqx_limiter_schema:types()),
#{
in => query,
required => true,
example => <<"bytes_in">>,
desc => <<"The limiter type">>
}
)}
],
#{
'operationId' => limiter,
get => #{
tags => [conf],
description => <<"Get config of this limiter">>,
parameters => Parameters,
responses => #{
200 => Schema,
404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"config not found">>)
}
},
put => #{
tags => [conf],
description => <<"Update config of this limiter">>,
parameters => Parameters,
'requestBody' => Schema,
responses => #{
200 => Schema,
400 => emqx_dashboard_swagger:error_codes(['UPDATE_FAILED'])
}
}
};
schema(Path) ->
{RootKey, {_Root, Schema}} = find_schema(Path),
#{
@ -201,18 +243,13 @@ fields(Field) ->
%%%==============================================================================================
%% HTTP API Callbacks
config(get, _Params, Req) ->
config(Method, Params, Req) ->
Path = conf_path(Req),
{ok, Conf} = emqx_map_lib:deep_find(Path, get_full_config()),
{200, Conf};
config(put, #{body := Body}, Req) ->
Path = conf_path(Req),
case emqx_conf:update(Path, Body, ?OPTS) of
{ok, #{raw_config := RawConf}} ->
{200, RawConf};
{error, Reason} ->
{400, #{code => 'UPDATE_FAILED', message => ?ERR_MSG(Reason)}}
end.
do_config(Method, Params, Path).
limiter(Method, #{query_string := QS} = Params, _Req) ->
#{<<"limiter_type">> := Type} = QS,
do_config(Method, Params, [<<"limiter">>, erlang:atom_to_binary(Type)]).
global_zone_configs(get, _Params, _Req) ->
Paths = global_zone_roots(),
@ -340,3 +377,14 @@ global_zone_roots() ->
global_zone_schema() ->
Roots = hocon_schema:roots(emqx_zone_schema),
lists:map(fun({RootKey, {_Root, Schema}}) -> {RootKey, Schema} end, Roots).
do_config(get, _Params, Path) ->
{ok, Conf} = emqx_map_lib:deep_find(Path, get_full_config()),
{200, Conf};
do_config(put, #{body := Body}, Path) ->
case emqx_conf:update(Path, Body, ?OPTS) of
{ok, #{raw_config := RawConf}} ->
{200, RawConf};
{error, Reason} ->
{400, #{code => 'UPDATE_FAILED', message => ?ERR_MSG(Reason)}}
end.