fix(limiter): simplify the code of the limiter

move runtime code out from the schema
This commit is contained in:
firest 2023-08-10 15:29:09 +08:00
parent d1dc37af4c
commit 6dbddfb089
10 changed files with 197 additions and 192 deletions

View File

@ -139,7 +139,7 @@ make_local_limiter(Cfg, Bucket) ->
tokens => emqx_limiter_server:get_initial_val(Cfg),
lasttime => ?NOW,
bucket => Bucket,
capacity => emqx_limiter_schema:calc_capacity(Cfg)
capacity => emqx_limiter_utils:calc_capacity(Cfg)
}.
%%@doc create a limiter server's reference

View File

@ -1,36 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2019-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_limiter_correction).
%% API
-export([add/2]).
-type correction_value() :: #{
correction := emqx_limiter_decimal:zero_or_float(),
any() => any()
}.
-export_type([correction_value/0]).
%%--------------------------------------------------------------------
%%% API
%%--------------------------------------------------------------------
-spec add(number(), correction_value()) -> {integer(), correction_value()}.
add(Inc, #{correction := Correction} = Data) ->
FixedInc = Inc + Correction,
IntInc = erlang:floor(FixedInc),
{IntInc, Data#{correction := FixedInc - IntInc}}.

View File

@ -24,13 +24,19 @@
sub/2,
mul/2,
put_to_counter/3,
floor_div/2
floor_div/2,
precisely_add/2
]).
-export_type([decimal/0, zero_or_float/0]).
-export_type([decimal/0, zero_or_float/0, correction_value/0]).
-type decimal() :: infinity | number().
-type zero_or_float() :: 0 | float().
-type correction_value() :: #{
correction := emqx_limiter_decimal:zero_or_float(),
any() => any()
}.
%%--------------------------------------------------------------------
%%% API
%%--------------------------------------------------------------------
@ -43,6 +49,12 @@ add(A, B) when
add(A, B) ->
A + B.
-spec precisely_add(number(), correction_value()) -> {integer(), correction_value()}.
precisely_add(Inc, #{correction := Correction} = Data) ->
FixedInc = Inc + Correction,
IntInc = erlang:floor(FixedInc),
{IntInc, Data#{correction := FixedInc - IntInc}}.
-spec sub(decimal(), decimal()) -> decimal().
sub(A, B) when
A =:= infinity orelse

View File

@ -131,7 +131,7 @@ delete_root(Type) ->
delete_bucket(?ROOT_ID, Type).
post_config_update([limiter], _Config, NewConf, _OldConf, _AppEnvs) ->
Conf = emqx_limiter_schema:convert_node_opts(NewConf),
Conf = emqx_limiter_utils:convert_node_opts(NewConf),
_ = [on_post_config_update(Type, Cfg) || {Type, Cfg} <- maps:to_list(Conf)],
ok.

View File

@ -33,14 +33,7 @@
desc/1,
types/0,
short_paths/0,
calc_capacity/1,
extract_with_type/2,
default_client_config/0,
default_bucket_config/0,
short_paths_fields/1,
get_listener_opts/1,
get_node_opts/1,
convert_node_opts/1
short_paths_fields/1
]).
-define(KILOBYTE, 1024).
@ -263,80 +256,6 @@ types() ->
short_paths() ->
[max_conn_rate, messages_rate, bytes_rate].
calc_capacity(#{rate := infinity}) ->
infinity;
calc_capacity(#{rate := Rate, burst := Burst}) ->
erlang:floor(1000 * Rate / default_period()) + Burst.
extract_with_type(_Type, undefined) ->
undefined;
extract_with_type(Type, #{client := ClientCfg} = BucketCfg) ->
BucketVal = maps:find(Type, BucketCfg),
ClientVal = maps:find(Type, ClientCfg),
merge_client_bucket(Type, ClientVal, BucketVal);
extract_with_type(Type, BucketCfg) ->
BucketVal = maps:find(Type, BucketCfg),
merge_client_bucket(Type, undefined, BucketVal).
%% Since the client configuration can be absent and be a undefined value,
%% but we must need some basic settings to control the behaviour of the limiter,
%% so here add this helper function to generate a default setting.
%% This is a temporary workaround until we found a better way to simplify.
default_client_config() ->
#{
rate => infinity,
initial => 0,
low_watermark => 0,
burst => 0,
divisible => true,
max_retry_time => timer:hours(1),
failure_strategy => force
}.
default_bucket_config() ->
#{
rate => infinity,
burst => 0,
initial => 0
}.
get_listener_opts(Conf) ->
Limiter = maps:get(limiter, Conf, undefined),
ShortPaths = maps:with(short_paths(), Conf),
get_listener_opts(Limiter, ShortPaths).
get_node_opts(Type) ->
Opts = emqx:get_config([limiter, Type], default_bucket_config()),
case type_to_short_path_name(Type) of
undefined ->
Opts;
Name ->
case emqx:get_config([limiter, Name], undefined) of
undefined ->
Opts;
Rate ->
Opts#{rate := Rate}
end
end.
convert_node_opts(Conf) ->
DefBucket = default_bucket_config(),
ShorPaths = short_paths(),
Fun = fun
%% The `client` in the node options was deprecated
(client, _Value, Acc) ->
Acc;
(Name, Value, Acc) ->
case lists:member(Name, ShorPaths) of
true ->
Type = short_path_name_to_type(Name),
Acc#{Type => DefBucket#{rate => Value}};
_ ->
Acc#{Name => Value}
end
end,
maps:fold(Fun, #{}, Conf).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
@ -538,51 +457,3 @@ alias_of_type(bytes) ->
[bytes_in];
alias_of_type(_) ->
[].
merge_client_bucket(Type, {ok, ClientVal}, {ok, BucketVal}) ->
#{Type => BucketVal, client => #{Type => ClientVal}};
merge_client_bucket(Type, {ok, ClientVal}, _) ->
#{client => #{Type => ClientVal}};
merge_client_bucket(Type, _, {ok, BucketVal}) ->
#{Type => BucketVal};
merge_client_bucket(_, _, _) ->
undefined.
short_path_name_to_type(max_conn_rate) ->
connection;
short_path_name_to_type(messages_rate) ->
messages;
short_path_name_to_type(bytes_rate) ->
bytes.
type_to_short_path_name(connection) ->
max_conn_rate;
type_to_short_path_name(messages) ->
messages_rate;
type_to_short_path_name(bytes) ->
bytes_rate;
type_to_short_path_name(_) ->
undefined.
get_listener_opts(Limiter, ShortPaths) when map_size(ShortPaths) =:= 0 ->
Limiter;
get_listener_opts(undefined, ShortPaths) ->
convert_listener_short_paths(ShortPaths);
get_listener_opts(Limiter, ShortPaths) ->
Shorts = convert_listener_short_paths(ShortPaths),
emqx_utils_maps:deep_merge(Limiter, Shorts).
convert_listener_short_paths(ShortPaths) ->
DefBucket = default_bucket_config(),
DefClient = default_client_config(),
Fun = fun(Name, Rate, Acc) ->
Type = short_path_name_to_type(Name),
case Name of
max_conn_rate ->
Acc#{Type => DefBucket#{rate => Rate}};
_ ->
Client = maps:get(client, Acc, #{}),
Acc#{client => Client#{Type => DefClient#{rate => Rate}}}
end
end,
maps:fold(Fun, #{}, ShortPaths).

View File

@ -383,7 +383,7 @@ longitudinal(
case lists:min([ShouldAlloc, Flow, Capacity]) of
Available when Available > 0 ->
{Inc, Bucket2} = emqx_limiter_correction:add(Available, Bucket),
{Inc, Bucket2} = emqx_limiter_decimal:precisely_add(Available, Bucket),
counters:add(Counter, Index, Inc),
{Available, Buckets#{Name := Bucket2#{obtained := Obtained + Available}}};
@ -419,7 +419,7 @@ maybe_adjust_root_tokens(#{root := #{rate := Rate} = Root, counter := Counter} =
State;
_ ->
Available = erlang:min(Rate - Token, InFlow),
{Inc, Root2} = emqx_limiter_correction:add(Available, Root),
{Inc, Root2} = emqx_limiter_decimal:precisely_add(Available, Root),
counters:add(Counter, ?ROOT_COUNTER_IDX, Inc),
State#{root := Root2}
end.
@ -473,7 +473,7 @@ dispatch_burst_to_buckets([Bucket | T], InFlow, Alloced, Buckets) ->
index := Index,
obtained := Obtained
} = Bucket,
{Inc, Bucket2} = emqx_limiter_correction:add(InFlow, Bucket),
{Inc, Bucket2} = emqx_limiter_decimal:precisely_add(InFlow, Bucket),
counters:add(Counter, Index, Inc),
@ -484,7 +484,7 @@ dispatch_burst_to_buckets([], _, Alloced, Buckets) ->
-spec init_tree(emqx_limiter_schema:limiter_type()) -> state().
init_tree(Type) when is_atom(Type) ->
Cfg = emqx_limiter_schema:get_node_opts(Type),
Cfg = emqx_limiter_utils:get_node_opts(Type),
init_tree(Type, Cfg).
init_tree(Type, #{rate := Rate} = Cfg) ->
@ -515,7 +515,7 @@ do_add_bucket(Id, #{rate := Rate} = Cfg, #{buckets := Buckets} = State) ->
undefined ->
make_bucket(Id, Cfg, State);
Bucket ->
Bucket2 = Bucket#{rate := Rate, capacity := emqx_limiter_schema:calc_capacity(Cfg)},
Bucket2 = Bucket#{rate := Rate, capacity := emqx_limiter_utils:calc_capacity(Cfg)},
State#{buckets := Buckets#{Id := Bucket2}}
end.
@ -536,7 +536,7 @@ make_bucket(
rate => Rate,
obtained => Initial,
correction => 0,
capacity => emqx_limiter_schema:calc_capacity(Cfg),
capacity => emqx_limiter_utils:calc_capacity(Cfg),
counter => Counter,
index => NewIndex
},
@ -601,7 +601,7 @@ create_limiter_without_client(Id, Type, BucketCfg) ->
false ->
{ok, emqx_htb_limiter:make_infinity_limiter()};
{ok, Bucket, RefCfg} ->
ClientCfg = emqx_limiter_schema:default_client_config(),
ClientCfg = emqx_limiter_utils:default_client_config(),
create_limiter_with_ref(Bucket, ClientCfg, RefCfg);
Error ->
Error
@ -627,7 +627,7 @@ find_referenced_bucket(Id, Type, #{rate := Rate} = Cfg) when Rate =/= infinity -
end;
%% this is a node-level reference
find_referenced_bucket(_Id, Type, _) ->
case emqx_limiter_schema:get_node_opts(Type) of
case emqx_limiter_utils:get_node_opts(Type) of
#{rate := infinity} ->
false;
NodeCfg ->

View File

@ -86,7 +86,7 @@ init([]) ->
%% Internal functions
%%--==================================================================
make_child(Type) ->
Cfg = emqx_limiter_schema:get_node_opts(Type),
Cfg = emqx_limiter_utils:get_node_opts(Type),
make_child(Type, Cfg).
make_child(Type, Cfg) ->

View File

@ -0,0 +1,158 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_limiter_utils).
-export([
calc_capacity/1,
extract_with_type/2,
get_listener_opts/1,
get_node_opts/1,
convert_node_opts/1,
default_client_config/0,
default_bucket_config/0
]).
-import(emqx_limiter_schema, [default_period/0, short_paths/0]).
%%--------------------------------------------------------------------
%% Configuration-related runtime code
%%--------------------------------------------------------------------
calc_capacity(#{rate := infinity}) ->
infinity;
calc_capacity(#{rate := Rate, burst := Burst}) ->
erlang:floor(1000 * Rate / default_period()) + Burst.
%% @doc extract data of a type from the nested config
extract_with_type(_Type, undefined) ->
undefined;
extract_with_type(Type, #{client := ClientCfg} = BucketCfg) ->
BucketVal = maps:find(Type, BucketCfg),
ClientVal = maps:find(Type, ClientCfg),
merge_client_bucket(Type, ClientVal, BucketVal);
extract_with_type(Type, BucketCfg) ->
BucketVal = maps:find(Type, BucketCfg),
merge_client_bucket(Type, undefined, BucketVal).
%% @doc get the limiter configuration from the listener setting
%% and compatible with the old version limiter schema
get_listener_opts(Conf) ->
Limiter = maps:get(limiter, Conf, undefined),
ShortPaths = maps:with(short_paths(), Conf),
get_listener_opts(Limiter, ShortPaths).
get_listener_opts(Limiter, ShortPaths) when map_size(ShortPaths) =:= 0 ->
Limiter;
get_listener_opts(undefined, ShortPaths) ->
convert_listener_short_paths(ShortPaths);
get_listener_opts(Limiter, ShortPaths) ->
Shorts = convert_listener_short_paths(ShortPaths),
emqx_utils_maps:deep_merge(Limiter, Shorts).
convert_listener_short_paths(ShortPaths) ->
DefBucket = default_bucket_config(),
DefClient = default_client_config(),
Fun = fun(Name, Rate, Acc) ->
Type = short_path_name_to_type(Name),
case Name of
max_conn_rate ->
Acc#{Type => DefBucket#{rate => Rate}};
_ ->
Client = maps:get(client, Acc, #{}),
Acc#{client => Client#{Type => DefClient#{rate => Rate}}}
end
end,
maps:fold(Fun, #{}, ShortPaths).
%% @doc get the node-level limiter configuration and compatible with the old version limiter schema
get_node_opts(Type) ->
Opts = emqx:get_config([limiter, Type], default_bucket_config()),
case type_to_short_path_name(Type) of
undefined ->
Opts;
Name ->
case emqx:get_config([limiter, Name], undefined) of
undefined ->
Opts;
Rate ->
Opts#{rate := Rate}
end
end.
convert_node_opts(Conf) ->
DefBucket = default_bucket_config(),
ShorPaths = short_paths(),
Fun = fun
%% The `client` in the node options was deprecated
(client, _Value, Acc) ->
Acc;
(Name, Value, Acc) ->
case lists:member(Name, ShorPaths) of
true ->
Type = short_path_name_to_type(Name),
Acc#{Type => DefBucket#{rate => Value}};
_ ->
Acc#{Name => Value}
end
end,
maps:fold(Fun, #{}, Conf).
merge_client_bucket(Type, {ok, ClientVal}, {ok, BucketVal}) ->
#{Type => BucketVal, client => #{Type => ClientVal}};
merge_client_bucket(Type, {ok, ClientVal}, _) ->
#{client => #{Type => ClientVal}};
merge_client_bucket(Type, _, {ok, BucketVal}) ->
#{Type => BucketVal};
merge_client_bucket(_, _, _) ->
undefined.
short_path_name_to_type(max_conn_rate) ->
connection;
short_path_name_to_type(messages_rate) ->
messages;
short_path_name_to_type(bytes_rate) ->
bytes.
type_to_short_path_name(connection) ->
max_conn_rate;
type_to_short_path_name(messages) ->
messages_rate;
type_to_short_path_name(bytes) ->
bytes_rate;
type_to_short_path_name(_) ->
undefined.
%% Since the client configuration can be absent and be a undefined value,
%% but we must need some basic settings to control the behaviour of the limiter,
%% so here add this helper function to generate a default setting.
%% This is a temporary workaround until we found a better way to simplify.
default_client_config() ->
#{
rate => infinity,
initial => 0,
low_watermark => 0,
burst => 0,
divisible => true,
max_retry_time => timer:hours(1),
failure_strategy => force
}.
default_bucket_config() ->
#{
rate => infinity,
burst => 0,
initial => 0
}.

View File

@ -587,7 +587,7 @@ esockd_opts(ListenerId, Type, Opts0) ->
Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0),
Limiter = limiter(Opts0),
Opts2 =
case emqx_limiter_schema:extract_with_type(connection, Limiter) of
case emqx_limiter_utils:extract_with_type(connection, Limiter) of
undefined ->
Opts1;
BucketCfg ->
@ -732,7 +732,7 @@ zone(Opts) ->
maps:get(zone, Opts, undefined).
limiter(Opts) ->
emqx_limiter_schema:get_listener_opts(Opts).
emqx_limiter_utils:get_listener_opts(Opts).
add_limiter_bucket(_Id, undefined) ->
ok;

View File

@ -589,11 +589,11 @@ t_extract_with_type(_) ->
(Type, Cfg) ->
IsOnly(Type, Cfg)
end,
?assertEqual(undefined, emqx_limiter_schema:extract_with_type(messages, undefined)),
?assertEqual(undefined, emqx_limiter_utils:extract_with_type(messages, undefined)),
?assert(
Checker(
messages,
emqx_limiter_schema:extract_with_type(messages, #{
emqx_limiter_utils:extract_with_type(messages, #{
messages => #{rate => 1}, bytes => #{rate => 1}
})
)
@ -601,7 +601,7 @@ t_extract_with_type(_) ->
?assert(
Checker(
messages,
emqx_limiter_schema:extract_with_type(messages, #{
emqx_limiter_utils:extract_with_type(messages, #{
messages => #{rate => 1},
bytes => #{rate => 1},
client => #{messages => #{rate => 2}}
@ -611,7 +611,7 @@ t_extract_with_type(_) ->
?assert(
Checker(
messages,
emqx_limiter_schema:extract_with_type(messages, #{
emqx_limiter_utils:extract_with_type(messages, #{
client => #{messages => #{rate => 2}, bytes => #{rate => 1}}
})
)
@ -622,7 +622,7 @@ t_add_bucket(_) ->
#{buckets := Buckets} = sys:get_state(emqx_limiter_server:whereis(bytes)),
?assertEqual(Size, maps:size(Buckets), Buckets)
end,
DefBucket = emqx_limiter_schema:default_bucket_config(),
DefBucket = emqx_limiter_utils:default_bucket_config(),
?assertEqual(ok, emqx_limiter_server:add_bucket(?FUNCTION_NAME, bytes, undefined)),
Checker(0),
?assertEqual(ok, emqx_limiter_server:add_bucket(?FUNCTION_NAME, bytes, DefBucket)),
@ -765,7 +765,7 @@ t_esockd_htb_consume(_) ->
t_node_short_paths(_) ->
CfgStr = <<"limiter {max_conn_rate = \"1000\", messages_rate = \"100\", bytes_rate = \"10\"}">>,
ok = emqx_common_test_helpers:load_config(emqx_limiter_schema, CfgStr),
Accessor = fun emqx_limiter_schema:get_node_opts/1,
Accessor = fun emqx_limiter_utils:get_node_opts/1,
?assertMatch(#{rate := 100.0}, Accessor(connection)),
?assertMatch(#{rate := 10.0}, Accessor(messages)),
?assertMatch(#{rate := 1.0}, Accessor(bytes)),
@ -776,7 +776,7 @@ t_compatibility_for_node_short_paths(_) ->
CfgStr =
<<"limiter {max_conn_rate = \"1000\", connection.rate = \"500\", bytes.rate = \"200\"}">>,
ok = emqx_common_test_helpers:load_config(emqx_limiter_schema, CfgStr),
Accessor = fun emqx_limiter_schema:get_node_opts/1,
Accessor = fun emqx_limiter_utils:get_node_opts/1,
?assertMatch(#{rate := 100.0}, Accessor(connection)),
?assertMatch(#{rate := 20.0}, Accessor(bytes)).
@ -796,7 +796,7 @@ t_listener_short_paths(_) ->
},
connection := #{rate := 100.0}
},
emqx_limiter_schema:get_listener_opts(ListenerOpt)
emqx_limiter_utils:get_listener_opts(ListenerOpt)
).
t_compatibility_for_listener_short_paths(_) ->
@ -809,7 +809,7 @@ t_compatibility_for_listener_short_paths(_) ->
#{
connection := #{rate := 100.0}
},
emqx_limiter_schema:get_listener_opts(ListenerOpt)
emqx_limiter_utils:get_listener_opts(ListenerOpt)
).
t_no_limiter_for_listener(_) ->
@ -818,7 +818,7 @@ t_no_limiter_for_listener(_) ->
ListenerOpt = emqx:get_config([listeners, tcp, default]),
?assertEqual(
undefined,
emqx_limiter_schema:get_listener_opts(ListenerOpt)
emqx_limiter_utils:get_listener_opts(ListenerOpt)
).
%%--------------------------------------------------------------------
@ -1135,5 +1135,5 @@ parse_schema(ConfigString) ->
).
default_client_config() ->
Conf = emqx_limiter_schema:default_client_config(),
Conf = emqx_limiter_utils:default_client_config(),
Conf#{divisible := false, max_retry_time := timer:seconds(10)}.