feat(rate_limiter): implement hierarchical token buckets

This commit is contained in:
lafirest 2021-09-24 19:25:38 +08:00
parent d80f20aca3
commit 8a2c5e2422
15 changed files with 1589 additions and 2 deletions

View File

@ -72,6 +72,7 @@
-export([namespace/0, roots/0, roots/1, fields/1]). -export([namespace/0, roots/0, roots/1, fields/1]).
-export([conf_get/2, conf_get/3, keys/2, filter/1]). -export([conf_get/2, conf_get/3, keys/2, filter/1]).
-export([server_ssl_opts_schema/2, client_ssl_opts_schema/1, ciphers_schema/1, default_ciphers/1]). -export([server_ssl_opts_schema/2, client_ssl_opts_schema/1, ciphers_schema/1, default_ciphers/1]).
-export([sc/2, map/2]).
namespace() -> undefined. namespace() -> undefined.

View File

@ -368,14 +368,18 @@ typename_to_spec("comma_separated_atoms()", _Mod) -> #{type => string, example =
typename_to_spec("pool_type()", _Mod) -> #{type => string, enum => [random, hash], example => hash}; typename_to_spec("pool_type()", _Mod) -> #{type => string, enum => [random, hash], example => hash};
typename_to_spec("log_level()", _Mod) -> typename_to_spec("log_level()", _Mod) ->
#{type => string, enum => [debug, info, notice, warning, error, critical, alert, emergency, all]}; #{type => string, enum => [debug, info, notice, warning, error, critical, alert, emergency, all]};
typename_to_spec("rate()", _Mod) ->
#{type => string, example => <<"10M/s">>};
typename_to_spec("bucket_rate()", _Mod) ->
#{type => string, example => <<"10M/s, 100M">>};
typename_to_spec(Name, Mod) -> typename_to_spec(Name, Mod) ->
Spec = range(Name), Spec = range(Name),
Spec1 = remote_module_type(Spec, Name, Mod), Spec1 = remote_module_type(Spec, Name, Mod),
Spec2 = typerefl_array(Spec1, Name, Mod), Spec2 = typerefl_array(Spec1, Name, Mod),
Spec3 = integer(Spec2, Name), Spec3 = integer(Spec2, Name),
Spec3 =:= nomatch andalso Spec3 =:= nomatch andalso
throw({error, #{msg => <<"Unsupport Type">>, type => Name, module => Mod}}), throw({error, #{msg => <<"Unsupport Type">>, type => Name, module => Mod}}),
Spec3. Spec3.
range(Name) -> range(Name) ->
case string:split(Name, "..") of case string:split(Name, "..") of

View File

@ -0,0 +1,50 @@
##--------------------------------------------------------------------
## Emq X Rate Limiter
##--------------------------------------------------------------------
emqx_limiter {
bytes_in {
global = "100KB/10s" # token generation rate
zone.default = "100kB/10s"
zone.external = "20kB/10s"
bucket.tcp {
zone = default
aggregated = "100kB/10s,1Mb"
per_client = "100KB/10s,10Kb"
}
bucket.ssl {
zone = external
aggregated = "100kB/10s,1Mb"
per_client = "100KB/10s,10Kb"
}
}
message_in {
global = "100/10s"
zone.default = "100/10s"
bucket.bucket1 {
zone = default
aggregated = "100/10s,1000"
per_client = "100/10s,100"
}
}
connection {
global = "100/10s"
zone.default = "100/10s"
bucket.bucket1 {
zone = default
aggregated = "100/10s,1000"
per_client = "100/10s,100"
}
}
message_routing {
global = "100/10s"
zone.default = "100/10s"
bucket.bucket1 {
zone = default
aggregated = "100/10s,100"
per_client = "100/10s,10"
}
}
}

View File

@ -0,0 +1,15 @@
%% -*- mode: erlang -*-
{application, emqx_limiter,
[{description, "EMQ X Hierachical Limiter"},
{vsn, "1.0.0"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_limiter_sup]},
{applications, [kernel,stdlib,emqx]},
{mod, {emqx_limiter_app,[]}},
{env, []},
{licenses, ["Apache-2.0"]},
{maintainers, ["EMQ X Team <contact@emqx.io>"]},
{links, [{"Homepage", "https://emqx.io/"},
{"Github", "https://github.com/emqx/emqx-retainer"}
]}
]}.

View File

@ -0,0 +1,55 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_limiter_app).
-behaviour(application).
%% Application callbacks
-export([start/2, stop/1]).
%%--------------------------------------------------------------------
%% @private
%% @doc
%% This function is called whenever an application is started using
%% application:start/[1,2], and should start the processes of the
%% application. If the application is structured according to the OTP
%% design principles as a supervision tree, this means starting the
%% top supervisor of the tree.
%% @end
%%--------------------------------------------------------------------
-spec start(StartType :: normal |
{takeover, Node :: node()} |
{failover, Node :: node()},
StartArgs :: term()) ->
{ok, Pid :: pid()} |
{ok, Pid :: pid(), State :: term()} |
{error, Reason :: term()}.
start(_StartType, _StartArgs) ->
{ok, _} = Result = emqx_limiter_sup:start_link(),
Result.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% This function is called whenever an application has stopped. It
%% is intended to be the opposite of Module:start/2 and should do
%% any necessary cleaning up. The return value is ignored.
%% @end
%%--------------------------------------------------------------------
-spec stop(State :: term()) -> any().
stop(_State) ->
ok.

View File

