Merge pull request #11423 from lafirest/chore/limiter_utils
fix(limiter): simplify the code of the limiter
This commit is contained in:
commit
4eaced241d
|
@ -139,7 +139,7 @@ make_local_limiter(Cfg, Bucket) ->
|
||||||
tokens => emqx_limiter_server:get_initial_val(Cfg),
|
tokens => emqx_limiter_server:get_initial_val(Cfg),
|
||||||
lasttime => ?NOW,
|
lasttime => ?NOW,
|
||||||
bucket => Bucket,
|
bucket => Bucket,
|
||||||
capacity => emqx_limiter_schema:calc_capacity(Cfg)
|
capacity => emqx_limiter_utils:calc_capacity(Cfg)
|
||||||
}.
|
}.
|
||||||
|
|
||||||
%%@doc create a limiter server's reference
|
%%@doc create a limiter server's reference
|
||||||
|
|
|
@ -1,36 +0,0 @@
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2019-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
||||||
%%
|
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
%% you may not use this file except in compliance with the License.
|
|
||||||
%% You may obtain a copy of the License at
|
|
||||||
%%
|
|
||||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
%%
|
|
||||||
%% Unless required by applicable law or agreed to in writing, software
|
|
||||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
%% See the License for the specific language governing permissions and
|
|
||||||
%% limitations under the License.
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
-module(emqx_limiter_correction).
|
|
||||||
|
|
||||||
%% API
|
|
||||||
-export([add/2]).
|
|
||||||
|
|
||||||
-type correction_value() :: #{
|
|
||||||
correction := emqx_limiter_decimal:zero_or_float(),
|
|
||||||
any() => any()
|
|
||||||
}.
|
|
||||||
|
|
||||||
-export_type([correction_value/0]).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%%% API
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
-spec add(number(), correction_value()) -> {integer(), correction_value()}.
|
|
||||||
add(Inc, #{correction := Correction} = Data) ->
|
|
||||||
FixedInc = Inc + Correction,
|
|
||||||
IntInc = erlang:floor(FixedInc),
|
|
||||||
{IntInc, Data#{correction := FixedInc - IntInc}}.
|
|
|
@ -24,13 +24,19 @@
|
||||||
sub/2,
|
sub/2,
|
||||||
mul/2,
|
mul/2,
|
||||||
put_to_counter/3,
|
put_to_counter/3,
|
||||||
floor_div/2
|
floor_div/2,
|
||||||
|
precisely_add/2
|
||||||
]).
|
]).
|
||||||
-export_type([decimal/0, zero_or_float/0]).
|
-export_type([decimal/0, zero_or_float/0, correction_value/0]).
|
||||||
|
|
||||||
-type decimal() :: infinity | number().
|
-type decimal() :: infinity | number().
|
||||||
-type zero_or_float() :: 0 | float().
|
-type zero_or_float() :: 0 | float().
|
||||||
|
|
||||||
|
-type correction_value() :: #{
|
||||||
|
correction := emqx_limiter_decimal:zero_or_float(),
|
||||||
|
any() => any()
|
||||||
|
}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%%% API
|
%%% API
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -43,6 +49,12 @@ add(A, B) when
|
||||||
add(A, B) ->
|
add(A, B) ->
|
||||||
A + B.
|
A + B.
|
||||||
|
|
||||||
|
-spec precisely_add(number(), correction_value()) -> {integer(), correction_value()}.
|
||||||
|
precisely_add(Inc, #{correction := Correction} = Data) ->
|
||||||
|
FixedInc = Inc + Correction,
|
||||||
|
IntInc = erlang:floor(FixedInc),
|
||||||
|
{IntInc, Data#{correction := FixedInc - IntInc}}.
|
||||||
|
|
||||||
-spec sub(decimal(), decimal()) -> decimal().
|
-spec sub(decimal(), decimal()) -> decimal().
|
||||||
sub(A, B) when
|
sub(A, B) when
|
||||||
A =:= infinity orelse
|
A =:= infinity orelse
|
||||||
|
|
|
@ -131,7 +131,7 @@ delete_root(Type) ->
|
||||||
delete_bucket(?ROOT_ID, Type).
|
delete_bucket(?ROOT_ID, Type).
|
||||||
|
|
||||||
post_config_update([limiter], _Config, NewConf, _OldConf, _AppEnvs) ->
|
post_config_update([limiter], _Config, NewConf, _OldConf, _AppEnvs) ->
|
||||||
Conf = emqx_limiter_schema:convert_node_opts(NewConf),
|
Conf = emqx_limiter_utils:convert_node_opts(NewConf),
|
||||||
_ = [on_post_config_update(Type, Cfg) || {Type, Cfg} <- maps:to_list(Conf)],
|
_ = [on_post_config_update(Type, Cfg) || {Type, Cfg} <- maps:to_list(Conf)],
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
|
|
@ -33,14 +33,7 @@
|
||||||
desc/1,
|
desc/1,
|
||||||
types/0,
|
types/0,
|
||||||
short_paths/0,
|
short_paths/0,
|
||||||
calc_capacity/1,
|
short_paths_fields/1
|
||||||
extract_with_type/2,
|
|
||||||
default_client_config/0,
|
|
||||||
default_bucket_config/0,
|
|
||||||
short_paths_fields/1,
|
|
||||||
get_listener_opts/1,
|
|
||||||
get_node_opts/1,
|
|
||||||
convert_node_opts/1
|
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-define(KILOBYTE, 1024).
|
-define(KILOBYTE, 1024).
|
||||||
|
@ -263,80 +256,6 @@ types() ->
|
||||||
short_paths() ->
|
short_paths() ->
|
||||||
[max_conn_rate, messages_rate, bytes_rate].
|
[max_conn_rate, messages_rate, bytes_rate].
|
||||||
|
|
||||||
calc_capacity(#{rate := infinity}) ->
|
|
||||||
infinity;
|
|
||||||
calc_capacity(#{rate := Rate, burst := Burst}) ->
|
|
||||||
erlang:floor(1000 * Rate / default_period()) + Burst.
|
|
||||||
|
|
||||||
extract_with_type(_Type, undefined) ->
|
|
||||||
undefined;
|
|
||||||
extract_with_type(Type, #{client := ClientCfg} = BucketCfg) ->
|
|
||||||
BucketVal = maps:find(Type, BucketCfg),
|
|
||||||
ClientVal = maps:find(Type, ClientCfg),
|
|
||||||
merge_client_bucket(Type, ClientVal, BucketVal);
|
|
||||||
extract_with_type(Type, BucketCfg) ->
|
|
||||||
BucketVal = maps:find(Type, BucketCfg),
|
|
||||||
merge_client_bucket(Type, undefined, BucketVal).
|
|
||||||
|
|
||||||
%% Since the client configuration can be absent and be a undefined value,
|
|
||||||
%% but we must need some basic settings to control the behaviour of the limiter,
|
|
||||||
%% so here add this helper function to generate a default setting.
|
|
||||||
%% This is a temporary workaround until we found a better way to simplify.
|
|
||||||
default_client_config() ->
|
|
||||||
#{
|
|
||||||
rate => infinity,
|
|
||||||
initial => 0,
|
|
||||||
low_watermark => 0,
|
|
||||||
burst => 0,
|
|
||||||
divisible => true,
|
|
||||||
max_retry_time => timer:hours(1),
|
|
||||||
failure_strategy => force
|
|
||||||
}.
|
|
||||||
|
|
||||||
default_bucket_config() ->
|
|
||||||
#{
|
|
||||||
rate => infinity,
|
|
||||||
burst => 0,
|
|
||||||
initial => 0
|
|
||||||
}.
|
|
||||||
|
|
||||||
get_listener_opts(Conf) ->
|
|
||||||
Limiter = maps:get(limiter, Conf, undefined),
|
|
||||||
ShortPaths = maps:with(short_paths(), Conf),
|
|
||||||
get_listener_opts(Limiter, ShortPaths).
|
|
||||||
|
|
||||||
get_node_opts(Type) ->
|
|
||||||
Opts = emqx:get_config([limiter, Type], default_bucket_config()),
|
|
||||||
case type_to_short_path_name(Type) of
|
|
||||||
undefined ->
|
|
||||||
Opts;
|
|
||||||
Name ->
|
|
||||||
case emqx:get_config([limiter, Name], undefined) of
|
|
||||||
undefined ->
|
|
||||||
Opts;
|
|
||||||
Rate ->
|
|
||||||
Opts#{rate := Rate}
|
|
||||||
end
|
|
||||||
end.
|
|
||||||
|
|
||||||
convert_node_opts(Conf) ->
|
|
||||||
DefBucket = default_bucket_config(),
|
|
||||||
ShorPaths = short_paths(),
|
|
||||||
Fun = fun
|
|
||||||
%% The `client` in the node options was deprecated
|
|
||||||
(client, _Value, Acc) ->
|
|
||||||
Acc;
|
|
||||||
(Name, Value, Acc) ->
|
|
||||||
case lists:member(Name, ShorPaths) of
|
|
||||||
true ->
|
|
||||||
Type = short_path_name_to_type(Name),
|
|
||||||
Acc#{Type => DefBucket#{rate => Value}};
|
|
||||||
_ ->
|
|
||||||
Acc#{Name => Value}
|
|
||||||
end
|
|
||||||
end,
|
|
||||||
maps:fold(Fun, #{}, Conf).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -538,51 +457,3 @@ alias_of_type(bytes) ->
|
||||||
[bytes_in];
|
[bytes_in];
|
||||||
alias_of_type(_) ->
|
alias_of_type(_) ->
|
||||||
[].
|
[].
|
||||||
|
|
||||||
merge_client_bucket(Type, {ok, ClientVal}, {ok, BucketVal}) ->
|
|
||||||
#{Type => BucketVal, client => #{Type => ClientVal}};
|
|
||||||
merge_client_bucket(Type, {ok, ClientVal}, _) ->
|
|
||||||
#{client => #{Type => ClientVal}};
|
|
||||||
merge_client_bucket(Type, _, {ok, BucketVal}) ->
|
|
||||||
#{Type => BucketVal};
|
|
||||||
merge_client_bucket(_, _, _) ->
|
|
||||||
undefined.
|
|
||||||
|
|
||||||
short_path_name_to_type(max_conn_rate) ->
|
|
||||||
connection;
|
|
||||||
short_path_name_to_type(messages_rate) ->
|
|
||||||
messages;
|
|
||||||
short_path_name_to_type(bytes_rate) ->
|
|
||||||
bytes.
|
|
||||||
|
|
||||||
type_to_short_path_name(connection) ->
|
|
||||||
max_conn_rate;
|
|
||||||
type_to_short_path_name(messages) ->
|
|
||||||
messages_rate;
|
|
||||||
type_to_short_path_name(bytes) ->
|
|
||||||
bytes_rate;
|
|
||||||
type_to_short_path_name(_) ->
|
|
||||||
undefined.
|
|
||||||
|
|
||||||
get_listener_opts(Limiter, ShortPaths) when map_size(ShortPaths) =:= 0 ->
|
|
||||||
Limiter;
|
|
||||||
get_listener_opts(undefined, ShortPaths) ->
|
|
||||||
convert_listener_short_paths(ShortPaths);
|
|
||||||
get_listener_opts(Limiter, ShortPaths) ->
|
|
||||||
Shorts = convert_listener_short_paths(ShortPaths),
|
|
||||||
emqx_utils_maps:deep_merge(Limiter, Shorts).
|
|
||||||
|
|
||||||
convert_listener_short_paths(ShortPaths) ->
|
|
||||||
DefBucket = default_bucket_config(),
|
|
||||||
DefClient = default_client_config(),
|
|
||||||
Fun = fun(Name, Rate, Acc) ->
|
|
||||||
Type = short_path_name_to_type(Name),
|
|
||||||
case Name of
|
|
||||||
max_conn_rate ->
|
|
||||||
Acc#{Type => DefBucket#{rate => Rate}};
|
|
||||||
_ ->
|
|
||||||
Client = maps:get(client, Acc, #{}),
|
|
||||||
Acc#{client => Client#{Type => DefClient#{rate => Rate}}}
|
|
||||||
end
|
|
||||||
end,
|
|
||||||
maps:fold(Fun, #{}, ShortPaths).
|
|
||||||
|
|
|
@ -383,7 +383,7 @@ longitudinal(
|
||||||
|
|
||||||
case lists:min([ShouldAlloc, Flow, Capacity]) of
|
case lists:min([ShouldAlloc, Flow, Capacity]) of
|
||||||
Available when Available > 0 ->
|
Available when Available > 0 ->
|
||||||
{Inc, Bucket2} = emqx_limiter_correction:add(Available, Bucket),
|
{Inc, Bucket2} = emqx_limiter_decimal:precisely_add(Available, Bucket),
|
||||||
counters:add(Counter, Index, Inc),
|
counters:add(Counter, Index, Inc),
|
||||||
|
|
||||||
{Available, Buckets#{Name := Bucket2#{obtained := Obtained + Available}}};
|
{Available, Buckets#{Name := Bucket2#{obtained := Obtained + Available}}};
|
||||||
|
@ -419,7 +419,7 @@ maybe_adjust_root_tokens(#{root := #{rate := Rate} = Root, counter := Counter} =
|
||||||
State;
|
State;
|
||||||
_ ->
|
_ ->
|
||||||
Available = erlang:min(Rate - Token, InFlow),
|
Available = erlang:min(Rate - Token, InFlow),
|
||||||
{Inc, Root2} = emqx_limiter_correction:add(Available, Root),
|
{Inc, Root2} = emqx_limiter_decimal:precisely_add(Available, Root),
|
||||||
counters:add(Counter, ?ROOT_COUNTER_IDX, Inc),
|
counters:add(Counter, ?ROOT_COUNTER_IDX, Inc),
|
||||||
State#{root := Root2}
|
State#{root := Root2}
|
||||||
end.
|
end.
|
||||||
|
@ -473,7 +473,7 @@ dispatch_burst_to_buckets([Bucket | T], InFlow, Alloced, Buckets) ->
|
||||||
index := Index,
|
index := Index,
|
||||||
obtained := Obtained
|
obtained := Obtained
|
||||||
} = Bucket,
|
} = Bucket,
|
||||||
{Inc, Bucket2} = emqx_limiter_correction:add(InFlow, Bucket),
|
{Inc, Bucket2} = emqx_limiter_decimal:precisely_add(InFlow, Bucket),
|
||||||
|
|
||||||
counters:add(Counter, Index, Inc),
|
counters:add(Counter, Index, Inc),
|
||||||
|
|
||||||
|
@ -484,7 +484,7 @@ dispatch_burst_to_buckets([], _, Alloced, Buckets) ->
|
||||||
|
|
||||||
-spec init_tree(emqx_limiter_schema:limiter_type()) -> state().
|
-spec init_tree(emqx_limiter_schema:limiter_type()) -> state().
|
||||||
init_tree(Type) when is_atom(Type) ->
|
init_tree(Type) when is_atom(Type) ->
|
||||||
Cfg = emqx_limiter_schema:get_node_opts(Type),
|
Cfg = emqx_limiter_utils:get_node_opts(Type),
|
||||||
init_tree(Type, Cfg).
|
init_tree(Type, Cfg).
|
||||||
|
|
||||||
init_tree(Type, #{rate := Rate} = Cfg) ->
|
init_tree(Type, #{rate := Rate} = Cfg) ->
|
||||||
|
@ -515,7 +515,7 @@ do_add_bucket(Id, #{rate := Rate} = Cfg, #{buckets := Buckets} = State) ->
|
||||||
undefined ->
|
undefined ->
|
||||||
make_bucket(Id, Cfg, State);
|
make_bucket(Id, Cfg, State);
|
||||||
Bucket ->
|
Bucket ->
|
||||||
Bucket2 = Bucket#{rate := Rate, capacity := emqx_limiter_schema:calc_capacity(Cfg)},
|
Bucket2 = Bucket#{rate := Rate, capacity := emqx_limiter_utils:calc_capacity(Cfg)},
|
||||||
State#{buckets := Buckets#{Id := Bucket2}}
|
State#{buckets := Buckets#{Id := Bucket2}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -536,7 +536,7 @@ make_bucket(
|
||||||
rate => Rate,
|
rate => Rate,
|
||||||
obtained => Initial,
|
obtained => Initial,
|
||||||
correction => 0,
|
correction => 0,
|
||||||
capacity => emqx_limiter_schema:calc_capacity(Cfg),
|
capacity => emqx_limiter_utils:calc_capacity(Cfg),
|
||||||
counter => Counter,
|
counter => Counter,
|
||||||
index => NewIndex
|
index => NewIndex
|
||||||
},
|
},
|
||||||
|
@ -601,7 +601,7 @@ create_limiter_without_client(Id, Type, BucketCfg) ->
|
||||||
false ->
|
false ->
|
||||||
{ok, emqx_htb_limiter:make_infinity_limiter()};
|
{ok, emqx_htb_limiter:make_infinity_limiter()};
|
||||||
{ok, Bucket, RefCfg} ->
|
{ok, Bucket, RefCfg} ->
|
||||||
ClientCfg = emqx_limiter_schema:default_client_config(),
|
ClientCfg = emqx_limiter_utils:default_client_config(),
|
||||||
create_limiter_with_ref(Bucket, ClientCfg, RefCfg);
|
create_limiter_with_ref(Bucket, ClientCfg, RefCfg);
|
||||||
Error ->
|
Error ->
|
||||||
Error
|
Error
|
||||||
|
@ -627,7 +627,7 @@ find_referenced_bucket(Id, Type, #{rate := Rate} = Cfg) when Rate =/= infinity -
|
||||||
end;
|
end;
|
||||||
%% this is a node-level reference
|
%% this is a node-level reference
|
||||||
find_referenced_bucket(_Id, Type, _) ->
|
find_referenced_bucket(_Id, Type, _) ->
|
||||||
case emqx_limiter_schema:get_node_opts(Type) of
|
case emqx_limiter_utils:get_node_opts(Type) of
|
||||||
#{rate := infinity} ->
|
#{rate := infinity} ->
|
||||||
false;
|
false;
|
||||||
NodeCfg ->
|
NodeCfg ->
|
||||||
|
|
|
@ -86,7 +86,7 @@ init([]) ->
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%--==================================================================
|
%%--==================================================================
|
||||||
make_child(Type) ->
|
make_child(Type) ->
|
||||||
Cfg = emqx_limiter_schema:get_node_opts(Type),
|
Cfg = emqx_limiter_utils:get_node_opts(Type),
|
||||||
make_child(Type, Cfg).
|
make_child(Type, Cfg).
|
||||||
|
|
||||||
make_child(Type, Cfg) ->
|
make_child(Type, Cfg) ->
|
||||||
|
|
|
@ -0,0 +1,158 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
-module(emqx_limiter_utils).
|
||||||
|
|
||||||
|
-export([
|
||||||
|
calc_capacity/1,
|
||||||
|
extract_with_type/2,
|
||||||
|
get_listener_opts/1,
|
||||||
|
get_node_opts/1,
|
||||||
|
convert_node_opts/1,
|
||||||
|
default_client_config/0,
|
||||||
|
default_bucket_config/0
|
||||||
|
]).
|
||||||
|
|
||||||
|
-import(emqx_limiter_schema, [default_period/0, short_paths/0]).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Configuration-related runtime code
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
calc_capacity(#{rate := infinity}) ->
|
||||||
|
infinity;
|
||||||
|
calc_capacity(#{rate := Rate, burst := Burst}) ->
|
||||||
|
erlang:floor(1000 * Rate / default_period()) + Burst.
|
||||||
|
|
||||||
|
%% @doc extract data of a type from the nested config
|
||||||
|
extract_with_type(_Type, undefined) ->
|
||||||
|
undefined;
|
||||||
|
extract_with_type(Type, #{client := ClientCfg} = BucketCfg) ->
|
||||||
|
BucketVal = maps:find(Type, BucketCfg),
|
||||||
|
ClientVal = maps:find(Type, ClientCfg),
|
||||||
|
merge_client_bucket(Type, ClientVal, BucketVal);
|
||||||
|
extract_with_type(Type, BucketCfg) ->
|
||||||
|
BucketVal = maps:find(Type, BucketCfg),
|
||||||
|
merge_client_bucket(Type, undefined, BucketVal).
|
||||||
|
|
||||||
|
%% @doc get the limiter configuration from the listener setting
|
||||||
|
%% and compatible with the old version limiter schema
|
||||||
|
get_listener_opts(Conf) ->
|
||||||
|
Limiter = maps:get(limiter, Conf, undefined),
|
||||||
|
ShortPaths = maps:with(short_paths(), Conf),
|
||||||
|
get_listener_opts(Limiter, ShortPaths).
|
||||||
|
|
||||||
|
get_listener_opts(Limiter, ShortPaths) when map_size(ShortPaths) =:= 0 ->
|
||||||
|
Limiter;
|
||||||
|
get_listener_opts(undefined, ShortPaths) ->
|
||||||
|
convert_listener_short_paths(ShortPaths);
|
||||||
|
get_listener_opts(Limiter, ShortPaths) ->
|
||||||
|
Shorts = convert_listener_short_paths(ShortPaths),
|
||||||
|
emqx_utils_maps:deep_merge(Limiter, Shorts).
|
||||||
|
|
||||||
|
convert_listener_short_paths(ShortPaths) ->
|
||||||
|
DefBucket = default_bucket_config(),
|
||||||
|
DefClient = default_client_config(),
|
||||||
|
Fun = fun(Name, Rate, Acc) ->
|
||||||
|
Type = short_path_name_to_type(Name),
|
||||||
|
case Name of
|
||||||
|
max_conn_rate ->
|
||||||
|
Acc#{Type => DefBucket#{rate => Rate}};
|
||||||
|
_ ->
|
||||||
|
Client = maps:get(client, Acc, #{}),
|
||||||
|
Acc#{client => Client#{Type => DefClient#{rate => Rate}}}
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
maps:fold(Fun, #{}, ShortPaths).
|
||||||
|
|
||||||
|
%% @doc get the node-level limiter configuration and compatible with the old version limiter schema
|
||||||
|
get_node_opts(Type) ->
|
||||||
|
Opts = emqx:get_config([limiter, Type], default_bucket_config()),
|
||||||
|
case type_to_short_path_name(Type) of
|
||||||
|
undefined ->
|
||||||
|
Opts;
|
||||||
|
Name ->
|
||||||
|
case emqx:get_config([limiter, Name], undefined) of
|
||||||
|
undefined ->
|
||||||
|
Opts;
|
||||||
|
Rate ->
|
||||||
|
Opts#{rate := Rate}
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
|
convert_node_opts(Conf) ->
|
||||||
|
DefBucket = default_bucket_config(),
|
||||||
|
ShorPaths = short_paths(),
|
||||||
|
Fun = fun
|
||||||
|
%% The `client` in the node options was deprecated
|
||||||
|
(client, _Value, Acc) ->
|
||||||
|
Acc;
|
||||||
|
(Name, Value, Acc) ->
|
||||||
|
case lists:member(Name, ShorPaths) of
|
||||||
|
true ->
|
||||||
|
Type = short_path_name_to_type(Name),
|
||||||
|
Acc#{Type => DefBucket#{rate => Value}};
|
||||||
|
_ ->
|
||||||
|
Acc#{Name => Value}
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
maps:fold(Fun, #{}, Conf).
|
||||||
|
|
||||||
|
merge_client_bucket(Type, {ok, ClientVal}, {ok, BucketVal}) ->
|
||||||
|
#{Type => BucketVal, client => #{Type => ClientVal}};
|
||||||
|
merge_client_bucket(Type, {ok, ClientVal}, _) ->
|
||||||
|
#{client => #{Type => ClientVal}};
|
||||||
|
merge_client_bucket(Type, _, {ok, BucketVal}) ->
|
||||||
|
#{Type => BucketVal};
|
||||||
|
merge_client_bucket(_, _, _) ->
|
||||||
|
undefined.
|
||||||
|
|
||||||
|
short_path_name_to_type(max_conn_rate) ->
|
||||||
|
connection;
|
||||||
|
short_path_name_to_type(messages_rate) ->
|
||||||
|
messages;
|
||||||
|
short_path_name_to_type(bytes_rate) ->
|
||||||
|
bytes.
|
||||||
|
|
||||||
|
type_to_short_path_name(connection) ->
|
||||||
|
max_conn_rate;
|
||||||
|
type_to_short_path_name(messages) ->
|
||||||
|
messages_rate;
|
||||||
|
type_to_short_path_name(bytes) ->
|
||||||
|
bytes_rate;
|
||||||
|
type_to_short_path_name(_) ->
|
||||||
|
undefined.
|
||||||
|
|
||||||
|
%% Since the client configuration can be absent and be a undefined value,
|
||||||
|
%% but we must need some basic settings to control the behaviour of the limiter,
|
||||||
|
%% so here add this helper function to generate a default setting.
|
||||||
|
%% This is a temporary workaround until we found a better way to simplify.
|
||||||
|
default_client_config() ->
|
||||||
|
#{
|
||||||
|
rate => infinity,
|
||||||
|
initial => 0,
|
||||||
|
low_watermark => 0,
|
||||||
|
burst => 0,
|
||||||
|
divisible => true,
|
||||||
|
max_retry_time => timer:hours(1),
|
||||||
|
failure_strategy => force
|
||||||
|
}.
|
||||||
|
|
||||||
|
default_bucket_config() ->
|
||||||
|
#{
|
||||||
|
rate => infinity,
|
||||||
|
burst => 0,
|
||||||
|
initial => 0
|
||||||
|
}.
|
|
@ -587,7 +587,7 @@ esockd_opts(ListenerId, Type, Opts0) ->
|
||||||
Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0),
|
Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0),
|
||||||
Limiter = limiter(Opts0),
|
Limiter = limiter(Opts0),
|
||||||
Opts2 =
|
Opts2 =
|
||||||
case emqx_limiter_schema:extract_with_type(connection, Limiter) of
|
case emqx_limiter_utils:extract_with_type(connection, Limiter) of
|
||||||
undefined ->
|
undefined ->
|
||||||
Opts1;
|
Opts1;
|
||||||
BucketCfg ->
|
BucketCfg ->
|
||||||
|
@ -732,7 +732,7 @@ zone(Opts) ->
|
||||||
maps:get(zone, Opts, undefined).
|
maps:get(zone, Opts, undefined).
|
||||||
|
|
||||||
limiter(Opts) ->
|
limiter(Opts) ->
|
||||||
emqx_limiter_schema:get_listener_opts(Opts).
|
emqx_limiter_utils:get_listener_opts(Opts).
|
||||||
|
|
||||||
add_limiter_bucket(_Id, undefined) ->
|
add_limiter_bucket(_Id, undefined) ->
|
||||||
ok;
|
ok;
|
||||||
|
|
|
@ -589,11 +589,11 @@ t_extract_with_type(_) ->
|
||||||
(Type, Cfg) ->
|
(Type, Cfg) ->
|
||||||
IsOnly(Type, Cfg)
|
IsOnly(Type, Cfg)
|
||||||
end,
|
end,
|
||||||
?assertEqual(undefined, emqx_limiter_schema:extract_with_type(messages, undefined)),
|
?assertEqual(undefined, emqx_limiter_utils:extract_with_type(messages, undefined)),
|
||||||
?assert(
|
?assert(
|
||||||
Checker(
|
Checker(
|
||||||
messages,
|
messages,
|
||||||
emqx_limiter_schema:extract_with_type(messages, #{
|
emqx_limiter_utils:extract_with_type(messages, #{
|
||||||
messages => #{rate => 1}, bytes => #{rate => 1}
|
messages => #{rate => 1}, bytes => #{rate => 1}
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
|
@ -601,7 +601,7 @@ t_extract_with_type(_) ->
|
||||||
?assert(
|
?assert(
|
||||||
Checker(
|
Checker(
|
||||||
messages,
|
messages,
|
||||||
emqx_limiter_schema:extract_with_type(messages, #{
|
emqx_limiter_utils:extract_with_type(messages, #{
|
||||||
messages => #{rate => 1},
|
messages => #{rate => 1},
|
||||||
bytes => #{rate => 1},
|
bytes => #{rate => 1},
|
||||||
client => #{messages => #{rate => 2}}
|
client => #{messages => #{rate => 2}}
|
||||||
|
@ -611,7 +611,7 @@ t_extract_with_type(_) ->
|
||||||
?assert(
|
?assert(
|
||||||
Checker(
|
Checker(
|
||||||
messages,
|
messages,
|
||||||
emqx_limiter_schema:extract_with_type(messages, #{
|
emqx_limiter_utils:extract_with_type(messages, #{
|
||||||
client => #{messages => #{rate => 2}, bytes => #{rate => 1}}
|
client => #{messages => #{rate => 2}, bytes => #{rate => 1}}
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
|
@ -622,7 +622,7 @@ t_add_bucket(_) ->
|
||||||
#{buckets := Buckets} = sys:get_state(emqx_limiter_server:whereis(bytes)),
|
#{buckets := Buckets} = sys:get_state(emqx_limiter_server:whereis(bytes)),
|
||||||
?assertEqual(Size, maps:size(Buckets), Buckets)
|
?assertEqual(Size, maps:size(Buckets), Buckets)
|
||||||
end,
|
end,
|
||||||
DefBucket = emqx_limiter_schema:default_bucket_config(),
|
DefBucket = emqx_limiter_utils:default_bucket_config(),
|
||||||
?assertEqual(ok, emqx_limiter_server:add_bucket(?FUNCTION_NAME, bytes, undefined)),
|
?assertEqual(ok, emqx_limiter_server:add_bucket(?FUNCTION_NAME, bytes, undefined)),
|
||||||
Checker(0),
|
Checker(0),
|
||||||
?assertEqual(ok, emqx_limiter_server:add_bucket(?FUNCTION_NAME, bytes, DefBucket)),
|
?assertEqual(ok, emqx_limiter_server:add_bucket(?FUNCTION_NAME, bytes, DefBucket)),
|
||||||
|
@ -765,7 +765,7 @@ t_esockd_htb_consume(_) ->
|
||||||
t_node_short_paths(_) ->
|
t_node_short_paths(_) ->
|
||||||
CfgStr = <<"limiter {max_conn_rate = \"1000\", messages_rate = \"100\", bytes_rate = \"10\"}">>,
|
CfgStr = <<"limiter {max_conn_rate = \"1000\", messages_rate = \"100\", bytes_rate = \"10\"}">>,
|
||||||
ok = emqx_common_test_helpers:load_config(emqx_limiter_schema, CfgStr),
|
ok = emqx_common_test_helpers:load_config(emqx_limiter_schema, CfgStr),
|
||||||
Accessor = fun emqx_limiter_schema:get_node_opts/1,
|
Accessor = fun emqx_limiter_utils:get_node_opts/1,
|
||||||
?assertMatch(#{rate := 100.0}, Accessor(connection)),
|
?assertMatch(#{rate := 100.0}, Accessor(connection)),
|
||||||
?assertMatch(#{rate := 10.0}, Accessor(messages)),
|
?assertMatch(#{rate := 10.0}, Accessor(messages)),
|
||||||
?assertMatch(#{rate := 1.0}, Accessor(bytes)),
|
?assertMatch(#{rate := 1.0}, Accessor(bytes)),
|
||||||
|
@ -776,7 +776,7 @@ t_compatibility_for_node_short_paths(_) ->
|
||||||
CfgStr =
|
CfgStr =
|
||||||
<<"limiter {max_conn_rate = \"1000\", connection.rate = \"500\", bytes.rate = \"200\"}">>,
|
<<"limiter {max_conn_rate = \"1000\", connection.rate = \"500\", bytes.rate = \"200\"}">>,
|
||||||
ok = emqx_common_test_helpers:load_config(emqx_limiter_schema, CfgStr),
|
ok = emqx_common_test_helpers:load_config(emqx_limiter_schema, CfgStr),
|
||||||
Accessor = fun emqx_limiter_schema:get_node_opts/1,
|
Accessor = fun emqx_limiter_utils:get_node_opts/1,
|
||||||
?assertMatch(#{rate := 100.0}, Accessor(connection)),
|
?assertMatch(#{rate := 100.0}, Accessor(connection)),
|
||||||
?assertMatch(#{rate := 20.0}, Accessor(bytes)).
|
?assertMatch(#{rate := 20.0}, Accessor(bytes)).
|
||||||
|
|
||||||
|
@ -796,7 +796,7 @@ t_listener_short_paths(_) ->
|
||||||
},
|
},
|
||||||
connection := #{rate := 100.0}
|
connection := #{rate := 100.0}
|
||||||
},
|
},
|
||||||
emqx_limiter_schema:get_listener_opts(ListenerOpt)
|
emqx_limiter_utils:get_listener_opts(ListenerOpt)
|
||||||
).
|
).
|
||||||
|
|
||||||
t_compatibility_for_listener_short_paths(_) ->
|
t_compatibility_for_listener_short_paths(_) ->
|
||||||
|
@ -809,7 +809,7 @@ t_compatibility_for_listener_short_paths(_) ->
|
||||||
#{
|
#{
|
||||||
connection := #{rate := 100.0}
|
connection := #{rate := 100.0}
|
||||||
},
|
},
|
||||||
emqx_limiter_schema:get_listener_opts(ListenerOpt)
|
emqx_limiter_utils:get_listener_opts(ListenerOpt)
|
||||||
).
|
).
|
||||||
|
|
||||||
t_no_limiter_for_listener(_) ->
|
t_no_limiter_for_listener(_) ->
|
||||||
|
@ -818,7 +818,7 @@ t_no_limiter_for_listener(_) ->
|
||||||
ListenerOpt = emqx:get_config([listeners, tcp, default]),
|
ListenerOpt = emqx:get_config([listeners, tcp, default]),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
undefined,
|
undefined,
|
||||||
emqx_limiter_schema:get_listener_opts(ListenerOpt)
|
emqx_limiter_utils:get_listener_opts(ListenerOpt)
|
||||||
).
|
).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -1135,5 +1135,5 @@ parse_schema(ConfigString) ->
|
||||||
).
|
).
|
||||||
|
|
||||||
default_client_config() ->
|
default_client_config() ->
|
||||||
Conf = emqx_limiter_schema:default_client_config(),
|
Conf = emqx_limiter_utils:default_client_config(),
|
||||||
Conf#{divisible := false, max_retry_time := timer:seconds(10)}.
|
Conf#{divisible := false, max_retry_time := timer:seconds(10)}.
|
||||||
|
|
Loading…
Reference in New Issue