fix(limiter): fix precision issue

When the global rate is less than 1/s, the bad code produces a long period, this will make the rate not correct
This commit is contained in:
firest 2022-06-09 10:28:24 +08:00
parent 0aab063dd5
commit e01f8ecccb
4 changed files with 31 additions and 42 deletions

View File

@ -374,7 +374,7 @@ return_pause(infinity, PauseType, Fun, Diff, Limiter) ->
%% workaround when emqx_limiter_server's rate is infinity
{PauseType, ?MINIMUM_PAUSE, make_retry_context(Fun, Diff), Limiter};
return_pause(Rate, PauseType, Fun, Diff, Limiter) ->
Val = erlang:round(Diff * emqx_limiter_schema:minimum_period() / Rate),
Val = erlang:round(Diff * emqx_limiter_schema:default_period() / Rate),
Pause = emqx_misc:clamp(Val, ?MINIMUM_PAUSE, ?MAXIMUM_PAUSE),
{PauseType, Pause, make_retry_context(Fun, Diff), Limiter}.
@ -408,5 +408,5 @@ may_return_or_pause(_, Limiter) ->
%% @doc apply the elapsed time to the limiter
apply_elapsed_time(Rate, Elapsed, Tokens, Capacity) ->
Inc = floor_div(mul(Elapsed, Rate), emqx_limiter_schema:minimum_period()),
Inc = floor_div(mul(Elapsed, Rate), emqx_limiter_schema:default_period()),
erlang:min(add(Tokens, Inc), Capacity).

View File

@ -24,7 +24,7 @@
fields/1,
to_rate/1,
to_capacity/1,
minimum_period/0,
default_period/0,
to_burst_rate/1,
to_initial/1,
namespace/0,
@ -191,8 +191,8 @@ desc(client_bucket) ->
desc(_) ->
undefined.
%% minimum period is 100ms
minimum_period() ->
%% default period is 100ms
default_period() ->
100.
to_rate(Str) ->
@ -235,7 +235,7 @@ to_rate(Str, CanInfinity, CanZero) ->
%% if time unit is 1s, it can be omitted
{match, [QuotaStr]} ->
Fun = fun(Quota) ->
{ok, Quota * minimum_period() / ?UNIT_TIME_IN_MS}
{ok, Quota * default_period() / ?UNIT_TIME_IN_MS}
end,
to_capacity(QuotaStr, Str, CanZero, Fun);
{match, [QuotaStr, TimeVal, TimeUnit]} ->
@ -250,7 +250,7 @@ to_rate(Str, CanInfinity, CanZero) ->
try
case emqx_schema:to_duration_ms(Interval) of
{ok, Ms} when Ms > 0 ->
{ok, Quota * minimum_period() / Ms};
{ok, Quota * default_period() / Ms};
{ok, 0} when CanZero ->
{ok, 0};
_ ->

View File

