fix(limiter): remove unused rate limit codes

This commit is contained in:
firest 2022-05-07 11:45:19 +08:00
parent b059eeda0a
commit ba683167bd
4 changed files with 27 additions and 322 deletions

View File

@ -1,184 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2019-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% Ratelimit or Quota checker
-module(emqx_limiter).
-include("types.hrl").
-export([
init/2,
%% XXX: Compatible with before 4.2 version
init/4,
info/1,
check/2
]).
-record(limiter, {
%% Zone
zone :: atom(),
%% Checkers
checkers :: [checker()]
}).
-type checker() :: #{
name := name(),
capacity := non_neg_integer(),
interval := non_neg_integer(),
consumer := esockd_rate_limit:bucket() | atom()
}.
-type name() ::
conn_bytes_in
| conn_messages_in
| conn_messages_routing
| overall_messages_routing.
-type policy() :: [{name(), esockd_rate_limit:config()}].
-type info() :: #{
name() :=
#{
tokens := non_neg_integer(),
capacity := non_neg_integer(),
interval := non_neg_integer()
}
}.
-type limiter() :: #limiter{}.
-dialyzer({nowarn_function, [consume/3]}).
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
-spec init(
atom(),
maybe(esockd_rate_limit:config()),
maybe(esockd_rate_limit:config()),
policy()
) ->
maybe(limiter()).
init(Zone, PubLimit, BytesIn, Specs) ->
Merged = maps:merge(
#{
conn_messages_in => PubLimit,
conn_bytes_in => BytesIn
},
maps:from_list(Specs)
),
Filtered = maps:filter(fun(_, V) -> V /= undefined end, Merged),
init(Zone, maps:to_list(Filtered)).
-spec init(atom(), policy()) -> maybe(limiter()).
init(_Zone, []) ->
undefined;
init(Zone, Specs) ->
#limiter{zone = Zone, checkers = [do_init_checker(Zone, Spec) || Spec <- Specs]}.
%% @private
do_init_checker(Zone, {Name, {Capacity, Interval}}) ->
Ck = #{name => Name, capacity => Capacity, interval => Interval},
case is_overall_limiter(Name) of
true ->
case catch esockd_limiter:lookup({Zone, Name}) of
_Info when is_map(_Info) ->
ignore;
_ ->
esockd_limiter:create({Zone, Name}, Capacity, Interval)
end,
Ck#{consumer => Zone};
_ ->
Ck#{consumer => esockd_rate_limit:new(Capacity / Interval, Capacity)}
end.
-spec info(limiter()) -> info().
info(#limiter{zone = Zone, checkers = Cks}) ->
maps:from_list([get_info(Zone, Ck) || Ck <- Cks]).
-spec check(
#{
cnt := Cnt :: non_neg_integer(),
oct := Oct :: non_neg_integer()
},
Limiter :: limiter()
) ->
{ok, NLimiter :: limiter()}
| {pause, MilliSecs :: non_neg_integer(), NLimiter :: limiter()}.
check(#{cnt := Cnt, oct := Oct}, Limiter = #limiter{checkers = Cks}) ->
{Pauses, NCks} = do_check(Cnt, Oct, Cks, [], []),
case lists:max(Pauses) of
I when I > 0 ->
{pause, I, Limiter#limiter{checkers = NCks}};
_ ->
{ok, Limiter#limiter{checkers = NCks}}
end.
%% @private
do_check(_, _, [], Pauses, NCks) ->
{Pauses, lists:reverse(NCks)};
do_check(Pubs, Bytes, [Ck | More], Pauses, Acc) ->
{I, NConsumer} = consume(Pubs, Bytes, Ck),
do_check(Pubs, Bytes, More, [I | Pauses], [Ck#{consumer := NConsumer} | Acc]).
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------
consume(Pubs, Bytes, #{name := Name, consumer := Cons}) ->
Tokens =
case is_message_limiter(Name) of
true -> Pubs;
_ -> Bytes
end,
case Tokens =:= 0 of
true ->
{0, Cons};
_ ->
case is_overall_limiter(Name) of
true ->
{_, Intv} = esockd_limiter:consume({Cons, Name}, Tokens),
{Intv, Cons};
_ ->
esockd_rate_limit:check(Tokens, Cons)
end
end.
get_info(Zone, #{
name := Name,
capacity := Cap,
interval := Intv,
consumer := Cons
}) ->
Info =
case is_overall_limiter(Name) of
true -> esockd_limiter:lookup({Zone, Name});
_ -> esockd_rate_limit:info(Cons)
end,
{Name, #{
capacity => Cap,
interval => Intv,
tokens => maps:get(tokens, Info)
}}.
is_overall_limiter(overall_messages_routing) -> true;
is_overall_limiter(_) -> false.
is_message_limiter(conn_messages_in) -> true;
is_message_limiter(conn_messages_routing) -> true;
is_message_limiter(overall_messages_routing) -> true;
is_message_limiter(_) -> false.

View File

@ -147,11 +147,6 @@ roots(medium) ->
ref("sys_topics"), ref("sys_topics"),
#{desc => ?DESC(sys_topics)} #{desc => ?DESC(sys_topics)}
)}, )},
{"rate_limit",
sc(
ref("rate_limit"),
#{}
)},
{"force_shutdown", {"force_shutdown",
sc( sc(
ref("force_shutdown"), ref("force_shutdown"),
@ -545,33 +540,6 @@ fields("mqtt") ->
fields("zone") -> fields("zone") ->
Fields = emqx_zone_schema:roots(), Fields = emqx_zone_schema:roots(),
[{F, ref(emqx_zone_schema, F)} || F <- Fields]; [{F, ref(emqx_zone_schema, F)} || F <- Fields];
fields("rate_limit") ->
[
{"max_conn_rate",
sc(
hoconsc:union([infinity, integer()]),
#{
default => 1000,
desc => ?DESC(fields_rate_limit_max_conn_rate)
}
)},
{"conn_messages_in",
sc(
hoconsc:union([infinity, comma_separated_list()]),
#{
default => infinity,
desc => ?DESC(fields_rate_limit_conn_messages_in)
}
)},
{"conn_bytes_in",
sc(
hoconsc:union([infinity, comma_separated_list()]),
#{
default => infinity,
desc => ?DESC(fields_rate_limit_conn_bytes_in)
}
)}
];
fields("flapping_detect") -> fields("flapping_detect") ->
[ [
{"enable", {"enable",
@ -1602,8 +1570,6 @@ desc("zone") ->
" - `force_shutdown.*`\n" " - `force_shutdown.*`\n"
" - `conn_congestion.*`\n" " - `conn_congestion.*`\n"
" - `force_gc.*`\n\n"; " - `force_gc.*`\n\n";
desc("rate_limit") ->
"Rate limit settings.";
desc("flapping_detect") -> desc("flapping_detect") ->
"This config controls the allowed maximum number of `CONNECT` packets received\n" "This config controls the allowed maximum number of `CONNECT` packets received\n"
"from the same clientid in a time frame defined by `window_time`.\n" "from the same clientid in a time frame defined by `window_time`.\n"

View File

@ -1,83 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2019-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_limiter_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
%%--------------------------------------------------------------------
%% Setups
%%--------------------------------------------------------------------
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_testcase(_, Cfg) ->
_ = esockd_limiter:start_link(),
Cfg.
end_per_testcase(_, _) ->
esockd_limiter:stop().
%%--------------------------------------------------------------------
%% Cases
%%--------------------------------------------------------------------
t_init(_) ->
Cap1 = 1000,
Intv1 = 10,
Cap2 = 2000,
Intv2 = 15,
undefined = emqx_limiter:init(external, undefined, undefined, []),
#{
conn_bytes_in := #{capacity := Cap2, interval := Intv2, tokens := Cap2},
conn_messages_in := #{capacity := Cap1, interval := Intv1, tokens := Cap1}
} =
emqx_limiter:info(
emqx_limiter:init(external, {Cap1, Intv1}, {Cap2, Intv2}, [])
),
#{conn_bytes_in := #{capacity := Cap2, interval := Intv2, tokens := Cap2}} =
emqx_limiter:info(
emqx_limiter:init(external, undefined, {Cap1, Intv1}, [{conn_bytes_in, {Cap2, Intv2}}])
).
t_check_conn(_) ->
Limiter = emqx_limiter:init(external, [{conn_bytes_in, {100, 1}}]),
{ok, Limiter2} = emqx_limiter:check(#{cnt => 0, oct => 1}, Limiter),
#{conn_bytes_in := #{tokens := 99}} = emqx_limiter:info(Limiter2),
{pause, 10, Limiter3} = emqx_limiter:check(#{cnt => 0, oct => 100}, Limiter),
#{conn_bytes_in := #{tokens := 0}} = emqx_limiter:info(Limiter3),
{pause, 100000, Limiter4} = emqx_limiter:check(#{cnt => 0, oct => 10000}, Limiter3),
#{conn_bytes_in := #{tokens := 0}} = emqx_limiter:info(Limiter4).
t_check_overall(_) ->
Limiter = emqx_limiter:init(external, [{overall_messages_routing, {100, 1}}]),
{ok, Limiter2} = emqx_limiter:check(#{cnt => 1, oct => 0}, Limiter),
#{overall_messages_routing := #{tokens := 99}} = emqx_limiter:info(Limiter2),
%% XXX: P = 1/r = 1/100 * 1000 = 10ms ?
{pause, _, Limiter3} = emqx_limiter:check(#{cnt => 100, oct => 0}, Limiter),
#{overall_messages_routing := #{tokens := 0}} = emqx_limiter:info(Limiter2),
%% XXX: P = 10000/r = 10000/100 * 1000 = 100s ?
{pause, _, Limiter4} = emqx_limiter:check(#{cnt => 10000, oct => 0}, Limiter3),
#{overall_messages_routing := #{tokens := 0}} = emqx_limiter:info(Limiter4).

View File

@ -63,7 +63,7 @@
%% The {active, N} option %% The {active, N} option
active_n :: pos_integer(), active_n :: pos_integer(),
%% Limiter %% Limiter
limiter :: maybe(emqx_limiter:limiter()), limiter :: maybe(emqx_htb_limiter:limiter()),
%% Limit Timer %% Limit Timer
limit_timer :: maybe(reference()), limit_timer :: maybe(reference()),
%% Parse State %% Parse State
@ -277,7 +277,7 @@ init_state(WrappedSock, Peername, Options, FrameMod, ChannMod) ->
conn_mod => ?MODULE conn_mod => ?MODULE
}, },
ActiveN = emqx_gateway_utils:active_n(Options), ActiveN = emqx_gateway_utils:active_n(Options),
%% FIXME: %% FIXME: TODO
%%Limiter = emqx_limiter:init(Options), %%Limiter = emqx_limiter:init(Options),
Limiter = undefined, Limiter = undefined,
FrameOpts = emqx_gateway_utils:frame_options(Options), FrameOpts = emqx_gateway_utils:frame_options(Options),
@ -883,25 +883,31 @@ handle_info(Info, State) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Ensure rate limit %% Ensure rate limit
ensure_rate_limit(Stats, State = #state{limiter = Limiter}) -> %% ensure_rate_limit(Stats, State = #state{limiter = Limiter}) ->
case ?ENABLED(Limiter) andalso emqx_limiter:check(Stats, Limiter) of %% case ?ENABLED(Limiter) andalso emqx_limiter:check(Stats, Limiter) of
false -> %% false ->
State; %% State;
{ok, Limiter1} -> %% {ok, Limiter1} ->
State#state{limiter = Limiter1}; %% State#state{limiter = Limiter1};
{pause, Time, Limiter1} -> %% {pause, Time, Limiter1} ->
%% XXX: which limiter reached? %% %% XXX: which limiter reached?
?SLOG(warning, #{ %% ?SLOG(warning, #{
msg => "reach_rate_limit", %% msg => "reach_rate_limit",
pause => Time %% pause => Time
}), %% }),
TRef = emqx_misc:start_timer(Time, limit_timeout), %% TRef = emqx_misc:start_timer(Time, limit_timeout),
State#state{ %% State#state{
sockstate = blocked, %% sockstate = blocked,
limiter = Limiter1, %% limiter = Limiter1,
limit_timer = TRef %% limit_timer = TRef
} %% }
end. %% end.
%% TODO
%% Why do we need this?
%% Why not use the esockd connection limiter (based on emqx_htb_limiter) directly?
ensure_rate_limit(_Stats, State) ->
State.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Run GC and Check OOM %% Run GC and Check OOM