diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_correction.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_correction.erl new file mode 100644 index 000000000..a92041b00 --- /dev/null +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_correction.erl @@ -0,0 +1,35 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2019-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_limiter_correction). + +%% API +-export([ add/2 ]). + +-type correction_value() :: #{ correction := emqx_limiter_decimal:zero_or_float() + , any() => any() + }. + +-export_type([correction_value/0]). + +%%-------------------------------------------------------------------- +%%% API +%%-------------------------------------------------------------------- +-spec add(number(), correction_value()) -> {integer(), correction_value()}. +add(Inc, #{correction := Correction} = Data) -> + FixedInc = Inc + Correction, + IntInc = erlang:floor(FixedInc), + {IntInc, Data#{correction := FixedInc - IntInc}}. diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_decimal.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_decimal.erl index 28b6f3385..e1fdeedcc 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_decimal.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_decimal.erl @@ -20,7 +20,7 @@ %% API -export([ add/2, sub/2, mul/2 - , add_to_counter/3, put_to_counter/3, floor_div/2]). + , put_to_counter/3, floor_div/2]). -export_type([decimal/0, zero_or_float/0]). -type decimal() :: infinity | number(). @@ -60,22 +60,6 @@ floor_div(infinity, _) -> floor_div(A, B) -> erlang:floor(A / B). --spec add_to_counter(counters:counters_ref(), pos_integer(), decimal()) -> - {zero_or_float(), zero_or_float()}. -add_to_counter(_, _, infinity) -> - {0, 0}; -add_to_counter(Counter, Index, Val) when is_float(Val) -> - IntPart = erlang:floor(Val), - if IntPart > 0 -> - counters:add(Counter, Index, IntPart); - true -> - ok - end, - {IntPart, Val - IntPart}; -add_to_counter(Counter, Index, Val) -> - counters:add(Counter, Index, Val), - {Val, 0}. - -spec put_to_counter(counters:counters_ref(), pos_integer(), decimal()) -> ok. put_to_counter(_, _, infinity) -> ok; diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl index 799d623bf..ee2ed3431 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl @@ -87,7 +87,7 @@ -define(OVERLOAD_MIN_ALLOC, 0.3). %% minimum coefficient for overloaded limiter -export_type([index/0]). --import(emqx_limiter_decimal, [add/2, sub/2, mul/2, add_to_counter/3, put_to_counter/3]). +-import(emqx_limiter_decimal, [add/2, sub/2, mul/2, put_to_counter/3]). %%-------------------------------------------------------------------- %% API @@ -317,12 +317,11 @@ longitudinal(#{id := Id, longitudinal(#{id := Id, rate := Rate, capacity := Capacity, - correction := Correction, counter := Counter, index := Index, obtained := Obtained} = Node, InFlow, Nodes) when Counter =/= undefined -> - Flow = add(erlang:min(InFlow, Rate), Correction), + Flow = erlang:min(InFlow, Rate), ShouldAlloc = case counters:get(Counter, Index) of @@ -340,11 +339,11 @@ longitudinal(#{id := Id, Avaiable when Avaiable > 0 -> %% XXX if capacity is infinity, and flow always > 0, the value in counter %% will be overflow at some point in the future, do we need to deal with this situation??? - {Alloced, Decimal} = add_to_counter(Counter, Index, Avaiable), + {Inc, Node2} = emqx_limiter_correction:add(Avaiable, Node), + counters:add(Counter, Index, Inc), - {Alloced, - Nodes#{Id := Node#{obtained := Obtained + Alloced, - correction := Decimal}}}; + {Inc, + Nodes#{Id := Node2#{obtained := Obtained + Inc}}}; _ -> {0, Nodes} end; @@ -411,31 +410,38 @@ dispatch_burst([], State) -> dispatch_burst(GroupL, #{root := #{burst := Burst}, nodes := Nodes} = State) -> - InFlow = erlang:floor(Burst / erlang:length(GroupL)), + InFlow = Burst / erlang:length(GroupL), Dispatch = fun({Zone, Childs}, NodeAcc) -> - #{id := ZoneId, - burst := ZoneBurst, - obtained := Obtained} = Zone, + #{id := ZoneId, + burst := ZoneBurst, + obtained := Obtained} = Zone, - ZoneFlow = erlang:min(InFlow, ZoneBurst), - EachFlow = ZoneFlow div erlang:length(Childs), - Zone2 = Zone#{obtained := Obtained + ZoneFlow}, - NodeAcc2 = NodeAcc#{ZoneId := Zone2}, - dispatch_burst_to_buckets(Childs, EachFlow, NodeAcc2) + case erlang:min(InFlow, ZoneBurst) of + 0 -> NodeAcc; + ZoneFlow -> + EachFlow = ZoneFlow / erlang:length(Childs), + {Alloced, NodeAcc2} = dispatch_burst_to_buckets(Childs, EachFlow, 0, NodeAcc), + Zone2 = Zone#{obtained := Obtained + Alloced}, + NodeAcc2#{ZoneId := Zone2} + end end, State#{nodes := lists:foldl(Dispatch, Nodes, GroupL)}. -spec dispatch_burst_to_buckets(list(node_id()), - non_neg_integer(), nodes()) -> nodes(). -dispatch_burst_to_buckets(Childs, InFlow, Nodes) -> - Each = fun(ChildId, NodeAcc) -> - #{counter := Counter, - index := Index, - obtained := Obtained} = Bucket = maps:get(ChildId, NodeAcc), - counters:add(Counter, Index, InFlow), - NodeAcc#{ChildId := Bucket#{obtained := Obtained + InFlow}} - end, - lists:foldl(Each, Nodes, Childs). + float(), non_neg_integer(), nodes()) -> {non_neg_integer(), nodes()}. +dispatch_burst_to_buckets([ChildId | T], InFlow, Alloced, Nodes) -> + #{counter := Counter, + index := Index, + obtained := Obtained} = Bucket = maps:get(ChildId, Nodes), + {Inc, Bucket2} = emqx_limiter_correction:add(InFlow, Bucket), + + counters:add(Counter, Index, Inc), + + Nodes2 = Nodes#{ChildId := Bucket2#{obtained := Obtained + Inc}}, + dispatch_burst_to_buckets(T, InFlow, Alloced + Inc, Nodes2); + +dispatch_burst_to_buckets([], _, Alloced, Nodes) -> + {Alloced, Nodes}. -spec init_tree(emqx_limiter_schema:limiter_type(), state()) -> state(). init_tree(Type, State) ->