From e7dec7835fe7b8c29832f49655ff57b5115f231a Mon Sep 17 00:00:00 2001 From: lafirest Date: Wed, 9 Mar 2022 18:57:02 +0800 Subject: [PATCH] feat(limiter): change zone to group and simplify config --- .../src/emqx_limiter/etc/emqx_limiter.conf | 46 ++-- .../emqx_limiter/src/emqx_limiter_manager.erl | 30 +-- .../emqx_limiter/src/emqx_limiter_schema.erl | 104 ++++++--- .../emqx_limiter/src/emqx_limiter_server.erl | 200 +++++++++++------- apps/emqx/src/emqx_schema.erl | 2 +- .../src/emqx_dashboard_swagger.erl | 2 + 6 files changed, 231 insertions(+), 153 deletions(-) diff --git a/apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf b/apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf index 4c1f1b7fb..d6edc7dd2 100644 --- a/apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf +++ b/apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf @@ -5,51 +5,49 @@ limiter { ## rate limiter for message publish bytes_in { - bucket.default { - aggregated.rate = infinity - aggregated.capacity = infinity - per_client.rate = infinity - per_client.capacity = infinity + group.default { + bucket.default { + rate = infinity + capacity = infinity + } } } ## rate limiter for message publish message_in { - bucket.default { - aggregated.rate = infinity - aggregated.capacity = infinity - per_client.rate = infinity - per_client.capacity = infinity + group.default { + bucket.default { + rate = infinity + capacity = infinity + } } } ## connection rate limiter connection { - bucket.default { - aggregated.rate = infinity - aggregated.capacity = infinity - per_client.rate = infinity - per_client.capacity = infinity + group.default { + bucket.default { + rate = infinity + capacity = infinity + } } } ## rate limiter for message deliver message_routing { - bucket.default { - aggregated.rate = infinity - aggregated.capacity = infinity - per_client.rate = infinity - per_client.capacity = infinity + group.default { + bucket.default { + rate = infinity + capacity = infinity + } } } ## Some functions that don't need to use global and zone scope, them can shared use this type shared { bucket.retainer { - aggregated.rate = infinity - aggregated.capacity = infinity - per_client.rate = infinity - per_client.capacity = infinity + rate = infinity + capacity = infinity } } } diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl index 4571a59a3..8295fa5d5 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl @@ -23,8 +23,8 @@ %% API -export([ start_link/0, start_server/1, find_bucket/1 - , find_bucket/3, insert_bucket/2, insert_bucket/4 - , make_path/3, restart_server/1]). + , find_bucket/2, insert_bucket/2, insert_bucket/3 + , make_path/2, make_path/3, restart_server/1]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -36,6 +36,7 @@ -type limiter_type() :: emqx_limiter_schema:limiter_type(). -type zone_name() :: emqx_limiter_schema:zone_name(). -type bucket_name() :: emqx_limiter_schema:bucket_name(). +-type bucket_path() :: emqx_limiter_schema:bucket_path(). %% counter record in ets table -record(bucket, { path :: path() @@ -57,10 +58,10 @@ start_server(Type) -> restart_server(Type) -> emqx_limiter_server_sup:restart(Type). --spec find_bucket(limiter_type(), zone_name(), bucket_name()) -> +-spec find_bucket(limiter_type(), bucket_path()) -> {ok, bucket_ref()} | undefined. -find_bucket(Type, Zone, BucketId) -> - find_bucket(make_path(Type, Zone, BucketId)). +find_bucket(Type, BucketPath) -> + find_bucket(make_path(Type, BucketPath)). -spec find_bucket(path()) -> {ok, bucket_ref()} | undefined. find_bucket(Path) -> @@ -72,21 +73,26 @@ find_bucket(Path) -> end. -spec insert_bucket(limiter_type(), - zone_name(), - bucket_name(), + bucket_path(), bucket_ref()) -> boolean(). -insert_bucket(Type, Zone, BucketId, Bucket) -> - inner_insert_bucket(make_path(Type, Zone, BucketId), - Bucket). +insert_bucket(Type, BucketPath, Bucket) -> + inner_insert_bucket(make_path(Type, BucketPath), Bucket). -spec insert_bucket(path(), bucket_ref()) -> true. insert_bucket(Path, Bucket) -> inner_insert_bucket(Path, Bucket). +-spec make_path(limiter_type(), bucket_path()) -> path(). +make_path(Type, BucketPath) -> + [Type | BucketPath]. + -spec make_path(limiter_type(), zone_name(), bucket_name()) -> path(). -make_path(Type, Name, BucketId) -> - [Type, Name, BucketId]. +make_path(shared, _GroupName, BucketName) -> + [shared, BucketName]; + +make_path(Type, GroupName, BucketName) -> + [Type, GroupName, BucketName]. %%-------------------------------------------------------------------- %% @doc diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl index c7a5af24a..77cf9cb75 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl @@ -20,7 +20,7 @@ -export([ roots/0, fields/1, to_rate/1, to_capacity/1 , minimum_period/0, to_burst_rate/1, to_initial/1 - , namespace/0]). + , namespace/0, to_bucket_path/1, default_group_name/0]). -define(KILOBYTE, 1024). @@ -36,6 +36,7 @@ -type burst_rate() :: 0 | float(). -type capacity() :: infinity | number(). %% the capacity of the token bucket -type initial() :: non_neg_integer(). %% initial capacity of the token bucket +-type bucket_path() :: list(atom()). %% the processing strategy after the failure of the token request -type failure_strategy() :: force %% Forced to pass @@ -46,17 +47,20 @@ -typerefl_from_string({burst_rate/0, ?MODULE, to_burst_rate}). -typerefl_from_string({capacity/0, ?MODULE, to_capacity}). -typerefl_from_string({initial/0, ?MODULE, to_initial}). +-typerefl_from_string({bucket_path/0, ?MODULE, to_bucket_path}). -reflect_type([ rate/0 , burst_rate/0 , capacity/0 , initial/0 , failure_strategy/0 + , bucket_path/0 ]). -export_type([limiter_type/0, bucket_name/0, zone_name/0]). -import(emqx_schema, [sc/2, map/2]). +-define(UNIT_TIME_IN_MS, 1000). namespace() -> limiter. @@ -67,43 +71,43 @@ fields(limiter) -> , {message_in, sc(ref(limiter_opts), #{})} , {connection, sc(ref(limiter_opts), #{})} , {message_routing, sc(ref(limiter_opts), #{})} - , {shared, sc(ref(shared_limiter_opts), + , {shared, sc(ref(shared_opts), #{description => <<"Some functions that do not need to use global and zone scope," "them can shared use this type">>})} ]; fields(limiter_opts) -> - [ {global, sc(ref(rate_burst), #{required => false})} - , {zone, sc(map("zone name", ref(rate_burst)), #{required => false})} - , {bucket, sc(map("bucket_id", ref(bucket)), - #{desc => "Token bucket"})} - ]; + fields(rate_burst) ++ %% the node global limit + [ {group, sc(map("group name", ref(group_opts)), #{})} + ]; -fields(shared_limiter_opts) -> - [{bucket, sc(map("bucket_id", ref(bucket)), - #{desc => "Token bucket"})} - ]; +fields(group_opts) -> + fields(rate_burst) ++ %% the group limite + [ {bucket, sc(map("bucket name", ref(bucket_opts)), #{})} + ]; fields(rate_burst) -> - [ {rate, sc(rate(), #{})} + [ {rate, sc(rate(), #{default => "infinity"})} , {burst, sc(burst_rate(), #{default => "0/0s"})} ]; -fields(bucket) -> - [ {zone, sc(atom(), #{desc => "The bucket's zone", default => default})} - , {aggregated, sc(ref(bucket_aggregated), #{})} - , {per_client, sc(ref(client_bucket), #{})} - ]; +fields(shared_opts) -> + [{bucket, sc(map("bucket name", ref(bucket_opts)), #{})}]; -fields(bucket_aggregated) -> +fields(bucket_opts) -> [ {rate, sc(rate(), #{})} , {initial, sc(initial(), #{default => "0"})} , {capacity, sc(capacity(), #{})} + , {per_client, sc(ref(client_bucket), + #{default => #{}, + desc => "The rate limit for each user of the bucket," + "this field is not required" + })} ]; fields(client_bucket) -> - [ {rate, sc(rate(), #{})} + [ {rate, sc(rate(), #{default => "infinity"})} , {initial, sc(initial(), #{default => "0"})} %% low_water_mark add for emqx_channel and emqx_session %% both modules consume first and then check @@ -113,13 +117,14 @@ fields(client_bucket) -> #{desc => "If the remaining tokens are lower than this value, the check/consume will succeed, but it will be forced to wait for a short period of time.", default => "0"})} - , {capacity, sc(capacity(), #{desc => "The capacity of the token bucket."})} + , {capacity, sc(capacity(), #{desc => "The capacity of the token bucket.", + default => "infinity"})} , {divisible, sc(boolean(), #{desc => "Is it possible to split the number of requested tokens?", default => false})} , {max_retry_time, sc(emqx_schema:duration(), #{ desc => "The maximum retry time when acquire failed." - , default => "5s"})} + , default => "10s"})} , {failure_strategy, sc(failure_strategy(), #{ desc => "The strategy when all the retries failed." , default => force})} @@ -132,6 +137,10 @@ minimum_period() -> to_rate(Str) -> to_rate(Str, true, false). +%% default group name for shared type limiter +default_group_name() -> + '_default'. + %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- @@ -145,22 +154,38 @@ to_rate(Str, CanInfinity, CanZero) -> case Tokens of ["infinity"] when CanInfinity -> {ok, infinity}; - ["0", _] when CanZero -> - {ok, 0}; %% for burst - [Quota, Interval] -> - {ok, Val} = to_capacity(Quota), - case emqx_schema:to_duration_ms(Interval) of - {ok, Ms} when Ms > 0 -> - {ok, Val * minimum_period() / Ms}; - _ -> - {error, Str} - end; + [QuotaStr] -> %% if time unit is 1s, it can be omitted + {ok, Val} = to_capacity(QuotaStr), + check_capacity(Str, Val, CanZero, + fun(Quota) -> + Quota * minimum_period() / ?UNIT_TIME_IN_MS + end); + [QuotaStr, Interval] -> + {ok, Val} = to_capacity(QuotaStr), + check_capacity(Str, Val, CanZero, + fun(Quota) -> + case emqx_schema:to_duration_ms(Interval) of + {ok, Ms} when Ms > 0 -> + {ok, Quota * minimum_period() / Ms}; + _ -> + {error, Str} + end + end); _ -> {error, Str} end. +check_capacity(_Str, 0, true, _Cont) -> + {ok, 0}; + +check_capacity(Str, 0, false, _Cont) -> + {error, Str}; + +check_capacity(_Str, Quota, _CanZero, Cont) -> + Cont(Quota). + to_capacity(Str) -> - Regex = "^\s*(?:(?:([1-9][0-9]*)([a-zA-z]*))|infinity)\s*$", + Regex = "^\s*(?:([0-9]+)([a-zA-z]*))|infinity\s*$", to_quota(Str, Regex). to_initial(Str) -> @@ -175,9 +200,9 @@ to_quota(Str, Regex) -> Val = erlang:list_to_integer(Quota), Unit2 = string:to_lower(Unit), {ok, apply_unit(Unit2, Val)}; - {match, [Quota]} -> + {match, [Quota, ""]} -> {ok, erlang:list_to_integer(Quota)}; - {match, []} -> + {match, ""} -> {ok, infinity}; _ -> {error, Str} @@ -188,3 +213,14 @@ apply_unit("kb", Val) -> Val * ?KILOBYTE; apply_unit("mb", Val) -> Val * ?KILOBYTE * ?KILOBYTE; apply_unit("gb", Val) -> Val * ?KILOBYTE * ?KILOBYTE * ?KILOBYTE; apply_unit(Unit, _) -> throw("invalid unit:" ++ Unit). + +to_bucket_path(Str) -> + Dirs = [erlang:list_to_atom(string:trim(T)) || T <- string:tokens(Str, ".")], + case Dirs of + [_Group, _Bucket] = Path -> + {ok, Path}; + [_Bucket] = Path -> + {ok, Path}; + _ -> + {error, Str} + end. diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl index c9984cd1a..b456d7046 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl @@ -1,4 +1,4 @@ -%%-------------------------------------------------------------------- +%-------------------------------------------------------------------- %% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); @@ -53,7 +53,9 @@ -type bucket() :: #{ id := node_id() , name := bucket_name() - , zone := zone_name() %% pointer to zone node, use for burst + %% pointer to zone node, use for burst + %% it also can use nodeId, nodeId is more direct, but nodeName is clearer + , zone := zone_name() , rate := rate() , obtained := non_neg_integer() , correction := emqx_limiter_decimal:zero_or_float() %% token correction value @@ -82,9 +84,11 @@ -type capacity() :: decimal(). -type decimal() :: emqx_limiter_decimal:decimal(). -type index() :: pos_integer(). +-type bucket_path() :: emqx_limiter_schema:bucket_path(). -define(CALL(Type), gen_server:call(name(Type), ?FUNCTION_NAME)). -define(OVERLOAD_MIN_ALLOC, 0.3). %% minimum coefficient for overloaded limiter +-define(CURRYING(X, Fun2), fun(Y) -> Fun2(X, Y) end). -export_type([index/0]). -import(emqx_limiter_decimal, [add/2, sub/2, mul/2, put_to_counter/3]). @@ -95,17 +99,28 @@ %% API %%-------------------------------------------------------------------- -spec connect(limiter_type(), - bucket_name() | #{limiter_type() => bucket_name()}) -> emqx_htb_limiter:limiter(). -connect(Type, BucketName) when is_atom(BucketName) -> - Path = [limiter, Type, bucket, BucketName], - case emqx:get_config(Path, undefined) of - undefined -> - ?SLOG(error, #{msg => "bucket_config_not_found", path => Path}), - throw("bucket's config not found"); - #{zone := Zone, - aggregated := #{rate := AggrRate, capacity := AggrSize}, - per_client := #{rate := CliRate, capacity := CliSize} = Cfg} -> - case emqx_limiter_manager:find_bucket(Type, Zone, BucketName) of + bucket_path() | #{limiter_type() => bucket_path() | undefined}) -> + emqx_htb_limiter:limiter(). +%% If no bucket path is set in config, there will be no limit +connect(_Type, undefined) -> + emqx_htb_limiter:make_infinity_limiter(undefined); + +%% Shared type can use bucket name directly +connect(shared, BucketName) when is_atom(BucketName) -> + connect(shared, [BucketName]); + +connect(Type, BucketPath) when is_list(BucketPath) -> + FullPath = get_bucket_full_cfg_path(Type, BucketPath), + case emqx:get_config(FullPath, undefined) of + undefined -> + io:format(">>>>> config:~p~n fullpath:~p~n", [emqx:get_config([limiter]), FullPath]), + io:format(">>>>> ets:~p~n", [ets:tab2list(emqx_limiter_counters)]), + ?SLOG(error, #{msg => "bucket_config_not_found", path => BucketPath}), + throw("bucket's config not found"); + #{rate := AggrRate, + capacity := AggrSize, + per_client := #{rate := CliRate, capacity := CliSize} = Cfg} -> + case emqx_limiter_manager:find_bucket(Type, BucketPath) of {ok, Bucket} -> if CliRate < AggrRate orelse CliSize < AggrSize -> emqx_htb_limiter:make_token_bucket_limiter(Cfg, Bucket); @@ -115,13 +130,14 @@ connect(Type, BucketName) when is_atom(BucketName) -> emqx_htb_limiter:make_ref_limiter(Cfg, Bucket) end; undefined -> - ?SLOG(error, #{msg => "bucket_not_found", path => Path}), + io:format(">>>>> ets:~p~n", [ets:tab2list(emqx_limiter_counters)]), + ?SLOG(error, #{msg => "bucket_not_found", path => BucketPath}), throw("invalid bucket") end end; -connect(Type, Names) -> - connect(Type, maps:get(Type, Names, default)). +connect(Type, Paths) -> + connect(Type, maps:get(Type, Paths, undefined)). -spec info(limiter_type()) -> state(). info(Type) -> @@ -374,6 +390,7 @@ maybe_burst(#{buckets := Buckets, nodes := Nodes} = State) when Burst > 0 -> %% find empty buckets and group by zone name GroupFun = fun(Id, Groups) -> + %% TODO filter undefined counter #{counter := Counter, index := Index, zone := Zone} = maps:get(Id, Nodes), @@ -426,7 +443,8 @@ dispatch_burst(GroupL, 0 -> NodeAcc; ZoneFlow -> EachFlow = ZoneFlow / erlang:length(Childs), - {Alloced, NodeAcc2} = dispatch_burst_to_buckets(Childs, EachFlow, 0, NodeAcc), + {Alloced, NodeAcc2} = + dispatch_burst_to_buckets(Childs, EachFlow, 0, NodeAcc), Zone2 = Zone#{obtained := Obtained + Alloced}, NodeAcc2#{ZoneId := Zone2} end @@ -434,7 +452,8 @@ dispatch_burst(GroupL, State#{nodes := lists:foldl(Dispatch, Nodes, GroupL)}. -spec dispatch_burst_to_buckets(list(node_id()), - float(), non_neg_integer(), nodes()) -> {non_neg_integer(), nodes()}. + float(), + non_neg_integer(), nodes()) -> {non_neg_integer(), nodes()}. dispatch_burst_to_buckets([ChildId | T], InFlow, Alloced, Nodes) -> #{counter := Counter, index := Index, @@ -451,76 +470,91 @@ dispatch_burst_to_buckets([], _, Alloced, Nodes) -> -spec init_tree(emqx_limiter_schema:limiter_type(), state()) -> state(). init_tree(Type, State) -> - case emqx:get_config([limiter, Type]) of - #{global := Global, - zone := Zone, - bucket := Bucket} -> ok; - #{bucket := Bucket} -> - Global = default_rate_burst_cfg(), - Zone = #{default => default_rate_burst_cfg()}, + Cfg = emqx:get_config([limiter, Type]), + GlobalCfg = maps:merge(#{rate => infinity, burst => 0}, Cfg), + case GlobalCfg of + #{group := Group} -> ok; + #{bucket := _} -> + Group = make_shared_default_group(GlobalCfg), ok end, - {Factor, Root} = make_root(Global, Zone), - State2 = State#{root := Root}, - {NodeId, State3} = make_zone(maps:to_list(Zone), Factor, 1, State2), - State4 = State3#{counter := counters:new(maps:size(Bucket), - [write_concurrency])}, - make_bucket(maps:to_list(Bucket), Global, Zone, Factor, NodeId, [], State4). --spec make_root(hocons:confg(), hocon:config()) -> {number(), root()}. -make_root(#{rate := Rate, burst := Burst}, Zone) -> - ZoneNum = maps:size(Zone), - Childs = lists:seq(1, ZoneNum), + {Factor, Root} = make_root(GlobalCfg), + {Zones, Nodes, DelayBuckets} = make_zone(maps:to_list(Group), Type, + GlobalCfg, Factor, 1, #{}, #{}, #{}), + + State2 = State#{root := Root#{childs := maps:values(Zones)}, + zones := Zones, + nodes := Nodes, + buckets := maps:keys(DelayBuckets), + counter := counters:new(maps:size(DelayBuckets), [write_concurrency]) + }, + + lists:foldl(fun(F, Acc) -> F(Acc) end, State2, maps:values(DelayBuckets)). + +-spec make_root(hocons:confg()) -> {number(), root()}. +make_root(#{rate := Rate, burst := Burst}) -> MiniPeriod = emqx_limiter_schema:minimum_period(), if Rate >= 1 -> {1, #{rate => Rate, burst => Burst, period => MiniPeriod, - childs => Childs, + childs => [], consumed => 0}}; true -> Factor = 1 / Rate, {Factor, #{rate => 1, burst => Burst * Factor, period => erlang:floor(Factor * MiniPeriod), - childs => Childs, + childs => [], consumed => 0}} end. -make_zone([{Name, ZoneCfg} | T], Factor, NodeId, State) -> - #{rate := Rate, burst := Burst} = ZoneCfg, - #{zones := Zones, nodes := Nodes} = State, +make_zone([{Name, ZoneCfg} | T], Type, GlobalCfg, Factor, NodeId, Zones, Nodes, DelayBuckets) -> + #{rate := Rate, burst := Burst, bucket := BucketMap} = ZoneCfg, + BucketCfgs = maps:to_list(BucketMap), + + FirstChildId = NodeId + 1, + Buckets = make_bucket(BucketCfgs, Type, GlobalCfg, Name, ZoneCfg, Factor, FirstChildId, #{}), + ChildNum = maps:size(Buckets), + NextZoneId = FirstChildId + ChildNum, + Zone = #{id => NodeId, name => Name, rate => mul(Rate, Factor), burst => Burst, obtained => 0, - childs => []}, - State2 = State#{zones := Zones#{Name => NodeId}, - nodes := Nodes#{NodeId => Zone}}, - make_zone(T, Factor, NodeId + 1, State2); + childs => maps:keys(Buckets) + }, -make_zone([], _, NodeId, State2) -> - {NodeId, State2}. + make_zone(T, Type, GlobalCfg, Factor, NextZoneId, + Zones#{Name => NodeId}, Nodes#{NodeId => Zone}, maps:merge(DelayBuckets, Buckets) + ); -make_bucket([{Name, Conf} | T], Global, Zone, Factor, Id, Buckets, #{type := Type} = State) -> - #{zone := ZoneName, - aggregated := Aggregated} = Conf, +make_zone([], _Type, _Global, _Factor, _NodeId, Zones, Nodes, DelayBuckets) -> + {Zones, Nodes, DelayBuckets}. + +make_bucket([{Name, Conf} | T], Type, GlobalCfg, ZoneName, ZoneCfg, Factor, Id, DelayBuckets) -> Path = emqx_limiter_manager:make_path(Type, ZoneName, Name), - case get_counter_rate(Conf, Zone, Global) of - infinity -> - State2 = State, - Rate = infinity, - Capacity = infinity, - Counter = undefined, - Index = undefined, - Ref = emqx_limiter_bucket_ref:new(Counter, Index, Rate), - emqx_limiter_manager:insert_bucket(Path, Ref); + case get_counter_rate(Conf, ZoneCfg, GlobalCfg) of + infinity -> + Rate = infinity, + Capacity = infinity, + Ref = emqx_limiter_bucket_ref:new(undefined, undefined, Rate), + emqx_limiter_manager:insert_bucket(Path, Ref), + InitFun = fun(#{id := NodeId} = Node, #{nodes := Nodes} = State) -> + State#{nodes := Nodes#{NodeId => Node}} + end; RawRate -> - #{capacity := Capacity} = Aggregated, - Initial = get_initial_val(Aggregated), - {Counter, Index, State2} = alloc_counter(Path, RawRate, Initial, State), - Rate = mul(RawRate, Factor) + #{capacity := Capacity} = Conf, + Initial = get_initial_val(Conf), + Rate = mul(RawRate, Factor), + + InitFun = fun(#{id := NodeId} = Node, #{nodes := Nodes} = State) -> + {Counter, Idx, State2} = alloc_counter(Path, RawRate, Initial, State), + Node2 = Node#{counter := Counter, index := Idx}, + State2#{nodes := Nodes#{NodeId => Node2}} + end end, Node = #{ id => Id @@ -530,14 +564,16 @@ make_bucket([{Name, Conf} | T], Global, Zone, Factor, Id, Buckets, #{type := Typ , obtained => 0 , correction => 0 , capacity => Capacity - , counter => Counter - , index => Index}, + , counter => undefined + , index => undefined}, - State3 = add_zone_child(Id, Node, ZoneName, State2), - make_bucket(T, Global, Zone, Factor, Id + 1, [Id | Buckets], State3); + DelayInit = ?CURRYING(Node, InitFun), -make_bucket([], _, _, _, _, Buckets, State) -> - State#{buckets := Buckets}. + make_bucket(T, + Type, GlobalCfg, ZoneName, ZoneCfg, Factor, Id + 1, DelayBuckets#{Id => DelayInit}); + +make_bucket([], _Type, _Global, _ZoneName, _Zone, _Factor, _Id, DelayBuckets) -> + DelayBuckets. -spec alloc_counter(emqx_limiter_manager:path(), rate(), capacity(), state()) -> {counters:counters_ref(), pos_integer(), state()}. @@ -558,20 +594,10 @@ init_counter(Path, Counter, Index, Rate, Initial, State) -> emqx_limiter_manager:insert_bucket(Path, Ref), {Counter, Index, State}. --spec add_zone_child(node_id(), bucket(), zone_name(), state()) -> state(). -add_zone_child(NodeId, Bucket, Name, #{zones := Zones, nodes := Nodes} = State) -> - ZoneId = maps:get(Name, Zones), - #{childs := Childs} = Zone = maps:get(ZoneId, Nodes), - Nodes2 = Nodes#{ZoneId => Zone#{childs := [NodeId | Childs]}, - NodeId => Bucket}, - State#{nodes := Nodes2}. - %% @doc find first limited node -get_counter_rate(#{zone := ZoneName, - aggregated := Cfg}, ZoneCfg, Global) -> - Zone = maps:get(ZoneName, ZoneCfg), +get_counter_rate(BucketCfg, ZoneCfg, GlobalCfg) -> Search = lists:search(fun(E) -> is_limited(E) end, - [Cfg, Zone, Global]), + [BucketCfg, ZoneCfg, GlobalCfg]), case Search of {value, #{rate := Rate}} -> Rate; @@ -585,6 +611,7 @@ is_limited(#{rate := Rate, capacity := Capacity}) -> is_limited(#{rate := Rate}) -> Rate =/= infinity. +-spec get_initial_val(hocons:config()) -> decimal(). get_initial_val(#{initial := Initial, rate := Rate, capacity := Capacity}) -> @@ -599,5 +626,14 @@ get_initial_val(#{initial := Initial, 0 end. -default_rate_burst_cfg() -> - #{rate => infinity, burst => 0}. +-spec make_shared_default_group(hocons:config()) -> honcs:config(). +make_shared_default_group(Cfg) -> + GroupName = emqx_limiter_schema:default_group_name(), + #{GroupName => Cfg#{rate => infinity, burst => 0}}. + +-spec get_bucket_full_cfg_path(limiter_type(), bucket_path()) -> list(atom()). +get_bucket_full_cfg_path(shared, [BucketName]) -> + [limiter, shared, bucket, BucketName]; + +get_bucket_full_cfg_path(Type, [GroupName, BucketName]) -> + [limiter, Type, group, GroupName, bucket, BucketName]. diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 592d6983a..e4da8a5cc 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1197,7 +1197,7 @@ base_listener() -> #{ default => 'default' })} , {"limiter", - sc(map("ratelimit bucket's name", atom()), #{default => #{}})} + sc(map("ratelimit bucket's name", emqx_limiter_schema:bucket_path()), #{default => #{}})} ]. %% utils diff --git a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl index 6121e39b6..153ec139a 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl @@ -493,6 +493,8 @@ typename_to_spec("failure_strategy()", _Mod) -> #{type => string, example => <<"force">>}; typename_to_spec("initial()", _Mod) -> #{type => string, example => <<"0M">>}; +typename_to_spec("bucket_path()", _Mod) -> + #{type => string, example => <<"groupName.bucketName">>}; typename_to_spec(Name, Mod) -> Spec = range(Name), Spec1 = remote_module_type(Spec, Name, Mod),