Merge pull request #10625 from lafirest/refactor/limiter_cfg

refactor(limiter): simplify limiter configuration
This commit is contained in:
lafirest 2023-05-09 11:40:05 +08:00 committed by GitHub
commit b94290db58
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 271 additions and 35 deletions

View File

@ -32,9 +32,13 @@
get_bucket_cfg_path/2,
desc/1,
types/0,
short_paths/0,
calc_capacity/1,
extract_with_type/2,
default_client_config/0
default_client_config/0,
short_paths_fields/1,
get_listener_opts/1,
get_node_opts/1
]).
-define(KILOBYTE, 1024).
@ -104,15 +108,17 @@ roots() ->
].
fields(limiter) ->
[
{Type,
?HOCON(?R_REF(node_opts), #{
desc => ?DESC(Type),
importance => ?IMPORTANCE_HIDDEN,
aliases => alias_of_type(Type)
})}
|| Type <- types()
] ++
short_paths_fields(?MODULE) ++
[
{Type,
?HOCON(?R_REF(node_opts), #{
desc => ?DESC(Type),
importance => ?IMPORTANCE_HIDDEN,
required => {false, recursively},
aliases => alias_of_type(Type)
})}
|| Type <- types()
] ++
[
%% This is an undocumented feature, and it won't be support anymore
{client,
@ -203,6 +209,14 @@ fields(listener_client_fields) ->
fields(Type) ->
simple_bucket_field(Type).
short_paths_fields(DesModule) ->
[
{Name,
?HOCON(rate(), #{desc => ?DESC(DesModule, Name), required => false, example => Example})}
|| {Name, Example} <-
lists:zip(short_paths(), [<<"1000/s">>, <<"1000/s">>, <<"100MB/s">>])
].
desc(limiter) ->
"Settings for the rate limiter.";
desc(node_opts) ->
@ -236,6 +250,9 @@ get_bucket_cfg_path(Type, BucketName) ->
types() ->
[bytes, messages, connection, message_routing, internal].
short_paths() ->
[max_conn_rate, messages_rate, bytes_rate].
calc_capacity(#{rate := infinity}) ->
infinity;
calc_capacity(#{rate := Rate, burst := Burst}) ->
@ -266,6 +283,31 @@ default_client_config() ->
failure_strategy => force
}.
default_bucket_config() ->
#{
rate => infinity,
burst => 0
}.
get_listener_opts(Conf) ->
Limiter = maps:get(limiter, Conf, undefined),
ShortPaths = maps:with(short_paths(), Conf),
get_listener_opts(Limiter, ShortPaths).
get_node_opts(Type) ->
Opts = emqx:get_config([limiter, Type], default_bucket_config()),
case type_to_short_path_name(Type) of
undefined ->
Opts;
Name ->
case emqx:get_config([limiter, Name], undefined) of
undefined ->
Opts;
Rate ->
Opts#{rate := Rate}
end
end.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
@ -476,3 +518,42 @@ merge_client_bucket(Type, _, {ok, BucketVal}) ->
#{Type => BucketVal};
merge_client_bucket(_, _, _) ->
undefined.
short_path_name_to_type(max_conn_rate) ->
connection;
short_path_name_to_type(messages_rate) ->
messages;
short_path_name_to_type(bytes_rate) ->
bytes.
type_to_short_path_name(connection) ->
max_conn_rate;
type_to_short_path_name(messages) ->
messages_rate;
type_to_short_path_name(bytes) ->
bytes_rate;
type_to_short_path_name(_) ->
undefined.
get_listener_opts(Limiter, ShortPaths) when map_size(ShortPaths) =:= 0 ->
Limiter;
get_listener_opts(undefined, ShortPaths) ->
convert_listener_short_paths(ShortPaths);
get_listener_opts(Limiter, ShortPaths) ->
Shorts = convert_listener_short_paths(ShortPaths),
emqx_utils_maps:deep_merge(Limiter, Shorts).
convert_listener_short_paths(ShortPaths) ->
DefBucket = default_bucket_config(),
DefClient = default_client_config(),
Fun = fun(Name, Rate, Acc) ->
Type = short_path_name_to_type(Name),
case Name of
max_conn_rate ->
Acc#{Type => DefBucket#{rate => Rate}};
_ ->
Client = maps:get(client, Acc, #{}),
Acc#{client => Client#{Type => DefClient#{rate => Rate}}}
end
end,
maps:fold(Fun, #{}, ShortPaths).

View File

@ -481,7 +481,7 @@ dispatch_burst_to_buckets([], _, Alloced, Buckets) ->
-spec init_tree(emqx_limiter_schema:limiter_type()) -> state().
init_tree(Type) when is_atom(Type) ->
Cfg = emqx:get_config([limiter, Type]),
Cfg = emqx_limiter_schema:get_node_opts(Type),
init_tree(Type, Cfg).
init_tree(Type, #{rate := Rate} = Cfg) ->
@ -625,13 +625,10 @@ find_referenced_bucket(Id, Type, #{rate := Rate} = Cfg) when Rate =/= infinity -
{error, invalid_bucket}
end;
%% this is a node-level reference
find_referenced_bucket(Id, Type, _) ->
case emqx:get_config([limiter, Type], undefined) of
find_referenced_bucket(_Id, Type, _) ->
case emqx_limiter_schema:get_node_opts(Type) of
#{rate := infinity} ->
false;
undefined ->
?SLOG(error, #{msg => "invalid limiter type", type => Type, id => Id}),
{error, invalid_bucket};
NodeCfg ->
{ok, Bucket} = emqx_limiter_manager:find_root(Type),
{ok, Bucket, NodeCfg}

View File

@ -86,7 +86,7 @@ init([]) ->
%% Internal functions
%%--==================================================================
make_child(Type) ->
Cfg = emqx:get_config([limiter, Type]),
Cfg = emqx_limiter_schema:get_node_opts(Type),
make_child(Type, Cfg).
make_child(Type, Cfg) ->

View File

@ -651,7 +651,7 @@ zone(Opts) ->
maps:get(zone, Opts, undefined).
limiter(Opts) ->
maps:get(limiter, Opts, undefined).
emqx_limiter_schema:get_listener_opts(Opts).
add_limiter_bucket(Id, #{limiter := Limiter}) ->
maps:fold(

View File

@ -2001,7 +2001,8 @@ base_listener(Bind) ->
listener_fields
),
#{
desc => ?DESC(base_listener_limiter)
desc => ?DESC(base_listener_limiter),
importance => ?IMPORTANCE_HIDDEN
}
)},
{"enable_authn",
@ -2012,7 +2013,7 @@ base_listener(Bind) ->
default => true
}
)}
].
] ++ emqx_limiter_schema:short_paths_fields(?MODULE).
desc("persistent_session_store") ->
"Settings for message persistence.";

View File

@ -47,7 +47,7 @@ all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
ok = emqx_common_test_helpers:load_config(emqx_limiter_schema, ?BASE_CONF),
load_conf(),
emqx_common_test_helpers:start_apps([?APP]),
Config.
@ -55,13 +55,15 @@ end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([?APP]).
init_per_testcase(_TestCase, Config) ->
emqx_config:erase(limiter),
load_conf(),
Config.
end_per_testcase(_TestCase, Config) ->
Config.
load_conf() ->
emqx_common_test_helpers:load_config(emqx_limiter_schema, ?BASE_CONF).
ok = emqx_common_test_helpers:load_config(emqx_limiter_schema, ?BASE_CONF).
init_config() ->
emqx_config:init_load(emqx_limiter_schema, ?BASE_CONF).
@ -313,8 +315,8 @@ t_capacity(_) ->
%% Test Cases Global Level
%%--------------------------------------------------------------------
t_collaborative_alloc(_) ->
GlobalMod = fun(#{message_routing := MR} = Cfg) ->
Cfg#{message_routing := MR#{rate := ?RATE("600/1s")}}
GlobalMod = fun(Cfg) ->
Cfg#{message_routing => #{rate => ?RATE("600/1s"), burst => 0}}
end,
Bucket1 = fun(#{client := Cli} = Bucket) ->
@ -353,11 +355,11 @@ t_collaborative_alloc(_) ->
).
t_burst(_) ->
GlobalMod = fun(#{message_routing := MR} = Cfg) ->
GlobalMod = fun(Cfg) ->
Cfg#{
message_routing := MR#{
rate := ?RATE("200/1s"),
burst := ?RATE("400/1s")
message_routing => #{
rate => ?RATE("200/1s"),
burst => ?RATE("400/1s")
}
}
end,
@ -653,16 +655,16 @@ t_not_exists_instance(_) ->
),
?assertEqual(
{error, invalid_bucket},
{ok, infinity},
emqx_limiter_server:connect(?FUNCTION_NAME, not_exists, Cfg)
),
ok.
t_create_instance_with_node(_) ->
GlobalMod = fun(#{message_routing := MR} = Cfg) ->
GlobalMod = fun(Cfg) ->
Cfg#{
message_routing := MR#{rate := ?RATE("200/1s")},
messages := MR#{rate := ?RATE("200/1s")}
message_routing => #{rate => ?RATE("200/1s"), burst => 0},
messages => #{rate => ?RATE("200/1s"), burst => 0}
}
end,
@ -739,6 +741,68 @@ t_esockd_htb_consume(_) ->
?assertMatch({ok, _}, C2R),
ok.
%%--------------------------------------------------------------------
%% Test Cases short paths
%%--------------------------------------------------------------------
t_node_short_paths(_) ->
CfgStr = <<"limiter {max_conn_rate = \"1000\", messages_rate = \"100\", bytes_rate = \"10\"}">>,
ok = emqx_common_test_helpers:load_config(emqx_limiter_schema, CfgStr),
Accessor = fun emqx_limiter_schema:get_node_opts/1,
?assertMatch(#{rate := 100.0}, Accessor(connection)),
?assertMatch(#{rate := 10.0}, Accessor(messages)),
?assertMatch(#{rate := 1.0}, Accessor(bytes)),
?assertMatch(#{rate := infinity}, Accessor(message_routing)),
?assertEqual(undefined, emqx:get_config([limiter, connection], undefined)).
t_compatibility_for_node_short_paths(_) ->
CfgStr =
<<"limiter {max_conn_rate = \"1000\", connection.rate = \"500\", bytes.rate = \"200\"}">>,
ok = emqx_common_test_helpers:load_config(emqx_limiter_schema, CfgStr),
Accessor = fun emqx_limiter_schema:get_node_opts/1,
?assertMatch(#{rate := 100.0}, Accessor(connection)),
?assertMatch(#{rate := 20.0}, Accessor(bytes)).
t_listener_short_paths(_) ->
CfgStr = <<
""
"listeners.tcp.default {max_conn_rate = \"1000\", messages_rate = \"100\", bytes_rate = \"10\"}"
""
>>,
ok = emqx_common_test_helpers:load_config(emqx_schema, CfgStr),
ListenerOpt = emqx:get_config([listeners, tcp, default]),
?assertMatch(
#{
client := #{
messages := #{rate := 10.0},
bytes := #{rate := 1.0}
},
connection := #{rate := 100.0}
},
emqx_limiter_schema:get_listener_opts(ListenerOpt)
).
t_compatibility_for_listener_short_paths(_) ->
CfgStr = <<
"" "listeners.tcp.default {max_conn_rate = \"1000\", limiter.connection.rate = \"500\"}" ""
>>,
ok = emqx_common_test_helpers:load_config(emqx_schema, CfgStr),
ListenerOpt = emqx:get_config([listeners, tcp, default]),
?assertMatch(
#{
connection := #{rate := 100.0}
},
emqx_limiter_schema:get_listener_opts(ListenerOpt)
).
t_no_limiter_for_listener(_) ->
CfgStr = <<>>,
ok = emqx_common_test_helpers:load_config(emqx_schema, CfgStr),
ListenerOpt = emqx:get_config([listeners, tcp, default]),
?assertEqual(
undefined,
emqx_limiter_schema:get_listener_opts(ListenerOpt)
).
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
@ -1043,3 +1107,11 @@ make_create_test_data_with_infinity_node(FakeInstnace) ->
%% client = C bucket = B C > B
{MkA(1000, 100), IsRefLimiter(FakeInstnace)}
].
parse_schema(ConfigString) ->
{ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
hocon_tconf:check_plain(
emqx_limiter_schema,
RawConf,
#{required => false, atom_key => false}
).

View File

@ -28,7 +28,8 @@
config_reset/3,
configs/3,
get_full_config/0,
global_zone_configs/3
global_zone_configs/3,
limiter/3
]).
-define(PREFIX, "/configs/").
@ -42,7 +43,6 @@
<<"alarm">>,
<<"sys_topics">>,
<<"sysmon">>,
<<"limiter">>,
<<"log">>,
<<"persistent_session_store">>,
<<"zones">>
@ -57,7 +57,8 @@ paths() ->
[
"/configs",
"/configs_reset/:rootname",
"/configs/global_zone"
"/configs/global_zone",
"/configs/limiter"
] ++
lists:map(fun({Name, _Type}) -> ?PREFIX ++ binary_to_list(Name) end, config_list()).
@ -147,6 +148,28 @@ schema("/configs/global_zone") ->
}
}
};
schema("/configs/limiter") ->
#{
'operationId' => limiter,
get => #{
tags => ?TAGS,
description => <<"Get the node-level limiter configs">>,
responses => #{
200 => hoconsc:mk(hoconsc:ref(emqx_limiter_schema, limiter)),
404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"config not found">>)
}
},
put => #{
tags => ?TAGS,
description => <<"Update the node-level limiter configs">>,
'requestBody' => hoconsc:mk(hoconsc:ref(emqx_limiter_schema, limiter)),
responses => #{
200 => hoconsc:mk(hoconsc:ref(emqx_limiter_schema, limiter)),
400 => emqx_dashboard_swagger:error_codes(['UPDATE_FAILED']),
403 => emqx_dashboard_swagger:error_codes(['UPDATE_FAILED'])
}
}
};
schema(Path) ->
{RootKey, {_Root, Schema}} = find_schema(Path),
#{
@ -272,6 +295,22 @@ configs(get, Params, _Req) ->
{200, Res}
end.
limiter(get, _Params, _Req) ->
{200, format_limiter_config(get_raw_config(limiter))};
limiter(put, #{body := NewConf}, _Req) ->
case emqx_conf:update([limiter], NewConf, ?OPTS) of
{ok, #{raw_config := RawConf}} ->
{200, format_limiter_config(RawConf)};
{error, {permission_denied, Reason}} ->
{403, #{code => 'UPDATE_FAILED', message => Reason}};
{error, Reason} ->
{400, #{code => 'UPDATE_FAILED', message => ?ERR_MSG(Reason)}}
end.
format_limiter_config(RawConf) ->
Shorts = lists:map(fun erlang:atom_to_binary/1, emqx_limiter_schema:short_paths()),
maps:with(Shorts, RawConf).
conf_path_reset(Req) ->
<<"/api/v5", ?PREFIX_RESET, Path/binary>> = cowboy_req:path(Req),
string:lexemes(Path, "/ ").

View File

@ -0,0 +1,4 @@
Simplify limiter configuration.
- Reduce the complexity of the limiter's configuration.
e.g. now users can use `limiter.messages_rate = 1000/s` to quickly set the node-level limit for the message publish.
- Update the `configs/limiter` API to suit this refactor.

View File

@ -1,5 +1,26 @@
emqx_limiter_schema {
max_conn_rate.desc:
"""Maximum connection rate.<br/>
This is used to limit the connection rate for this node,
once the limit is reached, new connections will be deferred or refused"""
max_conn_rate.label:
"""Maximum Connection Rate"""
messages_rate.desc:
"""Messages publish rate.<br/>
This is used to limit the inbound message numbers for this node,
once the limit is reached, the restricted client will slow down and even be hung for a while."""
messages_rate.label:
"""Messages Publish Rate"""
bytes_rate.desc:
"""Data publish rate.<br/>
This is used to limit the inbound bytes rate for this node,
once the limit is reached, the restricted client will slow down and even be hung for a while."""
bytes_rate.label:
"""Data Publish Rate"""
bucket_cfg.desc:
"""Bucket Configs"""

View File

@ -1033,6 +1033,27 @@ base_listener_limiter.desc:
base_listener_limiter.label:
"""Type of the rate limit."""
max_conn_rate.desc:
"""Maximum connection rate.<br/>
This is used to limit the connection rate for this listener,
once the limit is reached, new connections will be deferred or refused"""
max_conn_rate.label:
"""Maximum Connection Rate"""
messages_rate.desc:
"""Messages publish rate.<br/>
This is used to limit the inbound message numbers for each client connected to this listener,
once the limit is reached, the restricted client will slow down and even be hung for a while."""
messages_rate.label:
"""Messages Publish Rate"""
bytes_rate.desc:
"""Data publish rate.<br/>
This is used to limit the inbound bytes rate for each client connected to this listener,
once the limit is reached, the restricted client will slow down and even be hung for a while."""
bytes_rate.label:
"""Data Publish Rate"""
persistent_session_store_backend.desc:
"""Database management system used to store information about persistent sessions and messages.
- `builtin`: Use the embedded database (mria)"""