Merge pull request #6557 from lafirest/fix/limiter

feat(emqx_limiter): improve burst implementation
This commit is contained in:
lafirest 2022-01-05 09:43:22 +08:00 committed by GitHub
commit 81d862061a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 68 additions and 43 deletions

View File

@ -0,0 +1,35 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2019-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_limiter_correction).
%% API
-export([ add/2 ]).
-type correction_value() :: #{ correction := emqx_limiter_decimal:zero_or_float()
, any() => any()
}.
-export_type([correction_value/0]).
%%--------------------------------------------------------------------
%%% API
%%--------------------------------------------------------------------
-spec add(number(), correction_value()) -> {integer(), correction_value()}.
add(Inc, #{correction := Correction} = Data) ->
FixedInc = Inc + Correction,
IntInc = erlang:floor(FixedInc),
{IntInc, Data#{correction := FixedInc - IntInc}}.

View File

@ -20,7 +20,7 @@
%% API
-export([ add/2, sub/2, mul/2
, add_to_counter/3, put_to_counter/3, floor_div/2]).
, put_to_counter/3, floor_div/2]).
-export_type([decimal/0, zero_or_float/0]).
-type decimal() :: infinity | number().
@ -60,22 +60,6 @@ floor_div(infinity, _) ->
floor_div(A, B) ->
erlang:floor(A / B).
-spec add_to_counter(counters:counters_ref(), pos_integer(), decimal()) ->
{zero_or_float(), zero_or_float()}.
add_to_counter(_, _, infinity) ->
{0, 0};
add_to_counter(Counter, Index, Val) when is_float(Val) ->
IntPart = erlang:floor(Val),
if IntPart > 0 ->
counters:add(Counter, Index, IntPart);
true ->
ok
end,
{IntPart, Val - IntPart};
add_to_counter(Counter, Index, Val) ->
counters:add(Counter, Index, Val),
{Val, 0}.
-spec put_to_counter(counters:counters_ref(), pos_integer(), decimal()) -> ok.
put_to_counter(_, _, infinity) ->
ok;

View File

@ -87,7 +87,7 @@
-define(OVERLOAD_MIN_ALLOC, 0.3). %% minimum coefficient for overloaded limiter
-export_type([index/0]).
-import(emqx_limiter_decimal, [add/2, sub/2, mul/2, add_to_counter/3, put_to_counter/3]).
-import(emqx_limiter_decimal, [add/2, sub/2, mul/2, put_to_counter/3]).
%%--------------------------------------------------------------------
%% API
@ -317,12 +317,11 @@ longitudinal(#{id := Id,
longitudinal(#{id := Id,
rate := Rate,
capacity := Capacity,
correction := Correction,
counter := Counter,
index := Index,
obtained := Obtained} = Node,
InFlow, Nodes) when Counter =/= undefined ->
Flow = add(erlang:min(InFlow, Rate), Correction),
Flow = erlang:min(InFlow, Rate),
ShouldAlloc =
case counters:get(Counter, Index) of
@ -340,11 +339,11 @@ longitudinal(#{id := Id,
Avaiable when Avaiable > 0 ->
%% XXX if capacity is infinity, and flow always > 0, the value in counter
%% will be overflow at some point in the future, do we need to deal with this situation???
{Alloced, Decimal} = add_to_counter(Counter, Index, Avaiable),
{Inc, Node2} = emqx_limiter_correction:add(Avaiable, Node),
counters:add(Counter, Index, Inc),
{Alloced,
Nodes#{Id := Node#{obtained := Obtained + Alloced,
correction := Decimal}}};
{Inc,
Nodes#{Id := Node2#{obtained := Obtained + Inc}}};
_ ->
{0, Nodes}
end;
@ -411,31 +410,38 @@ dispatch_burst([], State) ->
dispatch_burst(GroupL,
#{root := #{burst := Burst},
nodes := Nodes} = State) ->
InFlow = erlang:floor(Burst / erlang:length(GroupL)),
InFlow = Burst / erlang:length(GroupL),
Dispatch = fun({Zone, Childs}, NodeAcc) ->
#{id := ZoneId,
burst := ZoneBurst,
obtained := Obtained} = Zone,
#{id := ZoneId,
burst := ZoneBurst,
obtained := Obtained} = Zone,
ZoneFlow = erlang:min(InFlow, ZoneBurst),
EachFlow = ZoneFlow div erlang:length(Childs),
Zone2 = Zone#{obtained := Obtained + ZoneFlow},
NodeAcc2 = NodeAcc#{ZoneId := Zone2},
dispatch_burst_to_buckets(Childs, EachFlow, NodeAcc2)
case erlang:min(InFlow, ZoneBurst) of
0 -> NodeAcc;
ZoneFlow ->
EachFlow = ZoneFlow / erlang:length(Childs),
{Alloced, NodeAcc2} = dispatch_burst_to_buckets(Childs, EachFlow, 0, NodeAcc),
Zone2 = Zone#{obtained := Obtained + Alloced},
NodeAcc2#{ZoneId := Zone2}
end
end,
State#{nodes := lists:foldl(Dispatch, Nodes, GroupL)}.
-spec dispatch_burst_to_buckets(list(node_id()),
non_neg_integer(), nodes()) -> nodes().
dispatch_burst_to_buckets(Childs, InFlow, Nodes) ->
Each = fun(ChildId, NodeAcc) ->
#{counter := Counter,
index := Index,
obtained := Obtained} = Bucket = maps:get(ChildId, NodeAcc),
counters:add(Counter, Index, InFlow),
NodeAcc#{ChildId := Bucket#{obtained := Obtained + InFlow}}
end,
lists:foldl(Each, Nodes, Childs).
float(), non_neg_integer(), nodes()) -> {non_neg_integer(), nodes()}.
dispatch_burst_to_buckets([ChildId | T], InFlow, Alloced, Nodes) ->
#{counter := Counter,
index := Index,
obtained := Obtained} = Bucket = maps:get(ChildId, Nodes),
{Inc, Bucket2} = emqx_limiter_correction:add(InFlow, Bucket),
counters:add(Counter, Index, Inc),
Nodes2 = Nodes#{ChildId := Bucket2#{obtained := Obtained + Inc}},
dispatch_burst_to_buckets(T, InFlow, Alloced + Inc, Nodes2);
dispatch_burst_to_buckets([], _, Alloced, Nodes) ->
{Alloced, Nodes}.
-spec init_tree(emqx_limiter_schema:limiter_type(), state()) -> state().
init_tree(Type, State) ->