168 lines
5.7 KiB
Erlang
168 lines
5.7 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2019-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
%% Ratelimit or Quota checker
|
|
-module(emqx_limiter).
|
|
|
|
-include("types.hrl").
|
|
|
|
-export([ init/2
|
|
, init/4 %% XXX: Compatible with before 4.2 version
|
|
, info/1
|
|
, check/2
|
|
, update_overall_limiter/4
|
|
]).
|
|
|
|
-record(limiter, {
|
|
%% Zone
|
|
zone :: emqx_zone:zone(),
|
|
%% Checkers
|
|
checkers :: [checker()]
|
|
}).
|
|
|
|
-type(checker() :: #{ name := name()
|
|
, capacity := non_neg_integer()
|
|
, interval := non_neg_integer()
|
|
, consumer := esockd_rate_limit:bucket() | emqx_zone:zone()
|
|
}).
|
|
|
|
-type(name() :: conn_bytes_in
|
|
| conn_messages_in
|
|
| conn_messages_routing
|
|
| overall_messages_routing
|
|
).
|
|
|
|
-type(policy() :: [{name(), esockd_rate_limit:config()}]).
|
|
|
|
-type(info() :: #{name() :=
|
|
#{tokens := non_neg_integer(),
|
|
capacity := non_neg_integer(),
|
|
interval := non_neg_integer()}}).
|
|
|
|
-type(limiter() :: #limiter{}).
|
|
|
|
-dialyzer({nowarn_function, [consume/3]}).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% APIs
|
|
%%--------------------------------------------------------------------
|
|
|
|
-spec(init(emqx_zone:zone(),
|
|
maybe(esockd_rate_limit:config()),
|
|
maybe(esockd_rate_limit:config()), policy())
|
|
-> maybe(limiter())).
|
|
init(Zone, PubLimit, BytesIn, Specs) ->
|
|
Merged = maps:merge(#{conn_messages_in => PubLimit,
|
|
conn_bytes_in => BytesIn}, maps:from_list(Specs)),
|
|
Filtered = maps:filter(fun(_, V) -> V /= undefined end, Merged),
|
|
init(Zone, maps:to_list(Filtered)).
|
|
|
|
-spec(init(emqx_zone:zone(), policy()) -> maybe(limiter())).
|
|
init(_Zone, []) ->
|
|
undefined;
|
|
init(Zone, Specs) ->
|
|
#limiter{zone = Zone, checkers = [do_init_checker(Zone, Spec) || Spec <- Specs]}.
|
|
|
|
%% @private
|
|
do_init_checker(Zone, {Name, {Capacity, Interval}}) ->
|
|
Ck = #{name => Name, capacity => Capacity, interval => Interval},
|
|
case is_overall_limiter(Name) of
|
|
true ->
|
|
case catch esockd_limiter:lookup({Zone, Name}) of
|
|
_Info when is_map(_Info) ->
|
|
ignore;
|
|
_ ->
|
|
esockd_limiter:create({Zone, Name}, Capacity, Interval)
|
|
end,
|
|
Ck#{consumer => Zone};
|
|
_ ->
|
|
Ck#{consumer => esockd_rate_limit:new(Capacity / Interval, Capacity)}
|
|
end.
|
|
|
|
-spec(info(limiter()) -> info()).
|
|
info(#limiter{zone = Zone, checkers = Cks}) ->
|
|
maps:from_list([get_info(Zone, Ck) || Ck <- Cks]).
|
|
|
|
-spec(check(#{cnt := Cnt :: non_neg_integer(),
|
|
oct := Oct :: non_neg_integer()},
|
|
Limiter :: limiter())
|
|
-> {ok, NLimiter :: limiter()}
|
|
| {pause, MilliSecs :: non_neg_integer(), NLimiter :: limiter()}).
|
|
check(#{cnt := Cnt, oct := Oct}, Limiter = #limiter{checkers = Cks}) ->
|
|
{Pauses, NCks} = do_check(Cnt, Oct, Cks, [], []),
|
|
case lists:max(Pauses) of
|
|
I when I > 0 ->
|
|
{pause, I, Limiter#limiter{checkers = NCks}};
|
|
_ ->
|
|
{ok, Limiter#limiter{checkers = NCks}}
|
|
end.
|
|
|
|
%% @private
|
|
do_check(_, _, [], Pauses, NCks) ->
|
|
{Pauses, lists:reverse(NCks)};
|
|
do_check(Pubs, Bytes, [Ck|More], Pauses, Acc) ->
|
|
{I, NConsumer} = consume(Pubs, Bytes, Ck),
|
|
do_check(Pubs, Bytes, More, [I|Pauses], [Ck#{consumer := NConsumer}|Acc]).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Internal funcs
|
|
%%--------------------------------------------------------------------
|
|
|
|
consume(Pubs, Bytes, #{name := Name, consumer := Cons}) ->
|
|
Tokens = case is_message_limiter(Name) of true -> Pubs; _ -> Bytes end,
|
|
case Tokens =:= 0 of
|
|
true ->
|
|
{0, Cons};
|
|
_ ->
|
|
case is_overall_limiter(Name) of
|
|
true ->
|
|
{_, Intv} = esockd_limiter:consume({Cons, Name}, Tokens),
|
|
{Intv, Cons};
|
|
_ ->
|
|
esockd_rate_limit:check(Tokens, Cons)
|
|
end
|
|
end.
|
|
|
|
get_info(Zone, #{name := Name, capacity := Cap,
|
|
interval := Intv, consumer := Cons}) ->
|
|
Info = case is_overall_limiter(Name) of
|
|
true -> esockd_limiter:lookup({Zone, Name});
|
|
_ -> esockd_rate_limit:info(Cons)
|
|
end,
|
|
{Name, #{capacity => Cap,
|
|
interval => Intv,
|
|
tokens => maps:get(tokens, Info)}}.
|
|
|
|
is_overall_limiter(overall_messages_routing) -> true;
|
|
is_overall_limiter(_) -> false.
|
|
|
|
is_message_limiter(conn_messages_in) -> true;
|
|
is_message_limiter(conn_messages_routing) -> true;
|
|
is_message_limiter(overall_messages_routing) -> true;
|
|
is_message_limiter(_) -> false.
|
|
|
|
update_overall_limiter(Zone, Name, Capacity, Interval) ->
|
|
case is_overall_limiter(Name) of
|
|
false -> false;
|
|
_ ->
|
|
try
|
|
esockd_limiter:update({Zone, Name}, Capacity, Interval),
|
|
true
|
|
catch _:_:_ ->
|
|
false
|
|
end
|
|
end.
|