From d28b34f0d159e0fee057d7fd7e050ec1fb74332a Mon Sep 17 00:00:00 2001 From: firest Date: Mon, 14 Mar 2022 18:15:50 +0800 Subject: [PATCH] fix(limiter): improve test case and fix some bugs --- .../src/emqx_limiter/src/emqx_htb_limiter.erl | 33 +- .../src/emqx_limiter/src/emqx_limiter_app.erl | 3 +- .../src/emqx_limiter_container.erl | 9 +- .../emqx_limiter/src/emqx_limiter_schema.erl | 4 +- .../emqx_limiter/src/emqx_limiter_server.erl | 43 +- .../src/emqx_limiter_server_sup.erl | 1 + apps/emqx/test/emqx_channel_SUITE.erl | 95 ++-- apps/emqx/test/emqx_ratelimiter_SUITE.erl | 431 +++++------------- apps/emqx/test/emqx_ws_connection_SUITE.erl | 20 +- .../src/emqx_dashboard_swagger.erl | 2 - .../src/emqx_retainer_dispatcher.erl | 7 +- .../test/emqx_retainer_SUITE.erl | 17 +- 12 files changed, 241 insertions(+), 424 deletions(-) diff --git a/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl b/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl index 74cd62833..372944666 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl @@ -22,7 +22,7 @@ %% API -export([ make_token_bucket_limiter/2, make_ref_limiter/2, check/2 - , consume/2, set_retry/2, retry/1, make_infinity_limiter/1 + , consume/2, set_retry/2, retry/1, make_infinity_limiter/0 , make_future/1, available/1 ]). -export_type([token_bucket_limiter/0]). @@ -108,6 +108,8 @@ -import(emqx_limiter_decimal, [sub/2, mul/2, floor_div/2, add/2]). +-elvis([{elvis_style, no_if_expression, disable}]). + %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- @@ -124,8 +126,8 @@ make_token_bucket_limiter(Cfg, Bucket) -> make_ref_limiter(Cfg, Bucket) when Bucket =/= infinity -> Cfg#{bucket => Bucket}. --spec make_infinity_limiter(limiter_bucket_cfg()) -> infinity. -make_infinity_limiter(_) -> +-spec make_infinity_limiter() -> infinity. +make_infinity_limiter() -> infinity. %% @doc request some tokens @@ -252,12 +254,11 @@ try_consume(_, _, Limiter) -> -spec do_check(acquire_type(Limiter), Limiter) -> inner_check_result(Limiter) when Limiter :: limiter(). -do_check(Need, #{tokens := Tokens} = Limiter) -> - if Need =< Tokens -> - do_check_with_parent_limiter(Need, Limiter); - true -> - do_reset(Need, Limiter) - end; +do_check(Need, #{tokens := Tokens} = Limiter) when Need =< Tokens -> + do_check_with_parent_limiter(Need, Limiter); + +do_check(Need, #{tokens := _} = Limiter) -> + do_reset(Need, Limiter); do_check(Need, #{divisible := Divisible, bucket := Bucket} = Ref) -> @@ -280,7 +281,8 @@ on_failure(throw, Limiter) -> Message = io_lib:format("limiter consume failed, limiter:~p~n", [Limiter]), erlang:throw({rate_check_fail, Message}). --spec do_check_with_parent_limiter(pos_integer(), token_bucket_limiter()) -> inner_check_result(token_bucket_limiter()). +-spec do_check_with_parent_limiter(pos_integer(), token_bucket_limiter()) -> + inner_check_result(token_bucket_limiter()). do_check_with_parent_limiter(Need, #{tokens := Tokens, divisible := Divisible, @@ -306,15 +308,16 @@ do_reset(Need, capacity := Capacity} = Limiter) -> Now = ?NOW, Tokens2 = apply_elapsed_time(Rate, Now - LastTime, Tokens, Capacity), - if Tokens2 >= Need -> + Available = erlang:floor(Tokens2), + if Available >= Need -> Limiter2 = Limiter#{tokens := Tokens2, lasttime := Now}, do_check_with_parent_limiter(Need, Limiter2); - Divisible andalso Tokens2 > 0 -> + Divisible andalso Available > 0 -> %% must be allocated here, because may be Need > Capacity return_pause(Rate, partial, fun do_reset/2, - Need - Tokens2, + Need - Available, Limiter#{tokens := 0, lasttime := Now}); true -> return_pause(Rate, pause, fun do_reset/2, Need, Limiter) @@ -331,8 +334,8 @@ return_pause(Rate, PauseType, Fun, Diff, Limiter) -> Pause = emqx_misc:clamp(Val, ?MINIMUM_PAUSE, ?MAXIMUM_PAUSE), {PauseType, Pause, make_retry_context(Fun, Diff), Limiter}. --spec make_retry_context(undefined | retry_fun(Limiter), non_neg_integer()) -> retry_context(Limiter) - when Limiter :: limiter(). +-spec make_retry_context(undefined | retry_fun(Limiter), non_neg_integer()) -> + retry_context(Limiter) when Limiter :: limiter(). make_retry_context(Fun, Diff) -> #{continuation => Fun, diff => Diff}. diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_app.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_app.erl index b4a92596f..6e66645f3 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_app.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_app.erl @@ -39,8 +39,7 @@ {ok, Pid :: pid(), State :: term()} | {error, Reason :: term()}. start(_StartType, _StartArgs) -> - {ok, _} = Result = emqx_limiter_sup:start_link(), - Result. + {ok, _} = emqx_limiter_sup:start_link(). %%-------------------------------------------------------------------- %% @private diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_container.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_container.erl index 395174448..65b213485 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_container.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_container.erl @@ -21,7 +21,7 @@ %% @end %% API --export([ new/0, new/1, get_limiter_by_names/2 +-export([ new/0, new/1, new/2, get_limiter_by_names/2 , add_new/3, update_by_name/3, set_retry_context/2 , check/3, retry/2, get_retry_context/1 , check_list/2, retry_list/2 @@ -60,7 +60,12 @@ new() -> %% @doc generate default data according to the type of limiter -spec new(list(limiter_type())) -> container(). new(Types) -> - get_limiter_by_names(Types, #{}). + new(Types, #{}). + +-spec new(list(limiter_type()), + #{limiter_type() => emqx_limiter_schema:bucket_name()}) -> container(). +new(Types, Names) -> + get_limiter_by_names(Types, Names). %% @doc generate a container %% according to the type of limiter and the bucket name configuration of the limiter diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl index c1ca45e1b..b9b25e806 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl @@ -29,7 +29,7 @@ | message_in | connection | message_routing - | shared. + | batch. -type bucket_name() :: atom(). -type rate() :: infinity | float(). @@ -142,7 +142,7 @@ to_rate(Str, CanInfinity, CanZero) -> {ok, Val} = to_capacity(QuotaStr), check_capacity(Str, Val, CanZero, fun(Quota) -> - Quota * minimum_period() / ?UNIT_TIME_IN_MS + {ok, Quota * minimum_period() / ?UNIT_TIME_IN_MS} end); [QuotaStr, Interval] -> {ok, Val} = to_capacity(QuotaStr), diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl index 95c8a47ec..896792a32 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl @@ -34,7 +34,7 @@ terminate/2, code_change/3, format_status/2]). -export([ start_link/1, connect/2, info/1 - , name/1, get_initial_val/1 + , name/1, get_initial_val/1, update_config/1 ]). -type root() :: #{ rate := rate() %% number of tokens generated per period @@ -85,7 +85,7 @@ emqx_htb_limiter:limiter(). %% If no bucket path is set in config, there will be no limit connect(_Type, undefined) -> - emqx_htb_limiter:make_infinity_limiter(undefined); + emqx_htb_limiter:make_infinity_limiter(); connect(Type, BucketName) when is_atom(BucketName) -> CfgPath = emqx_limiter_schema:get_bucket_cfg_path(Type, BucketName), @@ -101,7 +101,7 @@ connect(Type, BucketName) when is_atom(BucketName) -> if CliRate < AggrRate orelse CliSize < AggrSize -> emqx_htb_limiter:make_token_bucket_limiter(Cfg, Bucket); Bucket =:= infinity -> - emqx_htb_limiter:make_infinity_limiter(Cfg); + emqx_htb_limiter:make_infinity_limiter(); true -> emqx_htb_limiter:make_ref_limiter(Cfg, Bucket) end; @@ -122,6 +122,10 @@ info(Type) -> name(Type) -> erlang:list_to_atom(io_lib:format("~s_~s", [?MODULE, Type])). +-spec update_config(limiter_type()) -> ok. +update_config(Type) -> + ?CALL(Type). + %%-------------------------------------------------------------------- %% @doc %% Starts the server @@ -131,6 +135,7 @@ name(Type) -> start_link(Type) -> gen_server:start_link({local, name(Type)}, ?MODULE, [Type], []). + %%-------------------------------------------------------------------- %%% gen_server callbacks %%-------------------------------------------------------------------- @@ -147,16 +152,10 @@ start_link(Type) -> {stop, Reason :: term()} | ignore. init([Type]) -> - State = #{ type => Type - , root => undefined - , counter => undefined - , index => 1 - , buckets => #{} - }, - State2 = init_tree(Type, State), - #{root := #{period := Perido}} = State2, + State = init_tree(Type), + #{root := #{period := Perido}} = State, oscillate(Perido), - {ok, State2}. + {ok, State}. %%-------------------------------------------------------------------- %% @private @@ -176,6 +175,10 @@ init([Type]) -> handle_call(info, _From, State) -> {reply, State, State}; +handle_call(update_config, _From, #{type := Type}) -> + NewState = init_tree(Type), + {reply, ok, NewState}; + handle_call(Req, _From, State) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), {reply, ignored, State}. @@ -362,10 +365,11 @@ maybe_burst(State) -> dispatch_burst([], _, State) -> State; -dispatch_burst(Empties, InFlow, #{consumed := Consumed, buckets := Buckets} = State) -> +dispatch_burst(Empties, InFlow, + #{root := #{consumed := Consumed} = Root, buckets := Buckets} = State) -> EachFlow = InFlow / erlang:length(Empties), {Alloced, Buckets2} = dispatch_burst_to_buckets(Empties, EachFlow, 0, Buckets), - State#{consumed := Consumed + Alloced, buckets := Buckets2}. + State#{root := Root#{consumed := Consumed + Alloced}, buckets := Buckets2}. -spec dispatch_burst_to_buckets(list(bucket()), float(), @@ -385,8 +389,15 @@ dispatch_burst_to_buckets([Bucket | T], InFlow, Alloced, Buckets) -> dispatch_burst_to_buckets([], _, Alloced, Buckets) -> {Alloced, Buckets}. --spec init_tree(emqx_limiter_schema:limiter_type(), state()) -> state(). -init_tree(Type, State) -> +-spec init_tree(emqx_limiter_schema:limiter_type()) -> state(). +init_tree(Type) -> + State = #{ type => Type + , root => undefined + , counter => undefined + , index => 1 + , buckets => #{} + }, + #{bucket := Buckets} = Cfg = emqx:get_config([limiter, Type]), {Factor, Root} = make_root(Cfg), {CounterNum, DelayBuckets} = make_bucket(maps:to_list(Buckets), Type, Cfg, Factor, 1, []), diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server_sup.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server_sup.erl index 7f8d227ec..70332481a 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server_sup.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server_sup.erl @@ -46,6 +46,7 @@ start(Type) -> Spec = make_child(Type), supervisor:start_child(?MODULE, Spec). +%% XXX This is maybe a workaround, not so good -spec restart(emqx_limiter_schema:limiter_type()) -> _. restart(Type) -> Id = emqx_limiter_server:name(Type), diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index d97b0197b..256a271b2 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -113,57 +113,32 @@ listener_mqtt_ws_conf() -> listeners_conf() -> #{tcp => #{default => listener_mqtt_tcp_conf()}, ws => #{default => listener_mqtt_ws_conf()} - }. + }. limiter_conf() -> - #{bytes_in => - #{bucket => - #{default => - #{aggregated => - #{capacity => infinity,initial => 0,rate => infinity}, - per_client => - #{capacity => infinity,divisible => false, - failure_strategy => force,initial => 0,low_water_mark => 0, - max_retry_time => 5000,rate => infinity}, - zone => default}}, - global => #{burst => 0,rate => infinity}, - zone => #{default => #{burst => 0,rate => infinity}}}, - connection => - #{bucket => - #{default => - #{aggregated => - #{capacity => infinity,initial => 0,rate => infinity}, - per_client => - #{capacity => infinity,divisible => false, - failure_strategy => force,initial => 0,low_water_mark => 0, - max_retry_time => 5000,rate => infinity}, - zone => default}}, - global => #{burst => 0,rate => infinity}, - zone => #{default => #{burst => 0,rate => infinity}}}, - message_in => - #{bucket => - #{default => - #{aggregated => - #{capacity => infinity,initial => 0,rate => infinity}, - per_client => - #{capacity => infinity,divisible => false, - failure_strategy => force,initial => 0,low_water_mark => 0, - max_retry_time => 5000,rate => infinity}, - zone => default}}, - global => #{burst => 0,rate => infinity}, - zone => #{default => #{burst => 0,rate => infinity}}}, - message_routing => - #{bucket => - #{default => - #{aggregated => - #{capacity => infinity,initial => 0,rate => infinity}, - per_client => - #{capacity => infinity,divisible => false, - failure_strategy => force,initial => 0,low_water_mark => 0, - max_retry_time => 5000,rate => infinity}, - zone => default}}, - global => #{burst => 0,rate => infinity}, - zone => #{default => #{burst => 0,rate => infinity}}}}. + Make = fun() -> + #{bucket => + #{default => + #{capacity => infinity, + initial => 0, + rate => infinity, + per_client => + #{capacity => infinity,divisible => false, + failure_strategy => force,initial => 0,low_water_mark => 0, + max_retry_time => 5000,rate => infinity + } + } + }, + burst => 0, + rate => infinity + } + end, + + lists:foldl(fun(Name, Acc) -> + Acc#{Name => Make()} + end, + #{}, + [bytes_in, message_in, message_routing, connection, batch]). stats_conf() -> #{enable => true}. @@ -232,7 +207,7 @@ end_per_suite(_Config) -> init_per_testcase(TestCase, Config) -> OldConf = set_test_listener_confs(), emqx_common_test_helpers:start_apps([]), - modify_limiter(TestCase, OldConf), + check_modify_limiter(TestCase), [{config, OldConf}|Config]. end_per_testcase(_TestCase, Config) -> @@ -240,18 +215,19 @@ end_per_testcase(_TestCase, Config) -> emqx_common_test_helpers:stop_apps([]), Config. -modify_limiter(TestCase, NewConf) -> +check_modify_limiter(TestCase) -> Checks = [t_quota_qos0, t_quota_qos1, t_quota_qos2], case lists:member(TestCase, Checks) of true -> - modify_limiter(NewConf); + modify_limiter(); _ -> ok end. %% per_client 5/1s,5 %% aggregated 10/1s,10 -modify_limiter(#{limiter := Limiter} = NewConf) -> +modify_limiter() -> + Limiter = emqx_config:get([limiter]), #{message_routing := #{bucket := Bucket} = Routing} = Limiter, #{default := #{per_client := Client} = Default} = Bucket, Client2 = Client#{rate := 5, @@ -259,16 +235,15 @@ modify_limiter(#{limiter := Limiter} = NewConf) -> capacity := 5, low_water_mark := 1}, Default2 = Default#{per_client := Client2, - aggregated := #{rate => 10, - initial => 0, - capacity => 10 - }}, + rate => 10, + initial => 0, + capacity => 10}, Bucket2 = Bucket#{default := Default2}, Routing2 = Routing#{bucket := Bucket2}, - NewConf2 = NewConf#{limiter := Limiter#{message_routing := Routing2}}, - emqx_config:put(NewConf2), + emqx_config:put([limiter], Limiter#{message_routing := Routing2}), emqx_limiter_manager:restart_server(message_routing), + timer:sleep(100), ok. %%-------------------------------------------------------------------- @@ -1078,4 +1053,4 @@ session(InitFields) when is_map(InitFields) -> quota() -> emqx_limiter_container:get_limiter_by_names([message_routing], limiter_cfg()). -limiter_cfg() -> #{}. +limiter_cfg() -> #{message_routing => default}. diff --git a/apps/emqx/test/emqx_ratelimiter_SUITE.erl b/apps/emqx/test/emqx_ratelimiter_SUITE.erl index ea2e31700..589e78e8e 100644 --- a/apps/emqx/test/emqx_ratelimiter_SUITE.erl +++ b/apps/emqx/test/emqx_ratelimiter_SUITE.erl @@ -27,59 +27,37 @@ -define(BASE_CONF, <<""" limiter { bytes_in { - global.rate = infinity - zone.default.rate = infinity bucket.default { - zone = default - aggregated.rate = infinity - aggregated.capacity = infinity - per_client.rate = \"100MB/1s\" - per_client.capacity = infinity + rate = infinity + capacity = infinity } } message_in { - global.rate = infinity - zone.default.rate = infinity bucket.default { - zone = default - aggregated.rate = infinity - aggregated.capacity = infinity - per_client.rate = infinity - per_client.capacity = infinity + rate = infinity + capacity = infinity } } connection { - global.rate = infinity - zone.default.rate = infinity bucket.default { - zone = default - aggregated.rate = infinity - aggregated.capacity = infinity - per_client.rate = infinity - per_client.capacity = infinity + rate = infinity + capacity = infinity } } message_routing { - global.rate = infinity - zone.default.rate = infinity bucket.default { - zone = default - aggregated.rate = infinity - aggregated.capacity = infinity - per_client.rate = infinity - per_client.capacity = infinity + rate = infinity + capacity = infinity } } - shared { + batch { bucket.retainer { - aggregated.rate = infinity - aggregated.capacity = infinity - per_client.rate = infinity - per_client.capacity = infinity + rate = infinity + capacity = infinity } } } @@ -97,6 +75,7 @@ limiter { -define(LOGT(Format, Args), ct:pal("TEST_SUITE: " ++ Format, Args)). -define(RATE(Rate), to_rate(Rate)). -define(NOW, erlang:system_time(millisecond)). +-define(CONST(X), fun(_) -> X end). %%-------------------------------------------------------------------- %% Setups @@ -231,12 +210,11 @@ t_low_water_mark(_) -> with_per_client(default, Cfg, Case). t_infinity_client(_) -> - Fun = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) -> - Aggr2 = Aggr#{rate := infinity, - capacity := infinity}, + Fun = fun(#{per_client := Cli} = Bucket) -> + Bucket2 = Bucket#{rate := infinity, + capacity := infinity}, Cli2 = Cli#{rate := infinity, capacity := infinity}, - Bucket#{aggregated := Aggr2, - per_client := Cli2} + Bucket2#{per_client := Cli2} end, Case = fun() -> Client = connect(default), @@ -247,14 +225,13 @@ t_infinity_client(_) -> with_bucket(default, Fun, Case). t_try_restore_agg(_) -> - Fun = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) -> - Aggr2 = Aggr#{rate := 1, - capacity := 200, - initial := 50}, + Fun = fun(#{per_client := Cli} = Bucket) -> + Bucket2 = Bucket#{rate := 1, + capacity := 200, + initial := 50}, Cli2 = Cli#{rate := infinity, capacity := infinity, divisible := true, max_retry_time := 100, failure_strategy := force}, - Bucket#{aggregated := Aggr2, - per_client := Cli2} + Bucket2#{per_client := Cli2} end, Case = fun() -> Client = connect(default), @@ -267,15 +244,14 @@ t_try_restore_agg(_) -> with_bucket(default, Fun, Case). t_short_board(_) -> - Fun = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) -> - Aggr2 = Aggr#{rate := ?RATE("100/1s"), - initial := 0, - capacity := 100}, + Fun = fun(#{per_client := Cli} = Bucket) -> + Bucket2 = Bucket#{rate := ?RATE("100/1s"), + initial := 0, + capacity := 100}, Cli2 = Cli#{rate := ?RATE("600/1s"), capacity := 600, initial := 600}, - Bucket#{aggregated := Aggr2, - per_client := Cli2} + Bucket2#{per_client := Cli2} end, Case = fun() -> Counter = counters:new(1, []), @@ -286,15 +262,14 @@ t_short_board(_) -> with_bucket(default, Fun, Case). t_rate(_) -> - Fun = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) -> - Aggr2 = Aggr#{rate := ?RATE("100/100ms"), - initial := 0, - capacity := infinity}, + Fun = fun(#{per_client := Cli} = Bucket) -> + Bucket2 = Bucket#{rate := ?RATE("100/100ms"), + initial := 0, + capacity := infinity}, Cli2 = Cli#{rate := infinity, capacity := infinity, initial := 0}, - Bucket#{aggregated := Aggr2, - per_client := Cli2} + Bucket2#{per_client := Cli2} end, Case = fun() -> Client = connect(default), @@ -311,113 +286,74 @@ t_rate(_) -> t_capacity(_) -> Capacity = 600, - Fun = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) -> - Aggr2 = Aggr#{rate := ?RATE("100/100ms"), - initial := 0, - capacity := 600}, - Cli2 = Cli#{rate := infinity, - capacity := infinity, - initial := 0}, - Bucket#{aggregated := Aggr2, - per_client := Cli2} + Fun = fun(#{per_client := Cli} = Bucket) -> + Bucket2 = Bucket#{rate := ?RATE("100/100ms"), + initial := 0, + capacity := 600}, + Cli2 = Cli#{rate := infinity, + capacity := infinity, + initial := 0}, + Bucket2#{per_client := Cli2} end, Case = fun() -> - Client = connect(default), - timer:sleep(1000), + Client = connect(default), + timer:sleep(1000), C1 = emqx_htb_limiter:available(Client), ?assertEqual(Capacity, C1, "test bucket capacity") end, with_bucket(default, Fun, Case). -%%-------------------------------------------------------------------- -%% Test Cases Zone Level -%%-------------------------------------------------------------------- -t_limit_zone_with_unlimit_bucket(_) -> - ZoneMod = fun(Cfg) -> - Cfg#{rate := ?RATE("600/1s"), - burst := ?RATE("60/1s")} - end, - - Bucket = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) -> - Aggr2 = Aggr#{rate := infinity, - initial := 0, - capacity := infinity}, - Cli2 = Cli#{rate := infinity, - initial := 0, - capacity := infinity, - divisible := true}, - Bucket#{aggregated := Aggr2, per_client := Cli2} - end, - - Case = fun() -> - C1 = counters:new(1, []), - start_client(b1, ?NOW + 2000, C1, 20), - timer:sleep(2100), - check_average_rate(C1, 2, 600) - end, - - with_zone(default, ZoneMod, [{b1, Bucket}], Case). - - %%-------------------------------------------------------------------- %% Test Cases Global Level %%-------------------------------------------------------------------- -t_burst_and_fairness(_) -> +t_collaborative_alloc(_) -> GlobalMod = fun(Cfg) -> - Cfg#{burst := ?RATE("60/1s")} + Cfg#{rate := ?RATE("600/1s")} end, - ZoneMod = fun(Cfg) -> - Cfg#{rate := ?RATE("600/1s"), - burst := ?RATE("60/1s")} - end, - - Bucket = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) -> - Aggr2 = Aggr#{rate := ?RATE("500/1s"), - initial := 0, - capacity := 500}, - Cli2 = Cli#{rate := ?RATE("600/1s"), - capacity := 600, - initial := 600}, - Bucket#{aggregated := Aggr2, - per_client := Cli2} + Bucket1 = fun(#{per_client := Cli} = Bucket) -> + Bucket2 = Bucket#{rate := ?RATE("400/1s"), + initial := 0, + capacity := 600}, + Cli2 = Cli#{rate := ?RATE("50"), + capacity := 100, + initial := 100}, + Bucket2#{per_client := Cli2} end, + Bucket2 = fun(Bucket) -> + Bucket2 = Bucket1(Bucket), + Bucket2#{rate := ?RATE("200/1s")} + end, + Case = fun() -> C1 = counters:new(1, []), C2 = counters:new(1, []), start_client(b1, ?NOW + 2000, C1, 20), start_client(b2, ?NOW + 2000, C2, 30), timer:sleep(2100), - check_average_rate(C1, 2, 330), - check_average_rate(C2, 2, 330) + check_average_rate(C1, 2, 300), + check_average_rate(C2, 2, 300) end, with_global(GlobalMod, - default, - ZoneMod, - [{b1, Bucket}, {b2, Bucket}], + [{b1, Bucket1}, {b2, Bucket2}], Case). t_burst(_) -> GlobalMod = fun(Cfg) -> - Cfg#{burst := ?RATE("60/1s")} + Cfg#{rate := ?RATE("200/1s"), + burst := ?RATE("400/1s")} end, - ZoneMod = fun(Cfg) -> - Cfg#{rate := ?RATE("60/1s"), - burst := ?RATE("60/1s")} - end, - - Bucket = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) -> - Aggr2 = Aggr#{rate := ?RATE("500/1s"), - initial := 0, - capacity := 500}, - Cli2 = Cli#{rate := ?RATE("600/1s"), - capacity := 600, + Bucket = fun(#{per_client := Cli} = Bucket) -> + Bucket2 = Bucket#{rate := ?RATE("200/1s"), + initial := 0, + capacity := 200}, + Cli2 = Cli#{rate := ?RATE("50/1s"), + capacity := 200, divisible := true}, - Bucket#{aggregated := Aggr2, - per_client := Cli2} + Bucket2#{per_client := Cli2} end, Case = fun() -> @@ -430,180 +366,39 @@ t_burst(_) -> timer:sleep(2100), Total = lists:sum([counters:get(X, 1) || X <- [C1, C2, C3]]), - in_range(Total / 2, 30) + in_range(Total / 2, 300) end, with_global(GlobalMod, - default, - ZoneMod, [{b1, Bucket}, {b2, Bucket}, {b3, Bucket}], Case). - t_limit_global_with_unlimit_other(_) -> GlobalMod = fun(Cfg) -> Cfg#{rate := ?RATE("600/1s")} end, - ZoneMod = fun(Cfg) -> Cfg#{rate := infinity} end, - - Bucket = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) -> - Aggr2 = Aggr#{rate := infinity, - initial := 0, - capacity := infinity}, - Cli2 = Cli#{rate := infinity, - capacity := infinity, - initial := 0}, - Bucket#{aggregated := Aggr2, - per_client := Cli2} + Bucket = fun(#{per_client := Cli} = Bucket) -> + Bucket2 = Bucket#{rate := infinity, + initial := 0, + capacity := infinity}, + Cli2 = Cli#{rate := infinity, + capacity := infinity, + initial := 0}, + Bucket2#{per_client := Cli2} end, Case = fun() -> - C1 = counters:new(1, []), - start_client(b1, ?NOW + 2000, C1, 20), - timer:sleep(2100), - check_average_rate(C1, 2, 600) + C1 = counters:new(1, []), + start_client(b1, ?NOW + 2000, C1, 20), + timer:sleep(2100), + check_average_rate(C1, 2, 600) end, with_global(GlobalMod, - default, - ZoneMod, [{b1, Bucket}], Case). -t_multi_zones(_) -> - GlobalMod = fun(Cfg) -> - Cfg#{rate := ?RATE("600/1s")} - end, - - Zone1 = fun(Cfg) -> - Cfg#{rate := ?RATE("400/1s")} - end, - - Zone2 = fun(Cfg) -> - Cfg#{rate := ?RATE("500/1s")} - end, - - Bucket = fun(Zone, Rate) -> - fun(#{aggregated := Aggr, per_client := Cli} = Bucket) -> - Aggr2 = Aggr#{rate := infinity, - initial := 0, - capacity := infinity}, - Cli2 = Cli#{rate := Rate, - capacity := infinity, - initial := 0}, - Bucket#{aggregated := Aggr2, - per_client := Cli2, - zone := Zone} - end - end, - - Case = fun() -> - C1 = counters:new(1, []), - C2 = counters:new(1, []), - start_client(b1, ?NOW + 2000, C1, 25), - start_client(b2, ?NOW + 2000, C2, 20), - timer:sleep(2100), - check_average_rate(C1, 2, 300), - check_average_rate(C2, 2, 300) - end, - - with_global(GlobalMod, - [z1, z2], - [Zone1, Zone2], - [{b1, Bucket(z1, ?RATE("400/1s"))}, {b2, Bucket(z2, ?RATE("500/1s"))}], - Case). - -%% because the simulated client will try to reach the maximum rate -%% when divisiable = true, a large number of divided tokens will be generated -%% so this is not an accurate test -t_multi_zones_with_divisible(_) -> - GlobalMod = fun(Cfg) -> - Cfg#{rate := ?RATE("600/1s")} - end, - - Zone1 = fun(Cfg) -> - Cfg#{rate := ?RATE("400/1s")} - end, - - Zone2 = fun(Cfg) -> - Cfg#{rate := ?RATE("500/1s")} - end, - - Bucket = fun(Zone, Rate) -> - fun(#{aggregated := Aggr, per_client := Cli} = Bucket) -> - Aggr2 = Aggr#{rate := Rate, - initial := 0, - capacity := infinity}, - Cli2 = Cli#{rate := Rate, - divisible := true, - capacity := infinity, - initial := 0}, - Bucket#{aggregated := Aggr2, - per_client := Cli2, - zone := Zone} - end - end, - - Case = fun() -> - C1 = counters:new(1, []), - C2 = counters:new(1, []), - start_client(b1, ?NOW + 2000, C1, 25), - start_client(b2, ?NOW + 2000, C2, 20), - timer:sleep(2100), - check_average_rate(C1, 2, 300), - check_average_rate(C2, 2, 300) - end, - - with_global(GlobalMod, - [z1, z2], - [Zone1, Zone2], - [{b1, Bucket(z1, ?RATE("400/1s"))}, {b2, Bucket(z2, ?RATE("500/1s"))}], - Case). - -t_zone_hunger_and_fair(_) -> - GlobalMod = fun(Cfg) -> - Cfg#{rate := ?RATE("600/1s")} - end, - - Zone1 = fun(Cfg) -> - Cfg#{rate := ?RATE("600/1s")} - end, - - Zone2 = fun(Cfg) -> - Cfg#{rate := ?RATE("50/1s")} - end, - - Bucket = fun(Zone, Rate) -> - fun(#{aggregated := Aggr, per_client := Cli} = Bucket) -> - Aggr2 = Aggr#{rate := infinity, - initial := 0, - capacity := infinity}, - Cli2 = Cli#{rate := Rate, - capacity := infinity, - initial := 0}, - Bucket#{aggregated := Aggr2, - per_client := Cli2, - zone := Zone} - end - end, - - Case = fun() -> - C1 = counters:new(1, []), - C2 = counters:new(1, []), - start_client(b1, ?NOW + 2000, C1, 20), - start_client(b2, ?NOW + 2000, C2, 20), - timer:sleep(2100), - check_average_rate(C1, 2, 550), - check_average_rate(C2, 2, 50) - end, - - with_global(GlobalMod, - [z1, z2], - [Zone1, Zone2], - [{b1, Bucket(z1, ?RATE("600/1s"))}, {b2, Bucket(z2, ?RATE("50/1s"))}], - Case). - %%-------------------------------------------------------------------- %% Test Cases container %%-------------------------------------------------------------------- @@ -626,7 +421,8 @@ t_check_container(_) -> capacity := 1000} end, Case = fun() -> - C1 = emqx_limiter_container:new([message_routing]), + C1 = emqx_limiter_container:new([message_routing], + #{message_routing => default}), {ok, C2} = emqx_limiter_container:check(1000, message_routing, C1), {pause, Pause, C3} = emqx_limiter_container:check(1000, message_routing, C2), timer:sleep(Pause), @@ -663,9 +459,7 @@ t_limiter_server(_) -> ?assertMatch(#{root := _, counter := _, index := _, - zones := _, buckets := _, - nodes := _, type := message_routing}, State), Name = emqx_limiter_server:name(message_routing), @@ -675,6 +469,32 @@ t_limiter_server(_) -> ok = emqx_limiter_server:format_status(normal, ok), ok. +t_decimal(_) -> + ?assertEqual(infinity, emqx_limiter_decimal:add(infinity, 3)), + ?assertEqual(5, emqx_limiter_decimal:add(2, 3)), + ?assertEqual(infinity, emqx_limiter_decimal:sub(infinity, 3)), + ?assertEqual(-1, emqx_limiter_decimal:sub(2, 3)), + ?assertEqual(infinity, emqx_limiter_decimal:mul(infinity, 3)), + ?assertEqual(6, emqx_limiter_decimal:mul(2, 3)), + ?assertEqual(infinity, emqx_limiter_decimal:floor_div(infinity, 3)), + ?assertEqual(2, emqx_limiter_decimal:floor_div(7, 3)), + ok. + +t_schema_unit(_) -> + M = emqx_limiter_schema, + ?assertEqual(limiter, M:namespace()), + ?assertEqual({ok, infinity}, M:to_rate(" infinity ")), + ?assertMatch({ok, _}, M:to_rate("100")), + ?assertMatch({error, _}, M:to_rate("0")), + ?assertMatch({ok, _}, M:to_rate("100/10s")), + ?assertMatch({error, _}, M:to_rate("100/10x")), + ?assertEqual({ok, infinity}, M:to_capacity("infinity")), + ?assertEqual({ok, 100}, M:to_capacity("100")), + ?assertEqual({ok, 100 * 1024}, M:to_capacity("100KB")), + ?assertEqual({ok, 100 * 1024 * 1024}, M:to_capacity("100MB")), + ?assertEqual({ok, 100 * 1024 * 1024 * 1024}, M:to_capacity("100GB")), + ok. + %%-------------------------------------------------------------------- %%% Internal functions %%-------------------------------------------------------------------- @@ -752,7 +572,6 @@ client_try_check(Need, #client{counter = Counter, end end. - %% XXX not a god test, because client's rate maybe bigger than global rate %% so if client' rate = infinity %% client's divisible should be true or capacity must be bigger than number of each consume @@ -769,25 +588,17 @@ to_rate(Str) -> {ok, Rate} = emqx_limiter_schema:to_rate(Str), Rate. -with_global(Modifier, ZoneName, ZoneModifier, Buckets, Case) -> - Path = [limiter, message_routing], - #{global := Global} = Cfg = emqx_config:get(Path), - Cfg2 = Cfg#{global := Modifier(Global)}, - with_zone(Cfg2, ZoneName, ZoneModifier, Buckets, Case). +with_global(Modifier, BuckeTemps, Case) -> + Fun = fun(Cfg) -> + #{bucket := #{default := BucketCfg}} = Cfg2 = Modifier(Cfg), + Fun = fun({Name, BMod}, Acc) -> + Acc#{Name => BMod(BucketCfg)} + end, + Buckets = lists:foldl(Fun, #{}, BuckeTemps), + Cfg2#{bucket := Buckets} + end, -with_zone(Name, Modifier, Buckets, Case) -> - Path = [limiter, message_routing], - Cfg = emqx_config:get(Path), - with_zone(Cfg, Name, Modifier, Buckets, Case). - -with_zone(Cfg, Name, Modifier, Buckets, Case) -> - Path = [limiter, message_routing], - #{zone := ZoneCfgs, - bucket := BucketCfgs} = Cfg, - ZoneCfgs2 = apply_modifier(Name, Modifier, ZoneCfgs), - BucketCfgs2 = apply_modifier(Buckets, BucketCfgs), - Cfg2 = Cfg#{zone := ZoneCfgs2, bucket := BucketCfgs2}, - with_config(Path, fun(_) -> Cfg2 end, Case). + with_config([limiter, message_routing], Fun, Case). with_bucket(Bucket, Modifier, Case) -> Path = [limiter, message_routing, bucket, Bucket], @@ -802,8 +613,8 @@ with_config(Path, Modifier, Case) -> NewCfg = Modifier(Cfg), ct:pal("test with config:~p~n", [NewCfg]), emqx_config:put(Path, NewCfg), - emqx_limiter_manager:restart_server(message_routing), - timer:sleep(100), + emqx_limiter_server:update_config(message_routing), + timer:sleep(500), DelayReturn = delay_return(Case), emqx_config:put(Path, Cfg), DelayReturn(). diff --git a/apps/emqx/test/emqx_ws_connection_SUITE.erl b/apps/emqx/test/emqx_ws_connection_SUITE.erl index a2f5c3502..455cf3e43 100644 --- a/apps/emqx/test/emqx_ws_connection_SUITE.erl +++ b/apps/emqx/test/emqx_ws_connection_SUITE.erl @@ -405,16 +405,30 @@ t_handle_timeout_emit_stats(_) -> ?assertEqual(undefined, ?ws_conn:info(stats_timer, St)). t_ensure_rate_limit(_) -> + %% XXX In the future, limiter should provide API for config update + Path = [limiter, bytes_in, bucket, default, per_client], + PerClient = emqx_config:get(Path), + {ok, Rate}= emqx_limiter_schema:to_rate("50MB"), + emqx_config:put(Path, PerClient#{rate := Rate}), + emqx_limiter_server:update_config(bytes_in), + timer:sleep(100), + Limiter = init_limiter(), St = st(#{limiter => Limiter}), - {ok, Need} = emqx_limiter_schema:to_capacity("1GB"), %% must bigger than value in emqx_ratelimit_SUITE + + %% must bigger than value in emqx_ratelimit_SUITE + {ok, Need} = emqx_limiter_schema:to_capacity("1GB"), St1 = ?ws_conn:check_limiter([{Need, bytes_in}], [], fun(_, _, S) -> S end, [], St), ?assertEqual(blocked, ?ws_conn:info(sockstate, St1)), - ?assertEqual([{active, false}], ?ws_conn:info(postponed, St1)). + ?assertEqual([{active, false}], ?ws_conn:info(postponed, St1)), + + emqx_config:put(Path, PerClient), + emqx_limiter_server:update_config(bytes_in), + timer:sleep(100). t_parse_incoming(_) -> {Packets, St} = ?ws_conn:parse_incoming(<<48,3>>, [], st()), @@ -558,7 +572,7 @@ ws_client(State) -> ct:fail(ws_timeout) end. -limiter_cfg() -> #{}. +limiter_cfg() -> #{bytes_in => default, message_in => default}. init_limiter() -> emqx_limiter_container:get_limiter_by_names([bytes_in, message_in], limiter_cfg()). diff --git a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl index 153ec139a..6121e39b6 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl @@ -493,8 +493,6 @@ typename_to_spec("failure_strategy()", _Mod) -> #{type => string, example => <<"force">>}; typename_to_spec("initial()", _Mod) -> #{type => string, example => <<"0M">>}; -typename_to_spec("bucket_path()", _Mod) -> - #{type => string, example => <<"groupName.bucketName">>}; typename_to_spec(Name, Mod) -> Spec = range(Name), Spec1 = remote_module_type(Spec, Name, Mod), diff --git a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl index a1cc42898..c0e9ad42f 100644 --- a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl +++ b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl @@ -198,14 +198,15 @@ dispatch(Context, Pid, Topic, Cursor, Limiter) -> Mod = emqx_retainer:get_backend_module(), case Cursor =/= undefined orelse emqx_topic:wildcard(Topic) of false -> - {ok, Result} = Mod:read_message(Context, Topic), + {ok, Result} = erlang:apply(Mod, read_message, [Context, Topic]), deliver(Result, Context, Pid, Topic, undefined, Limiter); true -> - {ok, Result, NewCursor} = Mod:match_messages(Context, Topic, Cursor), + {ok, Result, NewCursor} = erlang:apply(Mod, match_messages, [Context, Topic, Cursor]), deliver(Result, Context, Pid, Topic, NewCursor, Limiter) end. --spec deliver(list(emqx_types:message()), context(), pid(), topic(), cursor(), limiter()) -> {ok, limiter()}. +-spec deliver(list(emqx_types:message()), context(), pid(), topic(), cursor(), limiter()) -> + {ok, limiter()}. deliver([], _Context, _Pid, _Topic, undefined, Limiter) -> {ok, Limiter}; diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index a9405373e..f143734fc 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -36,7 +36,7 @@ retainer { flow_control { batch_read_number = 0 batch_deliver_number = 0 - limiter_bucket_name = retainer + limiter.batch = retainer } backend { type = built_in_database @@ -281,12 +281,11 @@ t_stop_publish_clear_msg(_) -> ok = emqtt:disconnect(C1). t_flow_control(_) -> - #{per_client := PerClient} = RetainerCfg = emqx_config:get([limiter, shared, bucket, retainer]), - RetainerCfg2 = RetainerCfg#{ - per_client := PerClient#{ - rate := emqx_ratelimiter_SUITE:to_rate("1/1s"), - capacity := 1}}, - emqx_config:put([limiter, shared, bucket, retainer], RetainerCfg2), + #{per_client := PerClient} = RetainerCfg = emqx_config:get([limiter, batch, bucket, retainer]), + RetainerCfg2 = RetainerCfg#{per_client := + PerClient#{rate := emqx_ratelimiter_SUITE:to_rate("1/1s"), + capacity := 1}}, + emqx_config:put([limiter, batch, bucket, retainer], RetainerCfg2), emqx_limiter_manager:restart_server(shared), timer:sleep(500), @@ -296,7 +295,7 @@ t_flow_control(_) -> emqx_retainer:update_config(#{<<"flow_control">> => #{<<"batch_read_number">> => 1, <<"batch_deliver_number">> => 1, - <<"limiter_bucket_name">> => retainer}}), + <<"limiter">> => #{<<"batch">> => retainer}}}), {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, _} = emqtt:connect(C1), emqtt:publish( @@ -326,7 +325,7 @@ t_flow_control(_) -> ok = emqtt:disconnect(C1), %% recover the limiter - emqx_config:put([limiter, shared, bucket, retainer], RetainerCfg), + emqx_config:put([limiter, batch, bucket, retainer], RetainerCfg), emqx_limiter_manager:restart_server(shared), timer:sleep(500),