diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_bucket_ref.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_bucket_ref.erl index 34110b161..ad9f6a7cc 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_bucket_ref.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_bucket_ref.erl @@ -50,13 +50,7 @@ %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- --spec new( - undefined | counters:countres_ref(), - undefined | index(), - rate() -) -> bucket_ref(). -new(undefined, _, _) -> - infinity; +-spec new(counters:countres_ref(), index(), rate()) -> bucket_ref(). new(Counter, Index, Rate) -> #{ counter => Counter, diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl index c050956ec..835661654 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl @@ -30,7 +30,8 @@ namespace/0, get_bucket_cfg_path/2, desc/1, - types/0 + types/0, + infinity_value/0, ]). -define(KILOBYTE, 1024). @@ -46,7 +47,7 @@ -type rate() :: infinity | float(). -type burst_rate() :: 0 | float(). %% the capacity of the token bucket --type capacity() :: infinity | number(). +-type capacity() :: non_neg_integer(). %% initial capacity of the token bucket -type initial() :: non_neg_integer(). -type bucket_path() :: list(atom()). @@ -207,6 +208,18 @@ types() -> %% Internal functions %%-------------------------------------------------------------------- +%% `infinity` to `infinity_value` rules: +%% 1. all infinity capacity will change to infinity_value +%% 2. if the rate of global and bucket both are `infinity`, +%% use `infinity_value` as bucket rate. see `emqx_limiter_server:get_counter_rate/2` +infinity_value() -> + %% 1 TB + 1099511627776. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + to_burst_rate(Str) -> to_rate(Str, false, true). @@ -294,7 +307,7 @@ to_quota(Str, Regex) -> {match, [Quota, ""]} -> {ok, erlang:list_to_integer(Quota)}; {match, ""} -> - {ok, infinity}; + {ok, infinity_value()}; _ -> {error, Str} end diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl index 9c344c752..c1956f780 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl @@ -118,18 +118,16 @@ connect(Type, BucketName) when is_atom(BucketName) -> ?SLOG(error, #{msg => "bucket_config_not_found", type => Type, bucket => BucketName}), {error, config_not_found}; #{ - rate := AggrRate, - capacity := AggrSize, + rate := BucketRate, + capacity := BucketSize, per_client := #{rate := CliRate, capacity := CliSize} = Cfg } -> case emqx_limiter_manager:find_bucket(Type, BucketName) of {ok, Bucket} -> {ok, if - CliRate < AggrRate orelse CliSize < AggrSize -> + CliRate < BucketRate orelse CliSize < BucketSize -> emqx_htb_limiter:make_token_bucket_limiter(Cfg, Bucket); - Bucket =:= infinity -> - emqx_htb_limiter:make_infinity_limiter(); true -> emqx_htb_limiter:make_ref_limiter(Cfg, Bucket) end}; @@ -372,9 +370,6 @@ longitudinal( case lists:min([ShouldAlloc, Flow, Capacity]) of Available when Available > 0 -> - %% XXX if capacity is infinity, and flow always > 0, the value in - %% counter will be overflow at some point in the future, do we need - %% to deal with this situation??? {Inc, Bucket2} = emqx_limiter_correction:add(Available, Bucket), counters:add(Counter, Index, Inc), @@ -491,26 +486,14 @@ make_root(#{rate := Rate, burst := Burst}) -> make_bucket([{Name, Conf} | T], Type, GlobalCfg, CounterNum, DelayBuckets) -> Path = emqx_limiter_manager:make_path(Type, Name), - case get_counter_rate(Conf, GlobalCfg) of - infinity -> - Rate = infinity, - Capacity = infinity, - Initial = 0, - Ref = emqx_limiter_bucket_ref:new(undefined, undefined, Rate), - emqx_limiter_manager:insert_bucket(Path, Ref), - CounterNum2 = CounterNum, - InitFun = fun(#{name := BucketName} = Bucket, #{buckets := Buckets} = State) -> - State#{buckets := Buckets#{BucketName => Bucket}} - end; - Rate -> - #{capacity := Capacity} = Conf, - Initial = get_initial_val(Conf), - CounterNum2 = CounterNum + 1, - InitFun = fun(#{name := BucketName} = Bucket, #{buckets := Buckets} = State) -> - {Counter, Idx, State2} = alloc_counter(Path, Rate, Initial, State), - Bucket2 = Bucket#{counter := Counter, index := Idx}, - State2#{buckets := Buckets#{BucketName => Bucket2}} - end + Rate = get_counter_rate(Conf, GlobalCfg), + #{capacity := Capacity} = Conf, + Initial = get_initial_val(Conf), + CounterNum2 = CounterNum + 1, + InitFun = fun(#{name := BucketName} = Bucket, #{buckets := Buckets} = State) -> + {Counter, Idx, State2} = alloc_counter(Path, Rate, Initial, State), + Bucket2 = Bucket#{counter := Counter, index := Idx}, + State2#{buckets := Buckets#{BucketName => Bucket2}} end, Bucket = #{ @@ -569,8 +552,10 @@ init_counter(Path, Counter, Index, Rate, Initial, State) -> %% @doc find first limited node get_counter_rate(#{rate := Rate}, _GlobalCfg) when Rate =/= infinity -> Rate; -get_counter_rate(_Cfg, #{rate := Rate}) -> - Rate. +get_counter_rate(_Cfg, #{rate := Rate}) when Rate =/= infinity -> + Rate; +get_counter_rate(_Cfg, _GlobalCfg) -> + emqx_limiter_schema:infinity_value(). -spec get_initial_val(hocons:config()) -> decimal(). get_initial_val(#{ @@ -579,12 +564,13 @@ get_initial_val(#{ capacity := Capacity }) -> %% initial will nevner be infinity(see the emqx_limiter_schema) + InfVal = emqx_limiter_schema:infinity_value(), if Initial > 0 -> Initial; Rate =/= infinity -> erlang:min(Rate, Capacity); - Capacity =/= infinity -> + Capacity =/= InfVal -> Capacity; true -> 0