Merge pull request #10591 from lafirest/fix/simplify_limiter_client_cfg

perf(limiter): simplify the memory represent of limiter configuration
This commit is contained in:
lafirest 2023-05-05 16:59:04 +08:00 committed by GitHub
commit 335d948bce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 441 additions and 167 deletions

View File

@ -22,7 +22,7 @@
%% API
-export([
make_token_bucket_limiter/2,
make_local_limiter/2,
make_ref_limiter/2,
check/2,
consume/2,
@ -32,12 +32,11 @@
make_future/1,
available/1
]).
-export_type([token_bucket_limiter/0]).
-export_type([local_limiter/0]).
%% a token bucket limiter with a limiter server's bucket reference
%% the number of tokens currently available
-type token_bucket_limiter() :: #{
%% a token bucket limiter which may or not contains a reference to another limiter,
%% and can be used in a client alone
-type local_limiter() :: #{
tokens := non_neg_integer(),
rate := decimal(),
capacity := decimal(),
@ -58,12 +57,12 @@
retry_ctx =>
undefined
%% the retry context
| retry_context(token_bucket_limiter()),
| retry_context(local_limiter()),
%% allow to add other keys
atom => any()
}.
%% a limiter server's bucket reference
%% a limiter instance which only contains a reference to another limiter(bucket)
-type ref_limiter() :: #{
max_retry_time := non_neg_integer(),
failure_strategy := failure_strategy(),
@ -88,7 +87,7 @@
}.
-type bucket() :: emqx_limiter_bucket_ref:bucket_ref().
-type limiter() :: token_bucket_limiter() | ref_limiter() | infinity.
-type limiter() :: local_limiter() | ref_limiter() | infinity.
-type millisecond() :: non_neg_integer().
-type pause_type() :: pause | partial.
@ -116,7 +115,7 @@
rate := decimal(),
initial := non_neg_integer(),
low_watermark := non_neg_integer(),
capacity := decimal(),
burst := decimal(),
divisible := boolean(),
max_retry_time := non_neg_integer(),
failure_strategy := failure_strategy()
@ -134,8 +133,8 @@
%% API
%%--------------------------------------------------------------------
%%@doc create a limiter
-spec make_token_bucket_limiter(limiter_bucket_cfg(), bucket()) -> _.
make_token_bucket_limiter(Cfg, Bucket) ->
-spec make_local_limiter(limiter_bucket_cfg(), bucket()) -> _.
make_local_limiter(Cfg, Bucket) ->
Cfg#{
tokens => emqx_limiter_server:get_initial_val(Cfg),
lasttime => ?NOW,
@ -312,8 +311,8 @@ on_failure(throw, Limiter) ->
Message = io_lib:format("limiter consume failed, limiter:~p~n", [Limiter]),
erlang:throw({rate_check_fail, Message}).
-spec do_check_with_parent_limiter(pos_integer(), token_bucket_limiter()) ->
inner_check_result(token_bucket_limiter()).
-spec do_check_with_parent_limiter(pos_integer(), local_limiter()) ->
inner_check_result(local_limiter()).
do_check_with_parent_limiter(
Need,
#{
@ -336,7 +335,7 @@ do_check_with_parent_limiter(
)
end.
-spec do_reset(pos_integer(), token_bucket_limiter()) -> inner_check_result(token_bucket_limiter()).
-spec do_reset(pos_integer(), local_limiter()) -> inner_check_result(local_limiter()).
do_reset(
Need,
#{

View File

@ -30,6 +30,12 @@
post_config_update/5
]).
-export([
find_root/1,
insert_root/2,
delete_root/1
]).
-export([
start_server/1,
start_server/2,
@ -62,6 +68,7 @@
-define(UID(Id, Type), {Id, Type}).
-define(TAB, emqx_limiter_counters).
-define(ROOT_ID, root).
%%--------------------------------------------------------------------
%% API
@ -104,9 +111,25 @@ insert_bucket(Id, Type, Bucket) ->
).
-spec delete_bucket(limiter_id(), limiter_type()) -> true.
delete_bucket(Type, Id) ->
delete_bucket(Id, Type) ->
ets:delete(?TAB, ?UID(Id, Type)).
-spec find_root(limiter_type()) ->
{ok, bucket_ref()} | undefined.
find_root(Type) ->
find_bucket(?ROOT_ID, Type).
-spec insert_root(
limiter_type(),
bucket_ref()
) -> boolean().
insert_root(Type, Bucket) ->
insert_bucket(?ROOT_ID, Type, Bucket).
-spec delete_root(limiter_type()) -> true.
delete_root(Type) ->
delete_bucket(?ROOT_ID, Type).
post_config_update([limiter], _Config, NewConf, _OldConf, _AppEnvs) ->
Types = lists:delete(client, maps:keys(NewConf)),
_ = [on_post_config_update(Type, NewConf) || Type <- Types],

View File

@ -32,7 +32,9 @@
get_bucket_cfg_path/2,
desc/1,
types/0,
calc_capacity/1
calc_capacity/1,
extract_with_type/2,
default_client_config/0
]).
-define(KILOBYTE, 1024).
@ -94,30 +96,33 @@
namespace() -> limiter.
roots() ->
[{limiter, hoconsc:mk(hoconsc:ref(?MODULE, limiter), #{importance => ?IMPORTANCE_HIDDEN})}].
[
{limiter,
hoconsc:mk(hoconsc:ref(?MODULE, limiter), #{
importance => ?IMPORTANCE_HIDDEN
})}
].
fields(limiter) ->
[
{Type,
?HOCON(?R_REF(node_opts), #{
desc => ?DESC(Type),
default => #{},
importance => ?IMPORTANCE_HIDDEN,
aliases => alias_of_type(Type)
})}
|| Type <- types()
] ++
[
%% This is an undocumented feature, and it won't be support anymore
{client,
?HOCON(
?R_REF(client_fields),
#{
desc => ?DESC(client),
importance => ?IMPORTANCE_HIDDEN,
default => maps:from_list([
{erlang:atom_to_binary(Type), #{}}
|| Type <- types()
])
required => {false, recursively},
deprecated => {since, "5.0.25"}
}
)}
];
@ -131,7 +136,7 @@ fields(node_opts) ->
})}
];
fields(client_fields) ->
client_fields(types(), #{default => #{}});
client_fields(types());
fields(bucket_opts) ->
fields_of_bucket(<<"infinity">>);
fields(client_opts) ->
@ -194,7 +199,7 @@ fields(client_opts) ->
fields(listener_fields) ->
composite_bucket_fields(?LISTENER_BUCKET_KEYS, listener_client_fields);
fields(listener_client_fields) ->
client_fields(?LISTENER_BUCKET_KEYS, #{required => false});
client_fields(?LISTENER_BUCKET_KEYS);
fields(Type) ->
simple_bucket_field(Type).
@ -236,6 +241,31 @@ calc_capacity(#{rate := infinity}) ->
calc_capacity(#{rate := Rate, burst := Burst}) ->
erlang:floor(1000 * Rate / default_period()) + Burst.
extract_with_type(_Type, undefined) ->
undefined;
extract_with_type(Type, #{client := ClientCfg} = BucketCfg) ->
BucketVal = maps:find(Type, BucketCfg),
ClientVal = maps:find(Type, ClientCfg),
merge_client_bucket(Type, ClientVal, BucketVal);
extract_with_type(Type, BucketCfg) ->
BucketVal = maps:find(Type, BucketCfg),
merge_client_bucket(Type, undefined, BucketVal).
%% Since the client configuration can be absent and be a undefined value,
%% but we must need some basic settings to control the behaviour of the limiter,
%% so here add this helper function to generate a default setting.
%% This is a temporary workaround until we found a better way to simplify.
default_client_config() ->
#{
rate => infinity,
initial => 0,
low_watermark => 0,
burst => 0,
divisible => false,
max_retry_time => timer:seconds(10),
failure_strategy => force
}.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
@ -362,7 +392,7 @@ simple_bucket_field(Type) when is_atom(Type) ->
?R_REF(?MODULE, client_opts),
#{
desc => ?DESC(client),
required => false,
required => {false, recursively},
importance => importance_of_type(Type),
aliases => alias_of_type(Type)
}
@ -375,7 +405,7 @@ composite_bucket_fields(Types, ClientRef) ->
{Type,
?HOCON(?R_REF(?MODULE, bucket_opts), #{
desc => ?DESC(?MODULE, Type),
required => false,
required => {false, recursively},
importance => importance_of_type(Type),
aliases => alias_of_type(Type)
})}
@ -387,7 +417,7 @@ composite_bucket_fields(Types, ClientRef) ->
?R_REF(?MODULE, ClientRef),
#{
desc => ?DESC(client),
required => false
required => {false, recursively}
}
)}
].
@ -410,11 +440,12 @@ fields_of_bucket(Default) ->
})}
].
client_fields(Types, Meta) ->
client_fields(Types) ->
[
{Type,
?HOCON(?R_REF(client_opts), Meta#{
?HOCON(?R_REF(client_opts), #{
desc => ?DESC(Type),
required => false,
importance => importance_of_type(Type),
aliases => alias_of_type(Type)
})}
@ -436,3 +467,12 @@ alias_of_type(bytes) ->
[bytes_in];
alias_of_type(_) ->
[].
merge_client_bucket(Type, {ok, ClientVal}, {ok, BucketVal}) ->
#{Type => BucketVal, client => #{Type => ClientVal}};
merge_client_bucket(Type, {ok, ClientVal}, _) ->
#{client => #{Type => ClientVal}};
merge_client_bucket(Type, _, {ok, BucketVal}) ->
#{Type => BucketVal};
merge_client_bucket(_, _, _) ->
undefined.

View File

@ -59,7 +59,8 @@
burst := rate(),
%% token generation interval(second)
period := pos_integer(),
produced := float()
produced := float(),
correction := emqx_limiter_decimal:zero_or_float()
}.
-type bucket() :: #{
@ -98,6 +99,7 @@
%% minimum coefficient for overloaded limiter
-define(OVERLOAD_MIN_ALLOC, 0.3).
-define(COUNTER_SIZE, 8).
-define(ROOT_COUNTER_IDX, 1).
-export_type([index/0]).
-import(emqx_limiter_decimal, [add/2, sub/2, mul/2, put_to_counter/3]).
@ -110,47 +112,24 @@
-spec connect(
limiter_id(),
limiter_type(),
bucket_name() | #{limiter_type() => bucket_name() | undefined}
hocons:config() | undefined
) ->
{ok, emqx_htb_limiter:limiter()} | {error, _}.
%% If no bucket path is set in config, there will be no limit
connect(_Id, _Type, undefined) ->
{ok, emqx_htb_limiter:make_infinity_limiter()};
%% undefined is the default situation, no limiter setting by default
connect(Id, Type, undefined) ->
create_limiter(Id, Type, undefined, undefined);
connect(Id, Type, #{rate := _} = Cfg) ->
create_limiter(Id, Type, maps:get(client, Cfg, undefined), Cfg);
connect(Id, Type, Cfg) ->
case find_limiter_cfg(Type, Cfg) of
{_ClientCfg, undefined, _NodeCfg} ->
{ok, emqx_htb_limiter:make_infinity_limiter()};
{#{rate := infinity}, #{rate := infinity}, #{rate := infinity}} ->
{ok, emqx_htb_limiter:make_infinity_limiter()};
{ClientCfg, #{rate := infinity}, #{rate := infinity}} ->
{ok,
emqx_htb_limiter:make_token_bucket_limiter(
ClientCfg, emqx_limiter_bucket_ref:infinity_bucket()
)};
{
#{rate := CliRate} = ClientCfg,
#{rate := BucketRate} = BucketCfg,
_
} ->
case emqx_limiter_manager:find_bucket(Id, Type) of
{ok, Bucket} ->
BucketSize = emqx_limiter_schema:calc_capacity(BucketCfg),
CliSize = emqx_limiter_schema:calc_capacity(ClientCfg),
{ok,
if
CliRate < BucketRate orelse CliSize < BucketSize ->
emqx_htb_limiter:make_token_bucket_limiter(ClientCfg, Bucket);
true ->
emqx_htb_limiter:make_ref_limiter(ClientCfg, Bucket)
end};
undefined ->
?SLOG(error, #{msg => "bucket_not_found", type => Type, id => Id}),
{error, invalid_bucket}
end
end.
create_limiter(
Id,
Type,
emqx_utils_maps:deep_get([client, Type], Cfg, undefined),
maps:get(Type, Cfg, undefined)
).
-spec add_bucket(limiter_id(), limiter_type(), hocons:config() | undefined) -> ok.
add_bucket(_Id, _Type, undefine) ->
add_bucket(_Id, _Type, undefined) ->
ok;
add_bucket(Id, Type, Cfg) ->
?CALL(Type, {add_bucket, Id, Cfg}).
@ -288,7 +267,8 @@ handle_info(Info, State) ->
Reason :: normal | shutdown | {shutdown, term()} | term(),
State :: term()
) -> any().
terminate(_Reason, _State) ->
terminate(_Reason, #{type := Type}) ->
emqx_limiter_manager:delete_root(Type),
ok.
%%--------------------------------------------------------------------
@ -343,10 +323,14 @@ oscillation(
oscillate(Interval),
Ordereds = get_ordered_buckets(Buckets),
{Alloced, Buckets2} = transverse(Ordereds, Flow, 0.0, Buckets),
maybe_burst(State#{
buckets := Buckets2,
root := Root#{produced := Produced + Alloced}
}).
State2 = maybe_adjust_root_tokens(
State#{
buckets := Buckets2,
root := Root#{produced := Produced + Alloced}
},
Alloced
),
maybe_burst(State2).
%% @doc horizontal spread
-spec transverse(
@ -419,6 +403,24 @@ get_ordered_buckets(Buckets) ->
Buckets
).
-spec maybe_adjust_root_tokens(state(), float()) -> state().
maybe_adjust_root_tokens(#{root := #{rate := infinity}} = State, _Alloced) ->
State;
maybe_adjust_root_tokens(#{root := #{rate := Rate}} = State, Alloced) when Alloced >= Rate ->
State;
maybe_adjust_root_tokens(#{root := #{rate := Rate} = Root, counter := Counter} = State, Alloced) ->
InFlow = Rate - Alloced,
Token = counters:get(Counter, ?ROOT_COUNTER_IDX),
case Token >= Rate of
true ->
State;
_ ->
Available = erlang:min(Rate - Token, InFlow),
{Inc, Root2} = emqx_limiter_correction:add(Available, Root),
counters:add(Counter, ?ROOT_COUNTER_IDX, Inc),
State#{root := Root2}
end.
-spec maybe_burst(state()) -> state().
maybe_burst(
#{
@ -482,12 +484,16 @@ init_tree(Type) when is_atom(Type) ->
Cfg = emqx:get_config([limiter, Type]),
init_tree(Type, Cfg).
init_tree(Type, Cfg) ->
init_tree(Type, #{rate := Rate} = Cfg) ->
Counter = counters:new(?COUNTER_SIZE, [write_concurrency]),
RootBucket = emqx_limiter_bucket_ref:new(Counter, ?ROOT_COUNTER_IDX, Rate),
emqx_limiter_manager:insert_root(Type, RootBucket),
#{
type => Type,
root => make_root(Cfg),
counter => counters:new(?COUNTER_SIZE, [write_concurrency]),
index => 0,
counter => Counter,
%% The first slot is reserved for the root
index => ?ROOT_COUNTER_IDX,
buckets => #{}
}.
@ -497,7 +503,8 @@ make_root(#{rate := Rate, burst := Burst}) ->
rate => Rate,
burst => Burst,
period => emqx_limiter_schema:default_period(),
produced => 0.0
produced => 0.0,
correction => 0
}.
do_add_bucket(_Id, #{rate := infinity}, #{root := #{rate := infinity}} = State) ->
@ -571,25 +578,61 @@ call(Type, Msg) ->
gen_server:call(Pid, Msg)
end.
find_limiter_cfg(Type, #{rate := _} = Cfg) ->
{find_client_cfg(Type, maps:get(client, Cfg, undefined)), Cfg, find_node_cfg(Type)};
find_limiter_cfg(Type, Cfg) ->
{
find_client_cfg(Type, emqx_utils_maps:deep_get([client, Type], Cfg, undefined)),
maps:get(Type, Cfg, undefined),
find_node_cfg(Type)
}.
create_limiter(Id, Type, #{rate := Rate} = ClientCfg, BucketCfg) when Rate =/= infinity ->
create_limiter_with_client(Id, Type, ClientCfg, BucketCfg);
create_limiter(Id, Type, _, BucketCfg) ->
create_limiter_without_client(Id, Type, BucketCfg).
find_client_cfg(Type, BucketCfg) ->
NodeCfg = emqx:get_config([limiter, client, Type], undefined),
merge_client_cfg(NodeCfg, BucketCfg).
%% create a limiter with the client-level configuration
create_limiter_with_client(Id, Type, ClientCfg, BucketCfg) ->
case find_referenced_bucket(Id, Type, BucketCfg) of
false ->
{ok, emqx_htb_limiter:make_local_limiter(ClientCfg, infinity)};
{ok, Bucket, RefCfg} ->
create_limiter_with_ref(Bucket, ClientCfg, RefCfg);
Error ->
Error
end.
merge_client_cfg(undefined, BucketCfg) ->
BucketCfg;
merge_client_cfg(NodeCfg, undefined) ->
NodeCfg;
merge_client_cfg(NodeCfg, BucketCfg) ->
maps:merge(NodeCfg, BucketCfg).
%% create a limiter only with the referenced configuration
create_limiter_without_client(Id, Type, BucketCfg) ->
case find_referenced_bucket(Id, Type, BucketCfg) of
false ->
{ok, emqx_htb_limiter:make_infinity_limiter()};
{ok, Bucket, RefCfg} ->
ClientCfg = emqx_limiter_schema:default_client_config(),
create_limiter_with_ref(Bucket, ClientCfg, RefCfg);
Error ->
Error
end.
find_node_cfg(Type) ->
emqx:get_config([limiter, Type], #{rate => infinity, burst => 0}).
create_limiter_with_ref(
Bucket,
#{rate := CliRate} = ClientCfg,
#{rate := RefRate}
) when CliRate < RefRate ->
{ok, emqx_htb_limiter:make_local_limiter(ClientCfg, Bucket)};
create_limiter_with_ref(Bucket, ClientCfg, _) ->
{ok, emqx_htb_limiter:make_ref_limiter(ClientCfg, Bucket)}.
%% this is a listener(server)-level reference
find_referenced_bucket(Id, Type, #{rate := Rate} = Cfg) when Rate =/= infinity ->
case emqx_limiter_manager:find_bucket(Id, Type) of
{ok, Bucket} ->
{ok, Bucket, Cfg};
_ ->
?SLOG(error, #{msg => "bucket not found", type => Type, id => Id}),
{error, invalid_bucket}
end;
%% this is a node-level reference
find_referenced_bucket(Id, Type, _) ->
case emqx:get_config([limiter, Type], undefined) of
#{rate := infinity} ->
false;
undefined ->
?SLOG(error, #{msg => "invalid limiter type", type => Type, id => Id}),
{error, invalid_bucket};
NodeCfg ->
{ok, Bucket} = emqx_limiter_manager:find_root(Type),
{ok, Bucket, NodeCfg}
end.

View File

@ -494,7 +494,7 @@ esockd_opts(ListenerId, Type, Opts0) ->
Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0),
Limiter = limiter(Opts0),
Opts2 =
case maps:get(connection, Limiter, undefined) of
case emqx_limiter_schema:extract_with_type(connection, Limiter) of
undefined ->
Opts1;
BucketCfg ->
@ -639,7 +639,7 @@ zone(Opts) ->
maps:get(zone, Opts, undefined).
limiter(Opts) ->
maps:get(limiter, Opts, #{}).
maps:get(limiter, Opts, undefined).
add_limiter_bucket(Id, #{limiter := Limiter}) ->
maps:fold(

View File

@ -38,6 +38,7 @@
-define(LOGT(Format, Args), ct:pal("TEST_SUITE: " ++ Format, Args)).
-define(RATE(Rate), to_rate(Rate)).
-define(NOW, erlang:system_time(millisecond)).
-define(ROOT_COUNTER_IDX, 1).
%%--------------------------------------------------------------------
%% Setups
@ -211,11 +212,11 @@ t_infinity_client(_) ->
end,
with_per_client(Fun, Case).
t_try_restore_agg(_) ->
t_try_restore_with_bucket(_) ->
Fun = fun(#{client := Cli} = Bucket) ->
Bucket2 = Bucket#{
rate := 1,
burst := 199,
rate := 100,
burst := 100,
initial := 50
},
Cli2 = Cli#{
@ -394,38 +395,6 @@ t_burst(_) ->
Case
).
t_limit_global_with_unlimit_other(_) ->
GlobalMod = fun(#{message_routing := MR} = Cfg) ->
Cfg#{message_routing := MR#{rate := ?RATE("600/1s")}}
end,
Bucket = fun(#{client := Cli} = Bucket) ->
Bucket2 = Bucket#{
rate := infinity,
initial := 0,
burst := 0
},
Cli2 = Cli#{
rate := infinity,
burst := 0,
initial := 0
},
Bucket2#{client := Cli2}
end,
Case = fun() ->
C1 = counters:new(1, []),
start_client({b1, Bucket}, ?NOW + 2000, C1, 20),
timer:sleep(2200),
check_average_rate(C1, 2, 600)
end,
with_global(
GlobalMod,
[{b1, Bucket}],
Case
).
%%--------------------------------------------------------------------
%% Test Cases container
%%--------------------------------------------------------------------
@ -454,38 +423,6 @@ t_check_container(_) ->
end,
with_per_client(Cfg, Case).
%%--------------------------------------------------------------------
%% Test Override
%%--------------------------------------------------------------------
t_bucket_no_client(_) ->
Rate = ?RATE("1/s"),
GlobalMod = fun(#{client := #{message_routing := MR} = Client} = Cfg) ->
Cfg#{client := Client#{message_routing := MR#{rate := Rate}}}
end,
BucketMod = fun(Bucket) ->
maps:remove(client, Bucket)
end,
Case = fun() ->
Limiter = connect(BucketMod(make_limiter_cfg())),
?assertMatch(#{rate := Rate}, Limiter)
end,
with_global(GlobalMod, [BucketMod], Case).
t_bucket_client(_) ->
GlobalRate = ?RATE("1/s"),
BucketRate = ?RATE("10/s"),
GlobalMod = fun(#{client := #{message_routing := MR} = Client} = Cfg) ->
Cfg#{client := Client#{message_routing := MR#{rate := GlobalRate}}}
end,
BucketMod = fun(#{client := Client} = Bucket) ->
Bucket#{client := Client#{rate := BucketRate}}
end,
Case = fun() ->
Limiter = connect(BucketMod(make_limiter_cfg())),
?assertMatch(#{rate := BucketRate}, Limiter)
end,
with_global(GlobalMod, [BucketMod], Case).
%%--------------------------------------------------------------------
%% Test Cases misc
%%--------------------------------------------------------------------
@ -574,7 +511,7 @@ t_schema_unit(_) ->
?assertEqual({ok, 100 * 1024 * 1024 * 1024}, M:to_capacity("100GB")),
ok.
compatibility_for_capacity(_) ->
t_compatibility_for_capacity(_) ->
CfgStr = <<
""
"\n"
@ -594,7 +531,7 @@ compatibility_for_capacity(_) ->
parse_and_check(CfgStr)
).
compatibility_for_message_in(_) ->
t_compatibility_for_message_in(_) ->
CfgStr = <<
""
"\n"
@ -614,7 +551,7 @@ compatibility_for_message_in(_) ->
parse_and_check(CfgStr)
).
compatibility_for_bytes_in(_) ->
t_compatibility_for_bytes_in(_) ->
CfgStr = <<
""
"\n"
@ -634,6 +571,174 @@ compatibility_for_bytes_in(_) ->
parse_and_check(CfgStr)
).
t_extract_with_type(_) ->
IsOnly = fun
(_Key, Cfg) when map_size(Cfg) =/= 1 ->
false;
(Key, Cfg) ->
maps:is_key(Key, Cfg)
end,
Checker = fun
(Type, #{client := Client} = Cfg) ->
Cfg2 = maps:remove(client, Cfg),
IsOnly(Type, Client) andalso
(IsOnly(Type, Cfg2) orelse
map_size(Cfg2) =:= 0);
(Type, Cfg) ->
IsOnly(Type, Cfg)
end,
?assertEqual(undefined, emqx_limiter_schema:extract_with_type(messages, undefined)),
?assert(
Checker(
messages,
emqx_limiter_schema:extract_with_type(messages, #{
messages => #{rate => 1}, bytes => #{rate => 1}
})
)
),
?assert(
Checker(
messages,
emqx_limiter_schema:extract_with_type(messages, #{
messages => #{rate => 1},
bytes => #{rate => 1},
client => #{messages => #{rate => 2}}
})
)
),
?assert(
Checker(
messages,
emqx_limiter_schema:extract_with_type(messages, #{
client => #{messages => #{rate => 2}, bytes => #{rate => 1}}
})
)
).
%%--------------------------------------------------------------------
%% Test Cases Create Instance
%%--------------------------------------------------------------------
t_create_instance_with_infinity_node(_) ->
emqx_limiter_manager:insert_bucket(?FUNCTION_NAME, bytes, ?FUNCTION_NAME),
Cases = make_create_test_data_with_infinity_node(?FUNCTION_NAME),
lists:foreach(
fun({Cfg, Expected}) ->
{ok, Result} = emqx_limiter_server:connect(?FUNCTION_NAME, bytes, Cfg),
IsMatched =
case is_atom(Expected) of
true ->
Result =:= Expected;
_ ->
Expected(Result)
end,
?assert(
IsMatched,
lists:flatten(
io_lib:format("Got unexpected:~p~n, Cfg:~p~n", [
Result, Cfg
])
)
)
end,
Cases
),
emqx_limiter_manager:delete_bucket(?FUNCTION_NAME, bytes),
ok.
t_not_exists_instance(_) ->
Cfg = #{bytes => #{rate => 100, burst => 0, initial => 0}},
?assertEqual(
{error, invalid_bucket},
emqx_limiter_server:connect(?FUNCTION_NAME, bytes, Cfg)
),
?assertEqual(
{error, invalid_bucket},
emqx_limiter_server:connect(?FUNCTION_NAME, not_exists, Cfg)
),
ok.
t_create_instance_with_node(_) ->
GlobalMod = fun(#{message_routing := MR} = Cfg) ->
Cfg#{
message_routing := MR#{rate := ?RATE("200/1s")},
messages := MR#{rate := ?RATE("200/1s")}
}
end,
B1 = fun(Bucket) ->
Bucket#{rate := ?RATE("400/1s")}
end,
B2 = fun(Bucket) ->
Bucket#{rate := infinity}
end,
IsRefLimiter = fun
({ok, #{tokens := _}}, _IsRoot) ->
false;
({ok, #{bucket := #{index := ?ROOT_COUNTER_IDX}}}, true) ->
true;
({ok, #{bucket := #{index := Index}}}, false) when Index =/= ?ROOT_COUNTER_IDX ->
true;
(Result, _IsRoot) ->
ct:pal("The result is:~p~n", [Result]),
false
end,
Case = fun() ->
BucketCfg = make_limiter_cfg(),
?assert(
IsRefLimiter(emqx_limiter_server:connect(b1, message_routing, B1(BucketCfg)), false)
),
?assert(
IsRefLimiter(emqx_limiter_server:connect(b2, message_routing, B2(BucketCfg)), true)
),
?assert(IsRefLimiter(emqx_limiter_server:connect(x, messages, undefined), true)),
?assertNot(IsRefLimiter(emqx_limiter_server:connect(x, bytes, undefined), false))
end,
with_global(
GlobalMod,
[{b1, B1}, {b2, B2}],
Case
),
ok.
%%--------------------------------------------------------------------
%% Test Cases emqx_esockd_htb_limiter
%%--------------------------------------------------------------------
t_create_esockd_htb_limiter(_) ->
Opts = emqx_esockd_htb_limiter:new_create_options(?FUNCTION_NAME, bytes, undefined),
?assertMatch(
#{module := _, id := ?FUNCTION_NAME, type := bytes, bucket := undefined},
Opts
),
Limiter = emqx_esockd_htb_limiter:create(Opts),
?assertMatch(
#{module := _, name := bytes, limiter := infinity},
Limiter
),
?assertEqual(ok, emqx_esockd_htb_limiter:delete(Limiter)),
ok.
t_esockd_htb_consume(_) ->
ClientCfg = emqx_limiter_schema:default_client_config(),
Cfg = #{client => #{bytes => ClientCfg#{rate := 50, max_retry_time := 0}}},
Opts = emqx_esockd_htb_limiter:new_create_options(?FUNCTION_NAME, bytes, Cfg),
Limiter = emqx_esockd_htb_limiter:create(Opts),
C1R = emqx_esockd_htb_limiter:consume(51, Limiter),
?assertMatch({pause, _Ms, _Limiter2}, C1R),
timer:sleep(300),
C2R = emqx_esockd_htb_limiter:consume(50, Limiter),
?assertMatch({ok, _}, C2R),
ok.
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
@ -877,3 +982,64 @@ apply_modifier(Pairs, #{default := Template}) ->
parse_and_check(ConfigString) ->
ok = emqx_common_test_helpers:load_config(emqx_schema, ConfigString),
emqx:get_config([listeners, tcp, default, limiter]).
make_create_test_data_with_infinity_node(FakeInstnace) ->
Infinity = emqx_htb_limiter:make_infinity_limiter(),
ClientCfg = emqx_limiter_schema:default_client_config(),
InfinityRef = emqx_limiter_bucket_ref:infinity_bucket(),
MkC = fun(Rate) ->
#{client => #{bytes => ClientCfg#{rate := Rate}}}
end,
MkB = fun(Rate) ->
#{bytes => #{rate => Rate, burst => 0, initial => 0}}
end,
MkA = fun(Client, Bucket) ->
maps:merge(MkC(Client), MkB(Bucket))
end,
IsRefLimiter = fun(Expected) ->
fun
(#{tokens := _}) -> false;
(#{bucket := Bucket}) -> Bucket =:= Expected;
(_) -> false
end
end,
IsTokenLimiter = fun(Expected) ->
fun
(#{tokens := _, bucket := Bucket}) -> Bucket =:= Expected;
(_) -> false
end
end,
[
%% default situation, no limiter setting
{undefined, Infinity},
%% client = undefined bucket = undefined
{#{}, Infinity},
%% client = undefined bucket = infinity
{MkB(infinity), Infinity},
%% client = undefined bucket = other
{MkB(100), IsRefLimiter(FakeInstnace)},
%% client = infinity bucket = undefined
{MkC(infinity), Infinity},
%% client = infinity bucket = infinity
{MkA(infinity, infinity), Infinity},
%% client = infinity bucket = other
{MkA(infinity, 100), IsRefLimiter(FakeInstnace)},
%% client = other bucket = undefined
{MkC(100), IsTokenLimiter(InfinityRef)},
%% client = other bucket = infinity
{MkC(100), IsTokenLimiter(InfinityRef)},
%% client = C bucket = B C < B
{MkA(100, 1000), IsTokenLimiter(FakeInstnace)},
%% client = C bucket = B C > B
{MkA(1000, 100), IsRefLimiter(FakeInstnace)}
].

View File

@ -0,0 +1,3 @@
Improve the configuration of the limiter.
- Simplify the memory representation of the limiter configuration.
- Make sure the node-level limiter can really work when the listener's limiter configuration is omitted.