From e7dec7835fe7b8c29832f49655ff57b5115f231a Mon Sep 17 00:00:00 2001 From: lafirest Date: Wed, 9 Mar 2022 18:57:02 +0800 Subject: [PATCH 1/4] feat(limiter): change zone to group and simplify config --- .../src/emqx_limiter/etc/emqx_limiter.conf | 46 ++-- .../emqx_limiter/src/emqx_limiter_manager.erl | 30 +-- .../emqx_limiter/src/emqx_limiter_schema.erl | 104 ++++++--- .../emqx_limiter/src/emqx_limiter_server.erl | 200 +++++++++++------- apps/emqx/src/emqx_schema.erl | 2 +- .../src/emqx_dashboard_swagger.erl | 2 + 6 files changed, 231 insertions(+), 153 deletions(-) diff --git a/apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf b/apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf index 4c1f1b7fb..d6edc7dd2 100644 --- a/apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf +++ b/apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf @@ -5,51 +5,49 @@ limiter { ## rate limiter for message publish bytes_in { - bucket.default { - aggregated.rate = infinity - aggregated.capacity = infinity - per_client.rate = infinity - per_client.capacity = infinity + group.default { + bucket.default { + rate = infinity + capacity = infinity + } } } ## rate limiter for message publish message_in { - bucket.default { - aggregated.rate = infinity - aggregated.capacity = infinity - per_client.rate = infinity - per_client.capacity = infinity + group.default { + bucket.default { + rate = infinity + capacity = infinity + } } } ## connection rate limiter connection { - bucket.default { - aggregated.rate = infinity - aggregated.capacity = infinity - per_client.rate = infinity - per_client.capacity = infinity + group.default { + bucket.default { + rate = infinity + capacity = infinity + } } } ## rate limiter for message deliver message_routing { - bucket.default { - aggregated.rate = infinity - aggregated.capacity = infinity - per_client.rate = infinity - per_client.capacity = infinity + group.default { + bucket.default { + rate = infinity + capacity = infinity + } } } ## Some functions that don't need to use global and zone scope, them can shared use this type shared { bucket.retainer { - aggregated.rate = infinity - aggregated.capacity = infinity - per_client.rate = infinity - per_client.capacity = infinity + rate = infinity + capacity = infinity } } } diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl index 4571a59a3..8295fa5d5 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl @@ -23,8 +23,8 @@ %% API -export([ start_link/0, start_server/1, find_bucket/1 - , find_bucket/3, insert_bucket/2, insert_bucket/4 - , make_path/3, restart_server/1]). + , find_bucket/2, insert_bucket/2, insert_bucket/3 + , make_path/2, make_path/3, restart_server/1]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -36,6 +36,7 @@ -type limiter_type() :: emqx_limiter_schema:limiter_type(). -type zone_name() :: emqx_limiter_schema:zone_name(). -type bucket_name() :: emqx_limiter_schema:bucket_name(). +-type bucket_path() :: emqx_limiter_schema:bucket_path(). %% counter record in ets table -record(bucket, { path :: path() @@ -57,10 +58,10 @@ start_server(Type) -> restart_server(Type) -> emqx_limiter_server_sup:restart(Type). --spec find_bucket(limiter_type(), zone_name(), bucket_name()) -> +-spec find_bucket(limiter_type(), bucket_path()) -> {ok, bucket_ref()} | undefined. -find_bucket(Type, Zone, BucketId) -> - find_bucket(make_path(Type, Zone, BucketId)). +find_bucket(Type, BucketPath) -> + find_bucket(make_path(Type, BucketPath)). -spec find_bucket(path()) -> {ok, bucket_ref()} | undefined. find_bucket(Path) -> @@ -72,21 +73,26 @@ find_bucket(Path) -> end. -spec insert_bucket(limiter_type(), - zone_name(), - bucket_name(), + bucket_path(), bucket_ref()) -> boolean(). -insert_bucket(Type, Zone, BucketId, Bucket) -> - inner_insert_bucket(make_path(Type, Zone, BucketId), - Bucket). +insert_bucket(Type, BucketPath, Bucket) -> + inner_insert_bucket(make_path(Type, BucketPath), Bucket). -spec insert_bucket(path(), bucket_ref()) -> true. insert_bucket(Path, Bucket) -> inner_insert_bucket(Path, Bucket). +-spec make_path(limiter_type(), bucket_path()) -> path(). +make_path(Type, BucketPath) -> + [Type | BucketPath]. + -spec make_path(limiter_type(), zone_name(), bucket_name()) -> path(). -make_path(Type, Name, BucketId) -> - [Type, Name, BucketId]. +make_path(shared, _GroupName, BucketName) -> + [shared, BucketName]; + +make_path(Type, GroupName, BucketName) -> + [Type, GroupName, BucketName]. %%-------------------------------------------------------------------- %% @doc diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl index c7a5af24a..77cf9cb75 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl @@ -20,7 +20,7 @@ -export([ roots/0, fields/1, to_rate/1, to_capacity/1 , minimum_period/0, to_burst_rate/1, to_initial/1 - , namespace/0]). + , namespace/0, to_bucket_path/1, default_group_name/0]). -define(KILOBYTE, 1024). @@ -36,6 +36,7 @@ -type burst_rate() :: 0 | float(). -type capacity() :: infinity | number(). %% the capacity of the token bucket -type initial() :: non_neg_integer(). %% initial capacity of the token bucket +-type bucket_path() :: list(atom()). %% the processing strategy after the failure of the token request -type failure_strategy() :: force %% Forced to pass @@ -46,17 +47,20 @@ -typerefl_from_string({burst_rate/0, ?MODULE, to_burst_rate}). -typerefl_from_string({capacity/0, ?MODULE, to_capacity}). -typerefl_from_string({initial/0, ?MODULE, to_initial}). +-typerefl_from_string({bucket_path/0, ?MODULE, to_bucket_path}). -reflect_type([ rate/0 , burst_rate/0 , capacity/0 , initial/0 , failure_strategy/0 + , bucket_path/0 ]). -export_type([limiter_type/0, bucket_name/0, zone_name/0]). -import(emqx_schema, [sc/2, map/2]). +-define(UNIT_TIME_IN_MS, 1000). namespace() -> limiter. @@ -67,43 +71,43 @@ fields(limiter) -> , {message_in, sc(ref(limiter_opts), #{})} , {connection, sc(ref(limiter_opts), #{})} , {message_routing, sc(ref(limiter_opts), #{})} - , {shared, sc(ref(shared_limiter_opts), + , {shared, sc(ref(shared_opts), #{description => <<"Some functions that do not need to use global and zone scope," "them can shared use this type">>})} ]; fields(limiter_opts) -> - [ {global, sc(ref(rate_burst), #{required => false})} - , {zone, sc(map("zone name", ref(rate_burst)), #{required => false})} - , {bucket, sc(map("bucket_id", ref(bucket)), - #{desc => "Token bucket"})} - ]; + fields(rate_burst) ++ %% the node global limit + [ {group, sc(map("group name", ref(group_opts)), #{})} + ]; -fields(shared_limiter_opts) -> - [{bucket, sc(map("bucket_id", ref(bucket)), - #{desc => "Token bucket"})} - ]; +fields(group_opts) -> + fields(rate_burst) ++ %% the group limite + [ {bucket, sc(map("bucket name", ref(bucket_opts)), #{})} + ]; fields(rate_burst) -> - [ {rate, sc(rate(), #{})} + [ {rate, sc(rate(), #{default => "infinity"})} , {burst, sc(burst_rate(), #{default => "0/0s"})} ]; -fields(bucket) -> - [ {zone, sc(atom(), #{desc => "The bucket's zone", default => default})} - , {aggregated, sc(ref(bucket_aggregated), #{})} - , {per_client, sc(ref(client_bucket), #{})} - ]; +fields(shared_opts) -> + [{bucket, sc(map("bucket name", ref(bucket_opts)), #{})}]; -fields(bucket_aggregated) -> +fields(bucket_opts) -> [ {rate, sc(rate(), #{})} , {initial, sc(initial(), #{default => "0"})} , {capacity, sc(capacity(), #{})} + , {per_client, sc(ref(client_bucket), + #{default => #{}, + desc => "The rate limit for each user of the bucket," + "this field is not required" + })} ]; fields(client_bucket) -> - [ {rate, sc(rate(), #{})} + [ {rate, sc(rate(), #{default => "infinity"})} , {initial, sc(initial(), #{default => "0"})} %% low_water_mark add for emqx_channel and emqx_session %% both modules consume first and then check @@ -113,13 +117,14 @@ fields(client_bucket) -> #{desc => "If the remaining tokens are lower than this value, the check/consume will succeed, but it will be forced to wait for a short period of time.", default => "0"})} - , {capacity, sc(capacity(), #{desc => "The capacity of the token bucket."})} + , {capacity, sc(capacity(), #{desc => "The capacity of the token bucket.", + default => "infinity"})} , {divisible, sc(boolean(), #{desc => "Is it possible to split the number of requested tokens?", default => false})} , {max_retry_time, sc(emqx_schema:duration(), #{ desc => "The maximum retry time when acquire failed." - , default => "5s"})} + , default => "10s"})} , {failure_strategy, sc(failure_strategy(), #{ desc => "The strategy when all the retries failed." , default => force})} @@ -132,6 +137,10 @@ minimum_period() -> to_rate(Str) -> to_rate(Str, true, false). +%% default group name for shared type limiter +default_group_name() -> + '_default'. + %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- @@ -145,22 +154,38 @@ to_rate(Str, CanInfinity, CanZero) -> case Tokens of ["infinity"] when CanInfinity -> {ok, infinity}; - ["0", _] when CanZero -> - {ok, 0}; %% for burst - [Quota, Interval] -> - {ok, Val} = to_capacity(Quota), - case emqx_schema:to_duration_ms(Interval) of - {ok, Ms} when Ms > 0 -> - {ok, Val * minimum_period() / Ms}; - _ -> - {error, Str} - end; + [QuotaStr] -> %% if time unit is 1s, it can be omitted + {ok, Val} = to_capacity(QuotaStr), + check_capacity(Str, Val, CanZero, + fun(Quota) -> + Quota * minimum_period() / ?UNIT_TIME_IN_MS + end); + [QuotaStr, Interval] -> + {ok, Val} = to_capacity(QuotaStr), + check_capacity(Str, Val, CanZero, + fun(Quota) -> + case emqx_schema:to_duration_ms(Interval) of + {ok, Ms} when Ms > 0 -> + {ok, Quota * minimum_period() / Ms}; + _ -> + {error, Str} + end + end); _ -> {error, Str} end. +check_capacity(_Str, 0, true, _Cont) -> + {ok, 0}; + +check_capacity(Str, 0, false, _Cont) -> + {error, Str}; + +check_capacity(_Str, Quota, _CanZero, Cont) -> + Cont(Quota). + to_capacity(Str) -> - Regex = "^\s*(?:(?:([1-9][0-9]*)([a-zA-z]*))|infinity)\s*$", + Regex = "^\s*(?:([0-9]+)([a-zA-z]*))|infinity\s*$", to_quota(Str, Regex). to_initial(Str) -> @@ -175,9 +200,9 @@ to_quota(Str, Regex) -> Val = erlang:list_to_integer(Quota), Unit2 = string:to_lower(Unit), {ok, apply_unit(Unit2, Val)}; - {match, [Quota]} -> + {match, [Quota, ""]} -> {ok, erlang:list_to_integer(Quota)}; - {match, []} -> + {match, ""} -> {ok, infinity}; _ -> {error, Str} @@ -188,3 +213,14 @@ apply_unit("kb", Val) -> Val * ?KILOBYTE; apply_unit("mb", Val) -> Val * ?KILOBYTE * ?KILOBYTE; apply_unit("gb", Val) -> Val * ?KILOBYTE * ?KILOBYTE * ?KILOBYTE; apply_unit(Unit, _) -> throw("invalid unit:" ++ Unit). + +to_bucket_path(Str) -> + Dirs = [erlang:list_to_atom(string:trim(T)) || T <- string:tokens(Str, ".")], + case Dirs of + [_Group, _Bucket] = Path -> + {ok, Path}; + [_Bucket] = Path -> + {ok, Path}; + _ -> + {error, Str} + end. diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl index c9984cd1a..b456d7046 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl @@ -1,4 +1,4 @@ -%%-------------------------------------------------------------------- +%-------------------------------------------------------------------- %% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); @@ -53,7 +53,9 @@ -type bucket() :: #{ id := node_id() , name := bucket_name() - , zone := zone_name() %% pointer to zone node, use for burst + %% pointer to zone node, use for burst + %% it also can use nodeId, nodeId is more direct, but nodeName is clearer + , zone := zone_name() , rate := rate() , obtained := non_neg_integer() , correction := emqx_limiter_decimal:zero_or_float() %% token correction value @@ -82,9 +84,11 @@ -type capacity() :: decimal(). -type decimal() :: emqx_limiter_decimal:decimal(). -type index() :: pos_integer(). +-type bucket_path() :: emqx_limiter_schema:bucket_path(). -define(CALL(Type), gen_server:call(name(Type), ?FUNCTION_NAME)). -define(OVERLOAD_MIN_ALLOC, 0.3). %% minimum coefficient for overloaded limiter +-define(CURRYING(X, Fun2), fun(Y) -> Fun2(X, Y) end). -export_type([index/0]). -import(emqx_limiter_decimal, [add/2, sub/2, mul/2, put_to_counter/3]). @@ -95,17 +99,28 @@ %% API %%-------------------------------------------------------------------- -spec connect(limiter_type(), - bucket_name() | #{limiter_type() => bucket_name()}) -> emqx_htb_limiter:limiter(). -connect(Type, BucketName) when is_atom(BucketName) -> - Path = [limiter, Type, bucket, BucketName], - case emqx:get_config(Path, undefined) of - undefined -> - ?SLOG(error, #{msg => "bucket_config_not_found", path => Path}), - throw("bucket's config not found"); - #{zone := Zone, - aggregated := #{rate := AggrRate, capacity := AggrSize}, - per_client := #{rate := CliRate, capacity := CliSize} = Cfg} -> - case emqx_limiter_manager:find_bucket(Type, Zone, BucketName) of + bucket_path() | #{limiter_type() => bucket_path() | undefined}) -> + emqx_htb_limiter:limiter(). +%% If no bucket path is set in config, there will be no limit +connect(_Type, undefined) -> + emqx_htb_limiter:make_infinity_limiter(undefined); + +%% Shared type can use bucket name directly +connect(shared, BucketName) when is_atom(BucketName) -> + connect(shared, [BucketName]); + +connect(Type, BucketPath) when is_list(BucketPath) -> + FullPath = get_bucket_full_cfg_path(Type, BucketPath), + case emqx:get_config(FullPath, undefined) of + undefined -> + io:format(">>>>> config:~p~n fullpath:~p~n", [emqx:get_config([limiter]), FullPath]), + io:format(">>>>> ets:~p~n", [ets:tab2list(emqx_limiter_counters)]), + ?SLOG(error, #{msg => "bucket_config_not_found", path => BucketPath}), + throw("bucket's config not found"); + #{rate := AggrRate, + capacity := AggrSize, + per_client := #{rate := CliRate, capacity := CliSize} = Cfg} -> + case emqx_limiter_manager:find_bucket(Type, BucketPath) of {ok, Bucket} -> if CliRate < AggrRate orelse CliSize < AggrSize -> emqx_htb_limiter:make_token_bucket_limiter(Cfg, Bucket); @@ -115,13 +130,14 @@ connect(Type, BucketName) when is_atom(BucketName) -> emqx_htb_limiter:make_ref_limiter(Cfg, Bucket) end; undefined -> - ?SLOG(error, #{msg => "bucket_not_found", path => Path}), + io:format(">>>>> ets:~p~n", [ets:tab2list(emqx_limiter_counters)]), + ?SLOG(error, #{msg => "bucket_not_found", path => BucketPath}), throw("invalid bucket") end end; -connect(Type, Names) -> - connect(Type, maps:get(Type, Names, default)). +connect(Type, Paths) -> + connect(Type, maps:get(Type, Paths, undefined)). -spec info(limiter_type()) -> state(). info(Type) -> @@ -374,6 +390,7 @@ maybe_burst(#{buckets := Buckets, nodes := Nodes} = State) when Burst > 0 -> %% find empty buckets and group by zone name GroupFun = fun(Id, Groups) -> + %% TODO filter undefined counter #{counter := Counter, index := Index, zone := Zone} = maps:get(Id, Nodes), @@ -426,7 +443,8 @@ dispatch_burst(GroupL, 0 -> NodeAcc; ZoneFlow -> EachFlow = ZoneFlow / erlang:length(Childs), - {Alloced, NodeAcc2} = dispatch_burst_to_buckets(Childs, EachFlow, 0, NodeAcc), + {Alloced, NodeAcc2} = + dispatch_burst_to_buckets(Childs, EachFlow, 0, NodeAcc), Zone2 = Zone#{obtained := Obtained + Alloced}, NodeAcc2#{ZoneId := Zone2} end @@ -434,7 +452,8 @@ dispatch_burst(GroupL, State#{nodes := lists:foldl(Dispatch, Nodes, GroupL)}. -spec dispatch_burst_to_buckets(list(node_id()), - float(), non_neg_integer(), nodes()) -> {non_neg_integer(), nodes()}. + float(), + non_neg_integer(), nodes()) -> {non_neg_integer(), nodes()}. dispatch_burst_to_buckets([ChildId | T], InFlow, Alloced, Nodes) -> #{counter := Counter, index := Index, @@ -451,76 +470,91 @@ dispatch_burst_to_buckets([], _, Alloced, Nodes) -> -spec init_tree(emqx_limiter_schema:limiter_type(), state()) -> state(). init_tree(Type, State) -> - case emqx:get_config([limiter, Type]) of - #{global := Global, - zone := Zone, - bucket := Bucket} -> ok; - #{bucket := Bucket} -> - Global = default_rate_burst_cfg(), - Zone = #{default => default_rate_burst_cfg()}, + Cfg = emqx:get_config([limiter, Type]), + GlobalCfg = maps:merge(#{rate => infinity, burst => 0}, Cfg), + case GlobalCfg of + #{group := Group} -> ok; + #{bucket := _} -> + Group = make_shared_default_group(GlobalCfg), ok end, - {Factor, Root} = make_root(Global, Zone), - State2 = State#{root := Root}, - {NodeId, State3} = make_zone(maps:to_list(Zone), Factor, 1, State2), - State4 = State3#{counter := counters:new(maps:size(Bucket), - [write_concurrency])}, - make_bucket(maps:to_list(Bucket), Global, Zone, Factor, NodeId, [], State4). --spec make_root(hocons:confg(), hocon:config()) -> {number(), root()}. -make_root(#{rate := Rate, burst := Burst}, Zone) -> - ZoneNum = maps:size(Zone), - Childs = lists:seq(1, ZoneNum), + {Factor, Root} = make_root(GlobalCfg), + {Zones, Nodes, DelayBuckets} = make_zone(maps:to_list(Group), Type, + GlobalCfg, Factor, 1, #{}, #{}, #{}), + + State2 = State#{root := Root#{childs := maps:values(Zones)}, + zones := Zones, + nodes := Nodes, + buckets := maps:keys(DelayBuckets), + counter := counters:new(maps:size(DelayBuckets), [write_concurrency]) + }, + + lists:foldl(fun(F, Acc) -> F(Acc) end, State2, maps:values(DelayBuckets)). + +-spec make_root(hocons:confg()) -> {number(), root()}. +make_root(#{rate := Rate, burst := Burst}) -> MiniPeriod = emqx_limiter_schema:minimum_period(), if Rate >= 1 -> {1, #{rate => Rate, burst => Burst, period => MiniPeriod, - childs => Childs, + childs => [], consumed => 0}}; true -> Factor = 1 / Rate, {Factor, #{rate => 1, burst => Burst * Factor, period => erlang:floor(Factor * MiniPeriod), - childs => Childs, + childs => [], consumed => 0}} end. -make_zone([{Name, ZoneCfg} | T], Factor, NodeId, State) -> - #{rate := Rate, burst := Burst} = ZoneCfg, - #{zones := Zones, nodes := Nodes} = State, +make_zone([{Name, ZoneCfg} | T], Type, GlobalCfg, Factor, NodeId, Zones, Nodes, DelayBuckets) -> + #{rate := Rate, burst := Burst, bucket := BucketMap} = ZoneCfg, + BucketCfgs = maps:to_list(BucketMap), + + FirstChildId = NodeId + 1, + Buckets = make_bucket(BucketCfgs, Type, GlobalCfg, Name, ZoneCfg, Factor, FirstChildId, #{}), + ChildNum = maps:size(Buckets), + NextZoneId = FirstChildId + ChildNum, + Zone = #{id => NodeId, name => Name, rate => mul(Rate, Factor), burst => Burst, obtained => 0, - childs => []}, - State2 = State#{zones := Zones#{Name => NodeId}, - nodes := Nodes#{NodeId => Zone}}, - make_zone(T, Factor, NodeId + 1, State2); + childs => maps:keys(Buckets) + }, -make_zone([], _, NodeId, State2) -> - {NodeId, State2}. + make_zone(T, Type, GlobalCfg, Factor, NextZoneId, + Zones#{Name => NodeId}, Nodes#{NodeId => Zone}, maps:merge(DelayBuckets, Buckets) + ); -make_bucket([{Name, Conf} | T], Global, Zone, Factor, Id, Buckets, #{type := Type} = State) -> - #{zone := ZoneName, - aggregated := Aggregated} = Conf, +make_zone([], _Type, _Global, _Factor, _NodeId, Zones, Nodes, DelayBuckets) -> + {Zones, Nodes, DelayBuckets}. + +make_bucket([{Name, Conf} | T], Type, GlobalCfg, ZoneName, ZoneCfg, Factor, Id, DelayBuckets) -> Path = emqx_limiter_manager:make_path(Type, ZoneName, Name), - case get_counter_rate(Conf, Zone, Global) of - infinity -> - State2 = State, - Rate = infinity, - Capacity = infinity, - Counter = undefined, - Index = undefined, - Ref = emqx_limiter_bucket_ref:new(Counter, Index, Rate), - emqx_limiter_manager:insert_bucket(Path, Ref); + case get_counter_rate(Conf, ZoneCfg, GlobalCfg) of + infinity -> + Rate = infinity, + Capacity = infinity, + Ref = emqx_limiter_bucket_ref:new(undefined, undefined, Rate), + emqx_limiter_manager:insert_bucket(Path, Ref), + InitFun = fun(#{id := NodeId} = Node, #{nodes := Nodes} = State) -> + State#{nodes := Nodes#{NodeId => Node}} + end; RawRate -> - #{capacity := Capacity} = Aggregated, - Initial = get_initial_val(Aggregated), - {Counter, Index, State2} = alloc_counter(Path, RawRate, Initial, State), - Rate = mul(RawRate, Factor) + #{capacity := Capacity} = Conf, + Initial = get_initial_val(Conf), + Rate = mul(RawRate, Factor), + + InitFun = fun(#{id := NodeId} = Node, #{nodes := Nodes} = State) -> + {Counter, Idx, State2} = alloc_counter(Path, RawRate, Initial, State), + Node2 = Node#{counter := Counter, index := Idx}, + State2#{nodes := Nodes#{NodeId => Node2}} + end end, Node = #{ id => Id @@ -530,14 +564,16 @@ make_bucket([{Name, Conf} | T], Global, Zone, Factor, Id, Buckets, #{type := Typ , obtained => 0 , correction => 0 , capacity => Capacity - , counter => Counter - , index => Index}, + , counter => undefined + , index => undefined}, - State3 = add_zone_child(Id, Node, ZoneName, State2), - make_bucket(T, Global, Zone, Factor, Id + 1, [Id | Buckets], State3); + DelayInit = ?CURRYING(Node, InitFun), -make_bucket([], _, _, _, _, Buckets, State) -> - State#{buckets := Buckets}. + make_bucket(T, + Type, GlobalCfg, ZoneName, ZoneCfg, Factor, Id + 1, DelayBuckets#{Id => DelayInit}); + +make_bucket([], _Type, _Global, _ZoneName, _Zone, _Factor, _Id, DelayBuckets) -> + DelayBuckets. -spec alloc_counter(emqx_limiter_manager:path(), rate(), capacity(), state()) -> {counters:counters_ref(), pos_integer(), state()}. @@ -558,20 +594,10 @@ init_counter(Path, Counter, Index, Rate, Initial, State) -> emqx_limiter_manager:insert_bucket(Path, Ref), {Counter, Index, State}. --spec add_zone_child(node_id(), bucket(), zone_name(), state()) -> state(). -add_zone_child(NodeId, Bucket, Name, #{zones := Zones, nodes := Nodes} = State) -> - ZoneId = maps:get(Name, Zones), - #{childs := Childs} = Zone = maps:get(ZoneId, Nodes), - Nodes2 = Nodes#{ZoneId => Zone#{childs := [NodeId | Childs]}, - NodeId => Bucket}, - State#{nodes := Nodes2}. - %% @doc find first limited node -get_counter_rate(#{zone := ZoneName, - aggregated := Cfg}, ZoneCfg, Global) -> - Zone = maps:get(ZoneName, ZoneCfg), +get_counter_rate(BucketCfg, ZoneCfg, GlobalCfg) -> Search = lists:search(fun(E) -> is_limited(E) end, - [Cfg, Zone, Global]), + [BucketCfg, ZoneCfg, GlobalCfg]), case Search of {value, #{rate := Rate}} -> Rate; @@ -585,6 +611,7 @@ is_limited(#{rate := Rate, capacity := Capacity}) -> is_limited(#{rate := Rate}) -> Rate =/= infinity. +-spec get_initial_val(hocons:config()) -> decimal(). get_initial_val(#{initial := Initial, rate := Rate, capacity := Capacity}) -> @@ -599,5 +626,14 @@ get_initial_val(#{initial := Initial, 0 end. -default_rate_burst_cfg() -> - #{rate => infinity, burst => 0}. +-spec make_shared_default_group(hocons:config()) -> honcs:config(). +make_shared_default_group(Cfg) -> + GroupName = emqx_limiter_schema:default_group_name(), + #{GroupName => Cfg#{rate => infinity, burst => 0}}. + +-spec get_bucket_full_cfg_path(limiter_type(), bucket_path()) -> list(atom()). +get_bucket_full_cfg_path(shared, [BucketName]) -> + [limiter, shared, bucket, BucketName]; + +get_bucket_full_cfg_path(Type, [GroupName, BucketName]) -> + [limiter, Type, group, GroupName, bucket, BucketName]. diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 592d6983a..e4da8a5cc 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1197,7 +1197,7 @@ base_listener() -> #{ default => 'default' })} , {"limiter", - sc(map("ratelimit bucket's name", atom()), #{default => #{}})} + sc(map("ratelimit bucket's name", emqx_limiter_schema:bucket_path()), #{default => #{}})} ]. %% utils diff --git a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl index 6121e39b6..153ec139a 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl @@ -493,6 +493,8 @@ typename_to_spec("failure_strategy()", _Mod) -> #{type => string, example => <<"force">>}; typename_to_spec("initial()", _Mod) -> #{type => string, example => <<"0M">>}; +typename_to_spec("bucket_path()", _Mod) -> + #{type => string, example => <<"groupName.bucketName">>}; typename_to_spec(Name, Mod) -> Spec = range(Name), Spec1 = remote_module_type(Spec, Name, Mod), From 32030c8369a0850d5081b68558e8d0ee9901e4bf Mon Sep 17 00:00:00 2001 From: firest Date: Fri, 11 Mar 2022 18:04:35 +0800 Subject: [PATCH 2/4] feat(limiter): remove the group(zone) level --- .../src/emqx_limiter/etc/emqx_limiter.conf | 36 +- .../src/emqx_limiter/src/emqx_htb_limiter.erl | 13 +- .../src/emqx_limiter_container.erl | 8 +- .../emqx_limiter/src/emqx_limiter_manager.erl | 30 +- .../emqx_limiter/src/emqx_limiter_schema.erl | 55 +-- .../emqx_limiter/src/emqx_limiter_server.erl | 400 ++++++------------ apps/emqx/src/emqx_schema.erl | 2 +- apps/emqx_retainer/etc/emqx_retainer.conf | 4 +- .../src/emqx_retainer_dispatcher.erl | 8 +- .../src/emqx_retainer_schema.erl | 2 +- 10 files changed, 190 insertions(+), 368 deletions(-) diff --git a/apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf b/apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf index d6edc7dd2..a29903205 100644 --- a/apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf +++ b/apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf @@ -5,46 +5,38 @@ limiter { ## rate limiter for message publish bytes_in { - group.default { - bucket.default { - rate = infinity - capacity = infinity - } + bucket.default { + rate = infinity + capacity = infinity } } ## rate limiter for message publish message_in { - group.default { - bucket.default { - rate = infinity - capacity = infinity - } + bucket.default { + rate = infinity + capacity = infinity } } ## connection rate limiter connection { - group.default { - bucket.default { - rate = infinity - capacity = infinity - } + bucket.default { + rate = infinity + capacity = infinity } } ## rate limiter for message deliver message_routing { - group.default { - bucket.default { - rate = infinity - capacity = infinity - } + bucket.default { + rate = infinity + capacity = infinity } } - ## Some functions that don't need to use global and zone scope, them can shared use this type - shared { + ## rate limiter for internal batch operation + batch { bucket.retainer { rate = infinity capacity = infinity diff --git a/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl b/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl index 45256c00b..74cd62833 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl @@ -28,18 +28,23 @@ -export_type([token_bucket_limiter/0]). %% a token bucket limiter with a limiter server's bucket reference --type token_bucket_limiter() :: #{ tokens := non_neg_integer() %% the number of tokens currently available +-type token_bucket_limiter() :: #{ %% the number of tokens currently available + tokens := non_neg_integer() , rate := decimal() , capacity := decimal() , lasttime := millisecond() - , max_retry_time := non_neg_integer() %% @see emqx_limiter_schema - , failure_strategy := failure_strategy() %% @see emqx_limiter_schema + %% @see emqx_limiter_schema + , max_retry_time := non_neg_integer() + %% @see emqx_limiter_schema + , failure_strategy := failure_strategy() , divisible := boolean() %% @see emqx_limiter_schema , low_water_mark := non_neg_integer() %% @see emqx_limiter_schema , bucket := bucket() %% the limiter server's bucket %% retry contenxt - , retry_ctx => undefined %% undefined meaning there is no retry context or no need to retry + %% undefined meaning no retry context or no need to retry + + , retry_ctx => undefined | retry_context(token_bucket_limiter()) %% the retry context , atom => any() %% allow to add other keys }. diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_container.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_container.erl index 998bd9432..395174448 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_container.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_container.erl @@ -30,7 +30,10 @@ -export_type([container/0, check_result/0]). -type container() :: #{ limiter_type() => undefined | limiter() - , retry_key() => undefined | retry_context() | future() %% the retry context of the limiter + %% the retry context of the limiter + , retry_key() => undefined + | retry_context() + | future() , retry_ctx := undefined | any() %% the retry context of the container }. @@ -62,7 +65,8 @@ new(Types) -> %% @doc generate a container %% according to the type of limiter and the bucket name configuration of the limiter %% @end --spec get_limiter_by_names(list(limiter_type()), #{limiter_type() => emqx_limiter_schema:bucket_name()}) -> container(). +-spec get_limiter_by_names(list(limiter_type()), + #{limiter_type() => emqx_limiter_schema:bucket_name()}) -> container(). get_limiter_by_names(Types, BucketNames) -> Init = fun(Type, Acc) -> Limiter = emqx_limiter_server:connect(Type, BucketNames), diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl index 8295fa5d5..9cf4bc1d0 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl @@ -24,7 +24,8 @@ %% API -export([ start_link/0, start_server/1, find_bucket/1 , find_bucket/2, insert_bucket/2, insert_bucket/3 - , make_path/2, make_path/3, restart_server/1]). + , make_path/2, restart_server/1 + ]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -34,9 +35,7 @@ -type path() :: list(atom()). -type limiter_type() :: emqx_limiter_schema:limiter_type(). --type zone_name() :: emqx_limiter_schema:zone_name(). -type bucket_name() :: emqx_limiter_schema:bucket_name(). --type bucket_path() :: emqx_limiter_schema:bucket_path(). %% counter record in ets table -record(bucket, { path :: path() @@ -58,10 +57,10 @@ start_server(Type) -> restart_server(Type) -> emqx_limiter_server_sup:restart(Type). --spec find_bucket(limiter_type(), bucket_path()) -> +-spec find_bucket(limiter_type(), bucket_name()) -> {ok, bucket_ref()} | undefined. -find_bucket(Type, BucketPath) -> - find_bucket(make_path(Type, BucketPath)). +find_bucket(Type, BucketName) -> + find_bucket(make_path(Type, BucketName)). -spec find_bucket(path()) -> {ok, bucket_ref()} | undefined. find_bucket(Path) -> @@ -73,26 +72,19 @@ find_bucket(Path) -> end. -spec insert_bucket(limiter_type(), - bucket_path(), + bucket_name(), bucket_ref()) -> boolean(). -insert_bucket(Type, BucketPath, Bucket) -> - inner_insert_bucket(make_path(Type, BucketPath), Bucket). +insert_bucket(Type, BucketName, Bucket) -> + inner_insert_bucket(make_path(Type, BucketName), Bucket). -spec insert_bucket(path(), bucket_ref()) -> true. insert_bucket(Path, Bucket) -> inner_insert_bucket(Path, Bucket). --spec make_path(limiter_type(), bucket_path()) -> path(). -make_path(Type, BucketPath) -> - [Type | BucketPath]. - --spec make_path(limiter_type(), zone_name(), bucket_name()) -> path(). -make_path(shared, _GroupName, BucketName) -> - [shared, BucketName]; - -make_path(Type, GroupName, BucketName) -> - [Type, GroupName, BucketName]. +-spec make_path(limiter_type(), bucket_name()) -> path(). +make_path(Type, BucketName) -> + [Type | BucketName]. %%-------------------------------------------------------------------- %% @doc diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl index 77cf9cb75..c1ca45e1b 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl @@ -20,7 +20,8 @@ -export([ roots/0, fields/1, to_rate/1, to_capacity/1 , minimum_period/0, to_burst_rate/1, to_initial/1 - , namespace/0, to_bucket_path/1, default_group_name/0]). + , namespace/0, get_bucket_cfg_path/2 + ]). -define(KILOBYTE, 1024). @@ -31,7 +32,6 @@ | shared. -type bucket_name() :: atom(). --type zone_name() :: atom(). -type rate() :: infinity | float(). -type burst_rate() :: 0 | float(). -type capacity() :: infinity | number(). %% the capacity of the token bucket @@ -47,17 +47,15 @@ -typerefl_from_string({burst_rate/0, ?MODULE, to_burst_rate}). -typerefl_from_string({capacity/0, ?MODULE, to_capacity}). -typerefl_from_string({initial/0, ?MODULE, to_initial}). --typerefl_from_string({bucket_path/0, ?MODULE, to_bucket_path}). -reflect_type([ rate/0 , burst_rate/0 , capacity/0 , initial/0 , failure_strategy/0 - , bucket_path/0 ]). --export_type([limiter_type/0, bucket_name/0, zone_name/0]). +-export_type([limiter_type/0, bucket_name/0, bucket_path/0]). -import(emqx_schema, [sc/2, map/2]). -define(UNIT_TIME_IN_MS, 1000). @@ -67,38 +65,24 @@ namespace() -> limiter. roots() -> [limiter]. fields(limiter) -> - [ {bytes_in, sc(ref(limiter_opts), #{})} - , {message_in, sc(ref(limiter_opts), #{})} - , {connection, sc(ref(limiter_opts), #{})} - , {message_routing, sc(ref(limiter_opts), #{})} - , {shared, sc(ref(shared_opts), - #{description => - <<"Some functions that do not need to use global and zone scope," - "them can shared use this type">>})} + [ {bytes_in, sc(ref(limiter_opts), #{description => <<"Limiter of message publish bytes">>})} + , {message_in, sc(ref(limiter_opts), #{description => <<"Message publish limiter">>})} + , {connection, sc(ref(limiter_opts), #{description => <<"Connection limiter">>})} + , {message_routing, sc(ref(limiter_opts), #{description => <<"Deliver limiter">>})} + , {batch, sc(ref(limiter_opts), + #{description => <<"Internal batch operation limiter">>})} ]; fields(limiter_opts) -> - fields(rate_burst) ++ %% the node global limit - [ {group, sc(map("group name", ref(group_opts)), #{})} - ]; - -fields(group_opts) -> - fields(rate_burst) ++ %% the group limite - [ {bucket, sc(map("bucket name", ref(bucket_opts)), #{})} - ]; - -fields(rate_burst) -> [ {rate, sc(rate(), #{default => "infinity"})} , {burst, sc(burst_rate(), #{default => "0/0s"})} + , {bucket, sc(map("bucket name", ref(bucket_opts)), #{})} ]; -fields(shared_opts) -> - [{bucket, sc(map("bucket name", ref(bucket_opts)), #{})}]; - fields(bucket_opts) -> [ {rate, sc(rate(), #{})} - , {initial, sc(initial(), #{default => "0"})} , {capacity, sc(capacity(), #{})} + , {initial, sc(initial(), #{default => "0"})} , {per_client, sc(ref(client_bucket), #{default => #{}, desc => "The rate limit for each user of the bucket," @@ -137,9 +121,9 @@ minimum_period() -> to_rate(Str) -> to_rate(Str, true, false). -%% default group name for shared type limiter -default_group_name() -> - '_default'. +-spec get_bucket_cfg_path(limiter_type(), bucket_name()) -> bucket_path(). +get_bucket_cfg_path(Type, BucketName) -> + [limiter, Type, bucket, BucketName]. %%-------------------------------------------------------------------- %% Internal functions @@ -213,14 +197,3 @@ apply_unit("kb", Val) -> Val * ?KILOBYTE; apply_unit("mb", Val) -> Val * ?KILOBYTE * ?KILOBYTE; apply_unit("gb", Val) -> Val * ?KILOBYTE * ?KILOBYTE * ?KILOBYTE; apply_unit(Unit, _) -> throw("invalid unit:" ++ Unit). - -to_bucket_path(Str) -> - Dirs = [erlang:list_to_atom(string:trim(T)) || T <- string:tokens(Str, ".")], - case Dirs of - [_Group, _Bucket] = Path -> - {ok, Path}; - [_Bucket] = Path -> - {ok, Path}; - _ -> - {error, Str} - end. diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl index b456d7046..95c8a47ec 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl @@ -34,28 +34,16 @@ terminate/2, code_change/3, format_status/2]). -export([ start_link/1, connect/2, info/1 - , name/1, get_initial_val/1]). + , name/1, get_initial_val/1 + ]). -type root() :: #{ rate := rate() %% number of tokens generated per period , burst := rate() , period := pos_integer() %% token generation interval(second) - , childs := list(node_id()) %% node children , consumed := non_neg_integer() }. --type zone() :: #{ id := node_id() - , name := zone_name() - , rate := rate() - , burst := rate() - , obtained := non_neg_integer() %% number of tokens obtained - , childs := list(node_id()) - }. - --type bucket() :: #{ id := node_id() - , name := bucket_name() - %% pointer to zone node, use for burst - %% it also can use nodeId, nodeId is more direct, but nodeName is clearer - , zone := zone_name() +-type bucket() :: #{ name := bucket_name() , rate := rate() , obtained := non_neg_integer() , correction := emqx_limiter_decimal:zero_or_float() %% token correction value @@ -64,19 +52,14 @@ , index := undefined | index() }. --type state() :: #{ root := undefined | root() +-type state() :: #{ type := limiter_type() + , root := undefined | root() + , buckets := buckets() , counter := undefined | counters:counters_ref() %% current counter to alloc , index := index() - , zones := #{zone_name() => node_id()} - , buckets := list(node_id()) - , nodes := nodes() - , type := limiter_type() }. --type node_id() :: pos_integer(). --type node_data() :: zone() | bucket(). --type nodes() :: #{node_id() => node_data()}. --type zone_name() :: emqx_limiter_schema:zone_name(). +-type buckets() :: #{bucket_name() => bucket()}. -type limiter_type() :: emqx_limiter_schema:limiter_type(). -type bucket_name() :: emqx_limiter_schema:bucket_name(). -type rate() :: decimal(). @@ -84,11 +67,10 @@ -type capacity() :: decimal(). -type decimal() :: emqx_limiter_decimal:decimal(). -type index() :: pos_integer(). --type bucket_path() :: emqx_limiter_schema:bucket_path(). -define(CALL(Type), gen_server:call(name(Type), ?FUNCTION_NAME)). -define(OVERLOAD_MIN_ALLOC, 0.3). %% minimum coefficient for overloaded limiter --define(CURRYING(X, Fun2), fun(Y) -> Fun2(X, Y) end). +-define(CURRYING(X, F2), fun(Y) -> F2(X, Y) end). -export_type([index/0]). -import(emqx_limiter_decimal, [add/2, sub/2, mul/2, put_to_counter/3]). @@ -99,28 +81,22 @@ %% API %%-------------------------------------------------------------------- -spec connect(limiter_type(), - bucket_path() | #{limiter_type() => bucket_path() | undefined}) -> + bucket_name() | #{limiter_type() => bucket_name() | undefined}) -> emqx_htb_limiter:limiter(). %% If no bucket path is set in config, there will be no limit connect(_Type, undefined) -> emqx_htb_limiter:make_infinity_limiter(undefined); -%% Shared type can use bucket name directly -connect(shared, BucketName) when is_atom(BucketName) -> - connect(shared, [BucketName]); - -connect(Type, BucketPath) when is_list(BucketPath) -> - FullPath = get_bucket_full_cfg_path(Type, BucketPath), - case emqx:get_config(FullPath, undefined) of - undefined -> - io:format(">>>>> config:~p~n fullpath:~p~n", [emqx:get_config([limiter]), FullPath]), - io:format(">>>>> ets:~p~n", [ets:tab2list(emqx_limiter_counters)]), - ?SLOG(error, #{msg => "bucket_config_not_found", path => BucketPath}), - throw("bucket's config not found"); - #{rate := AggrRate, - capacity := AggrSize, - per_client := #{rate := CliRate, capacity := CliSize} = Cfg} -> - case emqx_limiter_manager:find_bucket(Type, BucketPath) of +connect(Type, BucketName) when is_atom(BucketName) -> + CfgPath = emqx_limiter_schema:get_bucket_cfg_path(Type, BucketName), + case emqx:get_config(CfgPath, undefined) of + undefined -> + ?SLOG(error, #{msg => "bucket_config_not_found", path => CfgPath}), + throw("bucket's config not found"); + #{rate := AggrRate, + capacity := AggrSize, + per_client := #{rate := CliRate, capacity := CliSize} = Cfg} -> + case emqx_limiter_manager:find_bucket(Type, BucketName) of {ok, Bucket} -> if CliRate < AggrRate orelse CliSize < AggrSize -> emqx_htb_limiter:make_token_bucket_limiter(Cfg, Bucket); @@ -130,8 +106,7 @@ connect(Type, BucketPath) when is_list(BucketPath) -> emqx_htb_limiter:make_ref_limiter(Cfg, Bucket) end; undefined -> - io:format(">>>>> ets:~p~n", [ets:tab2list(emqx_limiter_counters)]), - ?SLOG(error, #{msg => "bucket_not_found", path => BucketPath}), + ?SLOG(error, #{msg => "bucket_not_found", path => CfgPath}), throw("invalid bucket") end end; @@ -172,13 +147,12 @@ start_link(Type) -> {stop, Reason :: term()} | ignore. init([Type]) -> - State = #{root => undefined, - counter => undefined, - index => 1, - zones => #{}, - nodes => #{}, - buckets => [], - type => Type}, + State = #{ type => Type + , root => undefined + , counter => undefined + , index => 1 + , buckets => #{} + }, State2 = init_tree(Type, State), #{root := #{period := Perido}} = State2, oscillate(Perido), @@ -289,59 +263,38 @@ oscillate(Interval) -> -spec oscillation(state()) -> state(). oscillation(#{root := #{rate := Flow, period := Interval, - childs := ChildIds, consumed := Consumed} = Root, - nodes := Nodes} = State) -> + buckets := Buckets} = State) -> oscillate(Interval), - Childs = get_ordered_childs(ChildIds, Nodes), - {Alloced, Nodes2} = transverse(Childs, Flow, 0, Nodes), - maybe_burst(State#{nodes := Nodes2, + Ordereds = get_ordered_buckets(Buckets), + {Alloced, Buckets2} = transverse(Ordereds, Flow, 0, Buckets), + maybe_burst(State#{buckets := Buckets2, root := Root#{consumed := Consumed + Alloced}}). %% @doc horizontal spread --spec transverse(list(node_data()), +-spec transverse(list(bucket()), flow(), non_neg_integer(), - nodes()) -> {non_neg_integer(), nodes()}. -transverse([H | T], InFlow, Alloced, Nodes) when InFlow > 0 -> - {NodeAlloced, Nodes2} = longitudinal(H, InFlow, Nodes), - InFlow2 = sub(InFlow, NodeAlloced), - Alloced2 = Alloced + NodeAlloced, - transverse(T, InFlow2, Alloced2, Nodes2); + buckets()) -> {non_neg_integer(), buckets()}. +transverse([H | T], InFlow, Alloced, Buckets) when InFlow > 0 -> + {BucketAlloced, Buckets2} = longitudinal(H, InFlow, Buckets), + InFlow2 = sub(InFlow, BucketAlloced), + Alloced2 = Alloced + BucketAlloced, + transverse(T, InFlow2, Alloced2, Buckets2); -transverse(_, _, Alloced, Nodes) -> - {Alloced, Nodes}. +transverse(_, _, Alloced, Buckets) -> + {Alloced, Buckets}. %% @doc vertical spread --spec longitudinal(node_data(), flow(), nodes()) -> - {non_neg_integer(), nodes()}. -longitudinal(#{id := Id, - rate := Rate, - obtained := Obtained, - childs := ChildIds} = Node, InFlow, Nodes) -> - Flow = erlang:min(InFlow, Rate), - - if Flow > 0 -> - Childs = get_ordered_childs(ChildIds, Nodes), - {Alloced, Nodes2} = transverse(Childs, Flow, 0, Nodes), - if Alloced > 0 -> - {Alloced, - Nodes2#{Id => Node#{obtained := Obtained + Alloced}}}; - true -> - %% childs are empty or all counter childs are full - {0, Nodes2} - end; - true -> - {0, Nodes} - end; - -longitudinal(#{id := Id, +-spec longitudinal(bucket(), flow(), buckets()) -> + {non_neg_integer(), buckets()}. +longitudinal(#{name := Name, rate := Rate, capacity := Capacity, counter := Counter, index := Index, - obtained := Obtained} = Node, - InFlow, Nodes) when Counter =/= undefined -> + obtained := Obtained} = Bucket, + InFlow, Buckets) when Counter =/= undefined -> Flow = erlang:min(InFlow, Rate), ShouldAlloc = @@ -361,224 +314,149 @@ longitudinal(#{id := Id, %% XXX if capacity is infinity, and flow always > 0, the value in %% counter will be overflow at some point in the future, do we need %% to deal with this situation??? - {Inc, Node2} = emqx_limiter_correction:add(Available, Node), + {Inc, Bucket2} = emqx_limiter_correction:add(Available, Bucket), counters:add(Counter, Index, Inc), {Inc, - Nodes#{Id := Node2#{obtained := Obtained + Inc}}}; + Buckets#{Name := Bucket2#{obtained := Obtained + Inc}}}; _ -> - {0, Nodes} + {0, Buckets} end; -longitudinal(_, _, Nodes) -> - {0, Nodes}. +longitudinal(_, _, Buckets) -> + {0, Buckets}. --spec get_ordered_childs(list(node_id()), nodes()) -> list(node_data()). -get_ordered_childs(Ids, Nodes) -> - Childs = [maps:get(Id, Nodes) || Id <- Ids], +-spec get_ordered_buckets(list(bucket()) | buckets()) -> list(bucket()). +get_ordered_buckets(Buckets) when is_map(Buckets) -> + BucketList = maps:values(Buckets), + get_ordered_buckets(BucketList); +get_ordered_buckets(Buckets) -> %% sort by obtained, avoid node goes hungry lists:sort(fun(#{obtained := A}, #{obtained := B}) -> A < B end, - Childs). + Buckets). -spec maybe_burst(state()) -> state(). maybe_burst(#{buckets := Buckets, - zones := Zones, - root := #{burst := Burst}, - nodes := Nodes} = State) when Burst > 0 -> - %% find empty buckets and group by zone name - GroupFun = fun(Id, Groups) -> - %% TODO filter undefined counter - #{counter := Counter, - index := Index, - zone := Zone} = maps:get(Id, Nodes), - case counters:get(Counter, Index) of - Any when Any =< 0 -> - Group = maps:get(Zone, Groups, []), - maps:put(Zone, [Id | Group], Groups); - _ -> - Groups - end - end, + root := #{burst := Burst}} = State) when Burst > 0 -> + Fold = fun(_Name, #{counter := Cnt, index := Idx} = Bucket, Acc) when Cnt =/= undefined -> + case counters:get(Cnt, Idx) > 0 of + true -> + Acc; + false -> + [Bucket | Acc] + end; + (_Name, _Bucket, Acc) -> + Acc + end, - case lists:foldl(GroupFun, #{}, Buckets) of - Groups when map_size(Groups) > 0 -> - %% remove the zone which don't support burst - Filter = fun({Name, Childs}, Acc) -> - ZoneId = maps:get(Name, Zones), - #{burst := ZoneBurst} = Zone = maps:get(ZoneId, Nodes), - case ZoneBurst > 0 of - true -> - [{Zone, Childs} | Acc]; - _ -> - Acc - end - end, - - FilterL = lists:foldl(Filter, [], maps:to_list(Groups)), - dispatch_burst(FilterL, State); - _ -> - State - end; + Empties = maps:fold(Fold, [], Buckets), + dispatch_burst(Empties, Burst, State); maybe_burst(State) -> State. --spec dispatch_burst(list({zone(), list(node_id())}), state()) -> state(). -dispatch_burst([], State) -> +-spec dispatch_burst(list(bucket()), non_neg_integer(), state()) -> state(). +dispatch_burst([], _, State) -> State; -dispatch_burst(GroupL, - #{root := #{burst := Burst}, - nodes := Nodes} = State) -> - InFlow = Burst / erlang:length(GroupL), - Dispatch = fun({Zone, Childs}, NodeAcc) -> - #{id := ZoneId, - burst := ZoneBurst, - obtained := Obtained} = Zone, +dispatch_burst(Empties, InFlow, #{consumed := Consumed, buckets := Buckets} = State) -> + EachFlow = InFlow / erlang:length(Empties), + {Alloced, Buckets2} = dispatch_burst_to_buckets(Empties, EachFlow, 0, Buckets), + State#{consumed := Consumed + Alloced, buckets := Buckets2}. - case erlang:min(InFlow, ZoneBurst) of - 0 -> NodeAcc; - ZoneFlow -> - EachFlow = ZoneFlow / erlang:length(Childs), - {Alloced, NodeAcc2} = - dispatch_burst_to_buckets(Childs, EachFlow, 0, NodeAcc), - Zone2 = Zone#{obtained := Obtained + Alloced}, - NodeAcc2#{ZoneId := Zone2} - end - end, - State#{nodes := lists:foldl(Dispatch, Nodes, GroupL)}. - --spec dispatch_burst_to_buckets(list(node_id()), +-spec dispatch_burst_to_buckets(list(bucket()), float(), - non_neg_integer(), nodes()) -> {non_neg_integer(), nodes()}. -dispatch_burst_to_buckets([ChildId | T], InFlow, Alloced, Nodes) -> - #{counter := Counter, + non_neg_integer(), buckets()) -> {non_neg_integer(), buckets()}. +dispatch_burst_to_buckets([Bucket | T], InFlow, Alloced, Buckets) -> + #{name := Name, + counter := Counter, index := Index, - obtained := Obtained} = Bucket = maps:get(ChildId, Nodes), + obtained := Obtained} = Bucket, {Inc, Bucket2} = emqx_limiter_correction:add(InFlow, Bucket), counters:add(Counter, Index, Inc), - Nodes2 = Nodes#{ChildId := Bucket2#{obtained := Obtained + Inc}}, - dispatch_burst_to_buckets(T, InFlow, Alloced + Inc, Nodes2); + Buckets2 = Buckets#{Name := Bucket2#{obtained := Obtained + Inc}}, + dispatch_burst_to_buckets(T, InFlow, Alloced + Inc, Buckets2); -dispatch_burst_to_buckets([], _, Alloced, Nodes) -> - {Alloced, Nodes}. +dispatch_burst_to_buckets([], _, Alloced, Buckets) -> + {Alloced, Buckets}. -spec init_tree(emqx_limiter_schema:limiter_type(), state()) -> state(). init_tree(Type, State) -> - Cfg = emqx:get_config([limiter, Type]), - GlobalCfg = maps:merge(#{rate => infinity, burst => 0}, Cfg), - case GlobalCfg of - #{group := Group} -> ok; - #{bucket := _} -> - Group = make_shared_default_group(GlobalCfg), - ok - end, + #{bucket := Buckets} = Cfg = emqx:get_config([limiter, Type]), + {Factor, Root} = make_root(Cfg), + {CounterNum, DelayBuckets} = make_bucket(maps:to_list(Buckets), Type, Cfg, Factor, 1, []), - {Factor, Root} = make_root(GlobalCfg), - {Zones, Nodes, DelayBuckets} = make_zone(maps:to_list(Group), Type, - GlobalCfg, Factor, 1, #{}, #{}, #{}), - - State2 = State#{root := Root#{childs := maps:values(Zones)}, - zones := Zones, - nodes := Nodes, - buckets := maps:keys(DelayBuckets), - counter := counters:new(maps:size(DelayBuckets), [write_concurrency]) + State2 = State#{root := Root, + counter := counters:new(CounterNum, [write_concurrency]) }, - lists:foldl(fun(F, Acc) -> F(Acc) end, State2, maps:values(DelayBuckets)). + lists:foldl(fun(F, Acc) -> F(Acc) end, State2, DelayBuckets). -spec make_root(hocons:confg()) -> {number(), root()}. +make_root(#{rate := Rate, burst := Burst}) when Rate >= 1 -> + {1, #{rate => Rate, + burst => Burst, + period => emqx_limiter_schema:minimum_period(), + consumed => 0}}; + make_root(#{rate := Rate, burst := Burst}) -> MiniPeriod = emqx_limiter_schema:minimum_period(), - if Rate >= 1 -> - {1, #{rate => Rate, - burst => Burst, - period => MiniPeriod, - childs => [], - consumed => 0}}; - true -> - Factor = 1 / Rate, - {Factor, #{rate => 1, - burst => Burst * Factor, - period => erlang:floor(Factor * MiniPeriod), - childs => [], - consumed => 0}} - end. + Factor = 1 / Rate, + {Factor, #{rate => 1, + burst => Burst * Factor, + period => erlang:floor(Factor * MiniPeriod), + consumed => 0}}. -make_zone([{Name, ZoneCfg} | T], Type, GlobalCfg, Factor, NodeId, Zones, Nodes, DelayBuckets) -> - #{rate := Rate, burst := Burst, bucket := BucketMap} = ZoneCfg, - BucketCfgs = maps:to_list(BucketMap), - - FirstChildId = NodeId + 1, - Buckets = make_bucket(BucketCfgs, Type, GlobalCfg, Name, ZoneCfg, Factor, FirstChildId, #{}), - ChildNum = maps:size(Buckets), - NextZoneId = FirstChildId + ChildNum, - - Zone = #{id => NodeId, - name => Name, - rate => mul(Rate, Factor), - burst => Burst, - obtained => 0, - childs => maps:keys(Buckets) - }, - - make_zone(T, Type, GlobalCfg, Factor, NextZoneId, - Zones#{Name => NodeId}, Nodes#{NodeId => Zone}, maps:merge(DelayBuckets, Buckets) - ); - -make_zone([], _Type, _Global, _Factor, _NodeId, Zones, Nodes, DelayBuckets) -> - {Zones, Nodes, DelayBuckets}. - -make_bucket([{Name, Conf} | T], Type, GlobalCfg, ZoneName, ZoneCfg, Factor, Id, DelayBuckets) -> - Path = emqx_limiter_manager:make_path(Type, ZoneName, Name), - case get_counter_rate(Conf, ZoneCfg, GlobalCfg) of +make_bucket([{Name, Conf} | T], Type, GlobalCfg, Factor, CounterNum, DelayBuckets) -> + Path = emqx_limiter_manager:make_path(Type, Name), + case get_counter_rate(Conf, GlobalCfg) of infinity -> Rate = infinity, Capacity = infinity, Ref = emqx_limiter_bucket_ref:new(undefined, undefined, Rate), emqx_limiter_manager:insert_bucket(Path, Ref), - InitFun = fun(#{id := NodeId} = Node, #{nodes := Nodes} = State) -> - State#{nodes := Nodes#{NodeId => Node}} + CounterNum2 = CounterNum, + InitFun = fun(#{name := BucketName} = Bucket, #{buckets := Buckets} = State) -> + State#{buckets := Buckets#{BucketName => Bucket}} end; RawRate -> #{capacity := Capacity} = Conf, Initial = get_initial_val(Conf), Rate = mul(RawRate, Factor), - - InitFun = fun(#{id := NodeId} = Node, #{nodes := Nodes} = State) -> + CounterNum2 = CounterNum + 1, + InitFun = fun(#{name := BucketName} = Bucket, #{buckets := Buckets} = State) -> {Counter, Idx, State2} = alloc_counter(Path, RawRate, Initial, State), - Node2 = Node#{counter := Counter, index := Idx}, - State2#{nodes := Nodes#{NodeId => Node2}} + Bucket2 = Bucket#{counter := Counter, index := Idx}, + State2#{buckets := Buckets#{BucketName => Bucket2}} end end, - Node = #{ id => Id - , name => Name - , zone => ZoneName - , rate => Rate - , obtained => 0 - , correction => 0 - , capacity => Capacity - , counter => undefined - , index => undefined}, + Bucket = #{ name => Name + , rate => Rate + , obtained => 0 + , correction => 0 + , capacity => Capacity + , counter => undefined + , index => undefined}, - DelayInit = ?CURRYING(Node, InitFun), + DelayInit = ?CURRYING(Bucket, InitFun), make_bucket(T, - Type, GlobalCfg, ZoneName, ZoneCfg, Factor, Id + 1, DelayBuckets#{Id => DelayInit}); + Type, GlobalCfg, Factor, CounterNum2, [DelayInit | DelayBuckets]); -make_bucket([], _Type, _Global, _ZoneName, _Zone, _Factor, _Id, DelayBuckets) -> - DelayBuckets. +make_bucket([], _Type, _Global, _Factor, CounterNum, DelayBuckets) -> + {CounterNum, DelayBuckets}. -spec alloc_counter(emqx_limiter_manager:path(), rate(), capacity(), state()) -> {counters:counters_ref(), pos_integer(), state()}. alloc_counter(Path, Rate, Initial, #{counter := Counter, index := Index} = State) -> + case emqx_limiter_manager:find_bucket(Path) of {ok, #{counter := ECounter, index := EIndex}} when ECounter =/= undefined -> @@ -594,22 +472,14 @@ init_counter(Path, Counter, Index, Rate, Initial, State) -> emqx_limiter_manager:insert_bucket(Path, Ref), {Counter, Index, State}. + %% @doc find first limited node -get_counter_rate(BucketCfg, ZoneCfg, GlobalCfg) -> - Search = lists:search(fun(E) -> is_limited(E) end, - [BucketCfg, ZoneCfg, GlobalCfg]), - case Search of - {value, #{rate := Rate}} -> - Rate; - false -> - infinity - end. +get_counter_rate(#{rate := Rate, capacity := Capacity}, _GlobalCfg) + when Rate =/= infinity orelse Capacity =/= infinity -> %% TODO maybe no need to check capacity + Rate; -is_limited(#{rate := Rate, capacity := Capacity}) -> - Rate =/= infinity orelse Capacity =/= infinity; - -is_limited(#{rate := Rate}) -> - Rate =/= infinity. +get_counter_rate(_Cfg, #{rate := Rate}) -> + Rate. -spec get_initial_val(hocons:config()) -> decimal(). get_initial_val(#{initial := Initial, @@ -625,15 +495,3 @@ get_initial_val(#{initial := Initial, true -> 0 end. - --spec make_shared_default_group(hocons:config()) -> honcs:config(). -make_shared_default_group(Cfg) -> - GroupName = emqx_limiter_schema:default_group_name(), - #{GroupName => Cfg#{rate => infinity, burst => 0}}. - --spec get_bucket_full_cfg_path(limiter_type(), bucket_path()) -> list(atom()). -get_bucket_full_cfg_path(shared, [BucketName]) -> - [limiter, shared, bucket, BucketName]; - -get_bucket_full_cfg_path(Type, [GroupName, BucketName]) -> - [limiter, Type, group, GroupName, bucket, BucketName]. diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index e4da8a5cc..a450cc245 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1197,7 +1197,7 @@ base_listener() -> #{ default => 'default' })} , {"limiter", - sc(map("ratelimit bucket's name", emqx_limiter_schema:bucket_path()), #{default => #{}})} + sc(map("ratelimit's type", atom()), #{default => #{}})} ]. %% utils diff --git a/apps/emqx_retainer/etc/emqx_retainer.conf b/apps/emqx_retainer/etc/emqx_retainer.conf index e561bc52f..05cc1dcd0 100644 --- a/apps/emqx_retainer/etc/emqx_retainer.conf +++ b/apps/emqx_retainer/etc/emqx_retainer.conf @@ -58,9 +58,7 @@ retainer { batch_deliver_number = 0 ## deliver limiter bucket - ## - ## Default: 0s - limiter_bucket_name = retainer + limiter.batch = retainer } ## Maximum retained message size. diff --git a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl index d57c49799..a1cc42898 100644 --- a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl +++ b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl @@ -85,8 +85,8 @@ start_link(Pool, Id) -> init([Pool, Id]) -> erlang:process_flag(trap_exit, true), true = gproc_pool:connect_worker(Pool, {Pool, Id}), - Bucket = emqx:get_config([retainer, flow_control, limiter_bucket_name]), - Limiter = emqx_limiter_server:connect(shared, Bucket), + LimiterCfg = emqx:get_config([retainer, flow_control, limiter]), + Limiter = emqx_limiter_server:connect(batch, LimiterCfg), {ok, #{pool => Pool, id => Id, limiter => Limiter}}. %%-------------------------------------------------------------------- @@ -124,8 +124,8 @@ handle_cast({dispatch, Context, Pid, Topic}, #{limiter := Limiter} = State) -> {noreply, State#{limiter := Limiter2}}; handle_cast(refresh_limiter, State) -> - Bucket = emqx:get_config([retainer, flow_control, limiter_bucket_name]), - Limiter = emqx_limiter_server:connect(shared, Bucket), + LimiterCfg = emqx:get_config([retainer, flow_control, limiter]), + Limiter = emqx_limiter_server:connect(batch, LimiterCfg), {noreply, State#{limiter := Limiter}}; handle_cast(Msg, State) -> diff --git a/apps/emqx_retainer/src/emqx_retainer_schema.erl b/apps/emqx_retainer/src/emqx_retainer_schema.erl index 395f0e363..492a89021 100644 --- a/apps/emqx_retainer/src/emqx_retainer_schema.erl +++ b/apps/emqx_retainer/src/emqx_retainer_schema.erl @@ -29,7 +29,7 @@ fields(mnesia_config) -> fields(flow_control) -> [ {batch_read_number, sc(integer(), 0, fun is_pos_integer/1)} , {batch_deliver_number, sc(range(0, 1000), 0)} - , {limiter_bucket_name, sc(atom(), retainer)} + , {limiter, sc(emqx_schema:map("limiter's type", atom()), #{})} ]. %%-------------------------------------------------------------------- From d28b34f0d159e0fee057d7fd7e050ec1fb74332a Mon Sep 17 00:00:00 2001 From: firest Date: Mon, 14 Mar 2022 18:15:50 +0800 Subject: [PATCH 3/4] fix(limiter): improve test case and fix some bugs --- .../src/emqx_limiter/src/emqx_htb_limiter.erl | 33 +- .../src/emqx_limiter/src/emqx_limiter_app.erl | 3 +- .../src/emqx_limiter_container.erl | 9 +- .../emqx_limiter/src/emqx_limiter_schema.erl | 4 +- .../emqx_limiter/src/emqx_limiter_server.erl | 43 +- .../src/emqx_limiter_server_sup.erl | 1 + apps/emqx/test/emqx_channel_SUITE.erl | 95 ++-- apps/emqx/test/emqx_ratelimiter_SUITE.erl | 431 +++++------------- apps/emqx/test/emqx_ws_connection_SUITE.erl | 20 +- .../src/emqx_dashboard_swagger.erl | 2 - .../src/emqx_retainer_dispatcher.erl | 7 +- .../test/emqx_retainer_SUITE.erl | 17 +- 12 files changed, 241 insertions(+), 424 deletions(-) diff --git a/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl b/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl index 74cd62833..372944666 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl @@ -22,7 +22,7 @@ %% API -export([ make_token_bucket_limiter/2, make_ref_limiter/2, check/2 - , consume/2, set_retry/2, retry/1, make_infinity_limiter/1 + , consume/2, set_retry/2, retry/1, make_infinity_limiter/0 , make_future/1, available/1 ]). -export_type([token_bucket_limiter/0]). @@ -108,6 +108,8 @@ -import(emqx_limiter_decimal, [sub/2, mul/2, floor_div/2, add/2]). +-elvis([{elvis_style, no_if_expression, disable}]). + %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- @@ -124,8 +126,8 @@ make_token_bucket_limiter(Cfg, Bucket) -> make_ref_limiter(Cfg, Bucket) when Bucket =/= infinity -> Cfg#{bucket => Bucket}. --spec make_infinity_limiter(limiter_bucket_cfg()) -> infinity. -make_infinity_limiter(_) -> +-spec make_infinity_limiter() -> infinity. +make_infinity_limiter() -> infinity. %% @doc request some tokens @@ -252,12 +254,11 @@ try_consume(_, _, Limiter) -> -spec do_check(acquire_type(Limiter), Limiter) -> inner_check_result(Limiter) when Limiter :: limiter(). -do_check(Need, #{tokens := Tokens} = Limiter) -> - if Need =< Tokens -> - do_check_with_parent_limiter(Need, Limiter); - true -> - do_reset(Need, Limiter) - end; +do_check(Need, #{tokens := Tokens} = Limiter) when Need =< Tokens -> + do_check_with_parent_limiter(Need, Limiter); + +do_check(Need, #{tokens := _} = Limiter) -> + do_reset(Need, Limiter); do_check(Need, #{divisible := Divisible, bucket := Bucket} = Ref) -> @@ -280,7 +281,8 @@ on_failure(throw, Limiter) -> Message = io_lib:format("limiter consume failed, limiter:~p~n", [Limiter]), erlang:throw({rate_check_fail, Message}). --spec do_check_with_parent_limiter(pos_integer(), token_bucket_limiter()) -> inner_check_result(token_bucket_limiter()). +-spec do_check_with_parent_limiter(pos_integer(), token_bucket_limiter()) -> + inner_check_result(token_bucket_limiter()). do_check_with_parent_limiter(Need, #{tokens := Tokens, divisible := Divisible, @@ -306,15 +308,16 @@ do_reset(Need, capacity := Capacity} = Limiter) -> Now = ?NOW, Tokens2 = apply_elapsed_time(Rate, Now - LastTime, Tokens, Capacity), - if Tokens2 >= Need -> + Available = erlang:floor(Tokens2), + if Available >= Need -> Limiter2 = Limiter#{tokens := Tokens2, lasttime := Now}, do_check_with_parent_limiter(Need, Limiter2); - Divisible andalso Tokens2 > 0 -> + Divisible andalso Available > 0 -> %% must be allocated here, because may be Need > Capacity return_pause(Rate, partial, fun do_reset/2, - Need - Tokens2, + Need - Available, Limiter#{tokens := 0, lasttime := Now}); true -> return_pause(Rate, pause, fun do_reset/2, Need, Limiter) @@ -331,8 +334,8 @@ return_pause(Rate, PauseType, Fun, Diff, Limiter) -> Pause = emqx_misc:clamp(Val, ?MINIMUM_PAUSE, ?MAXIMUM_PAUSE), {PauseType, Pause, make_retry_context(Fun, Diff), Limiter}. --spec make_retry_context(undefined | retry_fun(Limiter), non_neg_integer()) -> retry_context(Limiter) - when Limiter :: limiter(). +-spec make_retry_context(undefined | retry_fun(Limiter), non_neg_integer()) -> + retry_context(Limiter) when Limiter :: limiter(). make_retry_context(Fun, Diff) -> #{continuation => Fun, diff => Diff}. diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_app.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_app.erl index b4a92596f..6e66645f3 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_app.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_app.erl @@ -39,8 +39,7 @@ {ok, Pid :: pid(), State :: term()} | {error, Reason :: term()}. start(_StartType, _StartArgs) -> - {ok, _} = Result = emqx_limiter_sup:start_link(), - Result. + {ok, _} = emqx_limiter_sup:start_link(). %%-------------------------------------------------------------------- %% @private diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_container.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_container.erl index 395174448..65b213485 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_container.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_container.erl @@ -21,7 +21,7 @@ %% @end %% API --export([ new/0, new/1, get_limiter_by_names/2 +-export([ new/0, new/1, new/2, get_limiter_by_names/2 , add_new/3, update_by_name/3, set_retry_context/2 , check/3, retry/2, get_retry_context/1 , check_list/2, retry_list/2 @@ -60,7 +60,12 @@ new() -> %% @doc generate default data according to the type of limiter -spec new(list(limiter_type())) -> container(). new(Types) -> - get_limiter_by_names(Types, #{}). + new(Types, #{}). + +-spec new(list(limiter_type()), + #{limiter_type() => emqx_limiter_schema:bucket_name()}) -> container(). +new(Types, Names) -> + get_limiter_by_names(Types, Names). %% @doc generate a container %% according to the type of limiter and the bucket name configuration of the limiter diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl index c1ca45e1b..b9b25e806 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl @@ -29,7 +29,7 @@ | message_in | connection | message_routing - | shared. + | batch. -type bucket_name() :: atom(). -type rate() :: infinity | float(). @@ -142,7 +142,7 @@ to_rate(Str, CanInfinity, CanZero) -> {ok, Val} = to_capacity(QuotaStr), check_capacity(Str, Val, CanZero, fun(Quota) -> - Quota * minimum_period() / ?UNIT_TIME_IN_MS + {ok, Quota * minimum_period() / ?UNIT_TIME_IN_MS} end); [QuotaStr, Interval] -> {ok, Val} = to_capacity(QuotaStr), diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl index 95c8a47ec..896792a32 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl @@ -34,7 +34,7 @@ terminate/2, code_change/3, format_status/2]). -export([ start_link/1, connect/2, info/1 - , name/1, get_initial_val/1 + , name/1, get_initial_val/1, update_config/1 ]). -type root() :: #{ rate := rate() %% number of tokens generated per period @@ -85,7 +85,7 @@ emqx_htb_limiter:limiter(). %% If no bucket path is set in config, there will be no limit connect(_Type, undefined) -> - emqx_htb_limiter:make_infinity_limiter(undefined); + emqx_htb_limiter:make_infinity_limiter(); connect(Type, BucketName) when is_atom(BucketName) -> CfgPath = emqx_limiter_schema:get_bucket_cfg_path(Type, BucketName), @@ -101,7 +101,7 @@ connect(Type, BucketName) when is_atom(BucketName) -> if CliRate < AggrRate orelse CliSize < AggrSize -> emqx_htb_limiter:make_token_bucket_limiter(Cfg, Bucket); Bucket =:= infinity -> - emqx_htb_limiter:make_infinity_limiter(Cfg); + emqx_htb_limiter:make_infinity_limiter(); true -> emqx_htb_limiter:make_ref_limiter(Cfg, Bucket) end; @@ -122,6 +122,10 @@ info(Type) -> name(Type) -> erlang:list_to_atom(io_lib:format("~s_~s", [?MODULE, Type])). +-spec update_config(limiter_type()) -> ok. +update_config(Type) -> + ?CALL(Type). + %%-------------------------------------------------------------------- %% @doc %% Starts the server @@ -131,6 +135,7 @@ name(Type) -> start_link(Type) -> gen_server:start_link({local, name(Type)}, ?MODULE, [Type], []). + %%-------------------------------------------------------------------- %%% gen_server callbacks %%-------------------------------------------------------------------- @@ -147,16 +152,10 @@ start_link(Type) -> {stop, Reason :: term()} | ignore. init([Type]) -> - State = #{ type => Type - , root => undefined - , counter => undefined - , index => 1 - , buckets => #{} - }, - State2 = init_tree(Type, State), - #{root := #{period := Perido}} = State2, + State = init_tree(Type), + #{root := #{period := Perido}} = State, oscillate(Perido), - {ok, State2}. + {ok, State}. %%-------------------------------------------------------------------- %% @private @@ -176,6 +175,10 @@ init([Type]) -> handle_call(info, _From, State) -> {reply, State, State}; +handle_call(update_config, _From, #{type := Type}) -> + NewState = init_tree(Type), + {reply, ok, NewState}; + handle_call(Req, _From, State) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), {reply, ignored, State}. @@ -362,10 +365,11 @@ maybe_burst(State) -> dispatch_burst([], _, State) -> State; -dispatch_burst(Empties, InFlow, #{consumed := Consumed, buckets := Buckets} = State) -> +dispatch_burst(Empties, InFlow, + #{root := #{consumed := Consumed} = Root, buckets := Buckets} = State) -> EachFlow = InFlow / erlang:length(Empties), {Alloced, Buckets2} = dispatch_burst_to_buckets(Empties, EachFlow, 0, Buckets), - State#{consumed := Consumed + Alloced, buckets := Buckets2}. + State#{root := Root#{consumed := Consumed + Alloced}, buckets := Buckets2}. -spec dispatch_burst_to_buckets(list(bucket()), float(), @@ -385,8 +389,15 @@ dispatch_burst_to_buckets([Bucket | T], InFlow, Alloced, Buckets) -> dispatch_burst_to_buckets([], _, Alloced, Buckets) -> {Alloced, Buckets}. --spec init_tree(emqx_limiter_schema:limiter_type(), state()) -> state(). -init_tree(Type, State) -> +-spec init_tree(emqx_limiter_schema:limiter_type()) -> state(). +init_tree(Type) -> + State = #{ type => Type + , root => undefined + , counter => undefined + , index => 1 + , buckets => #{} + }, + #{bucket := Buckets} = Cfg = emqx:get_config([limiter, Type]), {Factor, Root} = make_root(Cfg), {CounterNum, DelayBuckets} = make_bucket(maps:to_list(Buckets), Type, Cfg, Factor, 1, []), diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server_sup.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server_sup.erl index 7f8d227ec..70332481a 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server_sup.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server_sup.erl @@ -46,6 +46,7 @@ start(Type) -> Spec = make_child(Type), supervisor:start_child(?MODULE, Spec). +%% XXX This is maybe a workaround, not so good -spec restart(emqx_limiter_schema:limiter_type()) -> _. restart(Type) -> Id = emqx_limiter_server:name(Type), diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index d97b0197b..256a271b2 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -113,57 +113,32 @@ listener_mqtt_ws_conf() -> listeners_conf() -> #{tcp => #{default => listener_mqtt_tcp_conf()}, ws => #{default => listener_mqtt_ws_conf()} - }. + }. limiter_conf() -> - #{bytes_in => - #{bucket => - #{default => - #{aggregated => - #{capacity => infinity,initial => 0,rate => infinity}, - per_client => - #{capacity => infinity,divisible => false, - failure_strategy => force,initial => 0,low_water_mark => 0, - max_retry_time => 5000,rate => infinity}, - zone => default}}, - global => #{burst => 0,rate => infinity}, - zone => #{default => #{burst => 0,rate => infinity}}}, - connection => - #{bucket => - #{default => - #{aggregated => - #{capacity => infinity,initial => 0,rate => infinity}, - per_client => - #{capacity => infinity,divisible => false, - failure_strategy => force,initial => 0,low_water_mark => 0, - max_retry_time => 5000,rate => infinity}, - zone => default}}, - global => #{burst => 0,rate => infinity}, - zone => #{default => #{burst => 0,rate => infinity}}}, - message_in => - #{bucket => - #{default => - #{aggregated => - #{capacity => infinity,initial => 0,rate => infinity}, - per_client => - #{capacity => infinity,divisible => false, - failure_strategy => force,initial => 0,low_water_mark => 0, - max_retry_time => 5000,rate => infinity}, - zone => default}}, - global => #{burst => 0,rate => infinity}, - zone => #{default => #{burst => 0,rate => infinity}}}, - message_routing => - #{bucket => - #{default => - #{aggregated => - #{capacity => infinity,initial => 0,rate => infinity}, - per_client => - #{capacity => infinity,divisible => false, - failure_strategy => force,initial => 0,low_water_mark => 0, - max_retry_time => 5000,rate => infinity}, - zone => default}}, - global => #{burst => 0,rate => infinity}, - zone => #{default => #{burst => 0,rate => infinity}}}}. + Make = fun() -> + #{bucket => + #{default => + #{capacity => infinity, + initial => 0, + rate => infinity, + per_client => + #{capacity => infinity,divisible => false, + failure_strategy => force,initial => 0,low_water_mark => 0, + max_retry_time => 5000,rate => infinity + } + } + }, + burst => 0, + rate => infinity + } + end, + + lists:foldl(fun(Name, Acc) -> + Acc#{Name => Make()} + end, + #{}, + [bytes_in, message_in, message_routing, connection, batch]). stats_conf() -> #{enable => true}. @@ -232,7 +207,7 @@ end_per_suite(_Config) -> init_per_testcase(TestCase, Config) -> OldConf = set_test_listener_confs(), emqx_common_test_helpers:start_apps([]), - modify_limiter(TestCase, OldConf), + check_modify_limiter(TestCase), [{config, OldConf}|Config]. end_per_testcase(_TestCase, Config) -> @@ -240,18 +215,19 @@ end_per_testcase(_TestCase, Config) -> emqx_common_test_helpers:stop_apps([]), Config. -modify_limiter(TestCase, NewConf) -> +check_modify_limiter(TestCase) -> Checks = [t_quota_qos0, t_quota_qos1, t_quota_qos2], case lists:member(TestCase, Checks) of true -> - modify_limiter(NewConf); + modify_limiter(); _ -> ok end. %% per_client 5/1s,5 %% aggregated 10/1s,10 -modify_limiter(#{limiter := Limiter} = NewConf) -> +modify_limiter() -> + Limiter = emqx_config:get([limiter]), #{message_routing := #{bucket := Bucket} = Routing} = Limiter, #{default := #{per_client := Client} = Default} = Bucket, Client2 = Client#{rate := 5, @@ -259,16 +235,15 @@ modify_limiter(#{limiter := Limiter} = NewConf) -> capacity := 5, low_water_mark := 1}, Default2 = Default#{per_client := Client2, - aggregated := #{rate => 10, - initial => 0, - capacity => 10 - }}, + rate => 10, + initial => 0, + capacity => 10}, Bucket2 = Bucket#{default := Default2}, Routing2 = Routing#{bucket := Bucket2}, - NewConf2 = NewConf#{limiter := Limiter#{message_routing := Routing2}}, - emqx_config:put(NewConf2), + emqx_config:put([limiter], Limiter#{message_routing := Routing2}), emqx_limiter_manager:restart_server(message_routing), + timer:sleep(100), ok. %%-------------------------------------------------------------------- @@ -1078,4 +1053,4 @@ session(InitFields) when is_map(InitFields) -> quota() -> emqx_limiter_container:get_limiter_by_names([message_routing], limiter_cfg()). -limiter_cfg() -> #{}. +limiter_cfg() -> #{message_routing => default}. diff --git a/apps/emqx/test/emqx_ratelimiter_SUITE.erl b/apps/emqx/test/emqx_ratelimiter_SUITE.erl index ea2e31700..589e78e8e 100644 --- a/apps/emqx/test/emqx_ratelimiter_SUITE.erl +++ b/apps/emqx/test/emqx_ratelimiter_SUITE.erl @@ -27,59 +27,37 @@ -define(BASE_CONF, <<""" limiter { bytes_in { - global.rate = infinity - zone.default.rate = infinity bucket.default { - zone = default - aggregated.rate = infinity - aggregated.capacity = infinity - per_client.rate = \"100MB/1s\" - per_client.capacity = infinity + rate = infinity + capacity = infinity } } message_in { - global.rate = infinity - zone.default.rate = infinity bucket.default { - zone = default - aggregated.rate = infinity - aggregated.capacity = infinity - per_client.rate = infinity - per_client.capacity = infinity + rate = infinity + capacity = infinity } } connection { - global.rate = infinity - zone.default.rate = infinity bucket.default { - zone = default - aggregated.rate = infinity - aggregated.capacity = infinity - per_client.rate = infinity - per_client.capacity = infinity + rate = infinity + capacity = infinity } } message_routing { - global.rate = infinity - zone.default.rate = infinity bucket.default { - zone = default - aggregated.rate = infinity - aggregated.capacity = infinity - per_client.rate = infinity - per_client.capacity = infinity + rate = infinity + capacity = infinity } } - shared { + batch { bucket.retainer { - aggregated.rate = infinity - aggregated.capacity = infinity - per_client.rate = infinity - per_client.capacity = infinity + rate = infinity + capacity = infinity } } } @@ -97,6 +75,7 @@ limiter { -define(LOGT(Format, Args), ct:pal("TEST_SUITE: " ++ Format, Args)). -define(RATE(Rate), to_rate(Rate)). -define(NOW, erlang:system_time(millisecond)). +-define(CONST(X), fun(_) -> X end). %%-------------------------------------------------------------------- %% Setups @@ -231,12 +210,11 @@ t_low_water_mark(_) -> with_per_client(default, Cfg, Case). t_infinity_client(_) -> - Fun = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) -> - Aggr2 = Aggr#{rate := infinity, - capacity := infinity}, + Fun = fun(#{per_client := Cli} = Bucket) -> + Bucket2 = Bucket#{rate := infinity, + capacity := infinity}, Cli2 = Cli#{rate := infinity, capacity := infinity}, - Bucket#{aggregated := Aggr2, - per_client := Cli2} + Bucket2#{per_client := Cli2} end, Case = fun() -> Client = connect(default), @@ -247,14 +225,13 @@ t_infinity_client(_) -> with_bucket(default, Fun, Case). t_try_restore_agg(_) -> - Fun = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) -> - Aggr2 = Aggr#{rate := 1, - capacity := 200, - initial := 50}, + Fun = fun(#{per_client := Cli} = Bucket) -> + Bucket2 = Bucket#{rate := 1, + capacity := 200, + initial := 50}, Cli2 = Cli#{rate := infinity, capacity := infinity, divisible := true, max_retry_time := 100, failure_strategy := force}, - Bucket#{aggregated := Aggr2, - per_client := Cli2} + Bucket2#{per_client := Cli2} end, Case = fun() -> Client = connect(default), @@ -267,15 +244,14 @@ t_try_restore_agg(_) -> with_bucket(default, Fun, Case). t_short_board(_) -> - Fun = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) -> - Aggr2 = Aggr#{rate := ?RATE("100/1s"), - initial := 0, - capacity := 100}, + Fun = fun(#{per_client := Cli} = Bucket) -> + Bucket2 = Bucket#{rate := ?RATE("100/1s"), + initial := 0, + capacity := 100}, Cli2 = Cli#{rate := ?RATE("600/1s"), capacity := 600, initial := 600}, - Bucket#{aggregated := Aggr2, - per_client := Cli2} + Bucket2#{per_client := Cli2} end, Case = fun() -> Counter = counters:new(1, []), @@ -286,15 +262,14 @@ t_short_board(_) -> with_bucket(default, Fun, Case). t_rate(_) -> - Fun = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) -> - Aggr2 = Aggr#{rate := ?RATE("100/100ms"), - initial := 0, - capacity := infinity}, + Fun = fun(#{per_client := Cli} = Bucket) -> + Bucket2 = Bucket#{rate := ?RATE("100/100ms"), + initial := 0, + capacity := infinity}, Cli2 = Cli#{rate := infinity, capacity := infinity, initial := 0}, - Bucket#{aggregated := Aggr2, - per_client := Cli2} + Bucket2#{per_client := Cli2} end, Case = fun() -> Client = connect(default), @@ -311,113 +286,74 @@ t_rate(_) -> t_capacity(_) -> Capacity = 600, - Fun = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) -> - Aggr2 = Aggr#{rate := ?RATE("100/100ms"), - initial := 0, - capacity := 600}, - Cli2 = Cli#{rate := infinity, - capacity := infinity, - initial := 0}, - Bucket#{aggregated := Aggr2, - per_client := Cli2} + Fun = fun(#{per_client := Cli} = Bucket) -> + Bucket2 = Bucket#{rate := ?RATE("100/100ms"), + initial := 0, + capacity := 600}, + Cli2 = Cli#{rate := infinity, + capacity := infinity, + initial := 0}, + Bucket2#{per_client := Cli2} end, Case = fun() -> - Client = connect(default), - timer:sleep(1000), + Client = connect(default), + timer:sleep(1000), C1 = emqx_htb_limiter:available(Client), ?assertEqual(Capacity, C1, "test bucket capacity") end, with_bucket(default, Fun, Case). -%%-------------------------------------------------------------------- -%% Test Cases Zone Level -%%-------------------------------------------------------------------- -t_limit_zone_with_unlimit_bucket(_) -> - ZoneMod = fun(Cfg) -> - Cfg#{rate := ?RATE("600/1s"), - burst := ?RATE("60/1s")} - end, - - Bucket = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) -> - Aggr2 = Aggr#{rate := infinity, - initial := 0, - capacity := infinity}, - Cli2 = Cli#{rate := infinity, - initial := 0, - capacity := infinity, - divisible := true}, - Bucket#{aggregated := Aggr2, per_client := Cli2} - end, - - Case = fun() -> - C1 = counters:new(1, []), - start_client(b1, ?NOW + 2000, C1, 20), - timer:sleep(2100), - check_average_rate(C1, 2, 600) - end, - - with_zone(default, ZoneMod, [{b1, Bucket}], Case). - - %%-------------------------------------------------------------------- %% Test Cases Global Level %%-------------------------------------------------------------------- -t_burst_and_fairness(_) -> +t_collaborative_alloc(_) -> GlobalMod = fun(Cfg) -> - Cfg#{burst := ?RATE("60/1s")} + Cfg#{rate := ?RATE("600/1s")} end, - ZoneMod = fun(Cfg) -> - Cfg#{rate := ?RATE("600/1s"), - burst := ?RATE("60/1s")} - end, - - Bucket = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) -> - Aggr2 = Aggr#{rate := ?RATE("500/1s"), - initial := 0, - capacity := 500}, - Cli2 = Cli#{rate := ?RATE("600/1s"), - capacity := 600, - initial := 600}, - Bucket#{aggregated := Aggr2, - per_client := Cli2} + Bucket1 = fun(#{per_client := Cli} = Bucket) -> + Bucket2 = Bucket#{rate := ?RATE("400/1s"), + initial := 0, + capacity := 600}, + Cli2 = Cli#{rate := ?RATE("50"), + capacity := 100, + initial := 100}, + Bucket2#{per_client := Cli2} end, + Bucket2 = fun(Bucket) -> + Bucket2 = Bucket1(Bucket), + Bucket2#{rate := ?RATE("200/1s")} + end, + Case = fun() -> C1 = counters:new(1, []), C2 = counters:new(1, []), start_client(b1, ?NOW + 2000, C1, 20), start_client(b2, ?NOW + 2000, C2, 30), timer:sleep(2100), - check_average_rate(C1, 2, 330), - check_average_rate(C2, 2, 330) + check_average_rate(C1, 2, 300), + check_average_rate(C2, 2, 300) end, with_global(GlobalMod, - default, - ZoneMod, - [{b1, Bucket}, {b2, Bucket}], + [{b1, Bucket1}, {b2, Bucket2}], Case). t_burst(_) -> GlobalMod = fun(Cfg) -> - Cfg#{burst := ?RATE("60/1s")} + Cfg#{rate := ?RATE("200/1s"), + burst := ?RATE("400/1s")} end, - ZoneMod = fun(Cfg) -> - Cfg#{rate := ?RATE("60/1s"), - burst := ?RATE("60/1s")} - end, - - Bucket = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) -> - Aggr2 = Aggr#{rate := ?RATE("500/1s"), - initial := 0, - capacity := 500}, - Cli2 = Cli#{rate := ?RATE("600/1s"), - capacity := 600, + Bucket = fun(#{per_client := Cli} = Bucket) -> + Bucket2 = Bucket#{rate := ?RATE("200/1s"), + initial := 0, + capacity := 200}, + Cli2 = Cli#{rate := ?RATE("50/1s"), + capacity := 200, divisible := true}, - Bucket#{aggregated := Aggr2, - per_client := Cli2} + Bucket2#{per_client := Cli2} end, Case = fun() -> @@ -430,180 +366,39 @@ t_burst(_) -> timer:sleep(2100), Total = lists:sum([counters:get(X, 1) || X <- [C1, C2, C3]]), - in_range(Total / 2, 30) + in_range(Total / 2, 300) end, with_global(GlobalMod, - default, - ZoneMod, [{b1, Bucket}, {b2, Bucket}, {b3, Bucket}], Case). - t_limit_global_with_unlimit_other(_) -> GlobalMod = fun(Cfg) -> Cfg#{rate := ?RATE("600/1s")} end, - ZoneMod = fun(Cfg) -> Cfg#{rate := infinity} end, - - Bucket = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) -> - Aggr2 = Aggr#{rate := infinity, - initial := 0, - capacity := infinity}, - Cli2 = Cli#{rate := infinity, - capacity := infinity, - initial := 0}, - Bucket#{aggregated := Aggr2, - per_client := Cli2} + Bucket = fun(#{per_client := Cli} = Bucket) -> + Bucket2 = Bucket#{rate := infinity, + initial := 0, + capacity := infinity}, + Cli2 = Cli#{rate := infinity, + capacity := infinity, + initial := 0}, + Bucket2#{per_client := Cli2} end, Case = fun() -> - C1 = counters:new(1, []), - start_client(b1, ?NOW + 2000, C1, 20), - timer:sleep(2100), - check_average_rate(C1, 2, 600) + C1 = counters:new(1, []), + start_client(b1, ?NOW + 2000, C1, 20), + timer:sleep(2100), + check_average_rate(C1, 2, 600) end, with_global(GlobalMod, - default, - ZoneMod, [{b1, Bucket}], Case). -t_multi_zones(_) -> - GlobalMod = fun(Cfg) -> - Cfg#{rate := ?RATE("600/1s")} - end, - - Zone1 = fun(Cfg) -> - Cfg#{rate := ?RATE("400/1s")} - end, - - Zone2 = fun(Cfg) -> - Cfg#{rate := ?RATE("500/1s")} - end, - - Bucket = fun(Zone, Rate) -> - fun(#{aggregated := Aggr, per_client := Cli} = Bucket) -> - Aggr2 = Aggr#{rate := infinity, - initial := 0, - capacity := infinity}, - Cli2 = Cli#{rate := Rate, - capacity := infinity, - initial := 0}, - Bucket#{aggregated := Aggr2, - per_client := Cli2, - zone := Zone} - end - end, - - Case = fun() -> - C1 = counters:new(1, []), - C2 = counters:new(1, []), - start_client(b1, ?NOW + 2000, C1, 25), - start_client(b2, ?NOW + 2000, C2, 20), - timer:sleep(2100), - check_average_rate(C1, 2, 300), - check_average_rate(C2, 2, 300) - end, - - with_global(GlobalMod, - [z1, z2], - [Zone1, Zone2], - [{b1, Bucket(z1, ?RATE("400/1s"))}, {b2, Bucket(z2, ?RATE("500/1s"))}], - Case). - -%% because the simulated client will try to reach the maximum rate -%% when divisiable = true, a large number of divided tokens will be generated -%% so this is not an accurate test -t_multi_zones_with_divisible(_) -> - GlobalMod = fun(Cfg) -> - Cfg#{rate := ?RATE("600/1s")} - end, - - Zone1 = fun(Cfg) -> - Cfg#{rate := ?RATE("400/1s")} - end, - - Zone2 = fun(Cfg) -> - Cfg#{rate := ?RATE("500/1s")} - end, - - Bucket = fun(Zone, Rate) -> - fun(#{aggregated := Aggr, per_client := Cli} = Bucket) -> - Aggr2 = Aggr#{rate := Rate, - initial := 0, - capacity := infinity}, - Cli2 = Cli#{rate := Rate, - divisible := true, - capacity := infinity, - initial := 0}, - Bucket#{aggregated := Aggr2, - per_client := Cli2, - zone := Zone} - end - end, - - Case = fun() -> - C1 = counters:new(1, []), - C2 = counters:new(1, []), - start_client(b1, ?NOW + 2000, C1, 25), - start_client(b2, ?NOW + 2000, C2, 20), - timer:sleep(2100), - check_average_rate(C1, 2, 300), - check_average_rate(C2, 2, 300) - end, - - with_global(GlobalMod, - [z1, z2], - [Zone1, Zone2], - [{b1, Bucket(z1, ?RATE("400/1s"))}, {b2, Bucket(z2, ?RATE("500/1s"))}], - Case). - -t_zone_hunger_and_fair(_) -> - GlobalMod = fun(Cfg) -> - Cfg#{rate := ?RATE("600/1s")} - end, - - Zone1 = fun(Cfg) -> - Cfg#{rate := ?RATE("600/1s")} - end, - - Zone2 = fun(Cfg) -> - Cfg#{rate := ?RATE("50/1s")} - end, - - Bucket = fun(Zone, Rate) -> - fun(#{aggregated := Aggr, per_client := Cli} = Bucket) -> - Aggr2 = Aggr#{rate := infinity, - initial := 0, - capacity := infinity}, - Cli2 = Cli#{rate := Rate, - capacity := infinity, - initial := 0}, - Bucket#{aggregated := Aggr2, - per_client := Cli2, - zone := Zone} - end - end, - - Case = fun() -> - C1 = counters:new(1, []), - C2 = counters:new(1, []), - start_client(b1, ?NOW + 2000, C1, 20), - start_client(b2, ?NOW + 2000, C2, 20), - timer:sleep(2100), - check_average_rate(C1, 2, 550), - check_average_rate(C2, 2, 50) - end, - - with_global(GlobalMod, - [z1, z2], - [Zone1, Zone2], - [{b1, Bucket(z1, ?RATE("600/1s"))}, {b2, Bucket(z2, ?RATE("50/1s"))}], - Case). - %%-------------------------------------------------------------------- %% Test Cases container %%-------------------------------------------------------------------- @@ -626,7 +421,8 @@ t_check_container(_) -> capacity := 1000} end, Case = fun() -> - C1 = emqx_limiter_container:new([message_routing]), + C1 = emqx_limiter_container:new([message_routing], + #{message_routing => default}), {ok, C2} = emqx_limiter_container:check(1000, message_routing, C1), {pause, Pause, C3} = emqx_limiter_container:check(1000, message_routing, C2), timer:sleep(Pause), @@ -663,9 +459,7 @@ t_limiter_server(_) -> ?assertMatch(#{root := _, counter := _, index := _, - zones := _, buckets := _, - nodes := _, type := message_routing}, State), Name = emqx_limiter_server:name(message_routing), @@ -675,6 +469,32 @@ t_limiter_server(_) -> ok = emqx_limiter_server:format_status(normal, ok), ok. +t_decimal(_) -> + ?assertEqual(infinity, emqx_limiter_decimal:add(infinity, 3)), + ?assertEqual(5, emqx_limiter_decimal:add(2, 3)), + ?assertEqual(infinity, emqx_limiter_decimal:sub(infinity, 3)), + ?assertEqual(-1, emqx_limiter_decimal:sub(2, 3)), + ?assertEqual(infinity, emqx_limiter_decimal:mul(infinity, 3)), + ?assertEqual(6, emqx_limiter_decimal:mul(2, 3)), + ?assertEqual(infinity, emqx_limiter_decimal:floor_div(infinity, 3)), + ?assertEqual(2, emqx_limiter_decimal:floor_div(7, 3)), + ok. + +t_schema_unit(_) -> + M = emqx_limiter_schema, + ?assertEqual(limiter, M:namespace()), + ?assertEqual({ok, infinity}, M:to_rate(" infinity ")), + ?assertMatch({ok, _}, M:to_rate("100")), + ?assertMatch({error, _}, M:to_rate("0")), + ?assertMatch({ok, _}, M:to_rate("100/10s")), + ?assertMatch({error, _}, M:to_rate("100/10x")), + ?assertEqual({ok, infinity}, M:to_capacity("infinity")), + ?assertEqual({ok, 100}, M:to_capacity("100")), + ?assertEqual({ok, 100 * 1024}, M:to_capacity("100KB")), + ?assertEqual({ok, 100 * 1024 * 1024}, M:to_capacity("100MB")), + ?assertEqual({ok, 100 * 1024 * 1024 * 1024}, M:to_capacity("100GB")), + ok. + %%-------------------------------------------------------------------- %%% Internal functions %%-------------------------------------------------------------------- @@ -752,7 +572,6 @@ client_try_check(Need, #client{counter = Counter, end end. - %% XXX not a god test, because client's rate maybe bigger than global rate %% so if client' rate = infinity %% client's divisible should be true or capacity must be bigger than number of each consume @@ -769,25 +588,17 @@ to_rate(Str) -> {ok, Rate} = emqx_limiter_schema:to_rate(Str), Rate. -with_global(Modifier, ZoneName, ZoneModifier, Buckets, Case) -> - Path = [limiter, message_routing], - #{global := Global} = Cfg = emqx_config:get(Path), - Cfg2 = Cfg#{global := Modifier(Global)}, - with_zone(Cfg2, ZoneName, ZoneModifier, Buckets, Case). +with_global(Modifier, BuckeTemps, Case) -> + Fun = fun(Cfg) -> + #{bucket := #{default := BucketCfg}} = Cfg2 = Modifier(Cfg), + Fun = fun({Name, BMod}, Acc) -> + Acc#{Name => BMod(BucketCfg)} + end, + Buckets = lists:foldl(Fun, #{}, BuckeTemps), + Cfg2#{bucket := Buckets} + end, -with_zone(Name, Modifier, Buckets, Case) -> - Path = [limiter, message_routing], - Cfg = emqx_config:get(Path), - with_zone(Cfg, Name, Modifier, Buckets, Case). - -with_zone(Cfg, Name, Modifier, Buckets, Case) -> - Path = [limiter, message_routing], - #{zone := ZoneCfgs, - bucket := BucketCfgs} = Cfg, - ZoneCfgs2 = apply_modifier(Name, Modifier, ZoneCfgs), - BucketCfgs2 = apply_modifier(Buckets, BucketCfgs), - Cfg2 = Cfg#{zone := ZoneCfgs2, bucket := BucketCfgs2}, - with_config(Path, fun(_) -> Cfg2 end, Case). + with_config([limiter, message_routing], Fun, Case). with_bucket(Bucket, Modifier, Case) -> Path = [limiter, message_routing, bucket, Bucket], @@ -802,8 +613,8 @@ with_config(Path, Modifier, Case) -> NewCfg = Modifier(Cfg), ct:pal("test with config:~p~n", [NewCfg]), emqx_config:put(Path, NewCfg), - emqx_limiter_manager:restart_server(message_routing), - timer:sleep(100), + emqx_limiter_server:update_config(message_routing), + timer:sleep(500), DelayReturn = delay_return(Case), emqx_config:put(Path, Cfg), DelayReturn(). diff --git a/apps/emqx/test/emqx_ws_connection_SUITE.erl b/apps/emqx/test/emqx_ws_connection_SUITE.erl index a2f5c3502..455cf3e43 100644 --- a/apps/emqx/test/emqx_ws_connection_SUITE.erl +++ b/apps/emqx/test/emqx_ws_connection_SUITE.erl @@ -405,16 +405,30 @@ t_handle_timeout_emit_stats(_) -> ?assertEqual(undefined, ?ws_conn:info(stats_timer, St)). t_ensure_rate_limit(_) -> + %% XXX In the future, limiter should provide API for config update + Path = [limiter, bytes_in, bucket, default, per_client], + PerClient = emqx_config:get(Path), + {ok, Rate}= emqx_limiter_schema:to_rate("50MB"), + emqx_config:put(Path, PerClient#{rate := Rate}), + emqx_limiter_server:update_config(bytes_in), + timer:sleep(100), + Limiter = init_limiter(), St = st(#{limiter => Limiter}), - {ok, Need} = emqx_limiter_schema:to_capacity("1GB"), %% must bigger than value in emqx_ratelimit_SUITE + + %% must bigger than value in emqx_ratelimit_SUITE + {ok, Need} = emqx_limiter_schema:to_capacity("1GB"), St1 = ?ws_conn:check_limiter([{Need, bytes_in}], [], fun(_, _, S) -> S end, [], St), ?assertEqual(blocked, ?ws_conn:info(sockstate, St1)), - ?assertEqual([{active, false}], ?ws_conn:info(postponed, St1)). + ?assertEqual([{active, false}], ?ws_conn:info(postponed, St1)), + + emqx_config:put(Path, PerClient), + emqx_limiter_server:update_config(bytes_in), + timer:sleep(100). t_parse_incoming(_) -> {Packets, St} = ?ws_conn:parse_incoming(<<48,3>>, [], st()), @@ -558,7 +572,7 @@ ws_client(State) -> ct:fail(ws_timeout) end. -limiter_cfg() -> #{}. +limiter_cfg() -> #{bytes_in => default, message_in => default}. init_limiter() -> emqx_limiter_container:get_limiter_by_names([bytes_in, message_in], limiter_cfg()). diff --git a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl index 153ec139a..6121e39b6 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl @@ -493,8 +493,6 @@ typename_to_spec("failure_strategy()", _Mod) -> #{type => string, example => <<"force">>}; typename_to_spec("initial()", _Mod) -> #{type => string, example => <<"0M">>}; -typename_to_spec("bucket_path()", _Mod) -> - #{type => string, example => <<"groupName.bucketName">>}; typename_to_spec(Name, Mod) -> Spec = range(Name), Spec1 = remote_module_type(Spec, Name, Mod), diff --git a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl index a1cc42898..c0e9ad42f 100644 --- a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl +++ b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl @@ -198,14 +198,15 @@ dispatch(Context, Pid, Topic, Cursor, Limiter) -> Mod = emqx_retainer:get_backend_module(), case Cursor =/= undefined orelse emqx_topic:wildcard(Topic) of false -> - {ok, Result} = Mod:read_message(Context, Topic), + {ok, Result} = erlang:apply(Mod, read_message, [Context, Topic]), deliver(Result, Context, Pid, Topic, undefined, Limiter); true -> - {ok, Result, NewCursor} = Mod:match_messages(Context, Topic, Cursor), + {ok, Result, NewCursor} = erlang:apply(Mod, match_messages, [Context, Topic, Cursor]), deliver(Result, Context, Pid, Topic, NewCursor, Limiter) end. --spec deliver(list(emqx_types:message()), context(), pid(), topic(), cursor(), limiter()) -> {ok, limiter()}. +-spec deliver(list(emqx_types:message()), context(), pid(), topic(), cursor(), limiter()) -> + {ok, limiter()}. deliver([], _Context, _Pid, _Topic, undefined, Limiter) -> {ok, Limiter}; diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index a9405373e..f143734fc 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -36,7 +36,7 @@ retainer { flow_control { batch_read_number = 0 batch_deliver_number = 0 - limiter_bucket_name = retainer + limiter.batch = retainer } backend { type = built_in_database @@ -281,12 +281,11 @@ t_stop_publish_clear_msg(_) -> ok = emqtt:disconnect(C1). t_flow_control(_) -> - #{per_client := PerClient} = RetainerCfg = emqx_config:get([limiter, shared, bucket, retainer]), - RetainerCfg2 = RetainerCfg#{ - per_client := PerClient#{ - rate := emqx_ratelimiter_SUITE:to_rate("1/1s"), - capacity := 1}}, - emqx_config:put([limiter, shared, bucket, retainer], RetainerCfg2), + #{per_client := PerClient} = RetainerCfg = emqx_config:get([limiter, batch, bucket, retainer]), + RetainerCfg2 = RetainerCfg#{per_client := + PerClient#{rate := emqx_ratelimiter_SUITE:to_rate("1/1s"), + capacity := 1}}, + emqx_config:put([limiter, batch, bucket, retainer], RetainerCfg2), emqx_limiter_manager:restart_server(shared), timer:sleep(500), @@ -296,7 +295,7 @@ t_flow_control(_) -> emqx_retainer:update_config(#{<<"flow_control">> => #{<<"batch_read_number">> => 1, <<"batch_deliver_number">> => 1, - <<"limiter_bucket_name">> => retainer}}), + <<"limiter">> => #{<<"batch">> => retainer}}}), {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, _} = emqtt:connect(C1), emqtt:publish( @@ -326,7 +325,7 @@ t_flow_control(_) -> ok = emqtt:disconnect(C1), %% recover the limiter - emqx_config:put([limiter, shared, bucket, retainer], RetainerCfg), + emqx_config:put([limiter, batch, bucket, retainer], RetainerCfg), emqx_limiter_manager:restart_server(shared), timer:sleep(500), From beba7c9692294868c2d673d1826df207716a483a Mon Sep 17 00:00:00 2001 From: firest Date: Fri, 18 Mar 2022 14:45:06 +0800 Subject: [PATCH 4/4] fix(limiter): improve code style and description --- .../src/emqx_limiter/src/emqx_htb_limiter.erl | 12 ++-- .../emqx_limiter/src/emqx_limiter_schema.erl | 59 +++++++++++++++---- .../emqx_limiter/src/emqx_limiter_server.erl | 2 +- apps/emqx/src/emqx_schema.erl | 2 +- .../src/emqx_dashboard_swagger.erl | 2 + apps/emqx_retainer/etc/emqx_retainer.conf | 11 +++- .../src/emqx_retainer_dispatcher.erl | 8 +-- .../src/emqx_retainer_schema.erl | 2 +- .../test/emqx_retainer_SUITE.erl | 4 +- 9 files changed, 72 insertions(+), 30 deletions(-) diff --git a/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl b/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl index 372944666..2a4e13731 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl @@ -43,7 +43,6 @@ %% retry contenxt %% undefined meaning no retry context or no need to retry - , retry_ctx => undefined | retry_context(token_bucket_limiter()) %% the retry context , atom => any() %% allow to add other keys @@ -108,8 +107,6 @@ -import(emqx_limiter_decimal, [sub/2, mul/2, floor_div/2, add/2]). --elvis([{elvis_style, no_if_expression, disable}]). - %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- @@ -308,18 +305,19 @@ do_reset(Need, capacity := Capacity} = Limiter) -> Now = ?NOW, Tokens2 = apply_elapsed_time(Rate, Now - LastTime, Tokens, Capacity), - Available = erlang:floor(Tokens2), - if Available >= Need -> + + case erlang:floor(Tokens2) of + Available when Available >= Need -> Limiter2 = Limiter#{tokens := Tokens2, lasttime := Now}, do_check_with_parent_limiter(Need, Limiter2); - Divisible andalso Available > 0 -> + Available when Divisible andalso Available > 0 -> %% must be allocated here, because may be Need > Capacity return_pause(Rate, partial, fun do_reset/2, Need - Available, Limiter#{tokens := 0, lasttime := Now}); - true -> + _ -> return_pause(Rate, pause, fun do_reset/2, Need, Limiter) end. diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl index b9b25e806..3f313e7e4 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl @@ -53,9 +53,10 @@ , capacity/0 , initial/0 , failure_strategy/0 + , bucket_name/0 ]). --export_type([limiter_type/0, bucket_name/0, bucket_path/0]). +-export_type([limiter_type/0, bucket_path/0]). -import(emqx_schema, [sc/2, map/2]). -define(UNIT_TIME_IN_MS, 1000). @@ -65,24 +66,58 @@ namespace() -> limiter. roots() -> [limiter]. fields(limiter) -> - [ {bytes_in, sc(ref(limiter_opts), #{description => <<"Limiter of message publish bytes">>})} - , {message_in, sc(ref(limiter_opts), #{description => <<"Message publish limiter">>})} - , {connection, sc(ref(limiter_opts), #{description => <<"Connection limiter">>})} - , {message_routing, sc(ref(limiter_opts), #{description => <<"Deliver limiter">>})} + [ {bytes_in, sc(ref(limiter_opts), + #{description => + <<"The bytes_in limiter.
" + "It is used to limit the inbound bytes rate for this EMQX node." + "If the this limiter limit is reached," + "the restricted client will be slow down even be hung for a while.">> + })} + , {message_in, sc(ref(limiter_opts), + #{description => + <<"The message_in limiter.
" + "This is used to limit the inbound message numbers for this EMQX node" + "If the this limiter limit is reached," + "the restricted client will be slow down even be hung for a while.">> + })} + , {connection, sc(ref(limiter_opts), + #{description => + <<"The connection limiter.
" + "This is used to limit the connection rate for this EMQX node" + "If the this limiter limit is reached," + "New connections will be refused" + >>})} + , {message_routing, sc(ref(limiter_opts), + #{description => + <<"The message_routing limiter.
" + "This is used to limite the deliver rate for this EMQX node" + "If the this limiter limit is reached," + "New publish will be refused" + >> + })} , {batch, sc(ref(limiter_opts), - #{description => <<"Internal batch operation limiter">>})} + #{description => <<"The batch limiter.
" + "This is used for EMQX internal batch operation" + "e.g. limite the retainer's deliver rate" + >> + })} ]; fields(limiter_opts) -> - [ {rate, sc(rate(), #{default => "infinity"})} - , {burst, sc(burst_rate(), #{default => "0/0s"})} - , {bucket, sc(map("bucket name", ref(bucket_opts)), #{})} + [ {rate, sc(rate(), #{default => "infinity", desc => "The rate"})} + , {burst, sc(burst_rate(), + #{default => "0/0s", + desc => "The burst, This value is based on rate." + "this value + rate = the maximum limit that can be achieved when limiter burst" + })} + , {bucket, sc(map("bucket name", ref(bucket_opts)), #{desc => "Buckets config"})} ]; fields(bucket_opts) -> - [ {rate, sc(rate(), #{})} - , {capacity, sc(capacity(), #{})} - , {initial, sc(initial(), #{default => "0"})} + [ {rate, sc(rate(), #{desc => "The rate for this bucket"})} + , {capacity, sc(capacity(), #{desc => "The maximum number of tokens for this bucket"})} + , {initial, sc(initial(), #{default => "0", + desc => "The initial number of tokens for this bucket"})} , {per_client, sc(ref(client_bucket), #{default => #{}, desc => "The rate limit for each user of the bucket," diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl index 896792a32..84688ba38 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl @@ -1,4 +1,4 @@ -%-------------------------------------------------------------------- +%%-------------------------------------------------------------------- %% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index a450cc245..35012acc7 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1197,7 +1197,7 @@ base_listener() -> #{ default => 'default' })} , {"limiter", - sc(map("ratelimit's type", atom()), #{default => #{}})} + sc(map("ratelimit's type", emqx_limiter_schema:bucket_name()), #{default => #{}})} ]. %% utils diff --git a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl index 6121e39b6..093c6fcc4 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl @@ -493,6 +493,8 @@ typename_to_spec("failure_strategy()", _Mod) -> #{type => string, example => <<"force">>}; typename_to_spec("initial()", _Mod) -> #{type => string, example => <<"0M">>}; +typename_to_spec("bucket_name()", _Mod) -> + #{type => string, example => <<"retainer">>}; typename_to_spec(Name, Mod) -> Spec = range(Name), Spec1 = remote_module_type(Spec, Name, Mod), diff --git a/apps/emqx_retainer/etc/emqx_retainer.conf b/apps/emqx_retainer/etc/emqx_retainer.conf index 05cc1dcd0..051f44940 100644 --- a/apps/emqx_retainer/etc/emqx_retainer.conf +++ b/apps/emqx_retainer/etc/emqx_retainer.conf @@ -57,8 +57,15 @@ retainer { ## Default: 0 batch_deliver_number = 0 - ## deliver limiter bucket - limiter.batch = retainer + ## The rate limiter name for retained messages delivery. + ## In order to avoid delivering too many messages to the client at once, which may cause the client + ## to block or crash, or message dropped due to exceeding the size of the message queue. We need + ## to specify a rate limiter for the retained messages delivery to the client. + ## + ## The names of the available rate limiters are taken from the existing rate limiters under `limiter.batch`. + ## You can remove this field if you don't want any limit + ## Default: retainer + batch_deliver_limiter = retainer } ## Maximum retained message size. diff --git a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl index c0e9ad42f..6eb5457b7 100644 --- a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl +++ b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl @@ -85,8 +85,8 @@ start_link(Pool, Id) -> init([Pool, Id]) -> erlang:process_flag(trap_exit, true), true = gproc_pool:connect_worker(Pool, {Pool, Id}), - LimiterCfg = emqx:get_config([retainer, flow_control, limiter]), - Limiter = emqx_limiter_server:connect(batch, LimiterCfg), + BucketName = emqx:get_config([retainer, flow_control, batch_deliver_limiter]), + Limiter = emqx_limiter_server:connect(batch, BucketName), {ok, #{pool => Pool, id => Id, limiter => Limiter}}. %%-------------------------------------------------------------------- @@ -124,8 +124,8 @@ handle_cast({dispatch, Context, Pid, Topic}, #{limiter := Limiter} = State) -> {noreply, State#{limiter := Limiter2}}; handle_cast(refresh_limiter, State) -> - LimiterCfg = emqx:get_config([retainer, flow_control, limiter]), - Limiter = emqx_limiter_server:connect(batch, LimiterCfg), + BucketName = emqx:get_config([retainer, flow_control, batch_deliver_limiter]), + Limiter = emqx_limiter_server:connect(batch, BucketName), {noreply, State#{limiter := Limiter}}; handle_cast(Msg, State) -> diff --git a/apps/emqx_retainer/src/emqx_retainer_schema.erl b/apps/emqx_retainer/src/emqx_retainer_schema.erl index 492a89021..12189c737 100644 --- a/apps/emqx_retainer/src/emqx_retainer_schema.erl +++ b/apps/emqx_retainer/src/emqx_retainer_schema.erl @@ -29,7 +29,7 @@ fields(mnesia_config) -> fields(flow_control) -> [ {batch_read_number, sc(integer(), 0, fun is_pos_integer/1)} , {batch_deliver_number, sc(range(0, 1000), 0)} - , {limiter, sc(emqx_schema:map("limiter's type", atom()), #{})} + , {batch_deliver_limiter, sc(emqx_limiter_schema:bucket_name(), undefined)} ]. %%-------------------------------------------------------------------- diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index f143734fc..9e957f214 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -36,7 +36,7 @@ retainer { flow_control { batch_read_number = 0 batch_deliver_number = 0 - limiter.batch = retainer + batch_deliver_limiter = retainer } backend { type = built_in_database @@ -295,7 +295,7 @@ t_flow_control(_) -> emqx_retainer:update_config(#{<<"flow_control">> => #{<<"batch_read_number">> => 1, <<"batch_deliver_number">> => 1, - <<"limiter">> => #{<<"batch">> => retainer}}}), + <<"batch_deliver_limiter">> => retainer}}), {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, _} = emqtt:connect(C1), emqtt:publish(