diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index bbc40a1e7..9dcd13154 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -72,6 +72,7 @@ -export([namespace/0, roots/0, roots/1, fields/1]). -export([conf_get/2, conf_get/3, keys/2, filter/1]). -export([server_ssl_opts_schema/2, client_ssl_opts_schema/1, ciphers_schema/1, default_ciphers/1]). +-export([sc/2, map/2]). namespace() -> undefined. diff --git a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl index 47453d246..1c8aeaf83 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl @@ -368,14 +368,18 @@ typename_to_spec("comma_separated_atoms()", _Mod) -> #{type => string, example = typename_to_spec("pool_type()", _Mod) -> #{type => string, enum => [random, hash], example => hash}; typename_to_spec("log_level()", _Mod) -> #{type => string, enum => [debug, info, notice, warning, error, critical, alert, emergency, all]}; +typename_to_spec("rate()", _Mod) -> + #{type => string, example => <<"10M/s">>}; +typename_to_spec("bucket_rate()", _Mod) -> + #{type => string, example => <<"10M/s, 100M">>}; typename_to_spec(Name, Mod) -> Spec = range(Name), Spec1 = remote_module_type(Spec, Name, Mod), Spec2 = typerefl_array(Spec1, Name, Mod), Spec3 = integer(Spec2, Name), Spec3 =:= nomatch andalso - throw({error, #{msg => <<"Unsupport Type">>, type => Name, module => Mod}}), - Spec3. + throw({error, #{msg => <<"Unsupport Type">>, type => Name, module => Mod}}), + Spec3. range(Name) -> case string:split(Name, "..") of diff --git a/apps/emqx_limiter/etc/emqx_limiter.conf b/apps/emqx_limiter/etc/emqx_limiter.conf new file mode 100644 index 000000000..44bbb1740 --- /dev/null +++ b/apps/emqx_limiter/etc/emqx_limiter.conf @@ -0,0 +1,50 @@ +##-------------------------------------------------------------------- +## Emq X Rate Limiter +##-------------------------------------------------------------------- +emqx_limiter { + bytes_in { + global = "100KB/10s" # token generation rate + zone.default = "100kB/10s" + zone.external = "20kB/10s" + bucket.tcp { + zone = default + aggregated = "100kB/10s,1Mb" + per_client = "100KB/10s,10Kb" + } + bucket.ssl { + zone = external + aggregated = "100kB/10s,1Mb" + per_client = "100KB/10s,10Kb" + } + } + + message_in { + global = "100/10s" + zone.default = "100/10s" + bucket.bucket1 { + zone = default + aggregated = "100/10s,1000" + per_client = "100/10s,100" + } + } + + connection { + global = "100/10s" + zone.default = "100/10s" + bucket.bucket1 { + zone = default + aggregated = "100/10s,1000" + per_client = "100/10s,100" + } + } + + message_routing { + global = "100/10s" + zone.default = "100/10s" + bucket.bucket1 { + zone = default + aggregated = "100/10s,100" + per_client = "100/10s,10" + } + } +} \ No newline at end of file diff --git a/apps/emqx_limiter/src/emqx_limiter.app.src b/apps/emqx_limiter/src/emqx_limiter.app.src new file mode 100644 index 000000000..70fe89e97 --- /dev/null +++ b/apps/emqx_limiter/src/emqx_limiter.app.src @@ -0,0 +1,15 @@ +%% -*- mode: erlang -*- +{application, emqx_limiter, + [{description, "EMQ X Hierachical Limiter"}, + {vsn, "1.0.0"}, % strict semver, bump manually! + {modules, []}, + {registered, [emqx_limiter_sup]}, + {applications, [kernel,stdlib,emqx]}, + {mod, {emqx_limiter_app,[]}}, + {env, []}, + {licenses, ["Apache-2.0"]}, + {maintainers, ["EMQ X Team "]}, + {links, [{"Homepage", "https://emqx.io/"}, + {"Github", "https://github.com/emqx/emqx-retainer"} + ]} + ]}. diff --git a/apps/emqx_limiter/src/emqx_limiter_app.erl b/apps/emqx_limiter/src/emqx_limiter_app.erl new file mode 100644 index 000000000..2244a0e91 --- /dev/null +++ b/apps/emqx_limiter/src/emqx_limiter_app.erl @@ -0,0 +1,55 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_limiter_app). + +-behaviour(application). + +%% Application callbacks +-export([start/2, stop/1]). + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% This function is called whenever an application is started using +%% application:start/[1,2], and should start the processes of the +%% application. If the application is structured according to the OTP +%% design principles as a supervision tree, this means starting the +%% top supervisor of the tree. +%% @end +%%-------------------------------------------------------------------- +-spec start(StartType :: normal | + {takeover, Node :: node()} | + {failover, Node :: node()}, + StartArgs :: term()) -> + {ok, Pid :: pid()} | + {ok, Pid :: pid(), State :: term()} | + {error, Reason :: term()}. +start(_StartType, _StartArgs) -> + {ok, _} = Result = emqx_limiter_sup:start_link(), + Result. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% This function is called whenever an application has stopped. It +%% is intended to be the opposite of Module:start/2 and should do +%% any necessary cleaning up. The return value is ignored. +%% @end +%%-------------------------------------------------------------------- +-spec stop(State :: term()) -> any(). +stop(_State) -> + ok. diff --git a/apps/emqx_limiter/src/emqx_limiter_client.erl b/apps/emqx_limiter/src/emqx_limiter_client.erl new file mode 100644 index 000000000..eb7c768ff --- /dev/null +++ b/apps/emqx_limiter/src/emqx_limiter_client.erl @@ -0,0 +1,144 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2019-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_limiter_client). + +%% API +-export([create/5, make_ref/3, consume/2]). +-export_type([limiter/0]). + +%% tocket bucket algorithm +-record(limiter, { tokens :: non_neg_integer() + , rate :: float() + , capacity :: decimal() + , lasttime :: millisecond() + , ref :: ref_limiter() + }). + +-record(ref, { counter :: counters:counters_ref() + , index :: index() + , rate :: decimal() + , obtained :: non_neg_integer() + }). + +%% TODO +%% we should add a nop-limiter, when all the upper layers (global, zone, and buckets ) are infinity + +-type limiter() :: #limiter{}. +-type ref_limiter() :: #ref{}. +-type client() :: limiter() | ref_limiter(). +-type millisecond() :: non_neg_integer(). +-type pause_result(Client) :: {pause, millisecond(), Client}. +-type consume_result(Client) :: {ok, Client} + | pause_result(Client). +-type decimal() :: emqx_limiter_decimal:decimal(). +-type index() :: emqx_limiter_server:index(). + +-define(NOW, erlang:monotonic_time(millisecond)). +-define(MINIUMN_PAUSE, 100). + +-import(emqx_limiter_decimal, [sub/2]). +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- +-spec create(float(), + decimal(), + counters:counters_ref(), + index(), + decimal()) -> limiter(). +create(Rate, Capacity, Counter, Index, CounterRate) -> + #limiter{ tokens = Capacity + , rate = Rate + , capacity = Capacity + , lasttime = ?NOW + , ref = make_ref(Counter, Index, CounterRate) + }. + +-spec make_ref(counters:counters_ref(), index(), decimal()) -> ref_limiter(). +make_ref(Counter, Idx, Rate) -> + #ref{counter = Counter, index = Idx, rate = Rate, obtained = 0}. + +-spec consume(pos_integer(), Client) -> consume_result(Client) + when Client :: client(). +consume(Need, #limiter{tokens = Tokens, + capacity = Capacity} = Limiter) -> + if Need =< Tokens -> + try_consume_counter(Need, Limiter); + Need > Capacity -> + %% FIXME + %% The client should be able to send 4kb data if the rate is configured to be 2kb/s, it just needs 2s to complete. + throw("too big request"); %% FIXME how to deal this? + true -> + try_reset(Need, Limiter) + end; + +consume(Need, #ref{counter = Counter, + index = Index, + rate = Rate, + obtained = Obtained} = Ref) -> + Tokens = counters:get(Counter, Index), + if Tokens >= Need -> + counters:sub(Counter, Index, Need), + {ok, Ref#ref{obtained = Obtained + Need}}; + true -> + return_pause(Need - Tokens, Rate, Ref) + end. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- +-spec try_consume_counter(pos_integer(), limiter()) -> consume_result(limiter()). +try_consume_counter(Need, + #limiter{tokens = Tokens, + ref = #ref{counter = Counter, + index = Index, + obtained = Obtained, + rate = CounterRate} = Ref} = Limiter) -> + CT = counters:get(Counter, Index), + if CT >= Need -> + counters:sub(Counter, Index, Need), + {ok, Limiter#limiter{tokens = sub(Tokens, Need), + ref = Ref#ref{obtained = Obtained + Need}}}; + true -> + return_pause(Need - CT, CounterRate, Limiter) + end. + +-spec try_reset(pos_integer(), limiter()) -> consume_result(limiter()). +try_reset(Need, + #limiter{tokens = Tokens, + rate = Rate, + lasttime = LastTime, + capacity = Capacity} = Limiter) -> + Now = ?NOW, + Inc = erlang:floor((Now - LastTime) * Rate / emqx_limiter_schema:minimum_period()), + Tokens2 = erlang:min(Tokens + Inc, Capacity), + if Need > Tokens2 -> + return_pause(Need, Rate, Limiter); + true -> + Limiter2 = Limiter#limiter{tokens = Tokens2, + lasttime = Now}, + try_consume_counter(Need, Limiter2) + end. + +-spec return_pause(pos_integer(), decimal(), Client) -> pause_result(Client) + when Client :: client(). +return_pause(_, infinity, Limiter) -> + %% workaround when emqx_limiter_server's rate is infinity + {pause, ?MINIUMN_PAUSE, Limiter}; + +return_pause(Diff, Rate, Limiter) -> + Pause = erlang:round(Diff * emqx_limiter_schema:minimum_period() / Rate), + {pause, erlang:max(Pause, ?MINIUMN_PAUSE), Limiter}. diff --git a/apps/emqx_limiter/src/emqx_limiter_decimal.erl b/apps/emqx_limiter/src/emqx_limiter_decimal.erl new file mode 100644 index 000000000..26ae611e8 --- /dev/null +++ b/apps/emqx_limiter/src/emqx_limiter_decimal.erl @@ -0,0 +1,79 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2019-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% a simple decimal module for rate-related calculations + +-module(emqx_limiter_decimal). + +%% API +-export([ add/2, sub/2, mul/2 + , add_to_counter/3, put_to_counter/3]). +-export_type([decimal/0, zero_or_float/0]). + +-type decimal() :: infinity | number(). +-type zero_or_float() :: 0 | float(). + +%%-------------------------------------------------------------------- +%%% API +%%-------------------------------------------------------------------- +-spec add(decimal(), decimal()) -> decimal(). +add(A, B) when A =:= infinity + orelse B =:= infinity -> + infinity; + +add(A, B) -> + A + B. + +-spec sub(decimal(), decimal()) -> decimal(). +sub(A, B) when A =:= infinity + orelse B =:= infinity -> + infinity; + +sub(A, B) -> + A - B. + +-spec mul(decimal(), decimal()) -> decimal(). +mul(A, B) when A =:= infinity + orelse B =:= infinity -> + infinity; + +mul(A, B) -> + A * B. + +-spec add_to_counter(counters:counters_ref(), pos_integer(), decimal()) -> + {zero_or_float(), zero_or_float()}. +add_to_counter(_, _, infinity) -> + {0, 0}; +add_to_counter(Counter, Index, Val) when is_float(Val) -> + IntPart = erlang:floor(Val), + if IntPart > 0 -> + counters:add(Counter, Index, IntPart); + true -> + ok + end, + {IntPart, Val - IntPart}; +add_to_counter(Counter, Index, Val) -> + counters:add(Counter, Index, Val), + {Val, 0}. + +-spec put_to_counter(counters:counters_ref(), pos_integer(), decimal()) -> ok. +put_to_counter(_, _, infinity) -> + ok; +put_to_counter(Counter, Index, Val) when is_float(Val) -> + IntPart = erlang:floor(Val), + counters:put(Counter, Index, IntPart); +put_to_counter(Counter, Index, Val) -> + counters:put(Counter, Index, Val). diff --git a/apps/emqx_limiter/src/emqx_limiter_manager.erl b/apps/emqx_limiter/src/emqx_limiter_manager.erl new file mode 100644 index 000000000..471098242 --- /dev/null +++ b/apps/emqx_limiter/src/emqx_limiter_manager.erl @@ -0,0 +1,229 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2019-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_limiter_manager). + +-behaviour(gen_server). + +-include_lib("emqx/include/logger.hrl"). +-include_lib("stdlib/include/ms_transform.hrl"). + +%% API +-export([ start_link/0, start_server/1, find_counter/1 + , find_counter/3, insert_counter/4, insert_counter/6 + , make_path/3, restart_server/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3, format_status/2]). + +-type path() :: list(atom()). +-type limiter_type() :: emqx_limiter_schema:limiter_type(). +-type zone_name() :: emqx_limiter_schema:zone_name(). +-type bucket_name() :: emqx_limiter_schema:bucket_name(). + +%% counter record in ets table +-record(element, {path :: path(), + counter :: counters:counters_ref(), + index :: index(), + rate :: rate() + }). + + +-type index() :: emqx_limiter_server:index(). +-type rate() :: emqx_limiter_decimal:decimal(). + +-define(TAB, emqx_limiter_counters). + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- +-spec start_server(limiter_type()) -> _. +start_server(Type) -> + emqx_limiter_server_sup:start(Type). + +-spec restart_server(limiter_type()) -> _. +restart_server(Type) -> + emqx_limiter_server_sup:restart(Type). + +-spec find_counter(limiter_type(), zone_name(), bucket_name()) -> + {ok, counters:counters_ref(), index(), rate()} | undefined. +find_counter(Type, Zone, BucketId) -> + find_counter(make_path(Type, Zone, BucketId)). + +-spec find_counter(path()) -> + {ok, counters:counters_ref(), index(), rate()} | undefined. +find_counter(Path) -> + case ets:lookup(?TAB, Path) of + [#element{counter = Counter, index = Index, rate = Rate}] -> + {ok, Counter, Index, Rate}; + _ -> + undefined + end. + +-spec insert_counter(limiter_type(), + zone_name(), + bucket_name(), + counters:counters_ref(), + index(), + rate()) -> boolean(). +insert_counter(Type, Zone, BucketId, Counter, Index, Rate) -> + insert_counter(make_path(Type, Zone, BucketId), + Counter, + Index, + Rate). + +-spec insert_counter(path(), + counters:counters_ref(), + index(), + rate()) -> boolean(). +insert_counter(Path, Counter, Index, Rate) -> + ets:insert(?TAB, + #element{path = Path, + counter = Counter, + index = Index, + rate = Rate}). + +-spec make_path(limiter_type(), zone_name(), bucket_name()) -> path(). +make_path(Type, Name, BucketId) -> + [Type, Name, BucketId]. + +%%-------------------------------------------------------------------- +%% @doc +%% Starts the server +%% @end +%%-------------------------------------------------------------------- +-spec start_link() -> {ok, Pid :: pid()} | + {error, Error :: {already_started, pid()}} | + {error, Error :: term()} | + ignore. +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +%%-------------------------------------------------------------------- +%% gen_server callbacks +%%-------------------------------------------------------------------- + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Initializes the server +%% @end +%%-------------------------------------------------------------------- +-spec init(Args :: term()) -> {ok, State :: term()} | + {ok, State :: term(), Timeout :: timeout()} | + {ok, State :: term(), hibernate} | + {stop, Reason :: term()} | + ignore. +init([]) -> + _ = ets:new(?TAB, [ set, public, named_table, {keypos, #element.path} + , {write_concurrency, true}, {read_concurrency, true} + , {heir, erlang:whereis(emqx_limiter_sup), none} + ]), + {ok, #{}}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling call messages +%% @end +%%-------------------------------------------------------------------- +-spec handle_call(Request :: term(), From :: {pid(), term()}, State :: term()) -> + {reply, Reply :: term(), NewState :: term()} | + {reply, Reply :: term(), NewState :: term(), Timeout :: timeout()} | + {reply, Reply :: term(), NewState :: term(), hibernate} | + {noreply, NewState :: term()} | + {noreply, NewState :: term(), Timeout :: timeout()} | + {noreply, NewState :: term(), hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: term()} | + {stop, Reason :: term(), NewState :: term()}. +handle_call(Req, _From, State) -> + ?LOG(error, "Unexpected call: ~p", [Req]), + {reply, ignore, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling cast messages +%% @end +%%-------------------------------------------------------------------- +-spec handle_cast(Request :: term(), State :: term()) -> + {noreply, NewState :: term()} | + {noreply, NewState :: term(), Timeout :: timeout()} | + {noreply, NewState :: term(), hibernate} | + {stop, Reason :: term(), NewState :: term()}. +handle_cast(Req, State) -> + ?LOG(error, "Unexpected cast: ~p", [Req]), + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling all non call/cast messages +%% @end +%%-------------------------------------------------------------------- +-spec handle_info(Info :: timeout() | term(), State :: term()) -> + {noreply, NewState :: term()} | + {noreply, NewState :: term(), Timeout :: timeout()} | + {noreply, NewState :: term(), hibernate} | + {stop, Reason :: normal | term(), NewState :: term()}. +handle_info(Info, State) -> + ?LOG(error, "Unexpected info: ~p", [Info]), + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +%% @end +%%-------------------------------------------------------------------- +-spec terminate(Reason :: normal | shutdown | {shutdown, term()} | term(), + State :: term()) -> any(). +terminate(_Reason, _State) -> + ok. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Convert process state when code is changed +%% @end +%%-------------------------------------------------------------------- +-spec code_change(OldVsn :: term() | {down, term()}, + State :: term(), + Extra :: term()) -> {ok, NewState :: term()} | + {error, Reason :: term()}. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% This function is called for changing the form and appearance +%% of gen_server status when it is returned from sys:get_status/1,2 +%% or when it appears in termination error logs. +%% @end +%%-------------------------------------------------------------------- +-spec format_status(Opt :: normal | terminate, + Status :: list()) -> Status :: term(). +format_status(_Opt, Status) -> + Status. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- diff --git a/apps/emqx_limiter/src/emqx_limiter_schema.erl b/apps/emqx_limiter/src/emqx_limiter_schema.erl new file mode 100644 index 000000000..0e2977025 --- /dev/null +++ b/apps/emqx_limiter/src/emqx_limiter_schema.erl @@ -0,0 +1,140 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_limiter_schema). + +-include_lib("typerefl/include/types.hrl"). + +-export([ roots/0, fields/1, to_rate/1 + , to_bucket_rate/1, minimum_period/0]). + +-define(KILOBYTE, 1024). + +-type limiter_type() :: bytes_in + | message_in + | connection + | message_routing. + +-type bucket_name() :: atom(). +-type zone_name() :: atom(). +-type rate() :: infinity | float(). +-type bucket_rate() :: list(infinity | number()). + +-typerefl_from_string({rate/0, ?MODULE, to_rate}). +-typerefl_from_string({bucket_rate/0, ?MODULE, to_bucket_rate}). + +-reflect_type([ rate/0 + , bucket_rate/0 + ]). + +-export_type([limiter_type/0, bucket_name/0, zone_name/0]). + +-import(emqx_schema, [sc/2, map/2]). + +roots() -> [emqx_limiter]. + +fields(emqx_limiter) -> + [ {bytes_in, sc(ref(limiter), #{})} + , {message_in, sc(ref(limiter), #{})} + , {connection, sc(ref(limiter), #{})} + , {message_routing, sc(ref(limiter), #{})} + ]; + +fields(limiter) -> + [ {global, sc(rate(), #{})} + , {zone, sc(map("zone name", rate()), #{})} + , {bucket, sc(map("bucket id", ref(bucket)), + #{desc => "Token Buckets"})} + ]; + +fields(bucket) -> + [ {zone, sc(atom(), #{desc => "the zone which the bucket in"})} + , {aggregated, sc(bucket_rate(), #{})} + , {per_client, sc(bucket_rate(), #{})} + ]. + +%% minimum period is 100ms +minimum_period() -> + 100. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- +ref(Field) -> hoconsc:ref(?MODULE, Field). + +to_rate(Str) -> + Tokens = [string:trim(T) || T <- string:tokens(Str, "/")], + case Tokens of + ["infinity"] -> + {ok, infinity}; + [Quota, Interval] -> + {ok, Val} = to_quota(Quota), + case emqx_schema:to_duration_ms(Interval) of + {ok, Ms} when Ms > 0 -> + {ok, Val * minimum_period() / Ms}; + _ -> + {error, Str} + end; + _ -> + {error, Str} + end. + +to_bucket_rate(Str) -> + Tokens = [string:trim(T) || T <- string:tokens(Str, "/,")], + case Tokens of + [Rate, Capa] -> + {ok, infinity} = to_quota(Rate), + {ok, CapaVal} = to_quota(Capa), + if CapaVal =/= infinity -> + {ok, [infinity, CapaVal]}; + true -> + {error, Str} + end; + [Quota, Interval, Capacity] -> + {ok, Val} = to_quota(Quota), + case emqx_schema:to_duration_ms(Interval) of + {ok, Ms} when Ms > 0 -> + {ok, CapaVal} = to_quota(Capacity), + {ok, [Val * minimum_period() / Ms, CapaVal]}; + _ -> + {error, Str} + end; + _ -> + {error, Str} + end. + + +to_quota(Str) -> + {ok, MP} = re:compile("^\s*(?:(?:([1-9][0-9]*)([a-zA-z]*))|infinity)\s*$"), + Result = re:run(Str, MP, [{capture, all_but_first, list}]), + case Result of + {match, [Quota, Unit]} -> + Val = erlang:list_to_integer(Quota), + Unit2 = string:to_lower(Unit), + {ok, apply_unit(Unit2, Val)}; + {match, [Quota]} -> + {ok, erlang:list_to_integer(Quota)}; + {match, []} -> + {ok, infinity}; + _ -> + {error, Str} + end. + +apply_unit("", Val) -> Val; +apply_unit("kb", Val) -> Val * ?KILOBYTE; +apply_unit("mb", Val) -> Val * ?KILOBYTE * ?KILOBYTE; +apply_unit("gb", Val) -> Val * ?KILOBYTE * ?KILOBYTE * ?KILOBYTE; +apply_unit(Unit, _) -> throw("invalid unit:" ++ Unit). diff --git a/apps/emqx_limiter/src/emqx_limiter_server.erl b/apps/emqx_limiter/src/emqx_limiter_server.erl new file mode 100644 index 000000000..8a712db2e --- /dev/null +++ b/apps/emqx_limiter/src/emqx_limiter_server.erl @@ -0,0 +1,426 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% A hierachical token bucket algorithm +%% Note: this is not the linux HTB algorithm(http://luxik.cdi.cz/~devik/qos/htb/manual/theory.htm) +%% Algorithm: +%% 1. the root node periodically generates tokens and then distributes them +%% just like the oscillation of water waves +%% 2. the leaf node has a counter, which is the place where the token is actually held. +%% 3. other nodes only play the role of transmission, and the rate of the node is like a valve, +%% limiting the oscillation transmitted from the parent node + +-module(emqx_limiter_server). + +-behaviour(gen_server). + +-include_lib("emqx/include/logger.hrl"). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3, format_status/2]). + +-export([ start_link/1, connect/2, info/2 + , name/1]). + +-record(root, { rate :: rate() %% number of tokens generated per period + , period :: pos_integer() %% token generation interval(second) + , childs :: list(node_id()) %% node children + , consumed :: non_neg_integer() + }). + +-record(zone, { id :: pos_integer() + , name :: zone_name() + , rate :: rate() + , obtained :: non_neg_integer() %% number of tokens obtained + , childs :: list(node_id()) + }). + +-record(bucket, { id :: pos_integer() + , name :: bucket_name() + , rate :: rate() + , obtained :: non_neg_integer() + , correction :: emqx_limiter_decimal:zero_or_float() %% token correction value + , capacity :: capacity() + , counter :: counters:counters_ref() + , index :: index() + }). + +-record(state, { root :: undefined | root() + , counter :: undefined | counters:counters_ref() %% current counter to alloc + , index :: index() + , zones :: #{zone_name() => node_id()} + , nodes :: nodes() + , type :: limiter_type() + }). + +%% maybe use maps is better, but record is fastter +-define(FIELD_OBTAINED, #zone.obtained). +-define(GET_FIELD(F, Node), element(F, Node)). +-define(CALL(Type, Msg), gen_server:call(name(Type), {?FUNCTION_NAME, Msg})). + +-type node_id() :: pos_integer(). +-type root() :: #root{}. +-type zone() :: #zone{}. +-type bucket() :: #bucket{}. +-type node_data() :: zone() | bucket(). +-type nodes() :: #{node_id() => node_data()}. +-type zone_name() :: emqx_limiter_schema:zone_name(). +-type limiter_type() :: emqx_limiter_schema:limiter_type(). +-type bucket_name() :: emqx_limiter_schema:bucket_name(). +-type rate() :: decimal(). +-type flow() :: decimal(). +-type capacity() :: decimal(). +-type decimal() :: emqx_limiter_decimal:decimal(). +-type state() :: #state{}. +-type index() :: pos_integer(). + +-export_type([index/0]). +-import(emqx_limiter_decimal, [add/2, sub/2, mul/2, add_to_counter/3, put_to_counter/3]). + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- +-spec connect(limiter_type(), bucket_name()) -> emqx_limiter_client:client(). +connect(Type, Bucket) -> + #{zone := Zone, + aggregated := [Aggr, Capacity], + per_client := [Client, ClientCapa]} = emqx:get_config([emqx_limiter, Type, bucket, Bucket]), + case emqx_limiter_manager:find_counter(Type, Zone, Bucket) of + {ok, Counter, Idx, Rate} -> + if Client =/= infinity andalso (Client < Aggr orelse ClientCapa < Capacity) -> + emqx_limiter_client:create(Client, ClientCapa, Counter, Idx, Rate); + true -> + emqx_limiter_client:make_ref(Counter, Idx, Rate) + end; + _ -> + ?LOG(error, "can't find the bucket:~p which type is:~p~n", [Bucket, Type]), + throw("invalid bucket") + end. + +-spec info(limiter_type(), atom()) -> term(). +info(Type, Info) -> + ?CALL(Type, Info). + +-spec name(limiter_type()) -> atom(). +name(Type) -> + erlang:list_to_atom(io_lib:format("~s_~s", [?MODULE, Type])). + +%%-------------------------------------------------------------------- +%% @doc +%% Starts the server +%% @end +%%-------------------------------------------------------------------- +-spec start_link(limiter_type()) -> _. +start_link(Type) -> + gen_server:start_link({local, name(Type)}, ?MODULE, [Type], []). + +%%-------------------------------------------------------------------- +%%% gen_server callbacks +%%-------------------------------------------------------------------- + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Initializes the server +%% @end +%%-------------------------------------------------------------------- +-spec init(Args :: term()) -> {ok, State :: term()} | + {ok, State :: term(), Timeout :: timeout()} | + {ok, State :: term(), hibernate} | + {stop, Reason :: term()} | + ignore. +init([Type]) -> + State = #state{zones = #{}, + nodes = #{}, + type = Type, + index = 1}, + State2 = init_tree(Type, State), + oscillate(State2#state.root#root.period), + {ok, State2}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling call messages +%% @end +%%-------------------------------------------------------------------- +-spec handle_call(Request :: term(), From :: {pid(), term()}, State :: term()) -> + {reply, Reply :: term(), NewState :: term()} | + {reply, Reply :: term(), NewState :: term(), Timeout :: timeout()} | + {reply, Reply :: term(), NewState :: term(), hibernate} | + {noreply, NewState :: term()} | + {noreply, NewState :: term(), Timeout :: timeout()} | + {noreply, NewState :: term(), hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: term()} | + {stop, Reason :: term(), NewState :: term()}. +handle_call(Req, _From, State) -> + ?LOG(error, "Unexpected call: ~p", [Req]), + {reply, ignored, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling cast messages +%% @end +%%-------------------------------------------------------------------- +-spec handle_cast(Request :: term(), State :: term()) -> + {noreply, NewState :: term()} | + {noreply, NewState :: term(), Timeout :: timeout()} | + {noreply, NewState :: term(), hibernate} | + {stop, Reason :: term(), NewState :: term()}. +handle_cast(Req, State) -> + ?LOG(error, "Unexpected cast: ~p", [Req]), + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling all non call/cast messages +%% @end +%%-------------------------------------------------------------------- +-spec handle_info(Info :: timeout() | term(), State :: term()) -> + {noreply, NewState :: term()} | + {noreply, NewState :: term(), Timeout :: timeout()} | + {noreply, NewState :: term(), hibernate} | + {stop, Reason :: normal | term(), NewState :: term()}. +handle_info(oscillate, State) -> + {noreply, oscillation(State)}; + +handle_info(Info, State) -> + ?LOG(error, "Unexpected info: ~p", [Info]), + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +%% @end +%%-------------------------------------------------------------------- +-spec terminate(Reason :: normal | shutdown | {shutdown, term()} | term(), + State :: term()) -> any(). +terminate(_Reason, _State) -> + ok. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Convert process state when code is changed +%% @end +%%-------------------------------------------------------------------- +-spec code_change(OldVsn :: term() | {down, term()}, + State :: term(), + Extra :: term()) -> {ok, NewState :: term()} | + {error, Reason :: term()}. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% This function is called for changing the form and appearance +%% of gen_server status when it is returned from sys:get_status/1,2 +%% or when it appears in termination error logs. +%% @end +%%-------------------------------------------------------------------- +-spec format_status(Opt :: normal | terminate, + Status :: list()) -> Status :: term(). +format_status(_Opt, Status) -> + Status. + +%%-------------------------------------------------------------------- +%%% Internal functions +%%-------------------------------------------------------------------- +oscillate(Interval) -> + erlang:send_after(Interval, self(), ?FUNCTION_NAME). + +%% @doc generate tokens, and then spread to leaf nodes +-spec oscillation(state()) -> state(). +oscillation(#state{root = #root{rate = Flow, + period = Interval, + childs = ChildIds, + consumed = Consumed} = Root, + nodes = Nodes} = State) -> + oscillate(Interval), + Childs = get_orderd_childs(ChildIds, Nodes), + {Alloced, Nodes2} = transverse(Childs, Flow, 0, Nodes), + State#state{nodes = Nodes2, + root = Root#root{consumed = Consumed + Alloced}}. + +%% @doc horizontal spread +-spec transverse(list(node_data()), + flow(), + non_neg_integer(), + nodes()) -> {non_neg_integer(), nodes()}. +transverse([H | T], InFlow, Alloced, Nodes) when InFlow > 0 -> + {NodeAlloced, Nodes2} = longitudinal(H, InFlow, Nodes), + InFlow2 = sub(InFlow, NodeAlloced), + Alloced2 = Alloced + NodeAlloced, + transverse(T, InFlow2, Alloced2, Nodes2); + +transverse(_, _, Alloced, Nodes) -> + {Alloced, Nodes}. + +%% @doc vertical spread +-spec longitudinal(node_data(), flow(), nodes()) -> + {non_neg_integer(), nodes()}. +longitudinal(#zone{id = Id, + rate = Rate, + obtained = Obtained, + childs = ChildIds} = Node, InFlow, Nodes) -> + Flow = erlang:min(InFlow, Rate), + + if Flow > 0 -> + Childs = get_orderd_childs(ChildIds, Nodes), + {Alloced, Nodes2} = transverse(Childs, Flow, 0, Nodes), + if Alloced > 0 -> + {Alloced, + Nodes2#{Id => Node#zone{obtained = Obtained + Alloced}}}; + true -> + %% childs are empty or all counter childs are full + {0, Nodes} + end; + true -> + {0, Nodes} + end; + +longitudinal(#bucket{id = Id, + rate = Rate, + capacity = Capacity, + correction = Correction, + counter = Counter, + index = Index, + obtained = Obtained} = Node, InFlow, Nodes) -> + Flow = add(erlang:min(InFlow, Rate), Correction), + + Tokens = counters:get(Counter, Index), + %% toknes's value mayb be a negative value(stolen from the future) + Avaiable = erlang:min(if Tokens < 0 -> + add(Capacity, Tokens); + true -> + sub(Capacity, Tokens) + end, Flow), + FixAvaiable = erlang:min(Capacity, Avaiable), + if FixAvaiable > 0 -> + {Alloced, Decimal} = add_to_counter(Counter, Index, FixAvaiable), + + {Alloced, + Nodes#{Id => Node#bucket{obtained = Obtained + Alloced, + correction = Decimal}}}; + true -> + {0, Nodes} + end. + +-spec get_orderd_childs(list(node_id()), nodes()) -> list(node_data()). +get_orderd_childs(Ids, Nodes) -> + Childs = [maps:get(Id, Nodes) || Id <- Ids], + + %% sort by obtained, avoid node goes hungry + lists:sort(fun(A, B) -> + ?GET_FIELD(?FIELD_OBTAINED, A) < ?GET_FIELD(?FIELD_OBTAINED, B) + end, + Childs). + +-spec init_tree(emqx_limiter_schema:limiter_type(), state()) -> state(). +init_tree(Type, State) -> + #{global := Global, + zone := Zone, + bucket := Bucket} = emqx:get_config([emqx_limiter, Type]), + {Factor, Root} = make_root(Global, Zone), + State2 = State#state{root = Root}, + {NodeId, State3} = make_zone(maps:to_list(Zone), Factor, 1, State2), + State4 = State3#state{counter = counters:new(maps:size(Bucket), + [write_concurrency])}, + make_bucket(maps:to_list(Bucket), Factor, NodeId, State4). + +-spec make_root(decimal(), hocon:config()) -> {number(), root()}. +make_root(Rate, Zone) -> + ZoneNum = maps:size(Zone), + Childs = lists:seq(1, ZoneNum), + MiniPeriod = emqx_limiter_schema:minimum_period(), + if Rate >= 1 -> + {1, #root{rate = Rate, + period = MiniPeriod, + childs = Childs, + consumed = 0}}; + true -> + Factor = 1 / Rate, + {Factor, #root{rate = 1, + period = erlang:floor(Factor * MiniPeriod), + childs = Childs, + consumed = 0}} + end. + +make_zone([{Name, Rate} | T], Factor, NodeId, State) -> + #state{zones = Zones, nodes = Nodes} = State, + Zone = #zone{id = NodeId, + name = Name, + rate = mul(Rate, Factor), + obtained = 0, + childs = []}, + State2 = State#state{zones = Zones#{Name => NodeId}, + nodes = Nodes#{NodeId => Zone}}, + make_zone(T, Factor, NodeId + 1, State2); + +make_zone([], _, NodeId, State2) -> + {NodeId, State2}. + +make_bucket([{Name, Conf} | T], Factor, NodeId, State) -> + #{zone := ZoneName, + aggregated := [Rate, Capacity]} = Conf, + {Counter, Idx, State2} = alloc_counter(ZoneName, Name, Rate, State), + Node = #bucket{ id = NodeId + , name = Name + , rate = mul(Rate, Factor) + , obtained = 0 + , correction = 0 + , capacity = Capacity + , counter = Counter + , index = Idx}, + State3 = add_zone_child(NodeId, Node, ZoneName, State2), + make_bucket(T, Factor, NodeId + 1, State3); + +make_bucket([], _, _, State) -> + State. + +-spec alloc_counter(zone_name(), bucket_name(), rate(), state()) -> + {counters:counters_ref(), pos_integer(), state()}. +alloc_counter(Zone, Bucket, Rate, + #state{type = Type, counter = Counter, index = Index} = State) -> + Path = emqx_limiter_manager:make_path(Type, Zone, Bucket), + case emqx_limiter_manager:find_counter(Path) of + undefined -> + init_counter(Path, Counter, Index, + Rate, State#state{index = Index + 1}); + {ok, ECounter, EIndex, _} -> + init_counter(Path, ECounter, EIndex, Rate, State) + end. + +init_counter(Path, Counter, Index, Rate, State) -> + _ = put_to_counter(Counter, Index, 0), + emqx_limiter_manager:insert_counter(Path, Counter, Index, Rate), + {Counter, Index, State}. + +-spec add_zone_child(node_id(), bucket(), zone_name(), state()) -> state(). +add_zone_child(NodeId, Bucket, Name, #state{zones = Zones, nodes = Nodes} = State) -> + ZoneId = maps:get(Name, Zones), + #zone{childs = Childs} = Zone = maps:get(ZoneId, Nodes), + Nodes2 = Nodes#{ZoneId => Zone#zone{childs = [NodeId | Childs]}, + NodeId => Bucket}, + State#state{nodes = Nodes2}. diff --git a/apps/emqx_limiter/src/emqx_limiter_server_sup.erl b/apps/emqx_limiter/src/emqx_limiter_server_sup.erl new file mode 100644 index 000000000..56a2dd2dc --- /dev/null +++ b/apps/emqx_limiter/src/emqx_limiter_server_sup.erl @@ -0,0 +1,94 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_limiter_server_sup). + +-behaviour(supervisor). + +%% API +-export([start_link/0, start/1, restart/1]). + +%% Supervisor callbacks +-export([init/1]). + +%%--================================================================== +%% API functions +%%--================================================================== + +%%-------------------------------------------------------------------- +%% @doc +%% Starts the supervisor +%% @end +%%-------------------------------------------------------------------- +-spec start_link() -> {ok, Pid :: pid()} | + {error, {already_started, Pid :: pid()}} | + {error, {shutdown, term()}} | + {error, term()} | + ignore. +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +-spec start(emqx_limiter_schema:limiter_type()) -> _. +start(Type) -> + Spec = make_child(Type), + supervisor:start_child(?MODULE, Spec). + +-spec restart(emqx_limiter_schema:limiter_type()) -> _. +restart(Type) -> + Id = emqx_limiter_server:name(Type), + _ = supervisor:terminate_child(?MODULE, Id), + supervisor:restart_child(?MODULE, Id). + +%%--================================================================== +%% Supervisor callbacks +%%--================================================================== + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Whenever a supervisor is started using supervisor:start_link/[2,3], +%% this function is called by the new process to find out about +%% restart strategy, maximum restart intensity, and child +%% specifications. +%% @end +%%-------------------------------------------------------------------- +-spec init(Args :: term()) -> + {ok, {SupFlags :: supervisor:sup_flags(), + [ChildSpec :: supervisor:child_spec()]}} | + ignore. +init([]) -> + SupFlags = #{strategy => one_for_one, + intensity => 10, + period => 3600}, + + {ok, {SupFlags, childs()}}. + +%%--================================================================== +%% Internal functions +%%--================================================================== +make_child(Type) -> + Id = emqx_limiter_server:name(Type), + #{id => Id, + start => {emqx_limiter_server, start_link, [Type]}, + restart => transient, + shutdown => 5000, + type => worker, + modules => [emqx_limiter_server]}. + +childs() -> + Conf = emqx:get_config([emqx_limiter]), + Types = maps:keys(Conf), + [make_child(Type) || Type <- Types]. diff --git a/apps/emqx_limiter/src/emqx_limiter_sup.erl b/apps/emqx_limiter/src/emqx_limiter_sup.erl new file mode 100644 index 000000000..957f053af --- /dev/null +++ b/apps/emqx_limiter/src/emqx_limiter_sup.erl @@ -0,0 +1,76 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_limiter_sup). + +-behaviour(supervisor). + +%% API +-export([start_link/0]). + +%% Supervisor callbacks +-export([init/1]). + +%%-------------------------------------------------------------------- +%% API functions +%%-------------------------------------------------------------------- + +%%-------------------------------------------------------------------- +%% @doc +%% Starts the supervisor +%% @end +%%-------------------------------------------------------------------- +-spec start_link() -> {ok, Pid :: pid()} | + {error, {already_started, Pid :: pid()}} | + {error, {shutdown, term()}} | + {error, term()} | + ignore. +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +%%-------------------------------------------------------------------- +%% Supervisor callbacks +%%-------------------------------------------------------------------- + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Whenever a supervisor is started using supervisor:start_link/[2,3], +%% this function is called by the new process to find out about +%% restart strategy, maximum restart intensity, and child +%% specifications. +%% @end +%%-------------------------------------------------------------------- +-spec init(Args :: term()) -> + {ok, {SupFlags :: supervisor:sup_flags(), + [ChildSpec :: supervisor:child_spec()]}} | + ignore. +init([]) -> + SupFlags = #{strategy => one_for_one, + intensity => 10, + period => 3600}, + + Childs = [ make_child(emqx_limiter_manager, worker) + , make_child(emqx_limiter_server_sup, supervisor)], + + {ok, {SupFlags, Childs}}. + +make_child(Mod, Type) -> + #{id => Mod, + start => {Mod, start_link, []}, + restart => transient, + type => Type, + modules => [Mod]}. diff --git a/apps/emqx_limiter/test/emqx_limiter_SUITE.erl b/apps/emqx_limiter/test/emqx_limiter_SUITE.erl new file mode 100644 index 000000000..411c80fab --- /dev/null +++ b/apps/emqx_limiter/test/emqx_limiter_SUITE.erl @@ -0,0 +1,272 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_limiter_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-define(APP, emqx_limiter). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +-define(BASE_CONF, <<""" +emqx_limiter { + bytes_in {global = \"100KB/10s\" + zone.default = \"100kB/10s\" + zone.external = \"20kB/10s\" + bucket.tcp {zone = default + aggregated = \"100kB/10s,1Mb\" + per_client = \"100KB/10s,10Kb\"} + bucket.ssl {zone = external + aggregated = \"100kB/10s,1Mb\" + per_client = \"100KB/10s,10Kb\"} + } + + message_in {global = \"100/10s\" + zone.default = \"100/10s\" + bucket.bucket1 {zone = default + aggregated = \"100/10s,1000\" + per_client = \"100/10s,100\"} + } + + connection {global = \"100/10s\" + zone.default = \"100/10s\" + bucket.bucket1 {zone = default + aggregated = \"100/10s,100\" + per_client = \"100/10s,10\" + } + } + + message_routing {global = \"100/10s\" + zone.default = \"100/10s\" + bucket.bucket1 {zone = default + aggregated = \"100/10s,100\" + per_client = \"100/10s,10\" + } + } +}""">>). + +-define(LOGT(Format, Args), ct:pal("TEST_SUITE: " ++ Format, Args)). + +-record(client_options, { interval :: non_neg_integer() + , per_cost :: non_neg_integer() + , type :: atom() + , bucket :: atom() + , lifetime :: non_neg_integer() + , rates :: list(tuple()) + }). + +-record(client_state, { client :: emqx_limiter_client:limiter() + , pid :: pid() + , got :: non_neg_integer() + , options :: #client_options{}}). + +%%-------------------------------------------------------------------- +%% Setups +%%-------------------------------------------------------------------- +all() -> emqx_ct:all(?MODULE). + +init_per_suite(Config) -> + ok = emqx_config:init_load(emqx_limiter_schema, ?BASE_CONF), + emqx_ct_helpers:start_apps([?APP]), + Config. + +end_per_suite(_Config) -> + emqx_ct_helpers:stop_apps([?APP]). + +init_per_testcase(_TestCase, Config) -> + Config. + +%%-------------------------------------------------------------------- +%% Test Cases +%%-------------------------------------------------------------------- +t_un_overload(_) -> + Conf = emqx:get_config([emqx_limiter]), + Conn = #{global => to_rate("infinity"), + zone => #{z1 => to_rate("1000/1s"), + z2 => to_rate("1000/1s")}, + bucket => #{b1 => #{zone => z1, + aggregated => to_bucket_rate("100/1s, 500"), + per_client => to_bucket_rate("10/1s, 50")}, + b2 => #{zone => z2, + aggregated => to_bucket_rate("500/1s, 500"), + per_client => to_bucket_rate("100/1s, infinity") + }}}, + Conf2 = Conf#{connection => Conn}, + emqx_config:put([emqx_limiter], Conf2), + {ok, _} = emqx_limiter_manager:restart_server(connection), + + timer:sleep(200), + + B1C = #client_options{interval = 100, + per_cost = 1, + type = connection, + bucket = b1, + lifetime = timer:seconds(3), + rates = [{fun erlang:'=<'/2, ["1000/1s", "100/1s"]}, + {fun erlang:'=:='/2, ["10/1s"]}]}, + + B2C = #client_options{interval = 100, + per_cost = 10, + type = connection, + bucket = b2, + lifetime = timer:seconds(3), + rates = [{fun erlang:'=<'/2, ["1000/1s", "500/1s"]}, + {fun erlang:'=:='/2, ["100/1s"]}]}, + + lists:foreach(fun(_) -> start_client(B1C) end, + lists:seq(1, 10)), + + + lists:foreach(fun(_) -> start_client(B2C) end, + lists:seq(1, 5)), + + ?assert(check_client_result(10 + 5)). + +t_infinity(_) -> + Conf = emqx:get_config([emqx_limiter]), + Conn = #{global => to_rate("infinity"), + zone => #{z1 => to_rate("1000/1s"), + z2 => to_rate("infinity")}, + bucket => #{b1 => #{zone => z1, + aggregated => to_bucket_rate("100/1s, infinity"), + per_client => to_bucket_rate("10/1s, 100")}, + b2 => #{zone => z2, + aggregated => to_bucket_rate("infinity, 600"), + per_client => to_bucket_rate("100/1s, infinity") + }}}, + Conf2 = Conf#{connection => Conn}, + emqx_config:put([emqx_limiter], Conf2), + {ok, _} = emqx_limiter_manager:restart_server(connection), + + timer:sleep(200), + + B1C = #client_options{interval = 100, + per_cost = 1, + type = connection, + bucket = b1, + lifetime = timer:seconds(3), + rates = [{fun erlang:'=<'/2, ["1000/1s", "100/1s"]}, + {fun erlang:'=:='/2, ["10/1s"]}]}, + + B2C = #client_options{interval = 100, + per_cost = 10, + type = connection, + bucket = b2, + lifetime = timer:seconds(3), + rates = [{fun erlang:'=:='/2, ["100/1s"]}]}, + + lists:foreach(fun(_) -> start_client(B1C) end, + lists:seq(1, 8)), + + lists:foreach(fun(_) -> start_client(B2C) end, + lists:seq(1, 4)), + + ?assert(check_client_result(8 + 4)). + +%%-------------------------------------------------------------------- +%%% Internal functions +%%-------------------------------------------------------------------- +start_client(Opts) -> + Pid = self(), + erlang:spawn(fun() -> enter_client(Opts, Pid) end). + +enter_client(#client_options{type = Type, + bucket = Bucket, + lifetime = Lifetime} = Opts, + Pid) -> + erlang:send_after(Lifetime, self(), stop), + erlang:send(self(), consume), + Client = emqx_limiter_server:connect(Type, Bucket), + client_loop(#client_state{client = Client, + pid = Pid, + got = 0, + options = Opts}). + +client_loop(#client_state{client = Client, + got = Got, + pid = Pid, + options = #client_options{interval = Interval, + per_cost = PerCost, + lifetime = Lifetime, + rates = Rates}} = State) -> + receive + consume -> + case emqx_limiter_client:consume(PerCost, Client) of + {ok, Client2} -> + erlang:send_after(Interval, self(), consume), + client_loop(State#client_state{client = Client2, + got = Got + PerCost}); + {pause, MS, Client2} -> + erlang:send_after(MS, self(), {resume, erlang:system_time(millisecond)}), + client_loop(State#client_state{client = Client2}) + end; + stop -> + Rate = Got * emqx_limiter_schema:minimum_period() / Lifetime, + ?LOGT("Got:~p, Rate is:~p Checks:~p~n", [Got, Rate, Rate]), + Check = check_rates(Rate, Rates), + erlang:send(Pid, {client, Check}); + {resume, Begin} -> + case emqx_limiter_client:consume(PerCost, Client) of + {ok, Client2} -> + Now = erlang:system_time(millisecond), + Diff = erlang:max(0, Interval - (Now - Begin)), + erlang:send_after(Diff, self(), consume), + client_loop(State#client_state{client = Client2, + got = Got + PerCost}); + {pause, MS, Client2} -> + erlang:send_after(MS, self(), {resume, Begin}), + client_loop(State#client_state{client = Client2}) + end + end. + +check_rates(Rate, [{Fun, Rates} | T]) -> + case lists:all(fun(E) -> Fun(Rate, to_rate(E)) end, Rates) of + true -> + check_rates(Rate, T); + false -> + false + end; +check_rates(_, _) -> + true. + +check_client_result(0) -> + true; + +check_client_result(N) -> + ?LOGT("check_client_result:~p~n", [N]), + receive + {client, true} -> + check_client_result(N - 1); + {client, false} -> + false; + Any -> + ?LOGT(">>>> other:~p~n", [Any]) + + after 3500 -> + ?LOGT(">>>> timeout~n", []), + false + end. + +to_rate(Str) -> + {ok, Rate} = emqx_limiter_schema:to_rate(Str), + Rate. + +to_bucket_rate(Str) -> + {ok, Result} = emqx_limiter_schema:to_bucket_rate(Str), + Result. diff --git a/apps/emqx_machine/src/emqx_machine_schema.erl b/apps/emqx_machine/src/emqx_machine_schema.erl index 369d7b3c5..a124166e5 100644 --- a/apps/emqx_machine/src/emqx_machine_schema.erl +++ b/apps/emqx_machine/src/emqx_machine_schema.erl @@ -54,6 +54,7 @@ , emqx_rule_engine_schema , emqx_exhook_schema , emqx_psk_schema + , emqx_limiter_schema ]). namespace() -> undefined. diff --git a/rebar.config.erl b/rebar.config.erl index 6f56ce1d2..0d9503a3c 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -279,6 +279,7 @@ relx_apps(ReleaseType) -> , emqx_statsd , emqx_prometheus , emqx_psk + , emqx_limiter ] ++ [quicer || is_quicer_supported()] ++ [emqx_license || is_enterprise()]