diff --git a/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl b/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl index a1115ad3a..d7a959e40 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl @@ -374,7 +374,7 @@ return_pause(infinity, PauseType, Fun, Diff, Limiter) -> %% workaround when emqx_limiter_server's rate is infinity {PauseType, ?MINIMUM_PAUSE, make_retry_context(Fun, Diff), Limiter}; return_pause(Rate, PauseType, Fun, Diff, Limiter) -> - Val = erlang:round(Diff * emqx_limiter_schema:minimum_period() / Rate), + Val = erlang:round(Diff * emqx_limiter_schema:default_period() / Rate), Pause = emqx_misc:clamp(Val, ?MINIMUM_PAUSE, ?MAXIMUM_PAUSE), {PauseType, Pause, make_retry_context(Fun, Diff), Limiter}. @@ -408,5 +408,5 @@ may_return_or_pause(_, Limiter) -> %% @doc apply the elapsed time to the limiter apply_elapsed_time(Rate, Elapsed, Tokens, Capacity) -> - Inc = floor_div(mul(Elapsed, Rate), emqx_limiter_schema:minimum_period()), + Inc = floor_div(mul(Elapsed, Rate), emqx_limiter_schema:default_period()), erlang:min(add(Tokens, Inc), Capacity). diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl index 023f9124f..afb46498a 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl @@ -24,7 +24,7 @@ fields/1, to_rate/1, to_capacity/1, - minimum_period/0, + default_period/0, to_burst_rate/1, to_initial/1, namespace/0, @@ -191,8 +191,8 @@ desc(client_bucket) -> desc(_) -> undefined. -%% minimum period is 100ms -minimum_period() -> +%% default period is 100ms +default_period() -> 100. to_rate(Str) -> @@ -235,7 +235,7 @@ to_rate(Str, CanInfinity, CanZero) -> %% if time unit is 1s, it can be omitted {match, [QuotaStr]} -> Fun = fun(Quota) -> - {ok, Quota * minimum_period() / ?UNIT_TIME_IN_MS} + {ok, Quota * default_period() / ?UNIT_TIME_IN_MS} end, to_capacity(QuotaStr, Str, CanZero, Fun); {match, [QuotaStr, TimeVal, TimeUnit]} -> @@ -250,7 +250,7 @@ to_rate(Str, CanInfinity, CanZero) -> try case emqx_schema:to_duration_ms(Interval) of {ok, Ms} when Ms > 0 -> - {ok, Quota * minimum_period() / Ms}; + {ok, Quota * default_period() / Ms}; {ok, 0} when CanZero -> {ok, 0}; _ -> diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl index 4ae6308f0..43211018c 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl @@ -57,13 +57,13 @@ burst := rate(), %% token generation interval(second) period := pos_integer(), - consumed := non_neg_integer() + produced := float() }. -type bucket() :: #{ name := bucket_name(), rate := rate(), - obtained := non_neg_integer(), + obtained := float(), %% token correction value correction := emqx_limiter_decimal:zero_or_float(), capacity := capacity(), @@ -314,26 +314,26 @@ oscillation( root := #{ rate := Flow, period := Interval, - consumed := Consumed + produced := Produced } = Root, buckets := Buckets } = State ) -> oscillate(Interval), Ordereds = get_ordered_buckets(Buckets), - {Alloced, Buckets2} = transverse(Ordereds, Flow, 0, Buckets), + {Alloced, Buckets2} = transverse(Ordereds, Flow, 0.0, Buckets), maybe_burst(State#{ buckets := Buckets2, - root := Root#{consumed := Consumed + Alloced} + root := Root#{produced := Produced + Alloced} }). %% @doc horizontal spread -spec transverse( list(bucket()), flow(), - non_neg_integer(), + float(), buckets() -) -> {non_neg_integer(), buckets()}. +) -> {float(), buckets()}. transverse([H | T], InFlow, Alloced, Buckets) when InFlow > 0 -> {BucketAlloced, Buckets2} = longitudinal(H, InFlow, Buckets), InFlow2 = sub(InFlow, BucketAlloced), @@ -344,7 +344,7 @@ transverse(_, _, Alloced, Buckets) -> %% @doc vertical spread -spec longitudinal(bucket(), flow(), buckets()) -> - {non_neg_integer(), buckets()}. + {float(), buckets()}. longitudinal( #{ name := Name, @@ -381,7 +381,7 @@ longitudinal( {Inc, Bucket2} = emqx_limiter_correction:add(Available, Bucket), counters:add(Counter, Index, Inc), - {Inc, Buckets#{Name := Bucket2#{obtained := Obtained + Inc}}}; + {Available, Buckets#{Name := Bucket2#{obtained := Obtained + Available}}}; _ -> {0, Buckets} end; @@ -431,11 +431,11 @@ dispatch_burst([], _, State) -> dispatch_burst( Empties, InFlow, - #{root := #{consumed := Consumed} = Root, buckets := Buckets} = State + #{root := #{produced := Produced} = Root, buckets := Buckets} = State ) -> EachFlow = InFlow / erlang:length(Empties), {Alloced, Buckets2} = dispatch_burst_to_buckets(Empties, EachFlow, 0, Buckets), - State#{root := Root#{consumed := Consumed + Alloced}, buckets := Buckets2}. + State#{root := Root#{produced := Produced + Alloced}, buckets := Buckets2}. -spec dispatch_burst_to_buckets( list(bucket()), @@ -473,8 +473,8 @@ init_tree(Type, #{bucket := Buckets} = Cfg) -> buckets => #{} }, - {Factor, Root} = make_root(Cfg), - {CounterNum, DelayBuckets} = make_bucket(maps:to_list(Buckets), Type, Cfg, Factor, 1, []), + Root = make_root(Cfg), + {CounterNum, DelayBuckets} = make_bucket(maps:to_list(Buckets), Type, Cfg, 1, []), State2 = State#{ root := Root, @@ -483,25 +483,16 @@ init_tree(Type, #{bucket := Buckets} = Cfg) -> lists:foldl(fun(F, Acc) -> F(Acc) end, State2, DelayBuckets). --spec make_root(hocons:confg()) -> {number(), root()}. -make_root(#{rate := Rate, burst := Burst}) when Rate >= 1 -> - {1, #{ +-spec make_root(hocons:confg()) -> root(). +make_root(#{rate := Rate, burst := Burst}) -> + #{ rate => Rate, burst => Burst, - period => emqx_limiter_schema:minimum_period(), - consumed => 0 - }}; -make_root(#{rate := Rate, burst := Burst}) -> - MiniPeriod = emqx_limiter_schema:minimum_period(), - Factor = 1 / Rate, - {Factor, #{ - rate => 1, - burst => Burst * Factor, - period => erlang:floor(Factor * MiniPeriod), - consumed => 0 - }}. + period => emqx_limiter_schema:default_period(), + produced => 0.0 + }. -make_bucket([{Name, Conf} | T], Type, GlobalCfg, Factor, CounterNum, DelayBuckets) -> +make_bucket([{Name, Conf} | T], Type, GlobalCfg, CounterNum, DelayBuckets) -> Path = emqx_limiter_manager:make_path(Type, Name), case get_counter_rate(Conf, GlobalCfg) of infinity -> @@ -514,13 +505,12 @@ make_bucket([{Name, Conf} | T], Type, GlobalCfg, Factor, CounterNum, DelayBucket InitFun = fun(#{name := BucketName} = Bucket, #{buckets := Buckets} = State) -> State#{buckets := Buckets#{BucketName => Bucket}} end; - RawRate -> + Rate -> #{capacity := Capacity} = Conf, Initial = get_initial_val(Conf), - Rate = mul(RawRate, Factor), CounterNum2 = CounterNum + 1, InitFun = fun(#{name := BucketName} = Bucket, #{buckets := Buckets} = State) -> - {Counter, Idx, State2} = alloc_counter(Path, RawRate, Initial, State), + {Counter, Idx, State2} = alloc_counter(Path, Rate, Initial, State), Bucket2 = Bucket#{counter := Counter, index := Idx}, State2#{buckets := Buckets#{BucketName => Bucket2}} end @@ -542,11 +532,10 @@ make_bucket([{Name, Conf} | T], Type, GlobalCfg, Factor, CounterNum, DelayBucket T, Type, GlobalCfg, - Factor, CounterNum2, [DelayInit | DelayBuckets] ); -make_bucket([], _Type, _Global, _Factor, CounterNum, DelayBuckets) -> +make_bucket([], _Type, _Global, CounterNum, DelayBuckets) -> {CounterNum, DelayBuckets}. -spec alloc_counter(emqx_limiter_manager:path(), rate(), capacity(), state()) -> diff --git a/apps/emqx/test/emqx_ratelimiter_SUITE.erl b/apps/emqx/test/emqx_ratelimiter_SUITE.erl index 6b65d090e..0b6f7c94e 100644 --- a/apps/emqx/test/emqx_ratelimiter_SUITE.erl +++ b/apps/emqx/test/emqx_ratelimiter_SUITE.erl @@ -646,7 +646,7 @@ client_loop( } = State ) -> Now = ?NOW, - Period = emqx_limiter_schema:minimum_period(), + Period = emqx_limiter_schema:default_period(), MinPeriod = erlang:ceil(0.25 * Period), if Now >= EndTime ->