@ -0,0 +1,144 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2019-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_limiter_client).
%% API
-export([create/5, make_ref/3, consume/2]).
-export_type([limiter/0]).
%% tocket bucket algorithm
-record(limiter, { tokens :: non_neg_integer()
, rate :: float()
, capacity :: decimal()
, lasttime :: millisecond()
, ref :: ref_limiter()
}).
-record(ref, { counter :: counters:counters_ref()
, index :: index()
, rate :: decimal()
, obtained :: non_neg_integer()
}).
%% TODO
%% we should add a nop-limiter, when all the upper layers (global, zone, and buckets ) are infinity
-type limiter() :: #limiter{}.
-type ref_limiter() :: #ref{}.
-type client() :: limiter() | ref_limiter().
-type millisecond() :: non_neg_integer().
-type pause_result(Client) :: {pause, millisecond(), Client}.
-type consume_result(Client) :: {ok, Client}
| pause_result(Client).
-type decimal() :: emqx_limiter_decimal:decimal().
-type index() :: emqx_limiter_server:index().
-define(NOW, erlang:monotonic_time(millisecond)).
-define(MINIUMN_PAUSE, 100).
-import(emqx_limiter_decimal, [sub/2]).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
-spec create(float(),
decimal(),
counters:counters_ref(),
index(),
decimal()) -> limiter().
create(Rate, Capacity, Counter, Index, CounterRate) ->
#limiter{ tokens = Capacity
, rate = Rate
, capacity = Capacity
, lasttime = ?NOW
, ref = make_ref(Counter, Index, CounterRate)
}.
-spec make_ref(counters:counters_ref(), index(), decimal()) -> ref_limiter().
make_ref(Counter, Idx, Rate) ->
#ref{counter = Counter, index = Idx, rate = Rate, obtained = 0}.
-spec consume(pos_integer(), Client) -> consume_result(Client)
when Client :: client().
consume(Need, #limiter{tokens = Tokens,
capacity = Capacity} = Limiter) ->
if Need =< Tokens ->
try_consume_counter(Need, Limiter);
Need > Capacity ->
%% FIXME
%% The client should be able to send 4kb data if the rate is configured to be 2kb/s, it just needs 2s to complete.
throw("too big request"); %% FIXME how to deal this?
true ->
try_reset(Need, Limiter)
end;
consume(Need, #ref{counter = Counter,
index = Index,
rate = Rate,
obtained = Obtained} = Ref) ->
Tokens = counters:get(Counter, Index),
if Tokens >= Need ->
counters:sub(Counter, Index, Need),
{ok, Ref#ref{obtained = Obtained + Need}};
true ->
return_pause(Need - Tokens, Rate, Ref)
end.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
-spec try_consume_counter(pos_integer(), limiter()) -> consume_result(limiter()).
try_consume_counter(Need,
#limiter{tokens = Tokens,
ref = #ref{counter = Counter,
index = Index,
obtained = Obtained,
rate = CounterRate} = Ref} = Limiter) ->
CT = counters:get(Counter, Index),
if CT >= Need ->
counters:sub(Counter, Index, Need),
{ok, Limiter#limiter{tokens = sub(Tokens, Need),
ref = Ref#ref{obtained = Obtained + Need}}};
true ->
return_pause(Need - CT, CounterRate, Limiter)
end.
-spec try_reset(pos_integer(), limiter()) -> consume_result(limiter()).
try_reset(Need,
#limiter{tokens = Tokens,
rate = Rate,
lasttime = LastTime,
capacity = Capacity} = Limiter) ->
Now = ?NOW,
Inc = erlang:floor((Now - LastTime) * Rate / emqx_limiter_schema:minimum_period()),
Tokens2 = erlang:min(Tokens + Inc, Capacity),
if Need > Tokens2 ->
return_pause(Need, Rate, Limiter);
true ->
Limiter2 = Limiter#limiter{tokens = Tokens2,
lasttime = Now},
try_consume_counter(Need, Limiter2)
end.
-spec return_pause(pos_integer(), decimal(), Client) -> pause_result(Client)
when Client :: client().
return_pause(_, infinity, Limiter) ->
%% workaround when emqx_limiter_server's rate is infinity
{pause, ?MINIUMN_PAUSE, Limiter};
return_pause(Diff, Rate, Limiter) ->
Pause = erlang:round(Diff * emqx_limiter_schema:minimum_period() / Rate),
{pause, erlang:max(Pause, ?MINIUMN_PAUSE), Limiter}.

View File

@ -0,0 +1,79 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2019-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% a simple decimal module for rate-related calculations
-module(emqx_limiter_decimal).
%% API
-export([ add/2, sub/2, mul/2
, add_to_counter/3, put_to_counter/3]).
-export_type([decimal/0, zero_or_float/0]).
-type decimal() :: infinity | number().
-type zero_or_float() :: 0 | float().
%%--------------------------------------------------------------------
%%% API
%%--------------------------------------------------------------------
-spec add(decimal(), decimal()) -> decimal().
add(A, B) when A =:= infinity
orelse B =:= infinity ->
infinity;
add(A, B) ->
A + B.
-spec sub(decimal(), decimal()) -> decimal().
sub(A, B) when A =:= infinity
orelse B =:= infinity ->
infinity;
sub(A, B) ->
A - B.
-spec mul(decimal(), decimal()) -> decimal().
mul(A, B) when A =:= infinity
orelse B =:= infinity ->
infinity;
mul(A, B) ->
A * B.
-spec add_to_counter(counters:counters_ref(), pos_integer(), decimal()) ->
{zero_or_float(), zero_or_float()}.
add_to_counter(_, _, infinity) ->
{0, 0};
add_to_counter(Counter, Index, Val) when is_float(Val) ->
IntPart = erlang:floor(Val),
if IntPart > 0 ->
counters:add(Counter, Index, IntPart);
true ->
ok
end,
{IntPart, Val - IntPart};
add_to_counter(Counter, Index, Val) ->
counters:add(Counter, Index, Val),
{Val, 0}.
-spec put_to_counter(counters:counters_ref(), pos_integer(), decimal()) -> ok.
put_to_counter(_, _, infinity) ->
ok;
put_to_counter(Counter, Index, Val) when is_float(Val) ->
IntPart = erlang:floor(Val),
counters:put(Counter, Index, IntPart);
put_to_counter(Counter, Index, Val) ->
counters:put(Counter, Index, Val).

View File

