diff --git a/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl b/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl index 53f26deb5..bcd4166af 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl @@ -22,7 +22,7 @@ %% API -export([ - make_token_bucket_limiter/2, + make_local_limiter/2, make_ref_limiter/2, check/2, consume/2, @@ -32,12 +32,11 @@ make_future/1, available/1 ]). --export_type([token_bucket_limiter/0]). +-export_type([local_limiter/0]). -%% a token bucket limiter with a limiter server's bucket reference - -%% the number of tokens currently available --type token_bucket_limiter() :: #{ +%% a token bucket limiter which may or not contains a reference to another limiter, +%% and can be used in a client alone +-type local_limiter() :: #{ tokens := non_neg_integer(), rate := decimal(), capacity := decimal(), @@ -58,12 +57,12 @@ retry_ctx => undefined %% the retry context - | retry_context(token_bucket_limiter()), + | retry_context(local_limiter()), %% allow to add other keys atom => any() }. -%% a limiter server's bucket reference +%% a limiter instance which only contains a reference to another limiter(bucket) -type ref_limiter() :: #{ max_retry_time := non_neg_integer(), failure_strategy := failure_strategy(), @@ -88,7 +87,7 @@ }. -type bucket() :: emqx_limiter_bucket_ref:bucket_ref(). --type limiter() :: token_bucket_limiter() | ref_limiter() | infinity. +-type limiter() :: local_limiter() | ref_limiter() | infinity. -type millisecond() :: non_neg_integer(). -type pause_type() :: pause | partial. @@ -116,7 +115,7 @@ rate := decimal(), initial := non_neg_integer(), low_watermark := non_neg_integer(), - capacity := decimal(), + burst := decimal(), divisible := boolean(), max_retry_time := non_neg_integer(), failure_strategy := failure_strategy() @@ -134,8 +133,8 @@ %% API %%-------------------------------------------------------------------- %%@doc create a limiter --spec make_token_bucket_limiter(limiter_bucket_cfg(), bucket()) -> _. -make_token_bucket_limiter(Cfg, Bucket) -> +-spec make_local_limiter(limiter_bucket_cfg(), bucket()) -> _. +make_local_limiter(Cfg, Bucket) -> Cfg#{ tokens => emqx_limiter_server:get_initial_val(Cfg), lasttime => ?NOW, @@ -312,8 +311,8 @@ on_failure(throw, Limiter) -> Message = io_lib:format("limiter consume failed, limiter:~p~n", [Limiter]), erlang:throw({rate_check_fail, Message}). --spec do_check_with_parent_limiter(pos_integer(), token_bucket_limiter()) -> - inner_check_result(token_bucket_limiter()). +-spec do_check_with_parent_limiter(pos_integer(), local_limiter()) -> + inner_check_result(local_limiter()). do_check_with_parent_limiter( Need, #{ @@ -336,7 +335,7 @@ do_check_with_parent_limiter( ) end. --spec do_reset(pos_integer(), token_bucket_limiter()) -> inner_check_result(token_bucket_limiter()). +-spec do_reset(pos_integer(), local_limiter()) -> inner_check_result(local_limiter()). do_reset( Need, #{ diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl index 297bdffb0..40061e0b9 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl @@ -30,6 +30,12 @@ post_config_update/5 ]). +-export([ + find_root/1, + insert_root/2, + delete_root/1 +]). + -export([ start_server/1, start_server/2, @@ -62,6 +68,7 @@ -define(UID(Id, Type), {Id, Type}). -define(TAB, emqx_limiter_counters). +-define(ROOT_ID, root). %%-------------------------------------------------------------------- %% API @@ -104,9 +111,25 @@ insert_bucket(Id, Type, Bucket) -> ). -spec delete_bucket(limiter_id(), limiter_type()) -> true. -delete_bucket(Type, Id) -> +delete_bucket(Id, Type) -> ets:delete(?TAB, ?UID(Id, Type)). +-spec find_root(limiter_type()) -> + {ok, bucket_ref()} | undefined. +find_root(Type) -> + find_bucket(?ROOT_ID, Type). + +-spec insert_root( + limiter_type(), + bucket_ref() +) -> boolean(). +insert_root(Type, Bucket) -> + insert_bucket(?ROOT_ID, Type, Bucket). + +-spec delete_root(limiter_type()) -> true. +delete_root(Type) -> + delete_bucket(?ROOT_ID, Type). + post_config_update([limiter], _Config, NewConf, _OldConf, _AppEnvs) -> Types = lists:delete(client, maps:keys(NewConf)), _ = [on_post_config_update(Type, NewConf) || Type <- Types], diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl index ae8529470..40b23415c 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl @@ -32,7 +32,9 @@ get_bucket_cfg_path/2, desc/1, types/0, - calc_capacity/1 + calc_capacity/1, + extract_with_type/2, + default_client_config/0 ]). -define(KILOBYTE, 1024). @@ -94,30 +96,33 @@ namespace() -> limiter. roots() -> - [{limiter, hoconsc:mk(hoconsc:ref(?MODULE, limiter), #{importance => ?IMPORTANCE_HIDDEN})}]. + [ + {limiter, + hoconsc:mk(hoconsc:ref(?MODULE, limiter), #{ + importance => ?IMPORTANCE_HIDDEN + })} + ]. fields(limiter) -> [ {Type, ?HOCON(?R_REF(node_opts), #{ desc => ?DESC(Type), - default => #{}, importance => ?IMPORTANCE_HIDDEN, aliases => alias_of_type(Type) })} || Type <- types() ] ++ [ + %% This is an undocumented feature, and it won't be support anymore {client, ?HOCON( ?R_REF(client_fields), #{ desc => ?DESC(client), importance => ?IMPORTANCE_HIDDEN, - default => maps:from_list([ - {erlang:atom_to_binary(Type), #{}} - || Type <- types() - ]) + required => {false, recursively}, + deprecated => {since, "5.0.25"} } )} ]; @@ -131,7 +136,7 @@ fields(node_opts) -> })} ]; fields(client_fields) -> - client_fields(types(), #{default => #{}}); + client_fields(types()); fields(bucket_opts) -> fields_of_bucket(<<"infinity">>); fields(client_opts) -> @@ -194,7 +199,7 @@ fields(client_opts) -> fields(listener_fields) -> composite_bucket_fields(?LISTENER_BUCKET_KEYS, listener_client_fields); fields(listener_client_fields) -> - client_fields(?LISTENER_BUCKET_KEYS, #{required => false}); + client_fields(?LISTENER_BUCKET_KEYS); fields(Type) -> simple_bucket_field(Type). @@ -236,6 +241,31 @@ calc_capacity(#{rate := infinity}) -> calc_capacity(#{rate := Rate, burst := Burst}) -> erlang:floor(1000 * Rate / default_period()) + Burst. +extract_with_type(_Type, undefined) -> + undefined; +extract_with_type(Type, #{client := ClientCfg} = BucketCfg) -> + BucketVal = maps:find(Type, BucketCfg), + ClientVal = maps:find(Type, ClientCfg), + merge_client_bucket(Type, ClientVal, BucketVal); +extract_with_type(Type, BucketCfg) -> + BucketVal = maps:find(Type, BucketCfg), + merge_client_bucket(Type, undefined, BucketVal). + +%% Since the client configuration can be absent and be a undefined value, +%% but we must need some basic settings to control the behaviour of the limiter, +%% so here add this helper function to generate a default setting. +%% This is a temporary workaround until we found a better way to simplify. +default_client_config() -> + #{ + rate => infinity, + initial => 0, + low_watermark => 0, + burst => 0, + divisible => false, + max_retry_time => timer:seconds(10), + failure_strategy => force + }. + %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- @@ -362,7 +392,7 @@ simple_bucket_field(Type) when is_atom(Type) -> ?R_REF(?MODULE, client_opts), #{ desc => ?DESC(client), - required => false, + required => {false, recursively}, importance => importance_of_type(Type), aliases => alias_of_type(Type) } @@ -375,7 +405,7 @@ composite_bucket_fields(Types, ClientRef) -> {Type, ?HOCON(?R_REF(?MODULE, bucket_opts), #{ desc => ?DESC(?MODULE, Type), - required => false, + required => {false, recursively}, importance => importance_of_type(Type), aliases => alias_of_type(Type) })} @@ -387,7 +417,7 @@ composite_bucket_fields(Types, ClientRef) -> ?R_REF(?MODULE, ClientRef), #{ desc => ?DESC(client), - required => false + required => {false, recursively} } )} ]. @@ -410,11 +440,12 @@ fields_of_bucket(Default) -> })} ]. -client_fields(Types, Meta) -> +client_fields(Types) -> [ {Type, - ?HOCON(?R_REF(client_opts), Meta#{ + ?HOCON(?R_REF(client_opts), #{ desc => ?DESC(Type), + required => false, importance => importance_of_type(Type), aliases => alias_of_type(Type) })} @@ -436,3 +467,12 @@ alias_of_type(bytes) -> [bytes_in]; alias_of_type(_) -> []. + +merge_client_bucket(Type, {ok, ClientVal}, {ok, BucketVal}) -> + #{Type => BucketVal, client => #{Type => ClientVal}}; +merge_client_bucket(Type, {ok, ClientVal}, _) -> + #{client => #{Type => ClientVal}}; +merge_client_bucket(Type, _, {ok, BucketVal}) -> + #{Type => BucketVal}; +merge_client_bucket(_, _, _) -> + undefined. diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl index 58db66f82..2867283d6 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl @@ -59,7 +59,8 @@ burst := rate(), %% token generation interval(second) period := pos_integer(), - produced := float() + produced := float(), + correction := emqx_limiter_decimal:zero_or_float() }. -type bucket() :: #{ @@ -98,6 +99,7 @@ %% minimum coefficient for overloaded limiter -define(OVERLOAD_MIN_ALLOC, 0.3). -define(COUNTER_SIZE, 8). +-define(ROOT_COUNTER_IDX, 1). -export_type([index/0]). -import(emqx_limiter_decimal, [add/2, sub/2, mul/2, put_to_counter/3]). @@ -110,47 +112,24 @@ -spec connect( limiter_id(), limiter_type(), - bucket_name() | #{limiter_type() => bucket_name() | undefined} + hocons:config() | undefined ) -> {ok, emqx_htb_limiter:limiter()} | {error, _}. -%% If no bucket path is set in config, there will be no limit -connect(_Id, _Type, undefined) -> - {ok, emqx_htb_limiter:make_infinity_limiter()}; +%% undefined is the default situation, no limiter setting by default +connect(Id, Type, undefined) -> + create_limiter(Id, Type, undefined, undefined); +connect(Id, Type, #{rate := _} = Cfg) -> + create_limiter(Id, Type, maps:get(client, Cfg, undefined), Cfg); connect(Id, Type, Cfg) -> - case find_limiter_cfg(Type, Cfg) of - {_ClientCfg, undefined, _NodeCfg} -> - {ok, emqx_htb_limiter:make_infinity_limiter()}; - {#{rate := infinity}, #{rate := infinity}, #{rate := infinity}} -> - {ok, emqx_htb_limiter:make_infinity_limiter()}; - {ClientCfg, #{rate := infinity}, #{rate := infinity}} -> - {ok, - emqx_htb_limiter:make_token_bucket_limiter( - ClientCfg, emqx_limiter_bucket_ref:infinity_bucket() - )}; - { - #{rate := CliRate} = ClientCfg, - #{rate := BucketRate} = BucketCfg, - _ - } -> - case emqx_limiter_manager:find_bucket(Id, Type) of - {ok, Bucket} -> - BucketSize = emqx_limiter_schema:calc_capacity(BucketCfg), - CliSize = emqx_limiter_schema:calc_capacity(ClientCfg), - {ok, - if - CliRate < BucketRate orelse CliSize < BucketSize -> - emqx_htb_limiter:make_token_bucket_limiter(ClientCfg, Bucket); - true -> - emqx_htb_limiter:make_ref_limiter(ClientCfg, Bucket) - end}; - undefined -> - ?SLOG(error, #{msg => "bucket_not_found", type => Type, id => Id}), - {error, invalid_bucket} - end - end. + create_limiter( + Id, + Type, + emqx_utils_maps:deep_get([client, Type], Cfg, undefined), + maps:get(Type, Cfg, undefined) + ). -spec add_bucket(limiter_id(), limiter_type(), hocons:config() | undefined) -> ok. -add_bucket(_Id, _Type, undefine) -> +add_bucket(_Id, _Type, undefined) -> ok; add_bucket(Id, Type, Cfg) -> ?CALL(Type, {add_bucket, Id, Cfg}). @@ -288,7 +267,8 @@ handle_info(Info, State) -> Reason :: normal | shutdown | {shutdown, term()} | term(), State :: term() ) -> any(). -terminate(_Reason, _State) -> +terminate(_Reason, #{type := Type}) -> + emqx_limiter_manager:delete_root(Type), ok. %%-------------------------------------------------------------------- @@ -343,10 +323,14 @@ oscillation( oscillate(Interval), Ordereds = get_ordered_buckets(Buckets), {Alloced, Buckets2} = transverse(Ordereds, Flow, 0.0, Buckets), - maybe_burst(State#{ - buckets := Buckets2, - root := Root#{produced := Produced + Alloced} - }). + State2 = maybe_adjust_root_tokens( + State#{ + buckets := Buckets2, + root := Root#{produced := Produced + Alloced} + }, + Alloced + ), + maybe_burst(State2). %% @doc horizontal spread -spec transverse( @@ -419,6 +403,24 @@ get_ordered_buckets(Buckets) -> Buckets ). +-spec maybe_adjust_root_tokens(state(), float()) -> state(). +maybe_adjust_root_tokens(#{root := #{rate := infinity}} = State, _Alloced) -> + State; +maybe_adjust_root_tokens(#{root := #{rate := Rate}} = State, Alloced) when Alloced >= Rate -> + State; +maybe_adjust_root_tokens(#{root := #{rate := Rate} = Root, counter := Counter} = State, Alloced) -> + InFlow = Rate - Alloced, + Token = counters:get(Counter, ?ROOT_COUNTER_IDX), + case Token >= Rate of + true -> + State; + _ -> + Available = erlang:min(Rate - Token, InFlow), + {Inc, Root2} = emqx_limiter_correction:add(Available, Root), + counters:add(Counter, ?ROOT_COUNTER_IDX, Inc), + State#{root := Root2} + end. + -spec maybe_burst(state()) -> state(). maybe_burst( #{ @@ -482,12 +484,16 @@ init_tree(Type) when is_atom(Type) -> Cfg = emqx:get_config([limiter, Type]), init_tree(Type, Cfg). -init_tree(Type, Cfg) -> +init_tree(Type, #{rate := Rate} = Cfg) -> + Counter = counters:new(?COUNTER_SIZE, [write_concurrency]), + RootBucket = emqx_limiter_bucket_ref:new(Counter, ?ROOT_COUNTER_IDX, Rate), + emqx_limiter_manager:insert_root(Type, RootBucket), #{ type => Type, root => make_root(Cfg), - counter => counters:new(?COUNTER_SIZE, [write_concurrency]), - index => 0, + counter => Counter, + %% The first slot is reserved for the root + index => ?ROOT_COUNTER_IDX, buckets => #{} }. @@ -497,7 +503,8 @@ make_root(#{rate := Rate, burst := Burst}) -> rate => Rate, burst => Burst, period => emqx_limiter_schema:default_period(), - produced => 0.0 + produced => 0.0, + correction => 0 }. do_add_bucket(_Id, #{rate := infinity}, #{root := #{rate := infinity}} = State) -> @@ -571,25 +578,61 @@ call(Type, Msg) -> gen_server:call(Pid, Msg) end. -find_limiter_cfg(Type, #{rate := _} = Cfg) -> - {find_client_cfg(Type, maps:get(client, Cfg, undefined)), Cfg, find_node_cfg(Type)}; -find_limiter_cfg(Type, Cfg) -> - { - find_client_cfg(Type, emqx_utils_maps:deep_get([client, Type], Cfg, undefined)), - maps:get(Type, Cfg, undefined), - find_node_cfg(Type) - }. +create_limiter(Id, Type, #{rate := Rate} = ClientCfg, BucketCfg) when Rate =/= infinity -> + create_limiter_with_client(Id, Type, ClientCfg, BucketCfg); +create_limiter(Id, Type, _, BucketCfg) -> + create_limiter_without_client(Id, Type, BucketCfg). -find_client_cfg(Type, BucketCfg) -> - NodeCfg = emqx:get_config([limiter, client, Type], undefined), - merge_client_cfg(NodeCfg, BucketCfg). +%% create a limiter with the client-level configuration +create_limiter_with_client(Id, Type, ClientCfg, BucketCfg) -> + case find_referenced_bucket(Id, Type, BucketCfg) of + false -> + {ok, emqx_htb_limiter:make_local_limiter(ClientCfg, infinity)}; + {ok, Bucket, RefCfg} -> + create_limiter_with_ref(Bucket, ClientCfg, RefCfg); + Error -> + Error + end. -merge_client_cfg(undefined, BucketCfg) -> - BucketCfg; -merge_client_cfg(NodeCfg, undefined) -> - NodeCfg; -merge_client_cfg(NodeCfg, BucketCfg) -> - maps:merge(NodeCfg, BucketCfg). +%% create a limiter only with the referenced configuration +create_limiter_without_client(Id, Type, BucketCfg) -> + case find_referenced_bucket(Id, Type, BucketCfg) of + false -> + {ok, emqx_htb_limiter:make_infinity_limiter()}; + {ok, Bucket, RefCfg} -> + ClientCfg = emqx_limiter_schema:default_client_config(), + create_limiter_with_ref(Bucket, ClientCfg, RefCfg); + Error -> + Error + end. -find_node_cfg(Type) -> - emqx:get_config([limiter, Type], #{rate => infinity, burst => 0}). +create_limiter_with_ref( + Bucket, + #{rate := CliRate} = ClientCfg, + #{rate := RefRate} +) when CliRate < RefRate -> + {ok, emqx_htb_limiter:make_local_limiter(ClientCfg, Bucket)}; +create_limiter_with_ref(Bucket, ClientCfg, _) -> + {ok, emqx_htb_limiter:make_ref_limiter(ClientCfg, Bucket)}. + +%% this is a listener(server)-level reference +find_referenced_bucket(Id, Type, #{rate := Rate} = Cfg) when Rate =/= infinity -> + case emqx_limiter_manager:find_bucket(Id, Type) of + {ok, Bucket} -> + {ok, Bucket, Cfg}; + _ -> + ?SLOG(error, #{msg => "bucket not found", type => Type, id => Id}), + {error, invalid_bucket} + end; +%% this is a node-level reference +find_referenced_bucket(Id, Type, _) -> + case emqx:get_config([limiter, Type], undefined) of + #{rate := infinity} -> + false; + undefined -> + ?SLOG(error, #{msg => "invalid limiter type", type => Type, id => Id}), + {error, invalid_bucket}; + NodeCfg -> + {ok, Bucket} = emqx_limiter_manager:find_root(Type), + {ok, Bucket, NodeCfg} + end. diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index b3043effc..99ab52f61 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -494,7 +494,7 @@ esockd_opts(ListenerId, Type, Opts0) -> Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0), Limiter = limiter(Opts0), Opts2 = - case maps:get(connection, Limiter, undefined) of + case emqx_limiter_schema:extract_with_type(connection, Limiter) of undefined -> Opts1; BucketCfg -> @@ -639,7 +639,7 @@ zone(Opts) -> maps:get(zone, Opts, undefined). limiter(Opts) -> - maps:get(limiter, Opts, #{}). + maps:get(limiter, Opts, undefined). add_limiter_bucket(Id, #{limiter := Limiter}) -> maps:fold( diff --git a/apps/emqx/test/emqx_ratelimiter_SUITE.erl b/apps/emqx/test/emqx_ratelimiter_SUITE.erl index 26048873e..67ed8e6bc 100644 --- a/apps/emqx/test/emqx_ratelimiter_SUITE.erl +++ b/apps/emqx/test/emqx_ratelimiter_SUITE.erl @@ -38,6 +38,7 @@ -define(LOGT(Format, Args), ct:pal("TEST_SUITE: " ++ Format, Args)). -define(RATE(Rate), to_rate(Rate)). -define(NOW, erlang:system_time(millisecond)). +-define(ROOT_COUNTER_IDX, 1). %%-------------------------------------------------------------------- %% Setups @@ -211,11 +212,11 @@ t_infinity_client(_) -> end, with_per_client(Fun, Case). -t_try_restore_agg(_) -> +t_try_restore_with_bucket(_) -> Fun = fun(#{client := Cli} = Bucket) -> Bucket2 = Bucket#{ - rate := 1, - burst := 199, + rate := 100, + burst := 100, initial := 50 }, Cli2 = Cli#{ @@ -394,38 +395,6 @@ t_burst(_) -> Case ). -t_limit_global_with_unlimit_other(_) -> - GlobalMod = fun(#{message_routing := MR} = Cfg) -> - Cfg#{message_routing := MR#{rate := ?RATE("600/1s")}} - end, - - Bucket = fun(#{client := Cli} = Bucket) -> - Bucket2 = Bucket#{ - rate := infinity, - initial := 0, - burst := 0 - }, - Cli2 = Cli#{ - rate := infinity, - burst := 0, - initial := 0 - }, - Bucket2#{client := Cli2} - end, - - Case = fun() -> - C1 = counters:new(1, []), - start_client({b1, Bucket}, ?NOW + 2000, C1, 20), - timer:sleep(2200), - check_average_rate(C1, 2, 600) - end, - - with_global( - GlobalMod, - [{b1, Bucket}], - Case - ). - %%-------------------------------------------------------------------- %% Test Cases container %%-------------------------------------------------------------------- @@ -454,38 +423,6 @@ t_check_container(_) -> end, with_per_client(Cfg, Case). -%%-------------------------------------------------------------------- -%% Test Override -%%-------------------------------------------------------------------- -t_bucket_no_client(_) -> - Rate = ?RATE("1/s"), - GlobalMod = fun(#{client := #{message_routing := MR} = Client} = Cfg) -> - Cfg#{client := Client#{message_routing := MR#{rate := Rate}}} - end, - BucketMod = fun(Bucket) -> - maps:remove(client, Bucket) - end, - Case = fun() -> - Limiter = connect(BucketMod(make_limiter_cfg())), - ?assertMatch(#{rate := Rate}, Limiter) - end, - with_global(GlobalMod, [BucketMod], Case). - -t_bucket_client(_) -> - GlobalRate = ?RATE("1/s"), - BucketRate = ?RATE("10/s"), - GlobalMod = fun(#{client := #{message_routing := MR} = Client} = Cfg) -> - Cfg#{client := Client#{message_routing := MR#{rate := GlobalRate}}} - end, - BucketMod = fun(#{client := Client} = Bucket) -> - Bucket#{client := Client#{rate := BucketRate}} - end, - Case = fun() -> - Limiter = connect(BucketMod(make_limiter_cfg())), - ?assertMatch(#{rate := BucketRate}, Limiter) - end, - with_global(GlobalMod, [BucketMod], Case). - %%-------------------------------------------------------------------- %% Test Cases misc %%-------------------------------------------------------------------- @@ -574,7 +511,7 @@ t_schema_unit(_) -> ?assertEqual({ok, 100 * 1024 * 1024 * 1024}, M:to_capacity("100GB")), ok. -compatibility_for_capacity(_) -> +t_compatibility_for_capacity(_) -> CfgStr = << "" "\n" @@ -594,7 +531,7 @@ compatibility_for_capacity(_) -> parse_and_check(CfgStr) ). -compatibility_for_message_in(_) -> +t_compatibility_for_message_in(_) -> CfgStr = << "" "\n" @@ -614,7 +551,7 @@ compatibility_for_message_in(_) -> parse_and_check(CfgStr) ). -compatibility_for_bytes_in(_) -> +t_compatibility_for_bytes_in(_) -> CfgStr = << "" "\n" @@ -634,6 +571,174 @@ compatibility_for_bytes_in(_) -> parse_and_check(CfgStr) ). +t_extract_with_type(_) -> + IsOnly = fun + (_Key, Cfg) when map_size(Cfg) =/= 1 -> + false; + (Key, Cfg) -> + maps:is_key(Key, Cfg) + end, + Checker = fun + (Type, #{client := Client} = Cfg) -> + Cfg2 = maps:remove(client, Cfg), + IsOnly(Type, Client) andalso + (IsOnly(Type, Cfg2) orelse + map_size(Cfg2) =:= 0); + (Type, Cfg) -> + IsOnly(Type, Cfg) + end, + ?assertEqual(undefined, emqx_limiter_schema:extract_with_type(messages, undefined)), + ?assert( + Checker( + messages, + emqx_limiter_schema:extract_with_type(messages, #{ + messages => #{rate => 1}, bytes => #{rate => 1} + }) + ) + ), + ?assert( + Checker( + messages, + emqx_limiter_schema:extract_with_type(messages, #{ + messages => #{rate => 1}, + bytes => #{rate => 1}, + client => #{messages => #{rate => 2}} + }) + ) + ), + ?assert( + Checker( + messages, + emqx_limiter_schema:extract_with_type(messages, #{ + client => #{messages => #{rate => 2}, bytes => #{rate => 1}} + }) + ) + ). + +%%-------------------------------------------------------------------- +%% Test Cases Create Instance +%%-------------------------------------------------------------------- +t_create_instance_with_infinity_node(_) -> + emqx_limiter_manager:insert_bucket(?FUNCTION_NAME, bytes, ?FUNCTION_NAME), + Cases = make_create_test_data_with_infinity_node(?FUNCTION_NAME), + lists:foreach( + fun({Cfg, Expected}) -> + {ok, Result} = emqx_limiter_server:connect(?FUNCTION_NAME, bytes, Cfg), + IsMatched = + case is_atom(Expected) of + true -> + Result =:= Expected; + _ -> + Expected(Result) + end, + ?assert( + IsMatched, + lists:flatten( + io_lib:format("Got unexpected:~p~n, Cfg:~p~n", [ + Result, Cfg + ]) + ) + ) + end, + Cases + ), + emqx_limiter_manager:delete_bucket(?FUNCTION_NAME, bytes), + ok. + +t_not_exists_instance(_) -> + Cfg = #{bytes => #{rate => 100, burst => 0, initial => 0}}, + ?assertEqual( + {error, invalid_bucket}, + emqx_limiter_server:connect(?FUNCTION_NAME, bytes, Cfg) + ), + + ?assertEqual( + {error, invalid_bucket}, + emqx_limiter_server:connect(?FUNCTION_NAME, not_exists, Cfg) + ), + ok. + +t_create_instance_with_node(_) -> + GlobalMod = fun(#{message_routing := MR} = Cfg) -> + Cfg#{ + message_routing := MR#{rate := ?RATE("200/1s")}, + messages := MR#{rate := ?RATE("200/1s")} + } + end, + + B1 = fun(Bucket) -> + Bucket#{rate := ?RATE("400/1s")} + end, + + B2 = fun(Bucket) -> + Bucket#{rate := infinity} + end, + + IsRefLimiter = fun + ({ok, #{tokens := _}}, _IsRoot) -> + false; + ({ok, #{bucket := #{index := ?ROOT_COUNTER_IDX}}}, true) -> + true; + ({ok, #{bucket := #{index := Index}}}, false) when Index =/= ?ROOT_COUNTER_IDX -> + true; + (Result, _IsRoot) -> + ct:pal("The result is:~p~n", [Result]), + false + end, + + Case = fun() -> + BucketCfg = make_limiter_cfg(), + + ?assert( + IsRefLimiter(emqx_limiter_server:connect(b1, message_routing, B1(BucketCfg)), false) + ), + ?assert( + IsRefLimiter(emqx_limiter_server:connect(b2, message_routing, B2(BucketCfg)), true) + ), + ?assert(IsRefLimiter(emqx_limiter_server:connect(x, messages, undefined), true)), + ?assertNot(IsRefLimiter(emqx_limiter_server:connect(x, bytes, undefined), false)) + end, + + with_global( + GlobalMod, + [{b1, B1}, {b2, B2}], + Case + ), + ok. + +%%-------------------------------------------------------------------- +%% Test Cases emqx_esockd_htb_limiter +%%-------------------------------------------------------------------- +t_create_esockd_htb_limiter(_) -> + Opts = emqx_esockd_htb_limiter:new_create_options(?FUNCTION_NAME, bytes, undefined), + ?assertMatch( + #{module := _, id := ?FUNCTION_NAME, type := bytes, bucket := undefined}, + Opts + ), + + Limiter = emqx_esockd_htb_limiter:create(Opts), + ?assertMatch( + #{module := _, name := bytes, limiter := infinity}, + Limiter + ), + + ?assertEqual(ok, emqx_esockd_htb_limiter:delete(Limiter)), + ok. + +t_esockd_htb_consume(_) -> + ClientCfg = emqx_limiter_schema:default_client_config(), + Cfg = #{client => #{bytes => ClientCfg#{rate := 50, max_retry_time := 0}}}, + Opts = emqx_esockd_htb_limiter:new_create_options(?FUNCTION_NAME, bytes, Cfg), + Limiter = emqx_esockd_htb_limiter:create(Opts), + + C1R = emqx_esockd_htb_limiter:consume(51, Limiter), + ?assertMatch({pause, _Ms, _Limiter2}, C1R), + + timer:sleep(300), + C2R = emqx_esockd_htb_limiter:consume(50, Limiter), + ?assertMatch({ok, _}, C2R), + ok. + %%-------------------------------------------------------------------- %%% Internal functions %%-------------------------------------------------------------------- @@ -877,3 +982,64 @@ apply_modifier(Pairs, #{default := Template}) -> parse_and_check(ConfigString) -> ok = emqx_common_test_helpers:load_config(emqx_schema, ConfigString), emqx:get_config([listeners, tcp, default, limiter]). + +make_create_test_data_with_infinity_node(FakeInstnace) -> + Infinity = emqx_htb_limiter:make_infinity_limiter(), + ClientCfg = emqx_limiter_schema:default_client_config(), + InfinityRef = emqx_limiter_bucket_ref:infinity_bucket(), + MkC = fun(Rate) -> + #{client => #{bytes => ClientCfg#{rate := Rate}}} + end, + MkB = fun(Rate) -> + #{bytes => #{rate => Rate, burst => 0, initial => 0}} + end, + + MkA = fun(Client, Bucket) -> + maps:merge(MkC(Client), MkB(Bucket)) + end, + IsRefLimiter = fun(Expected) -> + fun + (#{tokens := _}) -> false; + (#{bucket := Bucket}) -> Bucket =:= Expected; + (_) -> false + end + end, + + IsTokenLimiter = fun(Expected) -> + fun + (#{tokens := _, bucket := Bucket}) -> Bucket =:= Expected; + (_) -> false + end + end, + + [ + %% default situation, no limiter setting + {undefined, Infinity}, + + %% client = undefined bucket = undefined + {#{}, Infinity}, + %% client = undefined bucket = infinity + {MkB(infinity), Infinity}, + %% client = undefined bucket = other + {MkB(100), IsRefLimiter(FakeInstnace)}, + + %% client = infinity bucket = undefined + {MkC(infinity), Infinity}, + %% client = infinity bucket = infinity + {MkA(infinity, infinity), Infinity}, + + %% client = infinity bucket = other + {MkA(infinity, 100), IsRefLimiter(FakeInstnace)}, + + %% client = other bucket = undefined + {MkC(100), IsTokenLimiter(InfinityRef)}, + + %% client = other bucket = infinity + {MkC(100), IsTokenLimiter(InfinityRef)}, + + %% client = C bucket = B C < B + {MkA(100, 1000), IsTokenLimiter(FakeInstnace)}, + + %% client = C bucket = B C > B + {MkA(1000, 100), IsRefLimiter(FakeInstnace)} + ]. diff --git a/changes/ce/perf-10591.en.md b/changes/ce/perf-10591.en.md new file mode 100644 index 000000000..2e14312d1 --- /dev/null +++ b/changes/ce/perf-10591.en.md @@ -0,0 +1,3 @@ +Improve the configuration of the limiter. +- Simplify the memory representation of the limiter configuration. +- Make sure the node-level limiter can really work when the listener's limiter configuration is omitted.