From 83a2af812f28b9d12c470541cc878017af33846b Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 21 Jul 2020 10:38:10 +0800 Subject: [PATCH] feat(ratelimit): refactor ratelimit --- etc/emqx.conf | 60 ++++------- priv/emqx.schema | 77 ++++++++++---- rebar.config | 2 +- src/emqx_connection.erl | 4 +- src/emqx_limiter.erl | 160 ++++++++++++++++++++++-------- src/emqx_ws_connection.erl | 4 +- src/emqx_zone.erl | 7 ++ test/emqx_connection_SUITE.erl | 6 +- test/emqx_limiter_SUITE.erl | 73 +++++++++----- test/emqx_ws_connection_SUITE.erl | 6 +- test/mqtt_protocol_v5_SUITE.erl | 5 +- 11 files changed, 268 insertions(+), 136 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index b22f22c24..36b50c211 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -680,12 +680,6 @@ mqtt.strict_mode = false ## Value: duration zone.external.idle_timeout = 15s -## Publish limit for the external MQTT connections. -## -## Value: Number,Duration -## Example: 100 messages per 10 seconds. -## zone.external.publish_limit = 100,10s - ## Enable ACL check. ## ## Value: Flag @@ -848,6 +842,28 @@ zone.external.mqueue_store_qos0 = true ## Value: on | off zone.external.enable_flapping_detect = off +## Message limit for the a external MQTT connection. +## +## Value: Number,Duration +## Example: 100 messages per 10 seconds. +#zone.external.conn_rate_limit.messages_in = 100, 10s + +## Bytes limit for a external MQTT connections. +## +## Value: Number,Duration +## Example: 100KB incoming per 10 seconds. +#zone.external.conn_rate_limit.bytes_in = 100KB, 10s + +## Message limit for the all external MQTT connections. +## +## Value: Number,Duration +#zone.external.overall_rate_limit.messages_in = 200000, 1s + +## Bytes limit for the all external MQTT connections. +## +## Value: Number,Duration +#zone.external.overall_rate_limit.bytes_in = 2048MB, 1s + ## All the topics will be prefixed with the mountpoint path if this option is enabled. ## ## Variables in mountpoint path: @@ -1016,12 +1032,6 @@ listener.tcp.external.active_n = 100 ## Value: String listener.tcp.external.zone = external -## Rate limit for the external MQTT/TCP connections. Format is 'limit,duration'. -## -## Value: limit,duration -## Default: 100KB incoming per 10 seconds. -## listener.tcp.external.rate_limit = 100KB,10s - ## The access control rules for the MQTT/TCP listener. ## ## See: https://github.com/emqtt/esockd#allowdeny @@ -1145,14 +1155,6 @@ listener.tcp.internal.active_n = 1000 ## Value: String listener.tcp.internal.zone = internal -## Rate limit for the internal MQTT/TCP connections. -## -## See: listener.tcp.$name.rate_limit -## -## Value: limit,duration -## Default: 1MB incoming per second. -## listener.tcp.internal.rate_limit = 1MB,1s - ## The TCP backlog of internal MQTT/TCP Listener. ## ## See: listener.tcp.$name.backlog @@ -1257,12 +1259,6 @@ listener.ssl.external.zone = external ## Value: ACL Rule listener.ssl.external.access.1 = allow all -## Rate limit for the external MQTT/SSL connections. -## -## Value: limit,duration -## Default: 100KB incoming per 10 seconds. -## listener.ssl.external.rate_limit = 100KB,10s - ## Enable the Proxy Protocol V1/2 if the EMQ cluster is deployed behind ## HAProxy or Nginx. ## @@ -1496,12 +1492,6 @@ listener.ws.external.max_conn_rate = 1000 ## Value: Number listener.ws.external.active_n = 100 -## Rate limit for the MQTT/WebSocket connections. -## -## Value: Limit,Duration -## Default: 100KB incoming per 10 seconds. -## listener.ws.external.rate_limit = 100KB,10s - ## Zone of the external MQTT/WebSocket listener belonged to. ## ## Value: String @@ -1697,12 +1687,6 @@ listener.wss.external.max_conn_rate = 1000 ## Value: Number listener.wss.external.active_n = 100 -## Rate limit for the MQTT/WebSocket/SSL connections. -## -## Value: Limit,Duration -## Default: 100KB incoming per 10 seconds. -## listener.wss.external.rate_limit = 100KB,10s - ## Zone of the external MQTT/WebSocket/SSL listener belonged to. ## ## Value: String diff --git a/priv/emqx.schema b/priv/emqx.schema index abd790b53..fee6e196d 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -992,6 +992,22 @@ end}. {default, off} ]}. +{mapping, "zone.$name.conn_rate_limit.messages_in", "emqx.zones", [ + {datatype, string} +]}. + +{mapping, "zone.$name.conn_rate_limit.bytes_in", "emqx.zones", [ + {datatype, string} +]}. + +{mapping, "zone.$name.overall_rate_limit.messages_in", "emqx.zones", [ + {datatype, string} +]}. + +{mapping, "zone.$name.overall_rate_limit.bytes_in", "emqx.zones", [ + {datatype, string} +]}. + %% @doc Force connection/session process GC after this number of %% messages | bytes passed through. %% Numbers delimited by `|'. Zero or negative is to disable. @@ -1036,16 +1052,22 @@ end}. ]}. {translation, "emqx.zones", fun(Conf) -> - Mapping = fun("publish_limit", Val) -> - [L, D] = string:tokens(Val, ", "), - Limit = list_to_integer(L), - Duration = case cuttlefish_duration:parse(D, s) of - Secs when is_integer(Secs) -> Secs; - {error, Reason} -> error(Reason) - end, - Rate = Limit / Duration, - {publish_limit, {Rate, Limit}}; - ("force_gc_policy", Val) -> + Ratelimit = fun(Val) -> + [L, D] = string:tokens(Val, ", "), + Limit = case cuttlefish_bytesize:parse(L) of + Sz when is_integer(Sz) -> Sz; + {error, Reason1} -> error(Reason1) + end, + Duration = case cuttlefish_duration:parse(D, s) of + Secs when is_integer(Secs) -> Secs; + {error, Reason} -> error(Reason) + end, + {Limit, Duration} + end, + Mapping = fun(["publish_limit"], Val) -> + %% XXX: Deprecated at v4.2 + {publish_limit, Ratelimit(Val)}; + (["force_gc_policy"], Val) -> [Count, Bytes] = string:tokens(Val, "| "), GcPolicy = case cuttlefish_bytesize:parse(Bytes) of {error, Reason} -> @@ -1055,7 +1077,7 @@ end}. count => list_to_integer(Count)} end, {force_gc_policy, GcPolicy}; - ("force_shutdown_policy", Val) -> + (["force_shutdown_policy"], Val) -> [Len, Siz] = string:tokens(Val, "| "), MaxSiz = case WordSize = erlang:system_info(wordsize) of 8 -> % arch_64 @@ -1074,7 +1096,7 @@ end}. max_heap_size => Siz1 div WordSize} end, {force_shutdown_policy, ShutdownPolicy}; - ("mqueue_priorities", Val) -> + (["mqueue_priorities"], Val) -> case Val of "none" -> {mqueue_priorities, none}; % NO_PRIORITY_TABLE _ -> @@ -1087,19 +1109,35 @@ end}. end, #{}, string:tokens(Val, ",")), {mqueue_priorities, MqueuePriorities} end; - ("mountpoint", Val) -> + (["mountpoint"], Val) -> {mountpoint, iolist_to_binary(Val)}; - ("response_information", Val) -> + (["response_information"], Val) -> {response_information, iolist_to_binary(Val)}; - (Opt, Val) -> + (["conn_rate_limit", "messages_in"], Val) -> + {ratelimit, {conn_messages_in, Ratelimit(Val)}}; + (["conn_rate_limit", "bytes_in"], Val) -> + {ratelimit, {conn_bytes_in, Ratelimit(Val)}}; + (["overall_rate_limit", "messages_in"], Val) -> + {ratelimit, {overall_messages_in, Ratelimit(Val)}}; + (["overall_rate_limit", "bytes_in"], Val) -> + {ratelimit, {overall_bytes_in, Ratelimit(Val)}}; + ([Opt], Val) -> {list_to_atom(Opt), Val} end, maps:to_list( lists:foldl( - fun({["zone", Name, Opt], Val}, Zones) -> + fun({["zone", Name | Opt], Val}, Zones) -> + NVal = Mapping(Opt, Val), maps:update_with(list_to_atom(Name), - fun(Opts) -> [Mapping(Opt, Val)|Opts] end, - [Mapping(Opt, Val)], Zones) + fun(Opts) -> + case NVal of + {ratelimit, Rl} -> + Rls = proplists:get_value(ratelimit, Opts, []), + lists:keystore(ratelimit, 1, Opts, {ratelimit, [Rl|Rls]}); + _ -> + [NVal|Opts] + end + end, [NVal], Zones) end, #{}, lists:usort(cuttlefish_variable:filter_by_prefix("zone.", Conf)))) end}. @@ -1736,8 +1774,7 @@ end}. Secs when is_integer(Secs) -> Secs; {error, Reason1} -> error(Reason1) end, - Rate = Limit / Duration, - {Rate, Limit} + {Limit, Duration} end, LisOpts = fun(Prefix) -> diff --git a/rebar.config b/rebar.config index 1ac3d4c9a..17bc69b45 100644 --- a/rebar.config +++ b/rebar.config @@ -6,7 +6,7 @@ [{gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}}, - {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.1"}}}, + {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.2"}}}, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.3"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.4.1"}}}, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}} diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index e54495a71..ef25c36e0 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -198,7 +198,9 @@ init_state(Transport, Socket, Options) -> Zone = proplists:get_value(zone, Options), ActiveN = proplists:get_value(active_n, Options, ?ACTIVE_N), PubLimit = emqx_zone:publish_limit(Zone), - Limiter = emqx_limiter:init([{pub_limit, PubLimit}|Options]), + BytesIn = proplists:get_value(rate_limit, Options), + RateLimit = emqx_zone:ratelimit(Zone), + Limiter = emqx_limiter:init(Zone, PubLimit, BytesIn, RateLimit), FrameOpts = emqx_zone:mqtt_frame_options(Zone), ParseState = emqx_frame:initial_parse_state(FrameOpts), Serialize = emqx_frame:serialize_fun(), diff --git a/src/emqx_limiter.erl b/src/emqx_limiter.erl index ee456bc45..bea0de2fb 100644 --- a/src/emqx_limiter.erl +++ b/src/emqx_limiter.erl @@ -18,55 +18,137 @@ -include("types.hrl"). --export([init/1, info/1, check/2]). - --import(emqx_misc, [maybe_apply/2]). +-export([ init/2 + , init/4 %% XXX: Compatible with before 4.2 version + , info/1 + , check/2 + ]). -record(limiter, { - %% Publish Limit - pub_limit :: maybe(esockd_rate_limit:bucket()), - %% Rate Limit - rate_limit :: maybe(esockd_rate_limit:bucket()) + %% Zone + zone :: emqx_zone:zone(), + %% All checkers + checkers :: [checker()] }). +-type(checker() :: #{ name := name() + , capacity := non_neg_integer() + , interval := non_neg_integer() + , consumer := function() | esockd_rate_limit:bucket() + }). + +-type(name() :: conn_bytes_in + | conn_messages_in + | overall_bytes_in + | overall_messages_in + ). + +-type(spec() :: {name(), esockd_rate_limit:config()}). + +-type(specs() :: [spec()]). + +-type(info() :: #{name() := + #{tokens := non_neg_integer(), + capacity := non_neg_integer(), + interval := non_neg_integer()}}). + -type(limiter() :: #limiter{}). --export_type([limiter/0]). +%%-------------------------------------------------------------------- +%% APIs +%%-------------------------------------------------------------------- --define(ENABLED(Rl), (Rl =/= undefined)). +-spec(init(emqx_zone:zone(), + maybe(esockd_rate_limit:config()), + maybe(esockd_rate_limit:config()), specs()) + -> maybe(limiter())). +init(Zone, PubLimit, BytesIn, RateLimit) -> + Merged = maps:merge(#{conn_messages_in => PubLimit, + conn_bytes_in => BytesIn}, maps:from_list(RateLimit)), + Filtered = maps:filter(fun(_, V) -> V /= undefined end, Merged), + init(Zone, maps:to_list(Filtered)). --spec(init(proplists:proplist()) -> maybe(limiter())). -init(Options) -> - Pl = proplists:get_value(pub_limit, Options), - Rl = proplists:get_value(rate_limit, Options), - case ?ENABLED(Pl) or ?ENABLED(Rl) of - true -> #limiter{pub_limit = init_limit(Pl), - rate_limit = init_limit(Rl) - }; - false -> undefined +-spec(init(emqx_zone:zone(), specs()) -> maybe(limiter())). +init(_Zone, []) -> + undefined; +init(Zone, Specs) -> + #limiter{zone = Zone, checkers = [do_init_checker(Zone, Spec) || Spec <- Specs]}. + +%% @private +do_init_checker(Zone, {Name, {Capacity, Interval}}) -> + Ck = #{name => Name, capacity => Capacity, interval => Interval}, + case is_overall_limiter(Name) of + true -> + case catch esockd_limiter:lookup({Zone, Name}) of + _Info when is_map(_Info) -> + ignore; + _ -> + esockd_limiter:create({Zone, Name}, Capacity, Interval) + end, + Ck#{consumer => fun(I) -> esockd_limiter:consume({Zone, Name}, I) end}; + _ -> + Ck#{consumer => esockd_rate_limit:new(Capacity / Interval, Capacity)} end. -init_limit(Rl) -> - maybe_apply(fun esockd_rate_limit:new/1, Rl). +-spec(info(limiter()) -> info()). +info(#limiter{zone = Zone, checkers = Cks}) -> + maps:from_list([get_info(Zone, Ck) || Ck <- Cks]). -info(#limiter{pub_limit = Pl, rate_limit = Rl}) -> - #{pub_limit => info(Pl), rate_limit => info(Rl)}; - -info(Rl) -> - maybe_apply(fun esockd_rate_limit:info/1, Rl). - -check(#{cnt := Cnt, oct := Oct}, Limiter = #limiter{pub_limit = Pl, - rate_limit = Rl}) -> - do_check([{#limiter.pub_limit, Cnt, Pl} || ?ENABLED(Pl)] ++ - [{#limiter.rate_limit, Oct, Rl} || ?ENABLED(Rl)], Limiter). - -do_check([], Limiter) -> - {ok, Limiter}; -do_check([{Pos, Cnt, Rl}|More], Limiter) -> - case esockd_rate_limit:check(Cnt, Rl) of - {0, Rl1} -> - do_check(More, setelement(Pos, Limiter, Rl1)); - {Pause, Rl1} -> - {pause, Pause, setelement(Pos, Limiter, Rl1)} +-spec(check(#{cnt := Cnt :: non_neg_integer(), + oct := Oct :: non_neg_integer()}, + Limiter :: limiter()) + -> {ok, NLimiter :: limiter()} + | {pause, MilliSecs :: non_neg_integer(), NLimiter :: limiter()}). +check(#{cnt := Cnt, oct := Oct}, Limiter = #limiter{checkers = Cks}) -> + {Pauses, NCks} = do_check(Cnt, Oct, Cks, [], []), + case lists:max(Pauses) of + I when I > 0 -> + {pause, I, Limiter#limiter{checkers = NCks}}; + _ -> + {ok, Limiter#limiter{checkers = NCks}} end. +%% @private +do_check(_, _, [], Pauses, NCks) -> + {Pauses, lists:reverse(NCks)}; +do_check(Pubs, Bytes, [Ck|More], Pauses, Acc) -> + {I, NConsumer} = consume(Pubs, Bytes, Ck), + do_check(Pubs, Bytes, More, [I|Pauses], [Ck#{consumer := NConsumer}|Acc]). + +%%-------------------------------------------------------------------- +%% Internal funcs +%%-------------------------------------------------------------------- + +consume(Pubs, Bytes, #{name := Name, consumer := Cons}) -> + Tokens = case is_message_limiter(Name) of true -> Pubs; _ -> Bytes end, + case Tokens =:= 0 of + true -> + {0, Cons}; + _ -> + case is_overall_limiter(Name) of + true -> + {_, Intv} = Cons(Tokens), + {Intv, Cons}; + _ -> + esockd_rate_limit:check(Tokens, Cons) + end + end. + +get_info(Zone, #{name := Name, capacity := Cap, + interval := Intv, consumer := Cons}) -> + Info = case is_overall_limiter(Name) of + true -> esockd_limiter:lookup({Zone, Name}); + _ -> esockd_rate_limit:info(Cons) + end, + {Name, #{capacity => Cap, + interval => Intv, + tokens => maps:get(tokens, Info)}}. + +is_overall_limiter(overall_bytes_in) -> true; +is_overall_limiter(overall_messages_in) -> true; +is_overall_limiter(_) -> false. + +is_message_limiter(conn_messages_in) -> true; +is_message_limiter(overall_messages_in) -> true; +is_message_limiter(_) -> false. + diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 25eb8ff6c..7c47c38c8 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -222,7 +222,9 @@ websocket_init([Req, Opts]) -> }, Zone = proplists:get_value(zone, Opts), PubLimit = emqx_zone:publish_limit(Zone), - Limiter = emqx_limiter:init([{pub_limit, PubLimit}|Opts]), + BytesIn = proplists:get_value(rate_limit, Opts), + RateLimit = emqx_zone:ratelimit(Zone), + Limiter = emqx_limiter:init(Zone, PubLimit, BytesIn, RateLimit), ActiveN = proplists:get_value(active_n, Opts, ?ACTIVE_N), FrameOpts = emqx_zone:mqtt_frame_options(Zone), ParseState = emqx_frame:initial_parse_state(FrameOpts), diff --git a/src/emqx_zone.erl b/src/emqx_zone.erl index 311ccaa77..6e49e2210 100644 --- a/src/emqx_zone.erl +++ b/src/emqx_zone.erl @@ -28,6 +28,7 @@ -compile({inline, [ idle_timeout/1 , publish_limit/1 + , ratelimit/1 , mqtt_frame_options/1 , mqtt_strict_mode/1 , max_packet_size/1 @@ -55,7 +56,9 @@ %% Zone Option API -export([ idle_timeout/1 + %% XXX: Dedeprecated at v4.2 , publish_limit/1 + , ratelimit/1 , mqtt_frame_options/1 , mqtt_strict_mode/1 , max_packet_size/1 @@ -137,6 +140,10 @@ idle_timeout(Zone) -> publish_limit(Zone) -> get_env(Zone, publish_limit). +-spec(ratelimit(zone()) -> [emqx_limiter:specs()]). +ratelimit(Zone) -> + get_env(Zone, ratelimit, []). + -spec(mqtt_frame_options(zone()) -> emqx_frame:options()). mqtt_frame_options(Zone) -> #{strict_mode => mqtt_strict_mode(Zone), diff --git a/test/emqx_connection_SUITE.erl b/test/emqx_connection_SUITE.erl index 8e1362b32..a2d300e9b 100644 --- a/test/emqx_connection_SUITE.erl +++ b/test/emqx_connection_SUITE.erl @@ -104,7 +104,7 @@ t_info(_) -> socktype := tcp}, SockInfo). t_info_limiter(_) -> - St = st(#{limiter => emqx_limiter:init([])}), + St = st(#{limiter => emqx_limiter:init(external, [])}), ?assertEqual(undefined, emqx_connection:info(limiter, St)). t_stats(_) -> @@ -279,11 +279,11 @@ t_ensure_rate_limit(_) -> State = emqx_connection:ensure_rate_limit(#{}, st(#{limiter => undefined})), ?assertEqual(undefined, emqx_connection:info(limiter, State)), - ok = meck:expect(emqx_limiter, check, fun(_, _) -> {ok, emqx_limiter:init([])} end), + ok = meck:expect(emqx_limiter, check, fun(_, _) -> {ok, emqx_limiter:init(external, [])} end), State1 = emqx_connection:ensure_rate_limit(#{}, st(#{limiter => #{}})), ?assertEqual(undefined, emqx_connection:info(limiter, State1)), - ok = meck:expect(emqx_limiter, check, fun(_, _) -> {pause, 3000, emqx_limiter:init([])} end), + ok = meck:expect(emqx_limiter, check, fun(_, _) -> {pause, 3000, emqx_limiter:init(external, [])} end), State2 = emqx_connection:ensure_rate_limit(#{}, st(#{limiter => #{}})), ?assertEqual(undefined, emqx_connection:info(limiter, State2)), ?assertEqual(blocked, emqx_connection:info(sockstate, State2)). diff --git a/test/emqx_limiter_SUITE.erl b/test/emqx_limiter_SUITE.erl index 6c76bab8e..75989336a 100644 --- a/test/emqx_limiter_SUITE.erl +++ b/test/emqx_limiter_SUITE.erl @@ -21,38 +21,57 @@ -include_lib("eunit/include/eunit.hrl"). +%%-------------------------------------------------------------------- +%% Setups +%%-------------------------------------------------------------------- + all() -> emqx_ct:all(?MODULE). -init_per_testcase(_TestCase, Config) -> - Config. +init_per_testcase(_, Cfg) -> + {ok, _} = esockd_limiter:start_link(), + Cfg. -end_per_testcase(_TestCase, _Config) -> - ok. +end_per_testcase(_, _) -> + esockd_limiter:stop(). -t_info(_) -> - #{pub_limit := #{rate := 1, - burst := 10, - tokens := 10 - }, - rate_limit := #{rate := 100, - burst := 1000, - tokens := 1000 - } - } = emqx_limiter:info(limiter()). +%%-------------------------------------------------------------------- +%% Cases +%%-------------------------------------------------------------------- -t_check(_) -> - lists:foreach(fun(I) -> - {ok, Limiter} = emqx_limiter:check(#{cnt => I, oct => I*100}, limiter()), - #{pub_limit := #{tokens := Cnt}, - rate_limit := #{tokens := Oct} - } = emqx_limiter:info(Limiter), - ?assertEqual({10 - I, 1000 - I*100}, {Cnt, Oct}) - end, lists:seq(1, 10)). +t_init(_) -> + Cap1 = 1000, Intv1 = 10, + Cap2 = 2000, Intv2 = 15, + undefined = emqx_limiter:init(external, undefined, undefined, []), + ?assertEqual(emqx_limiter:init(external, undefined, undefined, [{conn_messages_in, {Cap1, Intv1}}, + {conn_bytes_in, {Cap2, Intv2}}]), + emqx_limiter:init(external, {Cap1, Intv1}, {Cap2, Intv2}, [])), + #{conn_bytes_in := #{capacity := Cap2, interval := Intv2, tokens := Cap2 }} = + emqx_limiter:info( + emqx_limiter:init(external, undefined, {Cap1, Intv1}, [{conn_bytes_in, {Cap2, Intv2}}])). -t_check_pause(_) -> - {pause, 1000, _} = emqx_limiter:check(#{cnt => 11, oct => 2000}, limiter()), - {pause, 2000, _} = emqx_limiter:check(#{cnt => 10, oct => 1200}, limiter()). +t_check_conn(_) -> + Limiter = emqx_limiter:init(external, [{conn_bytes_in, {100, 1}}]), -limiter() -> - emqx_limiter:init([{pub_limit, {1, 10}}, {rate_limit, {100, 1000}}]). + {ok, Limiter2} = emqx_limiter:check(#{cnt => 0, oct => 1}, Limiter), + #{conn_bytes_in := #{tokens := 99}} = emqx_limiter:info(Limiter2), + + {pause, 10, Limiter3} = emqx_limiter:check(#{cnt => 0, oct => 100}, Limiter), + #{conn_bytes_in := #{tokens := 0}} = emqx_limiter:info(Limiter3), + + {pause, 100000, Limiter4} = emqx_limiter:check(#{cnt => 0, oct => 10000}, Limiter3), + #{conn_bytes_in := #{tokens := 0}} = emqx_limiter:info(Limiter4). + +t_check_overall(_) -> + Limiter = emqx_limiter:init(external, [{overall_bytes_in, {100, 1}}]), + + {ok, Limiter2} = emqx_limiter:check(#{cnt => 0, oct => 1}, Limiter), + #{overall_bytes_in := #{tokens := 99}} = emqx_limiter:info(Limiter2), + + %% XXX: P = 1/r = 1/100 * 1000 = 10ms ? + {pause, 1000, Limiter3} = emqx_limiter:check(#{cnt => 0, oct => 100}, Limiter), + #{overall_bytes_in := #{tokens := 0}} = emqx_limiter:info(Limiter2), + + %% XXX: P = 10000/r = 10000/100 * 1000 = 100s ? + {pause, 1000, Limiter4} = emqx_limiter:check(#{cnt => 0, oct => 10000}, Limiter3), + #{overall_bytes_in := #{tokens := 0}} = emqx_limiter:info(Limiter4). diff --git a/test/emqx_ws_connection_SUITE.erl b/test/emqx_ws_connection_SUITE.erl index eccdcd677..f31da691f 100644 --- a/test/emqx_ws_connection_SUITE.erl +++ b/test/emqx_ws_connection_SUITE.erl @@ -112,7 +112,7 @@ t_info(_) -> } = SockInfo. t_info_limiter(_) -> - St = st(#{limiter => emqx_limiter:init([])}), + St = st(#{limiter => emqx_limiter:init(external, [])}), ?assertEqual(undefined, ?ws_conn:info(limiter, St)). t_info_channel(_) -> @@ -291,9 +291,7 @@ t_handle_timeout_emit_stats(_) -> ?assertEqual(undefined, ?ws_conn:info(stats_timer, St)). t_ensure_rate_limit(_) -> - Limiter = emqx_limiter:init([{pub_limit, {1, 10}}, - {rate_limit, {100, 1000}} - ]), + Limiter = emqx_limiter:init(external, {1, 10}, {100, 1000}, []), St = st(#{limiter => Limiter}), St1 = ?ws_conn:ensure_rate_limit(#{cnt => 0, oct => 0}, St), St2 = ?ws_conn:ensure_rate_limit(#{cnt => 11, oct => 1200}, St1), diff --git a/test/mqtt_protocol_v5_SUITE.erl b/test/mqtt_protocol_v5_SUITE.erl index 2d74ec6fb..b67adb9d1 100644 --- a/test/mqtt_protocol_v5_SUITE.erl +++ b/test/mqtt_protocol_v5_SUITE.erl @@ -193,6 +193,7 @@ t_batch_subscribe(_) -> ?RC_NO_SUBSCRIPTION_EXISTED]} = emqtt:unsubscribe(Client, [<<"t1">>, <<"t2">>, <<"t3">>]), + file:delete(TempAcl), emqtt:disconnect(Client). t_connect_will_retain(_) -> @@ -253,7 +254,7 @@ t_connect_limit_timeout(_) -> end), Topic = nth(1, ?TOPICS), - emqx_zone:set_env(external, publish_limit, {2.0, 3}), + emqx_zone:set_env(external, publish_limit, {3, 5}), {ok, Client} = emqtt:start_link([{proto_ver, v5},{keepalive, 60}]), {ok, _} = emqtt:connect(Client), @@ -263,11 +264,11 @@ t_connect_limit_timeout(_) -> ok = emqtt:publish(Client, Topic, <<"t_shared_subscriptions_client_terminates_when_qos_eq_2">>, 0), ok = emqtt:publish(Client, Topic, <<"t_shared_subscriptions_client_terminates_when_qos_eq_2">>, 0), ok = emqtt:publish(Client, Topic, <<"t_shared_subscriptions_client_terminates_when_qos_eq_2">>, 0), - ok = emqtt:publish(Client, Topic, <<"t_shared_subscriptions_client_terminates_when_qos_eq_2">>, 0), timer:sleep(200), ?assert(is_reference(emqx_connection:info(limit_timer, sys:get_state(ClientPid)))), ok = emqtt:disconnect(Client), + emqx_zone:set_env(external, publish_limit, undefined), meck:unload(proplists). t_connect_emit_stats_timeout(_) ->