@ -0,0 +1,229 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2019-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_limiter_manager).
-behaviour(gen_server).
-include_lib("emqx/include/logger.hrl").
-include_lib("stdlib/include/ms_transform.hrl").
%% API
-export([ start_link/0, start_server/1, find_counter/1
, find_counter/3, insert_counter/4, insert_counter/6
, make_path/3, restart_server/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3, format_status/2]).
-type path() :: list(atom()).
-type limiter_type() :: emqx_limiter_schema:limiter_type().
-type zone_name() :: emqx_limiter_schema:zone_name().
-type bucket_name() :: emqx_limiter_schema:bucket_name().
%% counter record in ets table
-record(element, {path :: path(),
counter :: counters:counters_ref(),
index :: index(),
rate :: rate()
}).
-type index() :: emqx_limiter_server:index().
-type rate() :: emqx_limiter_decimal:decimal().
-define(TAB, emqx_limiter_counters).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
-spec start_server(limiter_type()) -> _.
start_server(Type) ->
emqx_limiter_server_sup:start(Type).
-spec restart_server(limiter_type()) -> _.
restart_server(Type) ->
emqx_limiter_server_sup:restart(Type).
-spec find_counter(limiter_type(), zone_name(), bucket_name()) ->
{ok, counters:counters_ref(), index(), rate()} | undefined.
find_counter(Type, Zone, BucketId) ->
find_counter(make_path(Type, Zone, BucketId)).
-spec find_counter(path()) ->
{ok, counters:counters_ref(), index(), rate()} | undefined.
find_counter(Path) ->
case ets:lookup(?TAB, Path) of
[#element{counter = Counter, index = Index, rate = Rate}] ->
{ok, Counter, Index, Rate};
_ ->
undefined
end.
-spec insert_counter(limiter_type(),
zone_name(),
bucket_name(),
counters:counters_ref(),
index(),
rate()) -> boolean().
insert_counter(Type, Zone, BucketId, Counter, Index, Rate) ->
insert_counter(make_path(Type, Zone, BucketId),
Counter,
Index,
Rate).
-spec insert_counter(path(),
counters:counters_ref(),
index(),
rate()) -> boolean().
insert_counter(Path, Counter, Index, Rate) ->
ets:insert(?TAB,
#element{path = Path,
counter = Counter,
index = Index,
rate = Rate}).
-spec make_path(limiter_type(), zone_name(), bucket_name()) -> path().
make_path(Type, Name, BucketId) ->
[Type, Name, BucketId].
%%--------------------------------------------------------------------
%% @doc
%% Starts the server
%% @end
%%--------------------------------------------------------------------
-spec start_link() -> {ok, Pid :: pid()} |
{error, Error :: {already_started, pid()}} |
{error, Error :: term()} |
ignore.
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Initializes the server
%% @end
%%--------------------------------------------------------------------
-spec init(Args :: term()) -> {ok, State :: term()} |
{ok, State :: term(), Timeout :: timeout()} |
{ok, State :: term(), hibernate} |
{stop, Reason :: term()} |
ignore.
init([]) ->
_ = ets:new(?TAB, [ set, public, named_table, {keypos, #element.path}
, {write_concurrency, true}, {read_concurrency, true}
, {heir, erlang:whereis(emqx_limiter_sup), none}
]),
{ok, #{}}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Handling call messages
%% @end
%%--------------------------------------------------------------------
-spec handle_call(Request :: term(), From :: {pid(), term()}, State :: term()) ->
{reply, Reply :: term(), NewState :: term()} |
{reply, Reply :: term(), NewState :: term(), Timeout :: timeout()} |
{reply, Reply :: term(), NewState :: term(), hibernate} |
{noreply, NewState :: term()} |
{noreply, NewState :: term(), Timeout :: timeout()} |
{noreply, NewState :: term(), hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: term()} |
{stop, Reason :: term(), NewState :: term()}.
handle_call(Req, _From, State) ->
?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignore, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Handling cast messages
%% @end
%%--------------------------------------------------------------------
-spec handle_cast(Request :: term(), State :: term()) ->
{noreply, NewState :: term()} |
{noreply, NewState :: term(), Timeout :: timeout()} |
{noreply, NewState :: term(), hibernate} |
{stop, Reason :: term(), NewState :: term()}.
handle_cast(Req, State) ->
?LOG(error, "Unexpected cast: ~p", [Req]),
{noreply, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Handling all non call/cast messages
%% @end
%%--------------------------------------------------------------------
-spec handle_info(Info :: timeout() | term(), State :: term()) ->
{noreply, NewState :: term()} |
{noreply, NewState :: term(), Timeout :: timeout()} |
{noreply, NewState :: term(), hibernate} |
{stop, Reason :: normal | term(), NewState :: term()}.
handle_info(Info, State) ->
?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
%% @end
%%--------------------------------------------------------------------
-spec terminate(Reason :: normal | shutdown | {shutdown, term()} | term(),
State :: term()) -> any().
terminate(_Reason, _State) ->
ok.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Convert process state when code is changed
%% @end
%%--------------------------------------------------------------------
-spec code_change(OldVsn :: term() | {down, term()},
State :: term(),
Extra :: term()) -> {ok, NewState :: term()} |
{error, Reason :: term()}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% This function is called for changing the form and appearance
%% of gen_server status when it is returned from sys:get_status/1,2
%% or when it appears in termination error logs.
%% @end
%%--------------------------------------------------------------------
-spec format_status(Opt :: normal | terminate,
Status :: list()) -> Status :: term().
format_status(_Opt, Status) ->
Status.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------

View File

@ -0,0 +1,140 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_limiter_schema).
-include_lib("typerefl/include/types.hrl").
-export([ roots/0, fields/1, to_rate/1
, to_bucket_rate/1, minimum_period/0]).
-define(KILOBYTE, 1024).
-type limiter_type() :: bytes_in
| message_in
| connection
| message_routing.
-type bucket_name() :: atom().
-type zone_name() :: atom().
-type rate() :: infinity | float().
-type bucket_rate() :: list(infinity | number()).
-typerefl_from_string({rate/0, ?MODULE, to_rate}).
-typerefl_from_string({bucket_rate/0, ?MODULE, to_bucket_rate}).
-reflect_type([ rate/0
, bucket_rate/0
]).
-export_type([limiter_type/0, bucket_name/0, zone_name/0]).
-import(emqx_schema, [sc/2, map/2]).
roots() -> [emqx_limiter].
fields(emqx_limiter) ->
[ {bytes_in, sc(ref(limiter), #{})}
, {message_in, sc(ref(limiter), #{})}
, {connection, sc(ref(limiter), #{})}
, {message_routing, sc(ref(limiter), #{})}
];
fields(limiter) ->
[ {global, sc(rate(), #{})}
, {zone, sc(map("zone name", rate()), #{})}
, {bucket, sc(map("bucket id", ref(bucket)),
#{desc => "Token Buckets"})}
];
fields(bucket) ->
[ {zone, sc(atom(), #{desc => "the zone which the bucket in"})}
, {aggregated, sc(bucket_rate(), #{})}
, {per_client, sc(bucket_rate(), #{})}
].
%% minimum period is 100ms
minimum_period() ->
100.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
ref(Field) -> hoconsc:ref(?MODULE, Field).
to_rate(Str) ->
Tokens = [string:trim(T) || T <- string:tokens(Str, "/")],
case Tokens of
["infinity"] ->
{ok, infinity};
[Quota, Interval] ->
{ok, Val} = to_quota(Quota),
case emqx_schema:to_duration_ms(Interval) of
{ok, Ms} when Ms > 0 ->
{ok, Val * minimum_period() / Ms};
_ ->
{error, Str}
end;
_ ->
{error, Str}
end.
to_bucket_rate(Str) ->
Tokens = [string:trim(T) || T <- string:tokens(Str, "/,")],
case Tokens of
[Rate, Capa] ->
{ok, infinity} = to_quota(Rate),
{ok, CapaVal} = to_quota(Capa),
if CapaVal =/= infinity ->
{ok, [infinity, CapaVal]};
true ->
{error, Str}
end;
[Quota, Interval, Capacity] ->
{ok, Val} = to_quota(Quota),
case emqx_schema:to_duration_ms(Interval) of
{ok, Ms} when Ms > 0 ->
{ok, CapaVal} = to_quota(Capacity),
{ok, [Val * minimum_period() / Ms, CapaVal]};
_ ->
{error, Str}
end;
_ ->
{error, Str}
end.
to_quota(Str) ->
{ok, MP} = re:compile("^\s*(?:(?:([1-9][0-9]*)([a-zA-z]*))|infinity)\s*$"),
Result = re:run(Str, MP, [{capture, all_but_first, list}]),
case Result of
{match, [Quota, Unit]} ->
Val = erlang:list_to_integer(Quota),
Unit2 = string:to_lower(Unit),
{ok, apply_unit(Unit2, Val)};
{match, [Quota]} ->
{ok, erlang:list_to_integer(Quota)};
{match, []} ->
{ok, infinity};
_ ->
{error, Str}
end.
apply_unit("", Val) -> Val;
apply_unit("kb", Val) -> Val * ?KILOBYTE;
apply_unit("mb", Val) -> Val * ?KILOBYTE * ?KILOBYTE;
apply_unit("gb", Val) -> Val * ?KILOBYTE * ?KILOBYTE * ?KILOBYTE;
apply_unit(Unit, _) -> throw("invalid unit:" ++ Unit).

View File

@ -0,0 +1,426 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% A hierachical token bucket algorithm
%% Note: this is not the linux HTB algorithm(http://luxik.cdi.cz/~devik/qos/htb/manual/theory.htm)
%% Algorithm:
%% 1. the root node periodically generates tokens and then distributes them
%% just like the oscillation of water waves
%% 2. the leaf node has a counter, which is the place where the token is actually held.
%% 3. other nodes only play the role of transmission, and the rate of the node is like a valve,
%% limiting the oscillation transmitted from the parent node
-module(emqx_limiter_server).
-behaviour(gen_server).
-include_lib("emqx/include/logger.hrl").
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3, format_status/2]).
-export([ start_link/1, connect/2, info/2
, name/1]).
-record(root, { rate :: rate() %% number of tokens generated per period
, period :: pos_integer() %% token generation interval(second)
, childs :: list(node_id()) %% node children
, consumed :: non_neg_integer()
}).
-record(zone, { id :: pos_integer()
, name :: zone_name()
, rate :: rate()
, obtained :: non_neg_integer() %% number of tokens obtained
, childs :: list(node_id())
}).
-record(bucket, { id :: pos_integer()
, name :: bucket_name()
, rate :: rate()
, obtained :: non_neg_integer()
, correction :: emqx_limiter_decimal:zero_or_float() %% token correction value
, capacity :: capacity()
, counter :: counters:counters_ref()
, index :: index()
}).
-record(state, { root :: undefined | root()
, counter :: undefined | counters:counters_ref() %% current counter to alloc
, index :: index()
, zones :: #{zone_name() => node_id()}
, nodes :: nodes()
, type :: limiter_type()
}).
%% maybe use maps is better, but record is fastter
-define(FIELD_OBTAINED, #zone.obtained).
-define(GET_FIELD(F, Node), element(F, Node)).
-define(CALL(Type, Msg), gen_server:call(name(Type), {?FUNCTION_NAME, Msg})).
-type node_id() :: pos_integer().
-type root() :: #root{}.
-type zone() :: #zone{}.
-type bucket() :: #bucket{}.
-type node_data() :: zone() | bucket().
-type nodes() :: #{node_id() => node_data()}.
-type zone_name() :: emqx_limiter_schema:zone_name().
-type limiter_type() :: emqx_limiter_schema:limiter_type().
-type bucket_name() :: emqx_limiter_schema:bucket_name().
-type rate() :: decimal().
-type flow() :: decimal().
-type capacity() :: decimal().
-type decimal() :: emqx_limiter_decimal:decimal().
-type state() :: #state{}.
-type index() :: pos_integer().
-export_type([index/0]).
-import(emqx_limiter_decimal, [add/2, sub/2, mul/2, add_to_counter/3, put_to_counter/3]).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
-spec connect(limiter_type(), bucket_name()) -> emqx_limiter_client:client().
connect(Type, Bucket) ->
#{zone := Zone,
aggregated := [Aggr, Capacity],
per_client := [Client, ClientCapa]} = emqx:get_config([emqx_limiter, Type, bucket, Bucket]),
case emqx_limiter_manager:find_counter(Type, Zone, Bucket) of
{ok, Counter, Idx, Rate} ->
if Client =/= infinity andalso (Client < Aggr orelse ClientCapa < Capacity) ->
emqx_limiter_client:create(Client, ClientCapa, Counter, Idx, Rate);
true ->
emqx_limiter_client:make_ref(Counter, Idx, Rate)
end;
_ ->
?LOG(error, "can't find the bucket:~p which type is:~p~n", [Bucket, Type]),
throw("invalid bucket")
end.
-spec info(limiter_type(), atom()) -> term().
info(Type, Info) ->
?CALL(Type, Info).
-spec name(limiter_type()) -> atom().
name(Type) ->
erlang:list_to_atom(io_lib:format("~s_~s", [?MODULE, Type])).
%%--------------------------------------------------------------------
%% @doc
%% Starts the server
%% @end
%%--------------------------------------------------------------------
-spec start_link(limiter_type()) -> _.
start_link(Type) ->
gen_server:start_link({local, name(Type)}, ?MODULE, [Type], []).
%%--------------------------------------------------------------------
%%% gen_server callbacks
%%--------------------------------------------------------------------
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Initializes the server
%% @end
%%--------------------------------------------------------------------
-spec init(Args :: term()) -> {ok, State :: term()} |
{ok, State :: term(), Timeout :: timeout()} |
{ok, State :: term(), hibernate} |
{stop, Reason :: term()} |
ignore.
init([Type]) ->
State = #state{zones = #{},
nodes = #{},
type = Type,
index = 1},
State2 = init_tree(Type, State),
oscillate(State2#state.root#root.period),
{ok, State2}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Handling call messages
%% @end
%%--------------------------------------------------------------------
-spec handle_call(Request :: term(), From :: {pid(), term()}, State :: term()) ->
{reply, Reply :: term(), NewState :: term()} |
{reply, Reply :: term(), NewState :: term(), Timeout :: timeout()} |
{reply, Reply :: term(), NewState :: term(), hibernate} |
{noreply, NewState :: term()} |
{noreply, NewState :: term(), Timeout :: timeout()} |
{noreply, NewState :: term(), hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: term()} |
{stop, Reason :: term(), NewState :: term()}.
handle_call(Req, _From, State) ->
?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Handling cast messages
%% @end
%%--------------------------------------------------------------------
-spec handle_cast(Request :: term(), State :: term()) ->
{noreply, NewState :: term()} |
{noreply, NewState :: term(), Timeout :: timeout()} |
{noreply, NewState :: term(), hibernate} |
{stop, Reason :: term(), NewState :: term()}.
handle_cast(Req, State) ->
?LOG(error, "Unexpected cast: ~p", [Req]),
{noreply, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Handling all non call/cast messages
%% @end
%%--------------------------------------------------------------------
-spec handle_info(Info :: timeout() | term(), State :: term()) ->
{noreply, NewState :: term()} |
{noreply, NewState :: term(), Timeout :: timeout()} |
{noreply, NewState :: term(), hibernate} |
{stop, Reason :: normal | term(), NewState :: term()}.
handle_info(oscillate, State) ->
{noreply, oscillation(State)};
handle_info(Info, State) ->
?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
%% @end
%%--------------------------------------------------------------------
-spec terminate(Reason :: normal | shutdown | {shutdown, term()} | term(),
State :: term()) -> any().
terminate(_Reason, _State) ->
ok.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Convert process state when code is changed
%% @end
%%--------------------------------------------------------------------
-spec code_change(OldVsn :: term() | {down, term()},
State :: term(),
Extra :: term()) -> {ok, NewState :: term()} |
{error, Reason :: term()}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% This function is called for changing the form and appearance
%% of gen_server status when it is returned from sys:get_status/1,2
%% or when it appears in termination error logs.
%% @end
%%--------------------------------------------------------------------
-spec format_status(Opt :: normal | terminate,
Status :: list()) -> Status :: term().
format_status(_Opt, Status) ->
Status.
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
oscillate(Interval) ->
erlang:send_after(Interval, self(), ?FUNCTION_NAME).
%% @doc generate tokens, and then spread to leaf nodes
-spec oscillation(state()) -> state().
oscillation(#state{root = #root{rate = Flow,
period = Interval,
childs = ChildIds,
consumed = Consumed} = Root,
nodes = Nodes} = State) ->
oscillate(Interval),
Childs = get_orderd_childs(ChildIds, Nodes),
{Alloced, Nodes2} = transverse(Childs, Flow, 0, Nodes),
State#state{nodes = Nodes2,
root = Root#root{consumed = Consumed + Alloced}}.
%% @doc horizontal spread
-spec transverse(list(node_data()),
flow(),
non_neg_integer(),
nodes()) -> {non_neg_integer(), nodes()}.
transverse([H | T], InFlow, Alloced, Nodes) when InFlow > 0 ->
{NodeAlloced, Nodes2} = longitudinal(H, InFlow, Nodes),
InFlow2 = sub(InFlow, NodeAlloced),
Alloced2 = Alloced + NodeAlloced,
transverse(T, InFlow2, Alloced2, Nodes2);
transverse(_, _, Alloced, Nodes) ->
{Alloced, Nodes}.
%% @doc vertical spread
-spec longitudinal(node_data(), flow(), nodes()) ->
{non_neg_integer(), nodes()}.
longitudinal(#zone{id = Id,
rate = Rate,
obtained = Obtained,
childs = ChildIds} = Node, InFlow, Nodes) ->
Flow = erlang:min(InFlow, Rate),
if Flow > 0 ->
Childs = get_orderd_childs(ChildIds, Nodes),
{Alloced, Nodes2} = transverse(Childs, Flow, 0, Nodes),
if Alloced > 0 ->
{Alloced,
Nodes2#{Id => Node#zone{obtained = Obtained + Alloced}}};
true ->
%% childs are empty or all counter childs are full
{0, Nodes}
end;
true ->
{0, Nodes}
end;
longitudinal(#bucket{id = Id,
rate = Rate,
capacity = Capacity,
correction = Correction,
counter = Counter,
index = Index,
obtained = Obtained} = Node, InFlow, Nodes) ->
Flow = add(erlang:min(InFlow, Rate), Correction),
Tokens = counters:get(Counter, Index),
%% toknes's value mayb be a negative value(stolen from the future)
Avaiable = erlang:min(if Tokens < 0 ->
add(Capacity, Tokens);
true ->
sub(Capacity, Tokens)
end, Flow),
FixAvaiable = erlang:min(Capacity, Avaiable),
if FixAvaiable > 0 ->
{Alloced, Decimal} = add_to_counter(Counter, Index, FixAvaiable),
{Alloced,
Nodes#{Id => Node#bucket{obtained = Obtained + Alloced,
correction = Decimal}}};
true ->
{0, Nodes}
end.
-spec get_orderd_childs(list(node_id()), nodes()) -> list(node_data()).
get_orderd_childs(Ids, Nodes) ->
Childs = [maps:get(Id, Nodes) || Id <- Ids],
%% sort by obtained, avoid node goes hungry
lists:sort(fun(A, B) ->
?GET_FIELD(?FIELD_OBTAINED, A) < ?GET_FIELD(?FIELD_OBTAINED, B)
end,
Childs).
-spec init_tree(emqx_limiter_schema:limiter_type(), state()) -> state().
init_tree(Type, State) ->
#{global := Global,
zone := Zone,
bucket := Bucket} = emqx:get_config([emqx_limiter, Type]),
{Factor, Root} = make_root(Global, Zone),
State2 = State#state{root = Root},
{NodeId, State3} = make_zone(maps:to_list(Zone), Factor, 1, State2),
State4 = State3#state{counter = counters:new(maps:size(Bucket),
[write_concurrency])},
make_bucket(maps:to_list(Bucket), Factor, NodeId, State4).
-spec make_root(decimal(), hocon:config()) -> {number(), root()}.
make_root(Rate, Zone) ->
ZoneNum = maps:size(Zone),
Childs = lists:seq(1, ZoneNum),
MiniPeriod = emqx_limiter_schema:minimum_period(),
if Rate >= 1 ->
{1, #root{rate = Rate,
period = MiniPeriod,
childs = Childs,
consumed = 0}};
true ->
Factor = 1 / Rate,
{Factor, #root{rate = 1,
period = erlang:floor(Factor * MiniPeriod),
childs = Childs,
consumed = 0}}
end.
make_zone([{Name, Rate} | T], Factor, NodeId, State) ->
#state{zones = Zones, nodes = Nodes} = State,
Zone = #zone{id = NodeId,
name = Name,
rate = mul(Rate, Factor),
obtained = 0,
childs = []},
State2 = State#state{zones = Zones#{Name => NodeId},
nodes = Nodes#{NodeId => Zone}},
make_zone(T, Factor, NodeId + 1, State2);
make_zone([], _, NodeId, State2) ->
{NodeId, State2}.
make_bucket([{Name, Conf} | T], Factor, NodeId, State) ->
#{zone := ZoneName,
aggregated := [Rate, Capacity]} = Conf,
{Counter, Idx, State2} = alloc_counter(ZoneName, Name, Rate, State),
Node = #bucket{ id = NodeId
, name = Name
, rate = mul(Rate, Factor)
, obtained = 0
, correction = 0
, capacity = Capacity
, counter = Counter
, index = Idx},
State3 = add_zone_child(NodeId, Node, ZoneName, State2),
make_bucket(T, Factor, NodeId + 1, State3);
make_bucket([], _, _, State) ->
State.
-spec alloc_counter(zone_name(), bucket_name(), rate(), state()) ->
{counters:counters_ref(), pos_integer(), state()}.
alloc_counter(Zone, Bucket, Rate,
#state{type = Type, counter = Counter, index = Index} = State) ->
Path = emqx_limiter_manager:make_path(Type, Zone, Bucket),
case emqx_limiter_manager:find_counter(Path) of
undefined ->
init_counter(Path, Counter, Index,
Rate, State#state{index = Index + 1});
{ok, ECounter, EIndex, _} ->
init_counter(Path, ECounter, EIndex, Rate, State)
end.
init_counter(Path, Counter, Index, Rate, State) ->
_ = put_to_counter(Counter, Index, 0),
emqx_limiter_manager:insert_counter(Path, Counter, Index, Rate),
{Counter, Index, State}.
-spec add_zone_child(node_id(), bucket(), zone_name(), state()) -> state().
add_zone_child(NodeId, Bucket, Name, #state{zones = Zones, nodes = Nodes} = State) ->
ZoneId = maps:get(Name, Zones),
#zone{childs = Childs} = Zone = maps:get(ZoneId, Nodes),
Nodes2 = Nodes#{ZoneId => Zone#zone{childs = [NodeId | Childs]},
NodeId => Bucket},
State#state{nodes = Nodes2}.

View File

@ -0,0 +1,94 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_limiter_server_sup).
-behaviour(supervisor).
%% API
-export([start_link/0, start/1, restart/1]).
%% Supervisor callbacks
-export([init/1]).
%%--==================================================================
%% API functions
%%--==================================================================
%%--------------------------------------------------------------------
%% @doc
%% Starts the supervisor
%% @end
%%--------------------------------------------------------------------
-spec start_link() -> {ok, Pid :: pid()} |
{error, {already_started, Pid :: pid()}} |
{error, {shutdown, term()}} |
{error, term()} |
ignore.
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
-spec start(emqx_limiter_schema:limiter_type()) -> _.
start(Type) ->
Spec = make_child(Type),
supervisor:start_child(?MODULE, Spec).
-spec restart(emqx_limiter_schema:limiter_type()) -> _.
restart(Type) ->
Id = emqx_limiter_server:name(Type),
_ = supervisor:terminate_child(?MODULE, Id),
supervisor:restart_child(?MODULE, Id).
%%--==================================================================
%% Supervisor callbacks
%%--==================================================================
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Whenever a supervisor is started using supervisor:start_link/[2,3],
%% this function is called by the new process to find out about
%% restart strategy, maximum restart intensity, and child
%% specifications.
%% @end
%%--------------------------------------------------------------------
-spec init(Args :: term()) ->
{ok, {SupFlags :: supervisor:sup_flags(),
[ChildSpec :: supervisor:child_spec()]}} |
ignore.
init([]) ->
SupFlags = #{strategy => one_for_one,
intensity => 10,
period => 3600},
{ok, {SupFlags, childs()}}.
%%--==================================================================
%% Internal functions
%%--==================================================================
make_child(Type) ->
Id = emqx_limiter_server:name(Type),
#{id => Id,
start => {emqx_limiter_server, start_link, [Type]},
restart => transient,
shutdown => 5000,
type => worker,
modules => [emqx_limiter_server]}.
childs() ->
Conf = emqx:get_config([emqx_limiter]),
Types = maps:keys(Conf),
[make_child(Type) || Type <- Types].

View File

@ -0,0 +1,76 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_limiter_sup).
-behaviour(supervisor).
%% API
-export([start_link/0]).
%% Supervisor callbacks
-export([init/1]).
%%--------------------------------------------------------------------
%% API functions
%%--------------------------------------------------------------------
%%--------------------------------------------------------------------
%% @doc
%% Starts the supervisor
%% @end
%%--------------------------------------------------------------------
-spec start_link() -> {ok, Pid :: pid()} |
{error, {already_started, Pid :: pid()}} |
{error, {shutdown, term()}} |
{error, term()} |
ignore.
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
%%--------------------------------------------------------------------
%% Supervisor callbacks
%%--------------------------------------------------------------------
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Whenever a supervisor is started using supervisor:start_link/[2,3],
%% this function is called by the new process to find out about
%% restart strategy, maximum restart intensity, and child
%% specifications.
%% @end
%%--------------------------------------------------------------------
-spec init(Args :: term()) ->
{ok, {SupFlags :: supervisor:sup_flags(),
[ChildSpec :: supervisor:child_spec()]}} |
ignore.
init([]) ->
SupFlags = #{strategy => one_for_one,
intensity => 10,
period => 3600},
Childs = [ make_child(emqx_limiter_manager, worker)
, make_child(emqx_limiter_server_sup, supervisor)],
{ok, {SupFlags, Childs}}.
make_child(Mod, Type) ->
#{id => Mod,
start => {Mod, start_link, []},
restart => transient,
type => Type,
modules => [Mod]}.

View File

@ -0,0 +1,272 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_limiter_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-define(APP, emqx_limiter).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(BASE_CONF, <<"""
emqx_limiter {
bytes_in {global = \"100KB/10s\"
zone.default = \"100kB/10s\"
zone.external = \"20kB/10s\"
bucket.tcp {zone = default
aggregated = \"100kB/10s,1Mb\"
per_client = \"100KB/10s,10Kb\"}
bucket.ssl {zone = external
aggregated = \"100kB/10s,1Mb\"
per_client = \"100KB/10s,10Kb\"}
}
message_in {global = \"100/10s\"
zone.default = \"100/10s\"
bucket.bucket1 {zone = default
aggregated = \"100/10s,1000\"
per_client = \"100/10s,100\"}
}
connection {global = \"100/10s\"
zone.default = \"100/10s\"
bucket.bucket1 {zone = default
aggregated = \"100/10s,100\"
per_client = \"100/10s,10\"
}
}
message_routing {global = \"100/10s\"
zone.default = \"100/10s\"
bucket.bucket1 {zone = default
aggregated = \"100/10s,100\"
per_client = \"100/10s,10\"
}
}
}""">>).
-define(LOGT(Format, Args), ct:pal("TEST_SUITE: " ++ Format, Args)).
-record(client_options, { interval :: non_neg_integer()
, per_cost :: non_neg_integer()
, type :: atom()
, bucket :: atom()
, lifetime :: non_neg_integer()
, rates :: list(tuple())
}).
-record(client_state, { client :: emqx_limiter_client:limiter()
, pid :: pid()
, got :: non_neg_integer()
, options :: #client_options{}}).
%%--------------------------------------------------------------------
%% Setups
%%--------------------------------------------------------------------
all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) ->
ok = emqx_config:init_load(emqx_limiter_schema, ?BASE_CONF),
emqx_ct_helpers:start_apps([?APP]),
Config.
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([?APP]).
init_per_testcase(_TestCase, Config) ->
Config.
%%--------------------------------------------------------------------
%% Test Cases
%%--------------------------------------------------------------------
t_un_overload(_) ->
Conf = emqx:get_config([emqx_limiter]),
Conn = #{global => to_rate("infinity"),
zone => #{z1 => to_rate("1000/1s"),
z2 => to_rate("1000/1s")},
bucket => #{b1 => #{zone => z1,
aggregated => to_bucket_rate("100/1s, 500"),
per_client => to_bucket_rate("10/1s, 50")},
b2 => #{zone => z2,
aggregated => to_bucket_rate("500/1s, 500"),
per_client => to_bucket_rate("100/1s, infinity")
}}},
Conf2 = Conf#{connection => Conn},
emqx_config:put([emqx_limiter], Conf2),
{ok, _} = emqx_limiter_manager:restart_server(connection),
timer:sleep(200),
B1C = #client_options{interval = 100,
per_cost = 1,
type = connection,
bucket = b1,
lifetime = timer:seconds(3),
rates = [{fun erlang:'=<'/2, ["1000/1s", "100/1s"]},
{fun erlang:'=:='/2, ["10/1s"]}]},
B2C = #client_options{interval = 100,
per_cost = 10,
type = connection,
bucket = b2,
lifetime = timer:seconds(3),
rates = [{fun erlang:'=<'/2, ["1000/1s", "500/1s"]},
{fun erlang:'=:='/2, ["100/1s"]}]},
lists:foreach(fun(_) -> start_client(B1C) end,
lists:seq(1, 10)),
lists:foreach(fun(_) -> start_client(B2C) end,
lists:seq(1, 5)),
?assert(check_client_result(10 + 5)).
t_infinity(_) ->
Conf = emqx:get_config([emqx_limiter]),
Conn = #{global => to_rate("infinity"),
zone => #{z1 => to_rate("1000/1s"),
z2 => to_rate("infinity")},
bucket => #{b1 => #{zone => z1,
aggregated => to_bucket_rate("100/1s, infinity"),
per_client => to_bucket_rate("10/1s, 100")},
b2 => #{zone => z2,
aggregated => to_bucket_rate("infinity, 600"),
per_client => to_bucket_rate("100/1s, infinity")
}}},
Conf2 = Conf#{connection => Conn},
emqx_config:put([emqx_limiter], Conf2),
{ok, _} = emqx_limiter_manager:restart_server(connection),
timer:sleep(200),
B1C = #client_options{interval = 100,
per_cost = 1,
type = connection,
bucket = b1,
lifetime = timer:seconds(3),
rates = [{fun erlang:'=<'/2, ["1000/1s", "100/1s"]},
{fun erlang:'=:='/2, ["10/1s"]}]},
B2C = #client_options{interval = 100,
per_cost = 10,
type = connection,
bucket = b2,
lifetime = timer:seconds(3),
rates = [{fun erlang:'=:='/2, ["100/1s"]}]},
lists:foreach(fun(_) -> start_client(B1C) end,
lists:seq(1, 8)),
lists:foreach(fun(_) -> start_client(B2C) end,
lists:seq(1, 4)),
?assert(check_client_result(8 + 4)).
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
start_client(Opts) ->
Pid = self(),
erlang:spawn(fun() -> enter_client(Opts, Pid) end).
enter_client(#client_options{type = Type,
bucket = Bucket,
lifetime = Lifetime} = Opts,
Pid) ->
erlang:send_after(Lifetime, self(), stop),
erlang:send(self(), consume),
Client = emqx_limiter_server:connect(Type, Bucket),
client_loop(#client_state{client = Client,
pid = Pid,
got = 0,
options = Opts}).
client_loop(#client_state{client = Client,
got = Got,
pid = Pid,
options = #client_options{interval = Interval,
per_cost = PerCost,
lifetime = Lifetime,
rates = Rates}} = State) ->
receive
consume ->
case emqx_limiter_client:consume(PerCost, Client) of
{ok, Client2} ->
erlang:send_after(Interval, self(), consume),
client_loop(State#client_state{client = Client2,
got = Got + PerCost});
{pause, MS, Client2} ->
erlang:send_after(MS, self(), {resume, erlang:system_time(millisecond)}),
client_loop(State#client_state{client = Client2})
end;
stop ->
Rate = Got * emqx_limiter_schema:minimum_period() / Lifetime,
?LOGT("Got:~p, Rate is:~p Checks:~p~n", [Got, Rate, Rate]),
Check = check_rates(Rate, Rates),
erlang:send(Pid, {client, Check});
{resume, Begin} ->
case emqx_limiter_client:consume(PerCost, Client) of
{ok, Client2} ->
Now = erlang:system_time(millisecond),
Diff = erlang:max(0, Interval - (Now - Begin)),
erlang:send_after(Diff, self(), consume),
client_loop(State#client_state{client = Client2,
got = Got + PerCost});
{pause, MS, Client2} ->
erlang:send_after(MS, self(), {resume, Begin}),
client_loop(State#client_state{client = Client2})
end
end.
check_rates(Rate, [{Fun, Rates} | T]) ->
case lists:all(fun(E) -> Fun(Rate, to_rate(E)) end, Rates) of
true ->
check_rates(Rate, T);
false ->
false
end;
check_rates(_, _) ->
true.
check_client_result(0) ->
true;
check_client_result(N) ->
?LOGT("check_client_result:~p~n", [N]),
receive
{client, true} ->
check_client_result(N - 1);
{client, false} ->
false;
Any ->
?LOGT(">>>> other:~p~n", [Any])
after 3500 ->
?LOGT(">>>> timeout~n", []),
false
end.
to_rate(Str) ->
{ok, Rate} = emqx_limiter_schema:to_rate(Str),
Rate.
to_bucket_rate(Str) ->
{ok, Result} = emqx_limiter_schema:to_bucket_rate(Str),
Result.

View File

@ -54,6 +54,7 @@
, emqx_rule_engine_schema , emqx_rule_engine_schema
, emqx_exhook_schema , emqx_exhook_schema
, emqx_psk_schema , emqx_psk_schema
, emqx_limiter_schema
]). ]).
namespace() -> undefined. namespace() -> undefined.

View File

@ -279,6 +279,7 @@ relx_apps(ReleaseType) ->
, emqx_statsd , emqx_statsd
, emqx_prometheus , emqx_prometheus
, emqx_psk , emqx_psk
, emqx_limiter
] ]
++ [quicer || is_quicer_supported()] ++ [quicer || is_quicer_supported()]
++ [emqx_license || is_enterprise()] ++ [emqx_license || is_enterprise()]