From 32030c8369a0850d5081b68558e8d0ee9901e4bf Mon Sep 17 00:00:00 2001 From: firest Date: Fri, 11 Mar 2022 18:04:35 +0800 Subject: [PATCH] feat(limiter): remove the group(zone) level --- .../src/emqx_limiter/etc/emqx_limiter.conf | 36 +- .../src/emqx_limiter/src/emqx_htb_limiter.erl | 13 +- .../src/emqx_limiter_container.erl | 8 +- .../emqx_limiter/src/emqx_limiter_manager.erl | 30 +- .../emqx_limiter/src/emqx_limiter_schema.erl | 55 +-- .../emqx_limiter/src/emqx_limiter_server.erl | 400 ++++++------------ apps/emqx/src/emqx_schema.erl | 2 +- apps/emqx_retainer/etc/emqx_retainer.conf | 4 +- .../src/emqx_retainer_dispatcher.erl | 8 +- .../src/emqx_retainer_schema.erl | 2 +- 10 files changed, 190 insertions(+), 368 deletions(-) diff --git a/apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf b/apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf index d6edc7dd2..a29903205 100644 --- a/apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf +++ b/apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf @@ -5,46 +5,38 @@ limiter { ## rate limiter for message publish bytes_in { - group.default { - bucket.default { - rate = infinity - capacity = infinity - } + bucket.default { + rate = infinity + capacity = infinity } } ## rate limiter for message publish message_in { - group.default { - bucket.default { - rate = infinity - capacity = infinity - } + bucket.default { + rate = infinity + capacity = infinity } } ## connection rate limiter connection { - group.default { - bucket.default { - rate = infinity - capacity = infinity - } + bucket.default { + rate = infinity + capacity = infinity } } ## rate limiter for message deliver message_routing { - group.default { - bucket.default { - rate = infinity - capacity = infinity - } + bucket.default { + rate = infinity + capacity = infinity } } - ## Some functions that don't need to use global and zone scope, them can shared use this type - shared { + ## rate limiter for internal batch operation + batch { bucket.retainer { rate = infinity capacity = infinity diff --git a/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl b/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl index 45256c00b..74cd62833 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl @@ -28,18 +28,23 @@ -export_type([token_bucket_limiter/0]). %% a token bucket limiter with a limiter server's bucket reference --type token_bucket_limiter() :: #{ tokens := non_neg_integer() %% the number of tokens currently available +-type token_bucket_limiter() :: #{ %% the number of tokens currently available + tokens := non_neg_integer() , rate := decimal() , capacity := decimal() , lasttime := millisecond() - , max_retry_time := non_neg_integer() %% @see emqx_limiter_schema - , failure_strategy := failure_strategy() %% @see emqx_limiter_schema + %% @see emqx_limiter_schema + , max_retry_time := non_neg_integer() + %% @see emqx_limiter_schema + , failure_strategy := failure_strategy() , divisible := boolean() %% @see emqx_limiter_schema , low_water_mark := non_neg_integer() %% @see emqx_limiter_schema , bucket := bucket() %% the limiter server's bucket %% retry contenxt - , retry_ctx => undefined %% undefined meaning there is no retry context or no need to retry + %% undefined meaning no retry context or no need to retry + + , retry_ctx => undefined | retry_context(token_bucket_limiter()) %% the retry context , atom => any() %% allow to add other keys }. diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_container.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_container.erl index 998bd9432..395174448 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_container.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_container.erl @@ -30,7 +30,10 @@ -export_type([container/0, check_result/0]). -type container() :: #{ limiter_type() => undefined | limiter() - , retry_key() => undefined | retry_context() | future() %% the retry context of the limiter + %% the retry context of the limiter + , retry_key() => undefined + | retry_context() + | future() , retry_ctx := undefined | any() %% the retry context of the container }. @@ -62,7 +65,8 @@ new(Types) -> %% @doc generate a container %% according to the type of limiter and the bucket name configuration of the limiter %% @end --spec get_limiter_by_names(list(limiter_type()), #{limiter_type() => emqx_limiter_schema:bucket_name()}) -> container(). +-spec get_limiter_by_names(list(limiter_type()), + #{limiter_type() => emqx_limiter_schema:bucket_name()}) -> container(). get_limiter_by_names(Types, BucketNames) -> Init = fun(Type, Acc) -> Limiter = emqx_limiter_server:connect(Type, BucketNames), diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl index 8295fa5d5..9cf4bc1d0 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl @@ -24,7 +24,8 @@ %% API -export([ start_link/0, start_server/1, find_bucket/1 , find_bucket/2, insert_bucket/2, insert_bucket/3 - , make_path/2, make_path/3, restart_server/1]). + , make_path/2, restart_server/1 + ]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -34,9 +35,7 @@ -type path() :: list(atom()). -type limiter_type() :: emqx_limiter_schema:limiter_type(). --type zone_name() :: emqx_limiter_schema:zone_name(). -type bucket_name() :: emqx_limiter_schema:bucket_name(). --type bucket_path() :: emqx_limiter_schema:bucket_path(). %% counter record in ets table -record(bucket, { path :: path() @@ -58,10 +57,10 @@ start_server(Type) -> restart_server(Type) -> emqx_limiter_server_sup:restart(Type). --spec find_bucket(limiter_type(), bucket_path()) -> +-spec find_bucket(limiter_type(), bucket_name()) -> {ok, bucket_ref()} | undefined. -find_bucket(Type, BucketPath) -> - find_bucket(make_path(Type, BucketPath)). +find_bucket(Type, BucketName) -> + find_bucket(make_path(Type, BucketName)). -spec find_bucket(path()) -> {ok, bucket_ref()} | undefined. find_bucket(Path) -> @@ -73,26 +72,19 @@ find_bucket(Path) -> end. -spec insert_bucket(limiter_type(), - bucket_path(), + bucket_name(), bucket_ref()) -> boolean(). -insert_bucket(Type, BucketPath, Bucket) -> - inner_insert_bucket(make_path(Type, BucketPath), Bucket). +insert_bucket(Type, BucketName, Bucket) -> + inner_insert_bucket(make_path(Type, BucketName), Bucket). -spec insert_bucket(path(), bucket_ref()) -> true. insert_bucket(Path, Bucket) -> inner_insert_bucket(Path, Bucket). --spec make_path(limiter_type(), bucket_path()) -> path(). -make_path(Type, BucketPath) -> - [Type | BucketPath]. - --spec make_path(limiter_type(), zone_name(), bucket_name()) -> path(). -make_path(shared, _GroupName, BucketName) -> - [shared, BucketName]; - -make_path(Type, GroupName, BucketName) -> - [Type, GroupName, BucketName]. +-spec make_path(limiter_type(), bucket_name()) -> path(). +make_path(Type, BucketName) -> + [Type | BucketName]. %%-------------------------------------------------------------------- %% @doc diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl index 77cf9cb75..c1ca45e1b 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl @@ -20,7 +20,8 @@ -export([ roots/0, fields/1, to_rate/1, to_capacity/1 , minimum_period/0, to_burst_rate/1, to_initial/1 - , namespace/0, to_bucket_path/1, default_group_name/0]). + , namespace/0, get_bucket_cfg_path/2 + ]). -define(KILOBYTE, 1024). @@ -31,7 +32,6 @@ | shared. -type bucket_name() :: atom(). --type zone_name() :: atom(). -type rate() :: infinity | float(). -type burst_rate() :: 0 | float(). -type capacity() :: infinity | number(). %% the capacity of the token bucket @@ -47,17 +47,15 @@ -typerefl_from_string({burst_rate/0, ?MODULE, to_burst_rate}). -typerefl_from_string({capacity/0, ?MODULE, to_capacity}). -typerefl_from_string({initial/0, ?MODULE, to_initial}). --typerefl_from_string({bucket_path/0, ?MODULE, to_bucket_path}). -reflect_type([ rate/0 , burst_rate/0 , capacity/0 , initial/0 , failure_strategy/0 - , bucket_path/0 ]). --export_type([limiter_type/0, bucket_name/0, zone_name/0]). +-export_type([limiter_type/0, bucket_name/0, bucket_path/0]). -import(emqx_schema, [sc/2, map/2]). -define(UNIT_TIME_IN_MS, 1000). @@ -67,38 +65,24 @@ namespace() -> limiter. roots() -> [limiter]. fields(limiter) -> - [ {bytes_in, sc(ref(limiter_opts), #{})} - , {message_in, sc(ref(limiter_opts), #{})} - , {connection, sc(ref(limiter_opts), #{})} - , {message_routing, sc(ref(limiter_opts), #{})} - , {shared, sc(ref(shared_opts), - #{description => - <<"Some functions that do not need to use global and zone scope," - "them can shared use this type">>})} + [ {bytes_in, sc(ref(limiter_opts), #{description => <<"Limiter of message publish bytes">>})} + , {message_in, sc(ref(limiter_opts), #{description => <<"Message publish limiter">>})} + , {connection, sc(ref(limiter_opts), #{description => <<"Connection limiter">>})} + , {message_routing, sc(ref(limiter_opts), #{description => <<"Deliver limiter">>})} + , {batch, sc(ref(limiter_opts), + #{description => <<"Internal batch operation limiter">>})} ]; fields(limiter_opts) -> - fields(rate_burst) ++ %% the node global limit - [ {group, sc(map("group name", ref(group_opts)), #{})} - ]; - -fields(group_opts) -> - fields(rate_burst) ++ %% the group limite - [ {bucket, sc(map("bucket name", ref(bucket_opts)), #{})} - ]; - -fields(rate_burst) -> [ {rate, sc(rate(), #{default => "infinity"})} , {burst, sc(burst_rate(), #{default => "0/0s"})} + , {bucket, sc(map("bucket name", ref(bucket_opts)), #{})} ]; -fields(shared_opts) -> - [{bucket, sc(map("bucket name", ref(bucket_opts)), #{})}]; - fields(bucket_opts) -> [ {rate, sc(rate(), #{})} - , {initial, sc(initial(), #{default => "0"})} , {capacity, sc(capacity(), #{})} + , {initial, sc(initial(), #{default => "0"})} , {per_client, sc(ref(client_bucket), #{default => #{}, desc => "The rate limit for each user of the bucket," @@ -137,9 +121,9 @@ minimum_period() -> to_rate(Str) -> to_rate(Str, true, false). -%% default group name for shared type limiter -default_group_name() -> - '_default'. +-spec get_bucket_cfg_path(limiter_type(), bucket_name()) -> bucket_path(). +get_bucket_cfg_path(Type, BucketName) -> + [limiter, Type, bucket, BucketName]. %%-------------------------------------------------------------------- %% Internal functions @@ -213,14 +197,3 @@ apply_unit("kb", Val) -> Val * ?KILOBYTE; apply_unit("mb", Val) -> Val * ?KILOBYTE * ?KILOBYTE; apply_unit("gb", Val) -> Val * ?KILOBYTE * ?KILOBYTE * ?KILOBYTE; apply_unit(Unit, _) -> throw("invalid unit:" ++ Unit). - -to_bucket_path(Str) -> - Dirs = [erlang:list_to_atom(string:trim(T)) || T <- string:tokens(Str, ".")], - case Dirs of - [_Group, _Bucket] = Path -> - {ok, Path}; - [_Bucket] = Path -> - {ok, Path}; - _ -> - {error, Str} - end. diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl index b456d7046..95c8a47ec 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl @@ -34,28 +34,16 @@ terminate/2, code_change/3, format_status/2]). -export([ start_link/1, connect/2, info/1 - , name/1, get_initial_val/1]). + , name/1, get_initial_val/1 + ]). -type root() :: #{ rate := rate() %% number of tokens generated per period , burst := rate() , period := pos_integer() %% token generation interval(second) - , childs := list(node_id()) %% node children , consumed := non_neg_integer() }. --type zone() :: #{ id := node_id() - , name := zone_name() - , rate := rate() - , burst := rate() - , obtained := non_neg_integer() %% number of tokens obtained - , childs := list(node_id()) - }. - --type bucket() :: #{ id := node_id() - , name := bucket_name() - %% pointer to zone node, use for burst - %% it also can use nodeId, nodeId is more direct, but nodeName is clearer - , zone := zone_name() +-type bucket() :: #{ name := bucket_name() , rate := rate() , obtained := non_neg_integer() , correction := emqx_limiter_decimal:zero_or_float() %% token correction value @@ -64,19 +52,14 @@ , index := undefined | index() }. --type state() :: #{ root := undefined | root() +-type state() :: #{ type := limiter_type() + , root := undefined | root() + , buckets := buckets() , counter := undefined | counters:counters_ref() %% current counter to alloc , index := index() - , zones := #{zone_name() => node_id()} - , buckets := list(node_id()) - , nodes := nodes() - , type := limiter_type() }. --type node_id() :: pos_integer(). --type node_data() :: zone() | bucket(). --type nodes() :: #{node_id() => node_data()}. --type zone_name() :: emqx_limiter_schema:zone_name(). +-type buckets() :: #{bucket_name() => bucket()}. -type limiter_type() :: emqx_limiter_schema:limiter_type(). -type bucket_name() :: emqx_limiter_schema:bucket_name(). -type rate() :: decimal(). @@ -84,11 +67,10 @@ -type capacity() :: decimal(). -type decimal() :: emqx_limiter_decimal:decimal(). -type index() :: pos_integer(). --type bucket_path() :: emqx_limiter_schema:bucket_path(). -define(CALL(Type), gen_server:call(name(Type), ?FUNCTION_NAME)). -define(OVERLOAD_MIN_ALLOC, 0.3). %% minimum coefficient for overloaded limiter --define(CURRYING(X, Fun2), fun(Y) -> Fun2(X, Y) end). +-define(CURRYING(X, F2), fun(Y) -> F2(X, Y) end). -export_type([index/0]). -import(emqx_limiter_decimal, [add/2, sub/2, mul/2, put_to_counter/3]). @@ -99,28 +81,22 @@ %% API %%-------------------------------------------------------------------- -spec connect(limiter_type(), - bucket_path() | #{limiter_type() => bucket_path() | undefined}) -> + bucket_name() | #{limiter_type() => bucket_name() | undefined}) -> emqx_htb_limiter:limiter(). %% If no bucket path is set in config, there will be no limit connect(_Type, undefined) -> emqx_htb_limiter:make_infinity_limiter(undefined); -%% Shared type can use bucket name directly -connect(shared, BucketName) when is_atom(BucketName) -> - connect(shared, [BucketName]); - -connect(Type, BucketPath) when is_list(BucketPath) -> - FullPath = get_bucket_full_cfg_path(Type, BucketPath), - case emqx:get_config(FullPath, undefined) of - undefined -> - io:format(">>>>> config:~p~n fullpath:~p~n", [emqx:get_config([limiter]), FullPath]), - io:format(">>>>> ets:~p~n", [ets:tab2list(emqx_limiter_counters)]), - ?SLOG(error, #{msg => "bucket_config_not_found", path => BucketPath}), - throw("bucket's config not found"); - #{rate := AggrRate, - capacity := AggrSize, - per_client := #{rate := CliRate, capacity := CliSize} = Cfg} -> - case emqx_limiter_manager:find_bucket(Type, BucketPath) of +connect(Type, BucketName) when is_atom(BucketName) -> + CfgPath = emqx_limiter_schema:get_bucket_cfg_path(Type, BucketName), + case emqx:get_config(CfgPath, undefined) of + undefined -> + ?SLOG(error, #{msg => "bucket_config_not_found", path => CfgPath}), + throw("bucket's config not found"); + #{rate := AggrRate, + capacity := AggrSize, + per_client := #{rate := CliRate, capacity := CliSize} = Cfg} -> + case emqx_limiter_manager:find_bucket(Type, BucketName) of {ok, Bucket} -> if CliRate < AggrRate orelse CliSize < AggrSize -> emqx_htb_limiter:make_token_bucket_limiter(Cfg, Bucket); @@ -130,8 +106,7 @@ connect(Type, BucketPath) when is_list(BucketPath) -> emqx_htb_limiter:make_ref_limiter(Cfg, Bucket) end; undefined -> - io:format(">>>>> ets:~p~n", [ets:tab2list(emqx_limiter_counters)]), - ?SLOG(error, #{msg => "bucket_not_found", path => BucketPath}), + ?SLOG(error, #{msg => "bucket_not_found", path => CfgPath}), throw("invalid bucket") end end; @@ -172,13 +147,12 @@ start_link(Type) -> {stop, Reason :: term()} | ignore. init([Type]) -> - State = #{root => undefined, - counter => undefined, - index => 1, - zones => #{}, - nodes => #{}, - buckets => [], - type => Type}, + State = #{ type => Type + , root => undefined + , counter => undefined + , index => 1 + , buckets => #{} + }, State2 = init_tree(Type, State), #{root := #{period := Perido}} = State2, oscillate(Perido), @@ -289,59 +263,38 @@ oscillate(Interval) -> -spec oscillation(state()) -> state(). oscillation(#{root := #{rate := Flow, period := Interval, - childs := ChildIds, consumed := Consumed} = Root, - nodes := Nodes} = State) -> + buckets := Buckets} = State) -> oscillate(Interval), - Childs = get_ordered_childs(ChildIds, Nodes), - {Alloced, Nodes2} = transverse(Childs, Flow, 0, Nodes), - maybe_burst(State#{nodes := Nodes2, + Ordereds = get_ordered_buckets(Buckets), + {Alloced, Buckets2} = transverse(Ordereds, Flow, 0, Buckets), + maybe_burst(State#{buckets := Buckets2, root := Root#{consumed := Consumed + Alloced}}). %% @doc horizontal spread --spec transverse(list(node_data()), +-spec transverse(list(bucket()), flow(), non_neg_integer(), - nodes()) -> {non_neg_integer(), nodes()}. -transverse([H | T], InFlow, Alloced, Nodes) when InFlow > 0 -> - {NodeAlloced, Nodes2} = longitudinal(H, InFlow, Nodes), - InFlow2 = sub(InFlow, NodeAlloced), - Alloced2 = Alloced + NodeAlloced, - transverse(T, InFlow2, Alloced2, Nodes2); + buckets()) -> {non_neg_integer(), buckets()}. +transverse([H | T], InFlow, Alloced, Buckets) when InFlow > 0 -> + {BucketAlloced, Buckets2} = longitudinal(H, InFlow, Buckets), + InFlow2 = sub(InFlow, BucketAlloced), + Alloced2 = Alloced + BucketAlloced, + transverse(T, InFlow2, Alloced2, Buckets2); -transverse(_, _, Alloced, Nodes) -> - {Alloced, Nodes}. +transverse(_, _, Alloced, Buckets) -> + {Alloced, Buckets}. %% @doc vertical spread --spec longitudinal(node_data(), flow(), nodes()) -> - {non_neg_integer(), nodes()}. -longitudinal(#{id := Id, - rate := Rate, - obtained := Obtained, - childs := ChildIds} = Node, InFlow, Nodes) -> - Flow = erlang:min(InFlow, Rate), - - if Flow > 0 -> - Childs = get_ordered_childs(ChildIds, Nodes), - {Alloced, Nodes2} = transverse(Childs, Flow, 0, Nodes), - if Alloced > 0 -> - {Alloced, - Nodes2#{Id => Node#{obtained := Obtained + Alloced}}}; - true -> - %% childs are empty or all counter childs are full - {0, Nodes2} - end; - true -> - {0, Nodes} - end; - -longitudinal(#{id := Id, +-spec longitudinal(bucket(), flow(), buckets()) -> + {non_neg_integer(), buckets()}. +longitudinal(#{name := Name, rate := Rate, capacity := Capacity, counter := Counter, index := Index, - obtained := Obtained} = Node, - InFlow, Nodes) when Counter =/= undefined -> + obtained := Obtained} = Bucket, + InFlow, Buckets) when Counter =/= undefined -> Flow = erlang:min(InFlow, Rate), ShouldAlloc = @@ -361,224 +314,149 @@ longitudinal(#{id := Id, %% XXX if capacity is infinity, and flow always > 0, the value in %% counter will be overflow at some point in the future, do we need %% to deal with this situation??? - {Inc, Node2} = emqx_limiter_correction:add(Available, Node), + {Inc, Bucket2} = emqx_limiter_correction:add(Available, Bucket), counters:add(Counter, Index, Inc), {Inc, - Nodes#{Id := Node2#{obtained := Obtained + Inc}}}; + Buckets#{Name := Bucket2#{obtained := Obtained + Inc}}}; _ -> - {0, Nodes} + {0, Buckets} end; -longitudinal(_, _, Nodes) -> - {0, Nodes}. +longitudinal(_, _, Buckets) -> + {0, Buckets}. --spec get_ordered_childs(list(node_id()), nodes()) -> list(node_data()). -get_ordered_childs(Ids, Nodes) -> - Childs = [maps:get(Id, Nodes) || Id <- Ids], +-spec get_ordered_buckets(list(bucket()) | buckets()) -> list(bucket()). +get_ordered_buckets(Buckets) when is_map(Buckets) -> + BucketList = maps:values(Buckets), + get_ordered_buckets(BucketList); +get_ordered_buckets(Buckets) -> %% sort by obtained, avoid node goes hungry lists:sort(fun(#{obtained := A}, #{obtained := B}) -> A < B end, - Childs). + Buckets). -spec maybe_burst(state()) -> state(). maybe_burst(#{buckets := Buckets, - zones := Zones, - root := #{burst := Burst}, - nodes := Nodes} = State) when Burst > 0 -> - %% find empty buckets and group by zone name - GroupFun = fun(Id, Groups) -> - %% TODO filter undefined counter - #{counter := Counter, - index := Index, - zone := Zone} = maps:get(Id, Nodes), - case counters:get(Counter, Index) of - Any when Any =< 0 -> - Group = maps:get(Zone, Groups, []), - maps:put(Zone, [Id | Group], Groups); - _ -> - Groups - end - end, + root := #{burst := Burst}} = State) when Burst > 0 -> + Fold = fun(_Name, #{counter := Cnt, index := Idx} = Bucket, Acc) when Cnt =/= undefined -> + case counters:get(Cnt, Idx) > 0 of + true -> + Acc; + false -> + [Bucket | Acc] + end; + (_Name, _Bucket, Acc) -> + Acc + end, - case lists:foldl(GroupFun, #{}, Buckets) of - Groups when map_size(Groups) > 0 -> - %% remove the zone which don't support burst - Filter = fun({Name, Childs}, Acc) -> - ZoneId = maps:get(Name, Zones), - #{burst := ZoneBurst} = Zone = maps:get(ZoneId, Nodes), - case ZoneBurst > 0 of - true -> - [{Zone, Childs} | Acc]; - _ -> - Acc - end - end, - - FilterL = lists:foldl(Filter, [], maps:to_list(Groups)), - dispatch_burst(FilterL, State); - _ -> - State - end; + Empties = maps:fold(Fold, [], Buckets), + dispatch_burst(Empties, Burst, State); maybe_burst(State) -> State. --spec dispatch_burst(list({zone(), list(node_id())}), state()) -> state(). -dispatch_burst([], State) -> +-spec dispatch_burst(list(bucket()), non_neg_integer(), state()) -> state(). +dispatch_burst([], _, State) -> State; -dispatch_burst(GroupL, - #{root := #{burst := Burst}, - nodes := Nodes} = State) -> - InFlow = Burst / erlang:length(GroupL), - Dispatch = fun({Zone, Childs}, NodeAcc) -> - #{id := ZoneId, - burst := ZoneBurst, - obtained := Obtained} = Zone, +dispatch_burst(Empties, InFlow, #{consumed := Consumed, buckets := Buckets} = State) -> + EachFlow = InFlow / erlang:length(Empties), + {Alloced, Buckets2} = dispatch_burst_to_buckets(Empties, EachFlow, 0, Buckets), + State#{consumed := Consumed + Alloced, buckets := Buckets2}. - case erlang:min(InFlow, ZoneBurst) of - 0 -> NodeAcc; - ZoneFlow -> - EachFlow = ZoneFlow / erlang:length(Childs), - {Alloced, NodeAcc2} = - dispatch_burst_to_buckets(Childs, EachFlow, 0, NodeAcc), - Zone2 = Zone#{obtained := Obtained + Alloced}, - NodeAcc2#{ZoneId := Zone2} - end - end, - State#{nodes := lists:foldl(Dispatch, Nodes, GroupL)}. - --spec dispatch_burst_to_buckets(list(node_id()), +-spec dispatch_burst_to_buckets(list(bucket()), float(), - non_neg_integer(), nodes()) -> {non_neg_integer(), nodes()}. -dispatch_burst_to_buckets([ChildId | T], InFlow, Alloced, Nodes) -> - #{counter := Counter, + non_neg_integer(), buckets()) -> {non_neg_integer(), buckets()}. +dispatch_burst_to_buckets([Bucket | T], InFlow, Alloced, Buckets) -> + #{name := Name, + counter := Counter, index := Index, - obtained := Obtained} = Bucket = maps:get(ChildId, Nodes), + obtained := Obtained} = Bucket, {Inc, Bucket2} = emqx_limiter_correction:add(InFlow, Bucket), counters:add(Counter, Index, Inc), - Nodes2 = Nodes#{ChildId := Bucket2#{obtained := Obtained + Inc}}, - dispatch_burst_to_buckets(T, InFlow, Alloced + Inc, Nodes2); + Buckets2 = Buckets#{Name := Bucket2#{obtained := Obtained + Inc}}, + dispatch_burst_to_buckets(T, InFlow, Alloced + Inc, Buckets2); -dispatch_burst_to_buckets([], _, Alloced, Nodes) -> - {Alloced, Nodes}. +dispatch_burst_to_buckets([], _, Alloced, Buckets) -> + {Alloced, Buckets}. -spec init_tree(emqx_limiter_schema:limiter_type(), state()) -> state(). init_tree(Type, State) -> - Cfg = emqx:get_config([limiter, Type]), - GlobalCfg = maps:merge(#{rate => infinity, burst => 0}, Cfg), - case GlobalCfg of - #{group := Group} -> ok; - #{bucket := _} -> - Group = make_shared_default_group(GlobalCfg), - ok - end, + #{bucket := Buckets} = Cfg = emqx:get_config([limiter, Type]), + {Factor, Root} = make_root(Cfg), + {CounterNum, DelayBuckets} = make_bucket(maps:to_list(Buckets), Type, Cfg, Factor, 1, []), - {Factor, Root} = make_root(GlobalCfg), - {Zones, Nodes, DelayBuckets} = make_zone(maps:to_list(Group), Type, - GlobalCfg, Factor, 1, #{}, #{}, #{}), - - State2 = State#{root := Root#{childs := maps:values(Zones)}, - zones := Zones, - nodes := Nodes, - buckets := maps:keys(DelayBuckets), - counter := counters:new(maps:size(DelayBuckets), [write_concurrency]) + State2 = State#{root := Root, + counter := counters:new(CounterNum, [write_concurrency]) }, - lists:foldl(fun(F, Acc) -> F(Acc) end, State2, maps:values(DelayBuckets)). + lists:foldl(fun(F, Acc) -> F(Acc) end, State2, DelayBuckets). -spec make_root(hocons:confg()) -> {number(), root()}. +make_root(#{rate := Rate, burst := Burst}) when Rate >= 1 -> + {1, #{rate => Rate, + burst => Burst, + period => emqx_limiter_schema:minimum_period(), + consumed => 0}}; + make_root(#{rate := Rate, burst := Burst}) -> MiniPeriod = emqx_limiter_schema:minimum_period(), - if Rate >= 1 -> - {1, #{rate => Rate, - burst => Burst, - period => MiniPeriod, - childs => [], - consumed => 0}}; - true -> - Factor = 1 / Rate, - {Factor, #{rate => 1, - burst => Burst * Factor, - period => erlang:floor(Factor * MiniPeriod), - childs => [], - consumed => 0}} - end. + Factor = 1 / Rate, + {Factor, #{rate => 1, + burst => Burst * Factor, + period => erlang:floor(Factor * MiniPeriod), + consumed => 0}}. -make_zone([{Name, ZoneCfg} | T], Type, GlobalCfg, Factor, NodeId, Zones, Nodes, DelayBuckets) -> - #{rate := Rate, burst := Burst, bucket := BucketMap} = ZoneCfg, - BucketCfgs = maps:to_list(BucketMap), - - FirstChildId = NodeId + 1, - Buckets = make_bucket(BucketCfgs, Type, GlobalCfg, Name, ZoneCfg, Factor, FirstChildId, #{}), - ChildNum = maps:size(Buckets), - NextZoneId = FirstChildId + ChildNum, - - Zone = #{id => NodeId, - name => Name, - rate => mul(Rate, Factor), - burst => Burst, - obtained => 0, - childs => maps:keys(Buckets) - }, - - make_zone(T, Type, GlobalCfg, Factor, NextZoneId, - Zones#{Name => NodeId}, Nodes#{NodeId => Zone}, maps:merge(DelayBuckets, Buckets) - ); - -make_zone([], _Type, _Global, _Factor, _NodeId, Zones, Nodes, DelayBuckets) -> - {Zones, Nodes, DelayBuckets}. - -make_bucket([{Name, Conf} | T], Type, GlobalCfg, ZoneName, ZoneCfg, Factor, Id, DelayBuckets) -> - Path = emqx_limiter_manager:make_path(Type, ZoneName, Name), - case get_counter_rate(Conf, ZoneCfg, GlobalCfg) of +make_bucket([{Name, Conf} | T], Type, GlobalCfg, Factor, CounterNum, DelayBuckets) -> + Path = emqx_limiter_manager:make_path(Type, Name), + case get_counter_rate(Conf, GlobalCfg) of infinity -> Rate = infinity, Capacity = infinity, Ref = emqx_limiter_bucket_ref:new(undefined, undefined, Rate), emqx_limiter_manager:insert_bucket(Path, Ref), - InitFun = fun(#{id := NodeId} = Node, #{nodes := Nodes} = State) -> - State#{nodes := Nodes#{NodeId => Node}} + CounterNum2 = CounterNum, + InitFun = fun(#{name := BucketName} = Bucket, #{buckets := Buckets} = State) -> + State#{buckets := Buckets#{BucketName => Bucket}} end; RawRate -> #{capacity := Capacity} = Conf, Initial = get_initial_val(Conf), Rate = mul(RawRate, Factor), - - InitFun = fun(#{id := NodeId} = Node, #{nodes := Nodes} = State) -> + CounterNum2 = CounterNum + 1, + InitFun = fun(#{name := BucketName} = Bucket, #{buckets := Buckets} = State) -> {Counter, Idx, State2} = alloc_counter(Path, RawRate, Initial, State), - Node2 = Node#{counter := Counter, index := Idx}, - State2#{nodes := Nodes#{NodeId => Node2}} + Bucket2 = Bucket#{counter := Counter, index := Idx}, + State2#{buckets := Buckets#{BucketName => Bucket2}} end end, - Node = #{ id => Id - , name => Name - , zone => ZoneName - , rate => Rate - , obtained => 0 - , correction => 0 - , capacity => Capacity - , counter => undefined - , index => undefined}, + Bucket = #{ name => Name + , rate => Rate + , obtained => 0 + , correction => 0 + , capacity => Capacity + , counter => undefined + , index => undefined}, - DelayInit = ?CURRYING(Node, InitFun), + DelayInit = ?CURRYING(Bucket, InitFun), make_bucket(T, - Type, GlobalCfg, ZoneName, ZoneCfg, Factor, Id + 1, DelayBuckets#{Id => DelayInit}); + Type, GlobalCfg, Factor, CounterNum2, [DelayInit | DelayBuckets]); -make_bucket([], _Type, _Global, _ZoneName, _Zone, _Factor, _Id, DelayBuckets) -> - DelayBuckets. +make_bucket([], _Type, _Global, _Factor, CounterNum, DelayBuckets) -> + {CounterNum, DelayBuckets}. -spec alloc_counter(emqx_limiter_manager:path(), rate(), capacity(), state()) -> {counters:counters_ref(), pos_integer(), state()}. alloc_counter(Path, Rate, Initial, #{counter := Counter, index := Index} = State) -> + case emqx_limiter_manager:find_bucket(Path) of {ok, #{counter := ECounter, index := EIndex}} when ECounter =/= undefined -> @@ -594,22 +472,14 @@ init_counter(Path, Counter, Index, Rate, Initial, State) -> emqx_limiter_manager:insert_bucket(Path, Ref), {Counter, Index, State}. + %% @doc find first limited node -get_counter_rate(BucketCfg, ZoneCfg, GlobalCfg) -> - Search = lists:search(fun(E) -> is_limited(E) end, - [BucketCfg, ZoneCfg, GlobalCfg]), - case Search of - {value, #{rate := Rate}} -> - Rate; - false -> - infinity - end. +get_counter_rate(#{rate := Rate, capacity := Capacity}, _GlobalCfg) + when Rate =/= infinity orelse Capacity =/= infinity -> %% TODO maybe no need to check capacity + Rate; -is_limited(#{rate := Rate, capacity := Capacity}) -> - Rate =/= infinity orelse Capacity =/= infinity; - -is_limited(#{rate := Rate}) -> - Rate =/= infinity. +get_counter_rate(_Cfg, #{rate := Rate}) -> + Rate. -spec get_initial_val(hocons:config()) -> decimal(). get_initial_val(#{initial := Initial, @@ -625,15 +495,3 @@ get_initial_val(#{initial := Initial, true -> 0 end. - --spec make_shared_default_group(hocons:config()) -> honcs:config(). -make_shared_default_group(Cfg) -> - GroupName = emqx_limiter_schema:default_group_name(), - #{GroupName => Cfg#{rate => infinity, burst => 0}}. - --spec get_bucket_full_cfg_path(limiter_type(), bucket_path()) -> list(atom()). -get_bucket_full_cfg_path(shared, [BucketName]) -> - [limiter, shared, bucket, BucketName]; - -get_bucket_full_cfg_path(Type, [GroupName, BucketName]) -> - [limiter, Type, group, GroupName, bucket, BucketName]. diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index e4da8a5cc..a450cc245 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1197,7 +1197,7 @@ base_listener() -> #{ default => 'default' })} , {"limiter", - sc(map("ratelimit bucket's name", emqx_limiter_schema:bucket_path()), #{default => #{}})} + sc(map("ratelimit's type", atom()), #{default => #{}})} ]. %% utils diff --git a/apps/emqx_retainer/etc/emqx_retainer.conf b/apps/emqx_retainer/etc/emqx_retainer.conf index e561bc52f..05cc1dcd0 100644 --- a/apps/emqx_retainer/etc/emqx_retainer.conf +++ b/apps/emqx_retainer/etc/emqx_retainer.conf @@ -58,9 +58,7 @@ retainer { batch_deliver_number = 0 ## deliver limiter bucket - ## - ## Default: 0s - limiter_bucket_name = retainer + limiter.batch = retainer } ## Maximum retained message size. diff --git a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl index d57c49799..a1cc42898 100644 --- a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl +++ b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl @@ -85,8 +85,8 @@ start_link(Pool, Id) -> init([Pool, Id]) -> erlang:process_flag(trap_exit, true), true = gproc_pool:connect_worker(Pool, {Pool, Id}), - Bucket = emqx:get_config([retainer, flow_control, limiter_bucket_name]), - Limiter = emqx_limiter_server:connect(shared, Bucket), + LimiterCfg = emqx:get_config([retainer, flow_control, limiter]), + Limiter = emqx_limiter_server:connect(batch, LimiterCfg), {ok, #{pool => Pool, id => Id, limiter => Limiter}}. %%-------------------------------------------------------------------- @@ -124,8 +124,8 @@ handle_cast({dispatch, Context, Pid, Topic}, #{limiter := Limiter} = State) -> {noreply, State#{limiter := Limiter2}}; handle_cast(refresh_limiter, State) -> - Bucket = emqx:get_config([retainer, flow_control, limiter_bucket_name]), - Limiter = emqx_limiter_server:connect(shared, Bucket), + LimiterCfg = emqx:get_config([retainer, flow_control, limiter]), + Limiter = emqx_limiter_server:connect(batch, LimiterCfg), {noreply, State#{limiter := Limiter}}; handle_cast(Msg, State) -> diff --git a/apps/emqx_retainer/src/emqx_retainer_schema.erl b/apps/emqx_retainer/src/emqx_retainer_schema.erl index 395f0e363..492a89021 100644 --- a/apps/emqx_retainer/src/emqx_retainer_schema.erl +++ b/apps/emqx_retainer/src/emqx_retainer_schema.erl @@ -29,7 +29,7 @@ fields(mnesia_config) -> fields(flow_control) -> [ {batch_read_number, sc(integer(), 0, fun is_pos_integer/1)} , {batch_deliver_number, sc(range(0, 1000), 0)} - , {limiter_bucket_name, sc(atom(), retainer)} + , {limiter, sc(emqx_schema:map("limiter's type", atom()), #{})} ]. %%--------------------------------------------------------------------