diff --git a/apps/emqx/src/emqx_limiter.erl b/apps/emqx/src/emqx_limiter.erl deleted file mode 100644 index f8874234a..000000000 --- a/apps/emqx/src/emqx_limiter.erl +++ /dev/null @@ -1,184 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2019-2022 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - -%% Ratelimit or Quota checker --module(emqx_limiter). - --include("types.hrl"). - --export([ - init/2, - %% XXX: Compatible with before 4.2 version - init/4, - info/1, - check/2 -]). - --record(limiter, { - %% Zone - zone :: atom(), - %% Checkers - checkers :: [checker()] -}). - --type checker() :: #{ - name := name(), - capacity := non_neg_integer(), - interval := non_neg_integer(), - consumer := esockd_rate_limit:bucket() | atom() -}. - --type name() :: - conn_bytes_in - | conn_messages_in - | conn_messages_routing - | overall_messages_routing. - --type policy() :: [{name(), esockd_rate_limit:config()}]. - --type info() :: #{ - name() := - #{ - tokens := non_neg_integer(), - capacity := non_neg_integer(), - interval := non_neg_integer() - } -}. - --type limiter() :: #limiter{}. - --dialyzer({nowarn_function, [consume/3]}). - -%%-------------------------------------------------------------------- -%% APIs -%%-------------------------------------------------------------------- - --spec init( - atom(), - maybe(esockd_rate_limit:config()), - maybe(esockd_rate_limit:config()), - policy() -) -> - maybe(limiter()). -init(Zone, PubLimit, BytesIn, Specs) -> - Merged = maps:merge( - #{ - conn_messages_in => PubLimit, - conn_bytes_in => BytesIn - }, - maps:from_list(Specs) - ), - Filtered = maps:filter(fun(_, V) -> V /= undefined end, Merged), - init(Zone, maps:to_list(Filtered)). - --spec init(atom(), policy()) -> maybe(limiter()). -init(_Zone, []) -> - undefined; -init(Zone, Specs) -> - #limiter{zone = Zone, checkers = [do_init_checker(Zone, Spec) || Spec <- Specs]}. - -%% @private -do_init_checker(Zone, {Name, {Capacity, Interval}}) -> - Ck = #{name => Name, capacity => Capacity, interval => Interval}, - case is_overall_limiter(Name) of - true -> - case catch esockd_limiter:lookup({Zone, Name}) of - _Info when is_map(_Info) -> - ignore; - _ -> - esockd_limiter:create({Zone, Name}, Capacity, Interval) - end, - Ck#{consumer => Zone}; - _ -> - Ck#{consumer => esockd_rate_limit:new(Capacity / Interval, Capacity)} - end. - --spec info(limiter()) -> info(). -info(#limiter{zone = Zone, checkers = Cks}) -> - maps:from_list([get_info(Zone, Ck) || Ck <- Cks]). - --spec check( - #{ - cnt := Cnt :: non_neg_integer(), - oct := Oct :: non_neg_integer() - }, - Limiter :: limiter() -) -> - {ok, NLimiter :: limiter()} - | {pause, MilliSecs :: non_neg_integer(), NLimiter :: limiter()}. -check(#{cnt := Cnt, oct := Oct}, Limiter = #limiter{checkers = Cks}) -> - {Pauses, NCks} = do_check(Cnt, Oct, Cks, [], []), - case lists:max(Pauses) of - I when I > 0 -> - {pause, I, Limiter#limiter{checkers = NCks}}; - _ -> - {ok, Limiter#limiter{checkers = NCks}} - end. - -%% @private -do_check(_, _, [], Pauses, NCks) -> - {Pauses, lists:reverse(NCks)}; -do_check(Pubs, Bytes, [Ck | More], Pauses, Acc) -> - {I, NConsumer} = consume(Pubs, Bytes, Ck), - do_check(Pubs, Bytes, More, [I | Pauses], [Ck#{consumer := NConsumer} | Acc]). - -%%-------------------------------------------------------------------- -%% Internal funcs -%%-------------------------------------------------------------------- - -consume(Pubs, Bytes, #{name := Name, consumer := Cons}) -> - Tokens = - case is_message_limiter(Name) of - true -> Pubs; - _ -> Bytes - end, - case Tokens =:= 0 of - true -> - {0, Cons}; - _ -> - case is_overall_limiter(Name) of - true -> - {_, Intv} = esockd_limiter:consume({Cons, Name}, Tokens), - {Intv, Cons}; - _ -> - esockd_rate_limit:check(Tokens, Cons) - end - end. - -get_info(Zone, #{ - name := Name, - capacity := Cap, - interval := Intv, - consumer := Cons -}) -> - Info = - case is_overall_limiter(Name) of - true -> esockd_limiter:lookup({Zone, Name}); - _ -> esockd_rate_limit:info(Cons) - end, - {Name, #{ - capacity => Cap, - interval => Intv, - tokens => maps:get(tokens, Info) - }}. - -is_overall_limiter(overall_messages_routing) -> true; -is_overall_limiter(_) -> false. - -is_message_limiter(conn_messages_in) -> true; -is_message_limiter(conn_messages_routing) -> true; -is_message_limiter(overall_messages_routing) -> true; -is_message_limiter(_) -> false. diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index f8c4ad335..0f9b29927 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -147,11 +147,6 @@ roots(medium) -> ref("sys_topics"), #{desc => ?DESC(sys_topics)} )}, - {"rate_limit", - sc( - ref("rate_limit"), - #{} - )}, {"force_shutdown", sc( ref("force_shutdown"), @@ -545,33 +540,6 @@ fields("mqtt") -> fields("zone") -> Fields = emqx_zone_schema:roots(), [{F, ref(emqx_zone_schema, F)} || F <- Fields]; -fields("rate_limit") -> - [ - {"max_conn_rate", - sc( - hoconsc:union([infinity, integer()]), - #{ - default => 1000, - desc => ?DESC(fields_rate_limit_max_conn_rate) - } - )}, - {"conn_messages_in", - sc( - hoconsc:union([infinity, comma_separated_list()]), - #{ - default => infinity, - desc => ?DESC(fields_rate_limit_conn_messages_in) - } - )}, - {"conn_bytes_in", - sc( - hoconsc:union([infinity, comma_separated_list()]), - #{ - default => infinity, - desc => ?DESC(fields_rate_limit_conn_bytes_in) - } - )} - ]; fields("flapping_detect") -> [ {"enable", @@ -1602,8 +1570,6 @@ desc("zone") -> " - `force_shutdown.*`\n" " - `conn_congestion.*`\n" " - `force_gc.*`\n\n"; -desc("rate_limit") -> - "Rate limit settings."; desc("flapping_detect") -> "This config controls the allowed maximum number of `CONNECT` packets received\n" "from the same clientid in a time frame defined by `window_time`.\n" diff --git a/apps/emqx/test/emqx_limiter_SUITE.erl b/apps/emqx/test/emqx_limiter_SUITE.erl deleted file mode 100644 index 423a3d4ed..000000000 --- a/apps/emqx/test/emqx_limiter_SUITE.erl +++ /dev/null @@ -1,83 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2019-2022 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_limiter_SUITE). - --compile(export_all). --compile(nowarn_export_all). - --include_lib("eunit/include/eunit.hrl"). - -%%-------------------------------------------------------------------- -%% Setups -%%-------------------------------------------------------------------- - -all() -> emqx_common_test_helpers:all(?MODULE). - -init_per_testcase(_, Cfg) -> - _ = esockd_limiter:start_link(), - Cfg. - -end_per_testcase(_, _) -> - esockd_limiter:stop(). - -%%-------------------------------------------------------------------- -%% Cases -%%-------------------------------------------------------------------- - -t_init(_) -> - Cap1 = 1000, - Intv1 = 10, - Cap2 = 2000, - Intv2 = 15, - undefined = emqx_limiter:init(external, undefined, undefined, []), - #{ - conn_bytes_in := #{capacity := Cap2, interval := Intv2, tokens := Cap2}, - conn_messages_in := #{capacity := Cap1, interval := Intv1, tokens := Cap1} - } = - emqx_limiter:info( - emqx_limiter:init(external, {Cap1, Intv1}, {Cap2, Intv2}, []) - ), - #{conn_bytes_in := #{capacity := Cap2, interval := Intv2, tokens := Cap2}} = - emqx_limiter:info( - emqx_limiter:init(external, undefined, {Cap1, Intv1}, [{conn_bytes_in, {Cap2, Intv2}}]) - ). - -t_check_conn(_) -> - Limiter = emqx_limiter:init(external, [{conn_bytes_in, {100, 1}}]), - - {ok, Limiter2} = emqx_limiter:check(#{cnt => 0, oct => 1}, Limiter), - #{conn_bytes_in := #{tokens := 99}} = emqx_limiter:info(Limiter2), - - {pause, 10, Limiter3} = emqx_limiter:check(#{cnt => 0, oct => 100}, Limiter), - #{conn_bytes_in := #{tokens := 0}} = emqx_limiter:info(Limiter3), - - {pause, 100000, Limiter4} = emqx_limiter:check(#{cnt => 0, oct => 10000}, Limiter3), - #{conn_bytes_in := #{tokens := 0}} = emqx_limiter:info(Limiter4). - -t_check_overall(_) -> - Limiter = emqx_limiter:init(external, [{overall_messages_routing, {100, 1}}]), - - {ok, Limiter2} = emqx_limiter:check(#{cnt => 1, oct => 0}, Limiter), - #{overall_messages_routing := #{tokens := 99}} = emqx_limiter:info(Limiter2), - - %% XXX: P = 1/r = 1/100 * 1000 = 10ms ? - {pause, _, Limiter3} = emqx_limiter:check(#{cnt => 100, oct => 0}, Limiter), - #{overall_messages_routing := #{tokens := 0}} = emqx_limiter:info(Limiter2), - - %% XXX: P = 10000/r = 10000/100 * 1000 = 100s ? - {pause, _, Limiter4} = emqx_limiter:check(#{cnt => 10000, oct => 0}, Limiter3), - #{overall_messages_routing := #{tokens := 0}} = emqx_limiter:info(Limiter4). diff --git a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl index 676b024dd..9e65c7eea 100644 --- a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl +++ b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl @@ -63,7 +63,7 @@ %% The {active, N} option active_n :: pos_integer(), %% Limiter - limiter :: maybe(emqx_limiter:limiter()), + limiter :: maybe(emqx_htb_limiter:limiter()), %% Limit Timer limit_timer :: maybe(reference()), %% Parse State @@ -277,7 +277,7 @@ init_state(WrappedSock, Peername, Options, FrameMod, ChannMod) -> conn_mod => ?MODULE }, ActiveN = emqx_gateway_utils:active_n(Options), - %% FIXME: + %% FIXME: TODO %%Limiter = emqx_limiter:init(Options), Limiter = undefined, FrameOpts = emqx_gateway_utils:frame_options(Options), @@ -883,25 +883,31 @@ handle_info(Info, State) -> %%-------------------------------------------------------------------- %% Ensure rate limit -ensure_rate_limit(Stats, State = #state{limiter = Limiter}) -> - case ?ENABLED(Limiter) andalso emqx_limiter:check(Stats, Limiter) of - false -> - State; - {ok, Limiter1} -> - State#state{limiter = Limiter1}; - {pause, Time, Limiter1} -> - %% XXX: which limiter reached? - ?SLOG(warning, #{ - msg => "reach_rate_limit", - pause => Time - }), - TRef = emqx_misc:start_timer(Time, limit_timeout), - State#state{ - sockstate = blocked, - limiter = Limiter1, - limit_timer = TRef - } - end. +%% ensure_rate_limit(Stats, State = #state{limiter = Limiter}) -> +%% case ?ENABLED(Limiter) andalso emqx_limiter:check(Stats, Limiter) of +%% false -> +%% State; +%% {ok, Limiter1} -> +%% State#state{limiter = Limiter1}; +%% {pause, Time, Limiter1} -> +%% %% XXX: which limiter reached? +%% ?SLOG(warning, #{ +%% msg => "reach_rate_limit", +%% pause => Time +%% }), +%% TRef = emqx_misc:start_timer(Time, limit_timeout), +%% State#state{ +%% sockstate = blocked, +%% limiter = Limiter1, +%% limit_timer = TRef +%% } +%% end. + +%% TODO +%% Why do we need this? +%% Why not use the esockd connection limiter (based on emqx_htb_limiter) directly? +ensure_rate_limit(_Stats, State) -> + State. %%-------------------------------------------------------------------- %% Run GC and Check OOM