From ce46cb9216efe56f875db5b7ef1a2498e06ee771 Mon Sep 17 00:00:00 2001 From: firest Date: Fri, 22 Jul 2022 18:59:02 +0800 Subject: [PATCH] fix(limiter): fix test case error --- .../emqx_limiter/src/emqx_limiter_schema.erl | 5 +++- .../emqx_limiter/src/emqx_limiter_server.erl | 29 ++++++++++-------- apps/emqx/src/emqx_listeners.erl | 4 +-- apps/emqx/test/emqx_channel_SUITE.erl | 14 +++++---- apps/emqx/test/emqx_connection_SUITE.erl | 13 ++++---- apps/emqx/test/emqx_ratelimiter_SUITE.erl | 30 ++++++++++--------- apps/emqx/test/emqx_ws_connection_SUITE.erl | 29 ++++++++++-------- 7 files changed, 71 insertions(+), 53 deletions(-) diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl index f5e90a7e8..23d4d4e4c 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl @@ -102,7 +102,10 @@ fields(limiter) -> ?R_REF(client_fields), #{ desc => ?DESC(client), - default => #{} + default => maps:from_list([ + {erlang:atom_to_binary(Type), #{}} + || Type <- types() + ]) } )} ]; diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl index 66cafa7dc..c5e919296 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl @@ -124,19 +124,22 @@ connect(Id, Type, Cfg) -> #{ rate := BucketRate, capacity := BucketSize - } = BucketCfg, + }, #{rate := CliRate, capacity := CliSize} = ClientCfg } -> - {ok, - if - CliRate < BucketRate orelse CliSize < BucketSize -> - emqx_htb_limiter:make_token_bucket_limiter(ClientCfg, BucketCfg); - true -> - emqx_htb_limiter:make_ref_limiter(ClientCfg, BucketCfg) - end}; - undefined -> - ?SLOG(error, #{msg => "bucket_not_found", type => Type, id => Id}), - {error, invalid_bucket} + case emqx_limiter_manager:find_bucket(Id, Type) of + {ok, Bucket} -> + {ok, + if + CliRate < BucketRate orelse CliSize < BucketSize -> + emqx_htb_limiter:make_token_bucket_limiter(ClientCfg, Bucket); + true -> + emqx_htb_limiter:make_ref_limiter(ClientCfg, Bucket) + end}; + undefined -> + ?SLOG(error, #{msg => "bucket_not_found", type => Type, id => Id}), + {error, invalid_bucket} + end end. -spec add_bucket(limiter_id(), limiter_type(), hocons:config() | undefined) -> ok. @@ -523,7 +526,7 @@ make_bucket( _ = put_to_counter(Counter, NewIndex, Initial), Ref = emqx_limiter_bucket_ref:new(Counter, NewIndex, Rate), emqx_limiter_manager:insert_bucket(Id, Type, Ref), - State#{buckets := Buckets#{Id => Bucket}}. + State#{buckets := Buckets#{Id => Bucket}, index := NewIndex}. do_del_bucket(Id, #{type := Type, buckets := Buckets} = State) -> case maps:get(Id, Buckets, undefined) of @@ -568,7 +571,7 @@ find_limiter_cfg(Type, #{rate := _} = Cfg) -> {Cfg, find_client_cfg(Type, maps:get(client, Cfg, undefined))}; find_limiter_cfg(Type, Cfg) -> { - maps:get(Type, Cfg), + maps:get(Type, Cfg, undefined), find_client_cfg(Type, emqx_map_lib:deep_get([client, Type], Cfg, undefined)) }. diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index fdbd5350e..9ccc5f2df 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -538,13 +538,13 @@ zone(Opts) -> limiter(Opts) -> maps:get(limiter, Opts, #{}). -add_limiter_bucket(Id, #{limiter := Limiters}) -> +add_limiter_bucket(Id, #{limiter := Limiter}) -> maps:fold( fun(Type, Cfg, _) -> emqx_limiter_server:add_bucket(Id, Type, Cfg) end, ok, - Limiters + maps:without([client], Limiter) ); add_limiter_bucket(_Id, _Cfg) -> ok. diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index 03be7448c..df1720772 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -1205,9 +1205,7 @@ session(InitFields) when is_map(InitFields) -> quota() -> emqx_limiter_container:get_limiter_by_types(?MODULE, [message_routing], limiter_cfg()). -limiter_cfg() -> #{message_routing => make_limiter_cfg()}. - -make_limiter_cfg() -> +limiter_cfg() -> Client = #{ rate => 5, initial => 0, @@ -1217,10 +1215,16 @@ make_limiter_cfg() -> max_retry_time => timer:seconds(5), failure_strategy => force }, - #{client => Client, rate => 10, initial => 0, capacity => 10}. + #{ + message_routing => bucket_cfg(), + client => #{message_routing => Client} + }. + +bucket_cfg() -> + #{rate => 10, initial => 0, capacity => 10}. add_bucket() -> - emqx_limiter_server:add_bucket(?MODULE, message_routing, make_limiter_cfg()). + emqx_limiter_server:add_bucket(?MODULE, message_routing, bucket_cfg()). del_bucket() -> emqx_limiter_server:del_bucket(?MODULE, message_routing). diff --git a/apps/emqx/test/emqx_connection_SUITE.erl b/apps/emqx/test/emqx_connection_SUITE.erl index 141dbdad6..c5dfdf34a 100644 --- a/apps/emqx/test/emqx_connection_SUITE.erl +++ b/apps/emqx/test/emqx_connection_SUITE.erl @@ -708,11 +708,8 @@ init_limiter() -> emqx_limiter_container:get_limiter_by_types(?LIMITER_ID, [bytes_in, message_in], limiter_cfg()). limiter_cfg() -> - Cfg = make_limiter_cfg(), - #{bytes_in => Cfg, message_in => Cfg}. - -make_limiter_cfg() -> Infinity = emqx_limiter_schema:infinity_value(), + Cfg = bucket_cfg(), Client = #{ rate => Infinity, initial => 0, @@ -722,10 +719,14 @@ make_limiter_cfg() -> max_retry_time => timer:seconds(5), failure_strategy => force }, - #{client => Client, rate => Infinity, initial => 0, capacity => Infinity}. + #{bytes_in => Cfg, message_in => Cfg, client => #{bytes_in => Client, message_in => Client}}. + +bucket_cfg() -> + Infinity = emqx_limiter_schema:infinity_value(), + #{rate => Infinity, initial => 0, capacity => Infinity}. add_bucket() -> - Cfg = make_limiter_cfg(), + Cfg = bucket_cfg(), emqx_limiter_server:add_bucket(?LIMITER_ID, bytes_in, Cfg), emqx_limiter_server:add_bucket(?LIMITER_ID, message_in, Cfg). diff --git a/apps/emqx/test/emqx_ratelimiter_SUITE.erl b/apps/emqx/test/emqx_ratelimiter_SUITE.erl index e31a220e9..7efcbaa18 100644 --- a/apps/emqx/test/emqx_ratelimiter_SUITE.erl +++ b/apps/emqx/test/emqx_ratelimiter_SUITE.erl @@ -310,8 +310,8 @@ t_capacity(_) -> %% Test Cases Global Level %%-------------------------------------------------------------------- t_collaborative_alloc(_) -> - GlobalMod = fun(Cfg) -> - Cfg#{rate := ?RATE("600/1s")} + GlobalMod = fun(#{message_routing := MR} = Cfg) -> + Cfg#{message_routing := MR#{rate := ?RATE("600/1s")}} end, Bucket1 = fun(#{client := Cli} = Bucket) -> @@ -350,10 +350,12 @@ t_collaborative_alloc(_) -> ). t_burst(_) -> - GlobalMod = fun(Cfg) -> + GlobalMod = fun(#{message_routing := MR} = Cfg) -> Cfg#{ - rate := ?RATE("200/1s"), - burst := ?RATE("400/1s") + message_routing := MR#{ + rate := ?RATE("200/1s"), + burst := ?RATE("400/1s") + } } end, @@ -391,8 +393,8 @@ t_burst(_) -> ). t_limit_global_with_unlimit_other(_) -> - GlobalMod = fun(Cfg) -> - Cfg#{rate := ?RATE("600/1s")} + GlobalMod = fun(#{message_routing := MR} = Cfg) -> + Cfg#{message_routing := MR#{rate := ?RATE("600/1s")}} end, Bucket = fun(#{client := Cli} = Bucket) -> @@ -433,11 +435,11 @@ t_check_container(_) -> capacity := 1000 } end, - Case = fun(BucketCfg) -> + Case = fun(#{client := Client} = BucketCfg) -> C1 = emqx_limiter_container:get_limiter_by_types( ?MODULE, [message_routing], - #{message_routing => BucketCfg} + #{message_routing => BucketCfg, client => #{message_routing => Client}} ), {ok, C2} = emqx_limiter_container:check(1000, message_routing, C1), {pause, Pause, C3} = emqx_limiter_container:check(1000, message_routing, C2), @@ -455,8 +457,8 @@ t_check_container(_) -> %%-------------------------------------------------------------------- t_bucket_no_client(_) -> Rate = ?RATE("1/s"), - GlobalMod = fun(#{client := Client} = Cfg) -> - Cfg#{client := Client#{rate := Rate}} + GlobalMod = fun(#{client := #{message_routing := MR} = Client} = Cfg) -> + Cfg#{client := Client#{message_routing := MR#{rate := Rate}}} end, BucketMod = fun(Bucket) -> maps:remove(client, Bucket) @@ -470,8 +472,8 @@ t_bucket_no_client(_) -> t_bucket_client(_) -> GlobalRate = ?RATE("1/s"), BucketRate = ?RATE("10/s"), - GlobalMod = fun(#{client := Client} = Cfg) -> - Cfg#{client := Client#{rate := GlobalRate}} + GlobalMod = fun(#{client := #{message_routing := MR} = Client} = Cfg) -> + Cfg#{client := Client#{message_routing := MR#{rate := GlobalRate}}} end, BucketMod = fun(#{client := Client} = Bucket) -> Bucket#{client := Client#{rate := BucketRate}} @@ -682,7 +684,7 @@ to_rate(Str) -> Rate. with_global(Modifier, Buckets, Case) -> - with_config([limiter, message_routing], Modifier, Buckets, Case). + with_config([limiter], Modifier, Buckets, Case). with_bucket(Modifier, Case) -> Cfg = Modifier(make_limiter_cfg()), diff --git a/apps/emqx/test/emqx_ws_connection_SUITE.erl b/apps/emqx/test/emqx_ws_connection_SUITE.erl index 47591bf64..47efc1829 100644 --- a/apps/emqx/test/emqx_ws_connection_SUITE.erl +++ b/apps/emqx/test/emqx_ws_connection_SUITE.erl @@ -509,8 +509,9 @@ t_handle_timeout_emit_stats(_) -> t_ensure_rate_limit(_) -> {ok, Rate} = emqx_limiter_schema:to_rate("50MB"), Limiter = init_limiter(#{ - bytes_in => make_limiter_cfg(Rate), - message_in => make_limiter_cfg() + bytes_in => bucket_cfg(), + message_in => bucket_cfg(), + client => #{bytes_in => client_cfg(Rate)} }), St = st(#{limiter => Limiter}), @@ -698,28 +699,32 @@ init_limiter(LimiterCfg) -> emqx_limiter_container:get_limiter_by_types(?LIMITER_ID, [bytes_in, message_in], LimiterCfg). limiter_cfg() -> - Cfg = make_limiter_cfg(), - #{bytes_in => Cfg, message_in => Cfg}. + Cfg = bucket_cfg(), + Client = client_cfg(), + #{bytes_in => Cfg, message_in => Cfg, client => #{bytes_in => Client, message_in => Client}}. -make_limiter_cfg() -> +client_cfg() -> Infinity = emqx_limiter_schema:infinity_value(), - make_limiter_cfg(Infinity). + client_cfg(Infinity). -make_limiter_cfg(ClientRate) -> +client_cfg(Rate) -> Infinity = emqx_limiter_schema:infinity_value(), - Client = #{ - rate => ClientRate, + #{ + rate => Rate, initial => 0, capacity => Infinity, low_watermark => 1, divisible => false, max_retry_time => timer:seconds(5), failure_strategy => force - }, - #{client => Client, rate => Infinity, initial => 0, capacity => Infinity}. + }. + +bucket_cfg() -> + Infinity = emqx_limiter_schema:infinity_value(), + #{rate => Infinity, initial => 0, capacity => Infinity}. add_bucket() -> - Cfg = make_limiter_cfg(), + Cfg = bucket_cfg(), emqx_limiter_server:add_bucket(?LIMITER_ID, bytes_in, Cfg), emqx_limiter_server:add_bucket(?LIMITER_ID, message_in, Cfg).