@ -57,13 +57,13 @@
burst := rate(),
%% token generation interval(second)
period := pos_integer(),
consumed := non_neg_integer()
produced := float()
}.
-type bucket() :: #{
name := bucket_name(),
rate := rate(),
obtained := non_neg_integer(),
obtained := float(),
%% token correction value
correction := emqx_limiter_decimal:zero_or_float(),
capacity := capacity(),
@ -314,26 +314,26 @@ oscillation(
root := #{
rate := Flow,
period := Interval,
consumed := Consumed
produced := Produced
} = Root,
buckets := Buckets
} = State
) ->
oscillate(Interval),
Ordereds = get_ordered_buckets(Buckets),
{Alloced, Buckets2} = transverse(Ordereds, Flow, 0, Buckets),
{Alloced, Buckets2} = transverse(Ordereds, Flow, 0.0, Buckets),
maybe_burst(State#{
buckets := Buckets2,
root := Root#{consumed := Consumed + Alloced}
root := Root#{produced := Produced + Alloced}
}).
%% @doc horizontal spread
-spec transverse(
list(bucket()),
flow(),
non_neg_integer(),
float(),
buckets()
) -> {non_neg_integer(), buckets()}.
) -> {float(), buckets()}.
transverse([H | T], InFlow, Alloced, Buckets) when InFlow > 0 ->
{BucketAlloced, Buckets2} = longitudinal(H, InFlow, Buckets),
InFlow2 = sub(InFlow, BucketAlloced),
@ -344,7 +344,7 @@ transverse(_, _, Alloced, Buckets) ->
%% @doc vertical spread
-spec longitudinal(bucket(), flow(), buckets()) ->
{non_neg_integer(), buckets()}.
{float(), buckets()}.
longitudinal(
#{
name := Name,
@ -381,7 +381,7 @@ longitudinal(
{Inc, Bucket2} = emqx_limiter_correction:add(Available, Bucket),
counters:add(Counter, Index, Inc),
{Inc, Buckets#{Name := Bucket2#{obtained := Obtained + Inc}}};
{Available, Buckets#{Name := Bucket2#{obtained := Obtained + Available}}};
_ ->
{0, Buckets}
end;
@ -431,11 +431,11 @@ dispatch_burst([], _, State) ->
dispatch_burst(
Empties,
InFlow,
#{root := #{consumed := Consumed} = Root, buckets := Buckets} = State
#{root := #{produced := Produced} = Root, buckets := Buckets} = State
) ->
EachFlow = InFlow / erlang:length(Empties),
{Alloced, Buckets2} = dispatch_burst_to_buckets(Empties, EachFlow, 0, Buckets),
State#{root := Root#{consumed := Consumed + Alloced}, buckets := Buckets2}.
State#{root := Root#{produced := Produced + Alloced}, buckets := Buckets2}.
-spec dispatch_burst_to_buckets(
list(bucket()),
@ -473,8 +473,8 @@ init_tree(Type, #{bucket := Buckets} = Cfg) ->
buckets => #{}
},
{Factor, Root} = make_root(Cfg),
{CounterNum, DelayBuckets} = make_bucket(maps:to_list(Buckets), Type, Cfg, Factor, 1, []),
Root = make_root(Cfg),
{CounterNum, DelayBuckets} = make_bucket(maps:to_list(Buckets), Type, Cfg, 1, []),
State2 = State#{
root := Root,
@ -483,25 +483,16 @@ init_tree(Type, #{bucket := Buckets} = Cfg) ->
lists:foldl(fun(F, Acc) -> F(Acc) end, State2, DelayBuckets).
-spec make_root(hocons:confg()) -> {number(), root()}.
make_root(#{rate := Rate, burst := Burst}) when Rate >= 1 ->
{1, #{
-spec make_root(hocons:confg()) -> root().
make_root(#{rate := Rate, burst := Burst}) ->
#{
rate => Rate,
burst => Burst,
period => emqx_limiter_schema:minimum_period(),
consumed => 0
}};
make_root(#{rate := Rate, burst := Burst}) ->
MiniPeriod = emqx_limiter_schema:minimum_period(),
Factor = 1 / Rate,
{Factor, #{
rate => 1,
burst => Burst * Factor,
period => erlang:floor(Factor * MiniPeriod),
consumed => 0
}}.
period => emqx_limiter_schema:default_period(),
produced => 0.0
}.
make_bucket([{Name, Conf} | T], Type, GlobalCfg, Factor, CounterNum, DelayBuckets) ->
make_bucket([{Name, Conf} | T], Type, GlobalCfg, CounterNum, DelayBuckets) ->
Path = emqx_limiter_manager:make_path(Type, Name),
case get_counter_rate(Conf, GlobalCfg) of
infinity ->
@ -514,13 +505,12 @@ make_bucket([{Name, Conf} | T], Type, GlobalCfg, Factor, CounterNum, DelayBucket
InitFun = fun(#{name := BucketName} = Bucket, #{buckets := Buckets} = State) ->
State#{buckets := Buckets#{BucketName => Bucket}}
end;
RawRate ->
Rate ->
#{capacity := Capacity} = Conf,
Initial = get_initial_val(Conf),
Rate = mul(RawRate, Factor),
CounterNum2 = CounterNum + 1,
InitFun = fun(#{name := BucketName} = Bucket, #{buckets := Buckets} = State) ->
{Counter, Idx, State2} = alloc_counter(Path, RawRate, Initial, State),
{Counter, Idx, State2} = alloc_counter(Path, Rate, Initial, State),
Bucket2 = Bucket#{counter := Counter, index := Idx},
State2#{buckets := Buckets#{BucketName => Bucket2}}
end
@ -542,11 +532,10 @@ make_bucket([{Name, Conf} | T], Type, GlobalCfg, Factor, CounterNum, DelayBucket
T,
Type,
GlobalCfg,
Factor,
CounterNum2,
[DelayInit | DelayBuckets]
);
make_bucket([], _Type, _Global, _Factor, CounterNum, DelayBuckets) ->
make_bucket([], _Type, _Global, CounterNum, DelayBuckets) ->
{CounterNum, DelayBuckets}.
-spec alloc_counter(emqx_limiter_manager:path(), rate(), capacity(), state()) ->

View File

@ -646,7 +646,7 @@ client_loop(
} = State
) ->
Now = ?NOW,
Period = emqx_limiter_schema:minimum_period(),
Period = emqx_limiter_schema:default_period(),
MinPeriod = erlang:ceil(0.25 * Period),
if
Now >= EndTime ->