feat(ratelimit): refactor ratelimit
This commit is contained in:
parent
961e7b9818
commit
83a2af812f
|
@ -680,12 +680,6 @@ mqtt.strict_mode = false
|
||||||
## Value: duration
|
## Value: duration
|
||||||
zone.external.idle_timeout = 15s
|
zone.external.idle_timeout = 15s
|
||||||
|
|
||||||
## Publish limit for the external MQTT connections.
|
|
||||||
##
|
|
||||||
## Value: Number,Duration
|
|
||||||
## Example: 100 messages per 10 seconds.
|
|
||||||
## zone.external.publish_limit = 100,10s
|
|
||||||
|
|
||||||
## Enable ACL check.
|
## Enable ACL check.
|
||||||
##
|
##
|
||||||
## Value: Flag
|
## Value: Flag
|
||||||
|
@ -848,6 +842,28 @@ zone.external.mqueue_store_qos0 = true
|
||||||
## Value: on | off
|
## Value: on | off
|
||||||
zone.external.enable_flapping_detect = off
|
zone.external.enable_flapping_detect = off
|
||||||
|
|
||||||
|
## Message limit for the a external MQTT connection.
|
||||||
|
##
|
||||||
|
## Value: Number,Duration
|
||||||
|
## Example: 100 messages per 10 seconds.
|
||||||
|
#zone.external.conn_rate_limit.messages_in = 100, 10s
|
||||||
|
|
||||||
|
## Bytes limit for a external MQTT connections.
|
||||||
|
##
|
||||||
|
## Value: Number,Duration
|
||||||
|
## Example: 100KB incoming per 10 seconds.
|
||||||
|
#zone.external.conn_rate_limit.bytes_in = 100KB, 10s
|
||||||
|
|
||||||
|
## Message limit for the all external MQTT connections.
|
||||||
|
##
|
||||||
|
## Value: Number,Duration
|
||||||
|
#zone.external.overall_rate_limit.messages_in = 200000, 1s
|
||||||
|
|
||||||
|
## Bytes limit for the all external MQTT connections.
|
||||||
|
##
|
||||||
|
## Value: Number,Duration
|
||||||
|
#zone.external.overall_rate_limit.bytes_in = 2048MB, 1s
|
||||||
|
|
||||||
## All the topics will be prefixed with the mountpoint path if this option is enabled.
|
## All the topics will be prefixed with the mountpoint path if this option is enabled.
|
||||||
##
|
##
|
||||||
## Variables in mountpoint path:
|
## Variables in mountpoint path:
|
||||||
|
@ -1016,12 +1032,6 @@ listener.tcp.external.active_n = 100
|
||||||
## Value: String
|
## Value: String
|
||||||
listener.tcp.external.zone = external
|
listener.tcp.external.zone = external
|
||||||
|
|
||||||
## Rate limit for the external MQTT/TCP connections. Format is 'limit,duration'.
|
|
||||||
##
|
|
||||||
## Value: limit,duration
|
|
||||||
## Default: 100KB incoming per 10 seconds.
|
|
||||||
## listener.tcp.external.rate_limit = 100KB,10s
|
|
||||||
|
|
||||||
## The access control rules for the MQTT/TCP listener.
|
## The access control rules for the MQTT/TCP listener.
|
||||||
##
|
##
|
||||||
## See: https://github.com/emqtt/esockd#allowdeny
|
## See: https://github.com/emqtt/esockd#allowdeny
|
||||||
|
@ -1145,14 +1155,6 @@ listener.tcp.internal.active_n = 1000
|
||||||
## Value: String
|
## Value: String
|
||||||
listener.tcp.internal.zone = internal
|
listener.tcp.internal.zone = internal
|
||||||
|
|
||||||
## Rate limit for the internal MQTT/TCP connections.
|
|
||||||
##
|
|
||||||
## See: listener.tcp.$name.rate_limit
|
|
||||||
##
|
|
||||||
## Value: limit,duration
|
|
||||||
## Default: 1MB incoming per second.
|
|
||||||
## listener.tcp.internal.rate_limit = 1MB,1s
|
|
||||||
|
|
||||||
## The TCP backlog of internal MQTT/TCP Listener.
|
## The TCP backlog of internal MQTT/TCP Listener.
|
||||||
##
|
##
|
||||||
## See: listener.tcp.$name.backlog
|
## See: listener.tcp.$name.backlog
|
||||||
|
@ -1257,12 +1259,6 @@ listener.ssl.external.zone = external
|
||||||
## Value: ACL Rule
|
## Value: ACL Rule
|
||||||
listener.ssl.external.access.1 = allow all
|
listener.ssl.external.access.1 = allow all
|
||||||
|
|
||||||
## Rate limit for the external MQTT/SSL connections.
|
|
||||||
##
|
|
||||||
## Value: limit,duration
|
|
||||||
## Default: 100KB incoming per 10 seconds.
|
|
||||||
## listener.ssl.external.rate_limit = 100KB,10s
|
|
||||||
|
|
||||||
## Enable the Proxy Protocol V1/2 if the EMQ cluster is deployed behind
|
## Enable the Proxy Protocol V1/2 if the EMQ cluster is deployed behind
|
||||||
## HAProxy or Nginx.
|
## HAProxy or Nginx.
|
||||||
##
|
##
|
||||||
|
@ -1496,12 +1492,6 @@ listener.ws.external.max_conn_rate = 1000
|
||||||
## Value: Number
|
## Value: Number
|
||||||
listener.ws.external.active_n = 100
|
listener.ws.external.active_n = 100
|
||||||
|
|
||||||
## Rate limit for the MQTT/WebSocket connections.
|
|
||||||
##
|
|
||||||
## Value: Limit,Duration
|
|
||||||
## Default: 100KB incoming per 10 seconds.
|
|
||||||
## listener.ws.external.rate_limit = 100KB,10s
|
|
||||||
|
|
||||||
## Zone of the external MQTT/WebSocket listener belonged to.
|
## Zone of the external MQTT/WebSocket listener belonged to.
|
||||||
##
|
##
|
||||||
## Value: String
|
## Value: String
|
||||||
|
@ -1697,12 +1687,6 @@ listener.wss.external.max_conn_rate = 1000
|
||||||
## Value: Number
|
## Value: Number
|
||||||
listener.wss.external.active_n = 100
|
listener.wss.external.active_n = 100
|
||||||
|
|
||||||
## Rate limit for the MQTT/WebSocket/SSL connections.
|
|
||||||
##
|
|
||||||
## Value: Limit,Duration
|
|
||||||
## Default: 100KB incoming per 10 seconds.
|
|
||||||
## listener.wss.external.rate_limit = 100KB,10s
|
|
||||||
|
|
||||||
## Zone of the external MQTT/WebSocket/SSL listener belonged to.
|
## Zone of the external MQTT/WebSocket/SSL listener belonged to.
|
||||||
##
|
##
|
||||||
## Value: String
|
## Value: String
|
||||||
|
|
|
@ -992,6 +992,22 @@ end}.
|
||||||
{default, off}
|
{default, off}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
|
{mapping, "zone.$name.conn_rate_limit.messages_in", "emqx.zones", [
|
||||||
|
{datatype, string}
|
||||||
|
]}.
|
||||||
|
|
||||||
|
{mapping, "zone.$name.conn_rate_limit.bytes_in", "emqx.zones", [
|
||||||
|
{datatype, string}
|
||||||
|
]}.
|
||||||
|
|
||||||
|
{mapping, "zone.$name.overall_rate_limit.messages_in", "emqx.zones", [
|
||||||
|
{datatype, string}
|
||||||
|
]}.
|
||||||
|
|
||||||
|
{mapping, "zone.$name.overall_rate_limit.bytes_in", "emqx.zones", [
|
||||||
|
{datatype, string}
|
||||||
|
]}.
|
||||||
|
|
||||||
%% @doc Force connection/session process GC after this number of
|
%% @doc Force connection/session process GC after this number of
|
||||||
%% messages | bytes passed through.
|
%% messages | bytes passed through.
|
||||||
%% Numbers delimited by `|'. Zero or negative is to disable.
|
%% Numbers delimited by `|'. Zero or negative is to disable.
|
||||||
|
@ -1036,16 +1052,22 @@ end}.
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
{translation, "emqx.zones", fun(Conf) ->
|
{translation, "emqx.zones", fun(Conf) ->
|
||||||
Mapping = fun("publish_limit", Val) ->
|
Ratelimit = fun(Val) ->
|
||||||
[L, D] = string:tokens(Val, ", "),
|
[L, D] = string:tokens(Val, ", "),
|
||||||
Limit = list_to_integer(L),
|
Limit = case cuttlefish_bytesize:parse(L) of
|
||||||
|
Sz when is_integer(Sz) -> Sz;
|
||||||
|
{error, Reason1} -> error(Reason1)
|
||||||
|
end,
|
||||||
Duration = case cuttlefish_duration:parse(D, s) of
|
Duration = case cuttlefish_duration:parse(D, s) of
|
||||||
Secs when is_integer(Secs) -> Secs;
|
Secs when is_integer(Secs) -> Secs;
|
||||||
{error, Reason} -> error(Reason)
|
{error, Reason} -> error(Reason)
|
||||||
end,
|
end,
|
||||||
Rate = Limit / Duration,
|
{Limit, Duration}
|
||||||
{publish_limit, {Rate, Limit}};
|
end,
|
||||||
("force_gc_policy", Val) ->
|
Mapping = fun(["publish_limit"], Val) ->
|
||||||
|
%% XXX: Deprecated at v4.2
|
||||||
|
{publish_limit, Ratelimit(Val)};
|
||||||
|
(["force_gc_policy"], Val) ->
|
||||||
[Count, Bytes] = string:tokens(Val, "| "),
|
[Count, Bytes] = string:tokens(Val, "| "),
|
||||||
GcPolicy = case cuttlefish_bytesize:parse(Bytes) of
|
GcPolicy = case cuttlefish_bytesize:parse(Bytes) of
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
|
@ -1055,7 +1077,7 @@ end}.
|
||||||
count => list_to_integer(Count)}
|
count => list_to_integer(Count)}
|
||||||
end,
|
end,
|
||||||
{force_gc_policy, GcPolicy};
|
{force_gc_policy, GcPolicy};
|
||||||
("force_shutdown_policy", Val) ->
|
(["force_shutdown_policy"], Val) ->
|
||||||
[Len, Siz] = string:tokens(Val, "| "),
|
[Len, Siz] = string:tokens(Val, "| "),
|
||||||
MaxSiz = case WordSize = erlang:system_info(wordsize) of
|
MaxSiz = case WordSize = erlang:system_info(wordsize) of
|
||||||
8 -> % arch_64
|
8 -> % arch_64
|
||||||
|
@ -1074,7 +1096,7 @@ end}.
|
||||||
max_heap_size => Siz1 div WordSize}
|
max_heap_size => Siz1 div WordSize}
|
||||||
end,
|
end,
|
||||||
{force_shutdown_policy, ShutdownPolicy};
|
{force_shutdown_policy, ShutdownPolicy};
|
||||||
("mqueue_priorities", Val) ->
|
(["mqueue_priorities"], Val) ->
|
||||||
case Val of
|
case Val of
|
||||||
"none" -> {mqueue_priorities, none}; % NO_PRIORITY_TABLE
|
"none" -> {mqueue_priorities, none}; % NO_PRIORITY_TABLE
|
||||||
_ ->
|
_ ->
|
||||||
|
@ -1087,19 +1109,35 @@ end}.
|
||||||
end, #{}, string:tokens(Val, ",")),
|
end, #{}, string:tokens(Val, ",")),
|
||||||
{mqueue_priorities, MqueuePriorities}
|
{mqueue_priorities, MqueuePriorities}
|
||||||
end;
|
end;
|
||||||
("mountpoint", Val) ->
|
(["mountpoint"], Val) ->
|
||||||
{mountpoint, iolist_to_binary(Val)};
|
{mountpoint, iolist_to_binary(Val)};
|
||||||
("response_information", Val) ->
|
(["response_information"], Val) ->
|
||||||
{response_information, iolist_to_binary(Val)};
|
{response_information, iolist_to_binary(Val)};
|
||||||
(Opt, Val) ->
|
(["conn_rate_limit", "messages_in"], Val) ->
|
||||||
|
{ratelimit, {conn_messages_in, Ratelimit(Val)}};
|
||||||
|
(["conn_rate_limit", "bytes_in"], Val) ->
|
||||||
|
{ratelimit, {conn_bytes_in, Ratelimit(Val)}};
|
||||||
|
(["overall_rate_limit", "messages_in"], Val) ->
|
||||||
|
{ratelimit, {overall_messages_in, Ratelimit(Val)}};
|
||||||
|
(["overall_rate_limit", "bytes_in"], Val) ->
|
||||||
|
{ratelimit, {overall_bytes_in, Ratelimit(Val)}};
|
||||||
|
([Opt], Val) ->
|
||||||
{list_to_atom(Opt), Val}
|
{list_to_atom(Opt), Val}
|
||||||
end,
|
end,
|
||||||
maps:to_list(
|
maps:to_list(
|
||||||
lists:foldl(
|
lists:foldl(
|
||||||
fun({["zone", Name, Opt], Val}, Zones) ->
|
fun({["zone", Name | Opt], Val}, Zones) ->
|
||||||
|
NVal = Mapping(Opt, Val),
|
||||||
maps:update_with(list_to_atom(Name),
|
maps:update_with(list_to_atom(Name),
|
||||||
fun(Opts) -> [Mapping(Opt, Val)|Opts] end,
|
fun(Opts) ->
|
||||||
[Mapping(Opt, Val)], Zones)
|
case NVal of
|
||||||
|
{ratelimit, Rl} ->
|
||||||
|
Rls = proplists:get_value(ratelimit, Opts, []),
|
||||||
|
lists:keystore(ratelimit, 1, Opts, {ratelimit, [Rl|Rls]});
|
||||||
|
_ ->
|
||||||
|
[NVal|Opts]
|
||||||
|
end
|
||||||
|
end, [NVal], Zones)
|
||||||
end, #{}, lists:usort(cuttlefish_variable:filter_by_prefix("zone.", Conf))))
|
end, #{}, lists:usort(cuttlefish_variable:filter_by_prefix("zone.", Conf))))
|
||||||
end}.
|
end}.
|
||||||
|
|
||||||
|
@ -1736,8 +1774,7 @@ end}.
|
||||||
Secs when is_integer(Secs) -> Secs;
|
Secs when is_integer(Secs) -> Secs;
|
||||||
{error, Reason1} -> error(Reason1)
|
{error, Reason1} -> error(Reason1)
|
||||||
end,
|
end,
|
||||||
Rate = Limit / Duration,
|
{Limit, Duration}
|
||||||
{Rate, Limit}
|
|
||||||
end,
|
end,
|
||||||
|
|
||||||
LisOpts = fun(Prefix) ->
|
LisOpts = fun(Prefix) ->
|
||||||
|
|
|
@ -6,7 +6,7 @@
|
||||||
[{gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}},
|
[{gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}},
|
||||||
{jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}},
|
{jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}},
|
||||||
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}},
|
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}},
|
||||||
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.1"}}},
|
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.2"}}},
|
||||||
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.3"}}},
|
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.3"}}},
|
||||||
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.4.1"}}},
|
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.4.1"}}},
|
||||||
{cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}
|
{cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}
|
||||||
|
|
|
@ -198,7 +198,9 @@ init_state(Transport, Socket, Options) ->
|
||||||
Zone = proplists:get_value(zone, Options),
|
Zone = proplists:get_value(zone, Options),
|
||||||
ActiveN = proplists:get_value(active_n, Options, ?ACTIVE_N),
|
ActiveN = proplists:get_value(active_n, Options, ?ACTIVE_N),
|
||||||
PubLimit = emqx_zone:publish_limit(Zone),
|
PubLimit = emqx_zone:publish_limit(Zone),
|
||||||
Limiter = emqx_limiter:init([{pub_limit, PubLimit}|Options]),
|
BytesIn = proplists:get_value(rate_limit, Options),
|
||||||
|
RateLimit = emqx_zone:ratelimit(Zone),
|
||||||
|
Limiter = emqx_limiter:init(Zone, PubLimit, BytesIn, RateLimit),
|
||||||
FrameOpts = emqx_zone:mqtt_frame_options(Zone),
|
FrameOpts = emqx_zone:mqtt_frame_options(Zone),
|
||||||
ParseState = emqx_frame:initial_parse_state(FrameOpts),
|
ParseState = emqx_frame:initial_parse_state(FrameOpts),
|
||||||
Serialize = emqx_frame:serialize_fun(),
|
Serialize = emqx_frame:serialize_fun(),
|
||||||
|
|
|
@ -18,55 +18,137 @@
|
||||||
|
|
||||||
-include("types.hrl").
|
-include("types.hrl").
|
||||||
|
|
||||||
-export([init/1, info/1, check/2]).
|
-export([ init/2
|
||||||
|
, init/4 %% XXX: Compatible with before 4.2 version
|
||||||
-import(emqx_misc, [maybe_apply/2]).
|
, info/1
|
||||||
|
, check/2
|
||||||
|
]).
|
||||||
|
|
||||||
-record(limiter, {
|
-record(limiter, {
|
||||||
%% Publish Limit
|
%% Zone
|
||||||
pub_limit :: maybe(esockd_rate_limit:bucket()),
|
zone :: emqx_zone:zone(),
|
||||||
%% Rate Limit
|
%% All checkers
|
||||||
rate_limit :: maybe(esockd_rate_limit:bucket())
|
checkers :: [checker()]
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
-type(checker() :: #{ name := name()
|
||||||
|
, capacity := non_neg_integer()
|
||||||
|
, interval := non_neg_integer()
|
||||||
|
, consumer := function() | esockd_rate_limit:bucket()
|
||||||
|
}).
|
||||||
|
|
||||||
|
-type(name() :: conn_bytes_in
|
||||||
|
| conn_messages_in
|
||||||
|
| overall_bytes_in
|
||||||
|
| overall_messages_in
|
||||||
|
).
|
||||||
|
|
||||||
|
-type(spec() :: {name(), esockd_rate_limit:config()}).
|
||||||
|
|
||||||
|
-type(specs() :: [spec()]).
|
||||||
|
|
||||||
|
-type(info() :: #{name() :=
|
||||||
|
#{tokens := non_neg_integer(),
|
||||||
|
capacity := non_neg_integer(),
|
||||||
|
interval := non_neg_integer()}}).
|
||||||
|
|
||||||
-type(limiter() :: #limiter{}).
|
-type(limiter() :: #limiter{}).
|
||||||
|
|
||||||
-export_type([limiter/0]).
|
%%--------------------------------------------------------------------
|
||||||
|
%% APIs
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-define(ENABLED(Rl), (Rl =/= undefined)).
|
-spec(init(emqx_zone:zone(),
|
||||||
|
maybe(esockd_rate_limit:config()),
|
||||||
|
maybe(esockd_rate_limit:config()), specs())
|
||||||
|
-> maybe(limiter())).
|
||||||
|
init(Zone, PubLimit, BytesIn, RateLimit) ->
|
||||||
|
Merged = maps:merge(#{conn_messages_in => PubLimit,
|
||||||
|
conn_bytes_in => BytesIn}, maps:from_list(RateLimit)),
|
||||||
|
Filtered = maps:filter(fun(_, V) -> V /= undefined end, Merged),
|
||||||
|
init(Zone, maps:to_list(Filtered)).
|
||||||
|
|
||||||
-spec(init(proplists:proplist()) -> maybe(limiter())).
|
-spec(init(emqx_zone:zone(), specs()) -> maybe(limiter())).
|
||||||
init(Options) ->
|
init(_Zone, []) ->
|
||||||
Pl = proplists:get_value(pub_limit, Options),
|
undefined;
|
||||||
Rl = proplists:get_value(rate_limit, Options),
|
init(Zone, Specs) ->
|
||||||
case ?ENABLED(Pl) or ?ENABLED(Rl) of
|
#limiter{zone = Zone, checkers = [do_init_checker(Zone, Spec) || Spec <- Specs]}.
|
||||||
true -> #limiter{pub_limit = init_limit(Pl),
|
|
||||||
rate_limit = init_limit(Rl)
|
%% @private
|
||||||
};
|
do_init_checker(Zone, {Name, {Capacity, Interval}}) ->
|
||||||
false -> undefined
|
Ck = #{name => Name, capacity => Capacity, interval => Interval},
|
||||||
|
case is_overall_limiter(Name) of
|
||||||
|
true ->
|
||||||
|
case catch esockd_limiter:lookup({Zone, Name}) of
|
||||||
|
_Info when is_map(_Info) ->
|
||||||
|
ignore;
|
||||||
|
_ ->
|
||||||
|
esockd_limiter:create({Zone, Name}, Capacity, Interval)
|
||||||
|
end,
|
||||||
|
Ck#{consumer => fun(I) -> esockd_limiter:consume({Zone, Name}, I) end};
|
||||||
|
_ ->
|
||||||
|
Ck#{consumer => esockd_rate_limit:new(Capacity / Interval, Capacity)}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
init_limit(Rl) ->
|
-spec(info(limiter()) -> info()).
|
||||||
maybe_apply(fun esockd_rate_limit:new/1, Rl).
|
info(#limiter{zone = Zone, checkers = Cks}) ->
|
||||||
|
maps:from_list([get_info(Zone, Ck) || Ck <- Cks]).
|
||||||
|
|
||||||
info(#limiter{pub_limit = Pl, rate_limit = Rl}) ->
|
-spec(check(#{cnt := Cnt :: non_neg_integer(),
|
||||||
#{pub_limit => info(Pl), rate_limit => info(Rl)};
|
oct := Oct :: non_neg_integer()},
|
||||||
|
Limiter :: limiter())
|
||||||
info(Rl) ->
|
-> {ok, NLimiter :: limiter()}
|
||||||
maybe_apply(fun esockd_rate_limit:info/1, Rl).
|
| {pause, MilliSecs :: non_neg_integer(), NLimiter :: limiter()}).
|
||||||
|
check(#{cnt := Cnt, oct := Oct}, Limiter = #limiter{checkers = Cks}) ->
|
||||||
check(#{cnt := Cnt, oct := Oct}, Limiter = #limiter{pub_limit = Pl,
|
{Pauses, NCks} = do_check(Cnt, Oct, Cks, [], []),
|
||||||
rate_limit = Rl}) ->
|
case lists:max(Pauses) of
|
||||||
do_check([{#limiter.pub_limit, Cnt, Pl} || ?ENABLED(Pl)] ++
|
I when I > 0 ->
|
||||||
[{#limiter.rate_limit, Oct, Rl} || ?ENABLED(Rl)], Limiter).
|
{pause, I, Limiter#limiter{checkers = NCks}};
|
||||||
|
_ ->
|
||||||
do_check([], Limiter) ->
|
{ok, Limiter#limiter{checkers = NCks}}
|
||||||
{ok, Limiter};
|
|
||||||
do_check([{Pos, Cnt, Rl}|More], Limiter) ->
|
|
||||||
case esockd_rate_limit:check(Cnt, Rl) of
|
|
||||||
{0, Rl1} ->
|
|
||||||
do_check(More, setelement(Pos, Limiter, Rl1));
|
|
||||||
{Pause, Rl1} ->
|
|
||||||
{pause, Pause, setelement(Pos, Limiter, Rl1)}
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%% @private
|
||||||
|
do_check(_, _, [], Pauses, NCks) ->
|
||||||
|
{Pauses, lists:reverse(NCks)};
|
||||||
|
do_check(Pubs, Bytes, [Ck|More], Pauses, Acc) ->
|
||||||
|
{I, NConsumer} = consume(Pubs, Bytes, Ck),
|
||||||
|
do_check(Pubs, Bytes, More, [I|Pauses], [Ck#{consumer := NConsumer}|Acc]).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Internal funcs
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
consume(Pubs, Bytes, #{name := Name, consumer := Cons}) ->
|
||||||
|
Tokens = case is_message_limiter(Name) of true -> Pubs; _ -> Bytes end,
|
||||||
|
case Tokens =:= 0 of
|
||||||
|
true ->
|
||||||
|
{0, Cons};
|
||||||
|
_ ->
|
||||||
|
case is_overall_limiter(Name) of
|
||||||
|
true ->
|
||||||
|
{_, Intv} = Cons(Tokens),
|
||||||
|
{Intv, Cons};
|
||||||
|
_ ->
|
||||||
|
esockd_rate_limit:check(Tokens, Cons)
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
|
get_info(Zone, #{name := Name, capacity := Cap,
|
||||||
|
interval := Intv, consumer := Cons}) ->
|
||||||
|
Info = case is_overall_limiter(Name) of
|
||||||
|
true -> esockd_limiter:lookup({Zone, Name});
|
||||||
|
_ -> esockd_rate_limit:info(Cons)
|
||||||
|
end,
|
||||||
|
{Name, #{capacity => Cap,
|
||||||
|
interval => Intv,
|
||||||
|
tokens => maps:get(tokens, Info)}}.
|
||||||
|
|
||||||
|
is_overall_limiter(overall_bytes_in) -> true;
|
||||||
|
is_overall_limiter(overall_messages_in) -> true;
|
||||||
|
is_overall_limiter(_) -> false.
|
||||||
|
|
||||||
|
is_message_limiter(conn_messages_in) -> true;
|
||||||
|
is_message_limiter(overall_messages_in) -> true;
|
||||||
|
is_message_limiter(_) -> false.
|
||||||
|
|
||||||
|
|
|
@ -222,7 +222,9 @@ websocket_init([Req, Opts]) ->
|
||||||
},
|
},
|
||||||
Zone = proplists:get_value(zone, Opts),
|
Zone = proplists:get_value(zone, Opts),
|
||||||
PubLimit = emqx_zone:publish_limit(Zone),
|
PubLimit = emqx_zone:publish_limit(Zone),
|
||||||
Limiter = emqx_limiter:init([{pub_limit, PubLimit}|Opts]),
|
BytesIn = proplists:get_value(rate_limit, Opts),
|
||||||
|
RateLimit = emqx_zone:ratelimit(Zone),
|
||||||
|
Limiter = emqx_limiter:init(Zone, PubLimit, BytesIn, RateLimit),
|
||||||
ActiveN = proplists:get_value(active_n, Opts, ?ACTIVE_N),
|
ActiveN = proplists:get_value(active_n, Opts, ?ACTIVE_N),
|
||||||
FrameOpts = emqx_zone:mqtt_frame_options(Zone),
|
FrameOpts = emqx_zone:mqtt_frame_options(Zone),
|
||||||
ParseState = emqx_frame:initial_parse_state(FrameOpts),
|
ParseState = emqx_frame:initial_parse_state(FrameOpts),
|
||||||
|
|
|
@ -28,6 +28,7 @@
|
||||||
-compile({inline,
|
-compile({inline,
|
||||||
[ idle_timeout/1
|
[ idle_timeout/1
|
||||||
, publish_limit/1
|
, publish_limit/1
|
||||||
|
, ratelimit/1
|
||||||
, mqtt_frame_options/1
|
, mqtt_frame_options/1
|
||||||
, mqtt_strict_mode/1
|
, mqtt_strict_mode/1
|
||||||
, max_packet_size/1
|
, max_packet_size/1
|
||||||
|
@ -55,7 +56,9 @@
|
||||||
|
|
||||||
%% Zone Option API
|
%% Zone Option API
|
||||||
-export([ idle_timeout/1
|
-export([ idle_timeout/1
|
||||||
|
%% XXX: Dedeprecated at v4.2
|
||||||
, publish_limit/1
|
, publish_limit/1
|
||||||
|
, ratelimit/1
|
||||||
, mqtt_frame_options/1
|
, mqtt_frame_options/1
|
||||||
, mqtt_strict_mode/1
|
, mqtt_strict_mode/1
|
||||||
, max_packet_size/1
|
, max_packet_size/1
|
||||||
|
@ -137,6 +140,10 @@ idle_timeout(Zone) ->
|
||||||
publish_limit(Zone) ->
|
publish_limit(Zone) ->
|
||||||
get_env(Zone, publish_limit).
|
get_env(Zone, publish_limit).
|
||||||
|
|
||||||
|
-spec(ratelimit(zone()) -> [emqx_limiter:specs()]).
|
||||||
|
ratelimit(Zone) ->
|
||||||
|
get_env(Zone, ratelimit, []).
|
||||||
|
|
||||||
-spec(mqtt_frame_options(zone()) -> emqx_frame:options()).
|
-spec(mqtt_frame_options(zone()) -> emqx_frame:options()).
|
||||||
mqtt_frame_options(Zone) ->
|
mqtt_frame_options(Zone) ->
|
||||||
#{strict_mode => mqtt_strict_mode(Zone),
|
#{strict_mode => mqtt_strict_mode(Zone),
|
||||||
|
|
|
@ -104,7 +104,7 @@ t_info(_) ->
|
||||||
socktype := tcp}, SockInfo).
|
socktype := tcp}, SockInfo).
|
||||||
|
|
||||||
t_info_limiter(_) ->
|
t_info_limiter(_) ->
|
||||||
St = st(#{limiter => emqx_limiter:init([])}),
|
St = st(#{limiter => emqx_limiter:init(external, [])}),
|
||||||
?assertEqual(undefined, emqx_connection:info(limiter, St)).
|
?assertEqual(undefined, emqx_connection:info(limiter, St)).
|
||||||
|
|
||||||
t_stats(_) ->
|
t_stats(_) ->
|
||||||
|
@ -279,11 +279,11 @@ t_ensure_rate_limit(_) ->
|
||||||
State = emqx_connection:ensure_rate_limit(#{}, st(#{limiter => undefined})),
|
State = emqx_connection:ensure_rate_limit(#{}, st(#{limiter => undefined})),
|
||||||
?assertEqual(undefined, emqx_connection:info(limiter, State)),
|
?assertEqual(undefined, emqx_connection:info(limiter, State)),
|
||||||
|
|
||||||
ok = meck:expect(emqx_limiter, check, fun(_, _) -> {ok, emqx_limiter:init([])} end),
|
ok = meck:expect(emqx_limiter, check, fun(_, _) -> {ok, emqx_limiter:init(external, [])} end),
|
||||||
State1 = emqx_connection:ensure_rate_limit(#{}, st(#{limiter => #{}})),
|
State1 = emqx_connection:ensure_rate_limit(#{}, st(#{limiter => #{}})),
|
||||||
?assertEqual(undefined, emqx_connection:info(limiter, State1)),
|
?assertEqual(undefined, emqx_connection:info(limiter, State1)),
|
||||||
|
|
||||||
ok = meck:expect(emqx_limiter, check, fun(_, _) -> {pause, 3000, emqx_limiter:init([])} end),
|
ok = meck:expect(emqx_limiter, check, fun(_, _) -> {pause, 3000, emqx_limiter:init(external, [])} end),
|
||||||
State2 = emqx_connection:ensure_rate_limit(#{}, st(#{limiter => #{}})),
|
State2 = emqx_connection:ensure_rate_limit(#{}, st(#{limiter => #{}})),
|
||||||
?assertEqual(undefined, emqx_connection:info(limiter, State2)),
|
?assertEqual(undefined, emqx_connection:info(limiter, State2)),
|
||||||
?assertEqual(blocked, emqx_connection:info(sockstate, State2)).
|
?assertEqual(blocked, emqx_connection:info(sockstate, State2)).
|
||||||
|
|
|
@ -21,38 +21,57 @@
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Setups
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
all() -> emqx_ct:all(?MODULE).
|
all() -> emqx_ct:all(?MODULE).
|
||||||
|
|
||||||
init_per_testcase(_TestCase, Config) ->
|
init_per_testcase(_, Cfg) ->
|
||||||
Config.
|
{ok, _} = esockd_limiter:start_link(),
|
||||||
|
Cfg.
|
||||||
|
|
||||||
end_per_testcase(_TestCase, _Config) ->
|
end_per_testcase(_, _) ->
|
||||||
ok.
|
esockd_limiter:stop().
|
||||||
|
|
||||||
t_info(_) ->
|
%%--------------------------------------------------------------------
|
||||||
#{pub_limit := #{rate := 1,
|
%% Cases
|
||||||
burst := 10,
|
%%--------------------------------------------------------------------
|
||||||
tokens := 10
|
|
||||||
},
|
|
||||||
rate_limit := #{rate := 100,
|
|
||||||
burst := 1000,
|
|
||||||
tokens := 1000
|
|
||||||
}
|
|
||||||
} = emqx_limiter:info(limiter()).
|
|
||||||
|
|
||||||
t_check(_) ->
|
t_init(_) ->
|
||||||
lists:foreach(fun(I) ->
|
Cap1 = 1000, Intv1 = 10,
|
||||||
{ok, Limiter} = emqx_limiter:check(#{cnt => I, oct => I*100}, limiter()),
|
Cap2 = 2000, Intv2 = 15,
|
||||||
#{pub_limit := #{tokens := Cnt},
|
undefined = emqx_limiter:init(external, undefined, undefined, []),
|
||||||
rate_limit := #{tokens := Oct}
|
?assertEqual(emqx_limiter:init(external, undefined, undefined, [{conn_messages_in, {Cap1, Intv1}},
|
||||||
} = emqx_limiter:info(Limiter),
|
{conn_bytes_in, {Cap2, Intv2}}]),
|
||||||
?assertEqual({10 - I, 1000 - I*100}, {Cnt, Oct})
|
emqx_limiter:init(external, {Cap1, Intv1}, {Cap2, Intv2}, [])),
|
||||||
end, lists:seq(1, 10)).
|
#{conn_bytes_in := #{capacity := Cap2, interval := Intv2, tokens := Cap2 }} =
|
||||||
|
emqx_limiter:info(
|
||||||
|
emqx_limiter:init(external, undefined, {Cap1, Intv1}, [{conn_bytes_in, {Cap2, Intv2}}])).
|
||||||
|
|
||||||
t_check_pause(_) ->
|
t_check_conn(_) ->
|
||||||
{pause, 1000, _} = emqx_limiter:check(#{cnt => 11, oct => 2000}, limiter()),
|
Limiter = emqx_limiter:init(external, [{conn_bytes_in, {100, 1}}]),
|
||||||
{pause, 2000, _} = emqx_limiter:check(#{cnt => 10, oct => 1200}, limiter()).
|
|
||||||
|
|
||||||
limiter() ->
|
{ok, Limiter2} = emqx_limiter:check(#{cnt => 0, oct => 1}, Limiter),
|
||||||
emqx_limiter:init([{pub_limit, {1, 10}}, {rate_limit, {100, 1000}}]).
|
#{conn_bytes_in := #{tokens := 99}} = emqx_limiter:info(Limiter2),
|
||||||
|
|
||||||
|
{pause, 10, Limiter3} = emqx_limiter:check(#{cnt => 0, oct => 100}, Limiter),
|
||||||
|
#{conn_bytes_in := #{tokens := 0}} = emqx_limiter:info(Limiter3),
|
||||||
|
|
||||||
|
{pause, 100000, Limiter4} = emqx_limiter:check(#{cnt => 0, oct => 10000}, Limiter3),
|
||||||
|
#{conn_bytes_in := #{tokens := 0}} = emqx_limiter:info(Limiter4).
|
||||||
|
|
||||||
|
t_check_overall(_) ->
|
||||||
|
Limiter = emqx_limiter:init(external, [{overall_bytes_in, {100, 1}}]),
|
||||||
|
|
||||||
|
{ok, Limiter2} = emqx_limiter:check(#{cnt => 0, oct => 1}, Limiter),
|
||||||
|
#{overall_bytes_in := #{tokens := 99}} = emqx_limiter:info(Limiter2),
|
||||||
|
|
||||||
|
%% XXX: P = 1/r = 1/100 * 1000 = 10ms ?
|
||||||
|
{pause, 1000, Limiter3} = emqx_limiter:check(#{cnt => 0, oct => 100}, Limiter),
|
||||||
|
#{overall_bytes_in := #{tokens := 0}} = emqx_limiter:info(Limiter2),
|
||||||
|
|
||||||
|
%% XXX: P = 10000/r = 10000/100 * 1000 = 100s ?
|
||||||
|
{pause, 1000, Limiter4} = emqx_limiter:check(#{cnt => 0, oct => 10000}, Limiter3),
|
||||||
|
#{overall_bytes_in := #{tokens := 0}} = emqx_limiter:info(Limiter4).
|
||||||
|
|
||||||
|
|
|
@ -112,7 +112,7 @@ t_info(_) ->
|
||||||
} = SockInfo.
|
} = SockInfo.
|
||||||
|
|
||||||
t_info_limiter(_) ->
|
t_info_limiter(_) ->
|
||||||
St = st(#{limiter => emqx_limiter:init([])}),
|
St = st(#{limiter => emqx_limiter:init(external, [])}),
|
||||||
?assertEqual(undefined, ?ws_conn:info(limiter, St)).
|
?assertEqual(undefined, ?ws_conn:info(limiter, St)).
|
||||||
|
|
||||||
t_info_channel(_) ->
|
t_info_channel(_) ->
|
||||||
|
@ -291,9 +291,7 @@ t_handle_timeout_emit_stats(_) ->
|
||||||
?assertEqual(undefined, ?ws_conn:info(stats_timer, St)).
|
?assertEqual(undefined, ?ws_conn:info(stats_timer, St)).
|
||||||
|
|
||||||
t_ensure_rate_limit(_) ->
|
t_ensure_rate_limit(_) ->
|
||||||
Limiter = emqx_limiter:init([{pub_limit, {1, 10}},
|
Limiter = emqx_limiter:init(external, {1, 10}, {100, 1000}, []),
|
||||||
{rate_limit, {100, 1000}}
|
|
||||||
]),
|
|
||||||
St = st(#{limiter => Limiter}),
|
St = st(#{limiter => Limiter}),
|
||||||
St1 = ?ws_conn:ensure_rate_limit(#{cnt => 0, oct => 0}, St),
|
St1 = ?ws_conn:ensure_rate_limit(#{cnt => 0, oct => 0}, St),
|
||||||
St2 = ?ws_conn:ensure_rate_limit(#{cnt => 11, oct => 1200}, St1),
|
St2 = ?ws_conn:ensure_rate_limit(#{cnt => 11, oct => 1200}, St1),
|
||||||
|
|
|
@ -193,6 +193,7 @@ t_batch_subscribe(_) ->
|
||||||
?RC_NO_SUBSCRIPTION_EXISTED]} = emqtt:unsubscribe(Client, [<<"t1">>,
|
?RC_NO_SUBSCRIPTION_EXISTED]} = emqtt:unsubscribe(Client, [<<"t1">>,
|
||||||
<<"t2">>,
|
<<"t2">>,
|
||||||
<<"t3">>]),
|
<<"t3">>]),
|
||||||
|
file:delete(TempAcl),
|
||||||
emqtt:disconnect(Client).
|
emqtt:disconnect(Client).
|
||||||
|
|
||||||
t_connect_will_retain(_) ->
|
t_connect_will_retain(_) ->
|
||||||
|
@ -253,7 +254,7 @@ t_connect_limit_timeout(_) ->
|
||||||
end),
|
end),
|
||||||
|
|
||||||
Topic = nth(1, ?TOPICS),
|
Topic = nth(1, ?TOPICS),
|
||||||
emqx_zone:set_env(external, publish_limit, {2.0, 3}),
|
emqx_zone:set_env(external, publish_limit, {3, 5}),
|
||||||
|
|
||||||
{ok, Client} = emqtt:start_link([{proto_ver, v5},{keepalive, 60}]),
|
{ok, Client} = emqtt:start_link([{proto_ver, v5},{keepalive, 60}]),
|
||||||
{ok, _} = emqtt:connect(Client),
|
{ok, _} = emqtt:connect(Client),
|
||||||
|
@ -263,11 +264,11 @@ t_connect_limit_timeout(_) ->
|
||||||
ok = emqtt:publish(Client, Topic, <<"t_shared_subscriptions_client_terminates_when_qos_eq_2">>, 0),
|
ok = emqtt:publish(Client, Topic, <<"t_shared_subscriptions_client_terminates_when_qos_eq_2">>, 0),
|
||||||
ok = emqtt:publish(Client, Topic, <<"t_shared_subscriptions_client_terminates_when_qos_eq_2">>, 0),
|
ok = emqtt:publish(Client, Topic, <<"t_shared_subscriptions_client_terminates_when_qos_eq_2">>, 0),
|
||||||
ok = emqtt:publish(Client, Topic, <<"t_shared_subscriptions_client_terminates_when_qos_eq_2">>, 0),
|
ok = emqtt:publish(Client, Topic, <<"t_shared_subscriptions_client_terminates_when_qos_eq_2">>, 0),
|
||||||
ok = emqtt:publish(Client, Topic, <<"t_shared_subscriptions_client_terminates_when_qos_eq_2">>, 0),
|
|
||||||
timer:sleep(200),
|
timer:sleep(200),
|
||||||
?assert(is_reference(emqx_connection:info(limit_timer, sys:get_state(ClientPid)))),
|
?assert(is_reference(emqx_connection:info(limit_timer, sys:get_state(ClientPid)))),
|
||||||
|
|
||||||
ok = emqtt:disconnect(Client),
|
ok = emqtt:disconnect(Client),
|
||||||
|
emqx_zone:set_env(external, publish_limit, undefined),
|
||||||
meck:unload(proplists).
|
meck:unload(proplists).
|
||||||
|
|
||||||
t_connect_emit_stats_timeout(_) ->
|
t_connect_emit_stats_timeout(_) ->
|
||||||
|
|
Loading…
Reference in New Issue