From 98faa698f32a7d7930e4e4a3c8b329d04a464f45 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Sat, 8 Aug 2020 14:51:38 +0800 Subject: [PATCH] feature(quota): add quota confs to limit the number of message forwards --- etc/emqx.conf | 20 ++++---- priv/emqx.schema | 27 +++++------ src/emqx_broker.erl | 38 +++++++-------- src/emqx_channel.erl | 85 ++++++++++++++++++++++++++-------- src/emqx_limiter.erl | 18 +++---- src/emqx_shared_sub.erl | 3 +- src/emqx_types.erl | 2 +- src/emqx_zone.erl | 6 +++ test/emqx_channel_SUITE.erl | 58 ++++++++++++++++++++++- test/emqx_limiter_SUITE.erl | 14 +++--- test/emqx_shared_sub_SUITE.erl | 2 +- 11 files changed, 189 insertions(+), 84 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index 6b67f34b9..59ae3be38 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -846,23 +846,25 @@ zone.external.enable_flapping_detect = off ## ## Value: Number,Duration ## Example: 100 messages per 10 seconds. -#zone.external.conn_rate_limit.messages_in = 100, 10s +#zone.external.rate_limit.conn_messages_in = 100, 10s ## Bytes limit for a external MQTT connections. ## ## Value: Number,Duration ## Example: 100KB incoming per 10 seconds. -#zone.external.conn_rate_limit.bytes_in = 100KB, 10s +#zone.external.rate_limit.conn_bytes_in = 100KB, 10s -## Message limit for the all external MQTT connections. +## Messages quota for the external MQTT connections. This value +## consumed by the number of PUBLISH message forwarding times. ## -## Value: Number,Duration -#zone.external.overall_rate_limit.messages_in = 200000, 1s - -## Bytes limit for the all external MQTT connections. +## This option allows you to configure up to 2 parameters that are split by '|'. +## The former represents the quota per connection, while the latter represents +## the quota for all external zone's connections. ## -## Value: Number,Duration -#zone.external.overall_rate_limit.bytes_in = 2048MB, 1s +## Value: Number, Duration +## +## Example: 100 messaegs per 1s and 200000 messages per 1s in zone +#zone.external.quota.routing.messages = 100,1s | 200000,1s ## All the topics will be prefixed with the mountpoint path if this option is enabled. ## diff --git a/priv/emqx.schema b/priv/emqx.schema index 20d2a09a0..38aa55998 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -992,19 +992,15 @@ end}. {default, off} ]}. -{mapping, "zone.$name.conn_rate_limit.messages_in", "emqx.zones", [ +{mapping, "zone.$name.rate_limit.messages_in", "emqx.zones", [ {datatype, string} ]}. -{mapping, "zone.$name.conn_rate_limit.bytes_in", "emqx.zones", [ +{mapping, "zone.$name.rate_limit.bytes_in", "emqx.zones", [ {datatype, string} ]}. -{mapping, "zone.$name.overall_rate_limit.messages_in", "emqx.zones", [ - {datatype, string} -]}. - -{mapping, "zone.$name.overall_rate_limit.bytes_in", "emqx.zones", [ +{mapping, "zone.$name.quota.routing.messages", "emqx.zones", [ {datatype, string} ]}. @@ -1113,14 +1109,19 @@ end}. {mountpoint, iolist_to_binary(Val)}; (["response_information"], Val) -> {response_information, iolist_to_binary(Val)}; - (["conn_rate_limit", "messages_in"], Val) -> + (["rate_limit", "conn_messages_in"], Val) -> {ratelimit, {conn_messages_in, Ratelimit(Val)}}; - (["conn_rate_limit", "bytes_in"], Val) -> + (["rate_limit", "conn_bytes_in"], Val) -> {ratelimit, {conn_bytes_in, Ratelimit(Val)}}; - (["overall_rate_limit", "messages_in"], Val) -> - {ratelimit, {overall_messages_in, Ratelimit(Val)}}; - (["overall_rate_limit", "bytes_in"], Val) -> - {ratelimit, {overall_bytes_in, Ratelimit(Val)}}; + (["quota", "routing", "messages"], Val) -> + Policy = case string:tokens(Val, "|") of + [T] -> + [{conn_messages_routing, Ratelimit(T)}]; + [T1, T2] -> + [{conn_messages_routing, Ratelimit(T1)}, + {overall_messages_routing, Ratelimit(T2)}] + end, + {quota, Policy}; ([Opt], Val) -> {list_to_atom(Opt), Val} end, diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index c5d9c6793..85c6ad070 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -282,37 +282,31 @@ forward(Node, To, Delivery, sync) -> -spec(dispatch(emqx_topic:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()). dispatch(Topic, #delivery{message = Msg}) -> - case subscribers(Topic) of - [] -> ok = emqx_hooks:run('message.dropped', [Msg, #{node => node()}, no_subscribers]), - ok = inc_dropped_cnt(Msg), - {error, no_subscribers}; - [Sub] -> %% optimize? - dispatch(Sub, Topic, Msg); - Subs -> lists:foldl( - fun(Sub, Res) -> - case dispatch(Sub, Topic, Msg) of - ok -> Res; - Err -> Err - end - end, ok, Subs) + DispN = lists:foldl( + fun(Sub, N) -> + N + dispatch(Sub, Topic, Msg) + end, 0, subscribers(Topic)), + case DispN of + 0 -> + ok = emqx_hooks:run('message.dropped', [Msg, #{node => node()}, no_subscribers]), + ok = inc_dropped_cnt(Msg), + {error, no_subscribers}; + _ -> + {ok, DispN} end. dispatch(SubPid, Topic, Msg) when is_pid(SubPid) -> case erlang:is_process_alive(SubPid) of true -> - SubPid ! {deliver, Topic, Msg}, - ok; - false -> {error, subscriber_die} + SubPid ! {deliver, Topic, Msg}, 1; + false -> 0 end; dispatch({shard, I}, Topic, Msg) -> lists:foldl( - fun(SubPid, Res) -> - case dispatch(SubPid, Topic, Msg) of - ok -> Res; - Err -> Err - end - end, ok, subscribers({shard, Topic, I})). + fun(SubPid, N) -> + N + dispatch(SubPid, Topic, Msg) + end, 0, subscribers({shard, Topic, I})). -compile({inline, [inc_dropped_cnt/1]}). inc_dropped_cnt(Msg) -> diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index c0a22474b..6259a0f55 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -73,6 +73,8 @@ alias_maximum :: maybe(map()), %% Authentication Data Cache auth_cache :: maybe(map()), + %% Quota checkers + quota :: maybe(emqx_limiter:limiter()), %% Timers timers :: #{atom() => disabled | maybe(reference())}, %% Conn State @@ -103,7 +105,8 @@ retry_timer => retry_delivery, await_timer => expire_awaiting_rel, expire_timer => expire_session, - will_timer => will_message + will_timer => will_message, + quota_timer => reset_quota_flag }). -define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]). @@ -167,6 +170,7 @@ init(ConnInfo = #{peername := {PeerHost, _Port}, Peercert = maps:get(peercert, ConnInfo, undefined), Protocol = maps:get(protocol, ConnInfo, mqtt), MountPoint = emqx_zone:mountpoint(Zone), + QuotaPolicy = emqx_zone:quota_policy(Zone), ClientInfo = setting_peercert_infos( Peercert, #{zone => Zone, @@ -185,6 +189,7 @@ init(ConnInfo = #{peername := {PeerHost, _Port}, outbound => #{} }, auth_cache = #{}, + quota = emqx_limiter:init(Zone, QuotaPolicy), timers = #{}, conn_state = idle, takeover = false, @@ -441,7 +446,8 @@ process_connect(AckProps, Channel = #channel{conninfo = #{clean_start := CleanSt process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel = #channel{clientinfo = #{zone := Zone}}) -> - case pipeline([fun process_alias/2, + case pipeline([fun check_quota_exceeded/2, + fun process_alias/2, fun check_pub_alias/2, fun check_pub_acl/2, fun check_pub_caps/2 @@ -449,23 +455,35 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), {ok, NPacket, NChannel} -> Msg = packet_to_message(NPacket, NChannel), do_publish(PacketId, Msg, NChannel); - {error, ReasonCode, NChannel} when ReasonCode =:= ?RC_NOT_AUTHORIZED -> + {error, Rc = ?RC_NOT_AUTHORIZED, NChannel} -> ?LOG(warning, "Cannot publish message to ~s due to ~s.", - [Topic, emqx_reason_codes:text(ReasonCode)]), + [Topic, emqx_reason_codes:text(Rc)]), case emqx_zone:get_env(Zone, acl_deny_action, ignore) of ignore -> case QoS of ?QOS_0 -> {ok, NChannel}; _ -> - handle_out(puback, {PacketId, ReasonCode}, NChannel) + handle_out(puback, {PacketId, Rc}, NChannel) end; disconnect -> - handle_out(disconnect, ReasonCode, NChannel) + handle_out(disconnect, Rc, NChannel) end; - {error, ReasonCode, NChannel} -> + {error, Rc = ?RC_QUOTA_EXCEEDED, NChannel} -> + ?LOG(warning, "Cannot publish messages to ~s due to ~s.", + [Topic, emqx_reason_codes:text(Rc)]), + case QoS of + ?QOS_0 -> + ok = emqx_metrics:inc('packets.publish.dropped'), + {ok, NChannel}; + ?QOS_1 -> + handle_out(puback, {PacketId, Rc}, NChannel); + ?QOS_2 -> + handle_out(pubrec, {PacketId, Rc}, NChannel) + end; + {error, Rc, NChannel} -> ?LOG(warning, "Cannot publish message to ~s due to ~s.", - [Topic, emqx_reason_codes:text(ReasonCode)]), - handle_out(disconnect, ReasonCode, NChannel) + [Topic, emqx_reason_codes:text(Rc)]), + handle_out(disconnect, Rc, NChannel) end. packet_to_message(Packet, #channel{ @@ -487,22 +505,24 @@ packet_to_message(Packet, #channel{ peerhost => PeerHost})). do_publish(_PacketId, Msg = #message{qos = ?QOS_0}, Channel) -> - _ = emqx_broker:publish(Msg), - {ok, Channel}; + Result = emqx_broker:publish(Msg), + NChannel = ensure_quota(Result, Channel), + {ok, NChannel}; do_publish(PacketId, Msg = #message{qos = ?QOS_1}, Channel) -> - Results = emqx_broker:publish(Msg), - RC = puback_reason_code(Results), - handle_out(puback, {PacketId, RC}, Channel); + PubRes = emqx_broker:publish(Msg), + RC = puback_reason_code(PubRes), + NChannel = ensure_quota(PubRes, Channel), + handle_out(puback, {PacketId, RC}, NChannel); do_publish(PacketId, Msg = #message{qos = ?QOS_2}, Channel = #channel{session = Session}) -> case emqx_session:publish(PacketId, Msg, Session) of - {ok, Results, NSession} -> - RC = puback_reason_code(Results), - NChannel = Channel#channel{session = NSession}, - NChannel1 = ensure_timer(await_timer, NChannel), - handle_out(pubrec, {PacketId, RC}, NChannel1); + {ok, PubRes, NSession} -> + RC = puback_reason_code(PubRes), + NChannel1 = ensure_timer(await_timer, Channel#channel{session = NSession}), + NChannel2 = ensure_quota(PubRes, NChannel1), + handle_out(pubrec, {PacketId, RC}, NChannel2); {error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} -> ok = emqx_metrics:inc('packets.publish.inuse'), handle_out(pubrec, {PacketId, RC}, Channel); @@ -513,6 +533,21 @@ do_publish(PacketId, Msg = #message{qos = ?QOS_2}, handle_out(pubrec, {PacketId, RC}, Channel) end. +ensure_quota(_, Channel = #channel{quota = undefined}) -> + Channel; +ensure_quota(PubRes, Channel = #channel{quota = Limiter}) -> + Cnt = lists:foldl( + fun({_, _, ok}, N) -> N + 1; + ({_, _, {ok, I}}, N) -> N + I; + (_, N) -> N + end, 1, PubRes), + case emqx_limiter:check(#{cnt => Cnt, oct => 0}, Limiter) of + {ok, NLimiter} -> + Channel#channel{quota = NLimiter}; + {pause, Intv, NLimiter} -> + ensure_timer(quota_timer, Intv, Channel#channel{quota = NLimiter}) + end. + -compile({inline, [puback_reason_code/1]}). puback_reason_code([]) -> ?RC_NO_MATCHING_SUBSCRIBERS; puback_reason_code([_|_]) -> ?RC_SUCCESS. @@ -927,6 +962,9 @@ handle_timeout(_TRef, will_message, Channel = #channel{will_msg = WillMsg}) -> (WillMsg =/= undefined) andalso publish_will_msg(WillMsg), {ok, clean_timer(will_timer, Channel#channel{will_msg = undefined})}; +handle_timeout(_TRef, reset_quota_flag, Channel) -> + {ok, clean_timer(quota_timer, Channel)}; + handle_timeout(_TRef, Msg, Channel) -> ?LOG(error, "Unexpected timeout: ~p~n", [Msg]), {ok, Channel}. @@ -1243,6 +1281,15 @@ packing_alias(Packet = #mqtt_packet{ end; packing_alias(Packet, Channel) -> {Packet, Channel}. +%%-------------------------------------------------------------------- +%% Check quota state + +check_quota_exceeded(_, #channel{timers = Timers}) -> + case maps:get(quota_timer, Timers, undefined) of + undefined -> ok; + _ -> {error, ?RC_QUOTA_EXCEEDED} + end. + %%-------------------------------------------------------------------- %% Check Pub Alias diff --git a/src/emqx_limiter.erl b/src/emqx_limiter.erl index bea0de2fb..5d0d4fe52 100644 --- a/src/emqx_limiter.erl +++ b/src/emqx_limiter.erl @@ -14,6 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- +%% Ratelimit or Quota checker -module(emqx_limiter). -include("types.hrl"). @@ -27,7 +28,7 @@ -record(limiter, { %% Zone zone :: emqx_zone:zone(), - %% All checkers + %% Checkers checkers :: [checker()] }). @@ -39,8 +40,8 @@ -type(name() :: conn_bytes_in | conn_messages_in - | overall_bytes_in - | overall_messages_in + | conn_messages_routing + | overall_messages_routing ). -type(spec() :: {name(), esockd_rate_limit:config()}). @@ -62,9 +63,9 @@ maybe(esockd_rate_limit:config()), maybe(esockd_rate_limit:config()), specs()) -> maybe(limiter())). -init(Zone, PubLimit, BytesIn, RateLimit) -> +init(Zone, PubLimit, BytesIn, Specs) -> Merged = maps:merge(#{conn_messages_in => PubLimit, - conn_bytes_in => BytesIn}, maps:from_list(RateLimit)), + conn_bytes_in => BytesIn}, maps:from_list(Specs)), Filtered = maps:filter(fun(_, V) -> V /= undefined end, Merged), init(Zone, maps:to_list(Filtered)). @@ -144,11 +145,10 @@ get_info(Zone, #{name := Name, capacity := Cap, interval => Intv, tokens => maps:get(tokens, Info)}}. -is_overall_limiter(overall_bytes_in) -> true; -is_overall_limiter(overall_messages_in) -> true; +is_overall_limiter(overall_messages_routing) -> true; is_overall_limiter(_) -> false. is_message_limiter(conn_messages_in) -> true; -is_message_limiter(overall_messages_in) -> true; +is_message_limiter(conn_messages_routing) -> true; +is_message_limiter(overall_messages_routing) -> true; is_message_limiter(_) -> false. - diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index 3563b1513..71372cd90 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -117,8 +117,7 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) -> {error, no_subscribers}; {Type, SubPid} -> case do_dispatch(SubPid, Topic, Msg, Type) of - ok -> - ok; + ok -> {ok, 1}; {error, _Reason} -> %% Failed to dispatch to this sub, try next. dispatch(Group, Topic, Delivery, [SubPid | FailedSubs]) diff --git a/src/emqx_types.erl b/src/emqx_types.erl index 8de122bc6..9bc1d2e87 100644 --- a/src/emqx_types.erl +++ b/src/emqx_types.erl @@ -193,7 +193,7 @@ -type(banned() :: #banned{}). -type(deliver() :: {deliver, topic(), message()}). -type(delivery() :: #delivery{}). --type(deliver_result() :: ok | {error, term()}). +-type(deliver_result() :: ok | {ok, non_neg_integer()} | {error, term()}). -type(publish_result() :: [{node(), topic(), deliver_result()} | {share, topic(), deliver_result()}]). -type(route() :: #route{}). diff --git a/src/emqx_zone.erl b/src/emqx_zone.erl index 6e49e2210..6cec6c1ee 100644 --- a/src/emqx_zone.erl +++ b/src/emqx_zone.erl @@ -47,6 +47,7 @@ , force_gc_policy/1 , force_shutdown_policy/1 , response_information/1 + , quota_policy/1 , get_env/2 , get_env/3 ]}). @@ -77,6 +78,7 @@ , force_gc_policy/1 , force_shutdown_policy/1 , response_information/1 + , quota_policy/1 ]). -export([ init_gc_state/1 @@ -217,6 +219,10 @@ force_shutdown_policy(Zone) -> response_information(Zone) -> get_env(Zone, response_information). +-spec(quota_policy(zone()) -> emqx_quota:policy()). +quota_policy(Zone) -> + get_env(Zone, quota, []). + %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- diff --git a/test/emqx_channel_SUITE.erl b/test/emqx_channel_SUITE.erl index bb6374d8c..fe269c04d 100644 --- a/test/emqx_channel_SUITE.erl +++ b/test/emqx_channel_SUITE.erl @@ -171,7 +171,7 @@ t_handle_in_qos1_publish(_) -> emqx_channel:handle_in(Publish, channel(#{conn_state => connected})). t_handle_in_qos2_publish(_) -> - ok = meck:expect(emqx_broker, publish, fun(_) -> [{node(), <<"topic">>, 1}] end), + ok = meck:expect(emqx_broker, publish, fun(_) -> [{node(), <<"topic">>, {ok, 1}}] end), Channel = channel(#{conn_state => connected, session => session()}), Publish1 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>), {ok, ?PUBREC_PACKET(1, ?RC_SUCCESS), Channel1} = @@ -355,6 +355,57 @@ t_process_unsubscribe(_) -> TopicFilters = [{<<"+">>, ?DEFAULT_SUBOPTS}], {[?RC_SUCCESS], _Channel} = emqx_channel:process_unsubscribe(TopicFilters, #{}, channel()). +t_quota_qos0(_) -> + esockd_limiter:start_link(), Cnter = counters:new(1, []), + ok = meck:expect(emqx_broker, publish, fun(_) -> [{node(), <<"topic">>, {ok, 4}}] end), + ok = meck:expect(emqx_metrics, inc, fun('packets.publish.dropped') -> counters:add(Cnter, 1, 1) end), + ok = meck:expect(emqx_metrics, val, fun('packets.publish.dropped') -> counters:get(Cnter, 1) end), + Chann = channel(#{conn_state => connected, quota => quota()}), + Pub = ?PUBLISH_PACKET(?QOS_0, <<"topic">>, undefined, <<"payload">>), + + M1 = emqx_metrics:val('packets.publish.dropped'), + {ok, Chann1} = emqx_channel:handle_in(Pub, Chann), + {ok, Chann2} = emqx_channel:handle_in(Pub, Chann1), + M1 = emqx_metrics:val('packets.publish.dropped') - 1, + {ok, Chann3} = emqx_channel:handle_timeout(ref, reset_quota_flag, Chann2), + {ok, _} = emqx_channel:handle_in(Pub, Chann3), + M1 = emqx_metrics:val('packets.publish.dropped') - 1, + + ok = meck:expect(emqx_metrics, inc, fun(_) -> ok end), + ok = meck:expect(emqx_metrics, inc, fun(_, _) -> ok end), + esockd_limiter:stop(). + +t_quota_qos1(_) -> + esockd_limiter:start_link(), + ok = meck:expect(emqx_broker, publish, fun(_) -> [{node(), <<"topic">>, {ok, 4}}] end), + Chann = channel(#{conn_state => connected, quota => quota()}), + Pub = ?PUBLISH_PACKET(?QOS_1, <<"topic">>, 1, <<"payload">>), + %% Quota per connections + {ok, ?PUBACK_PACKET(1, ?RC_SUCCESS), Chann1} = emqx_channel:handle_in(Pub, Chann), + {ok, ?PUBACK_PACKET(1, ?RC_QUOTA_EXCEEDED), Chann2} = emqx_channel:handle_in(Pub, Chann1), + {ok, Chann3} = emqx_channel:handle_timeout(ref, reset_quota_flag, Chann2), + {ok, ?PUBACK_PACKET(1, ?RC_SUCCESS), Chann4} = emqx_channel:handle_in(Pub, Chann3), + %% Quota in overall + {ok, ?PUBACK_PACKET(1, ?RC_QUOTA_EXCEEDED), _} = emqx_channel:handle_in(Pub, Chann4), + esockd_limiter:stop(). + +t_quota_qos2(_) -> + esockd_limiter:start_link(), + ok = meck:expect(emqx_broker, publish, fun(_) -> [{node(), <<"topic">>, {ok, 4}}] end), + Chann = channel(#{conn_state => connected, quota => quota()}), + Pub1 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>), + Pub2 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 2, <<"payload">>), + Pub3 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 3, <<"payload">>), + Pub4 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 4, <<"payload">>), + %% Quota per connections + {ok, ?PUBREC_PACKET(1, ?RC_SUCCESS), Chann1} = emqx_channel:handle_in(Pub1, Chann), + {ok, ?PUBREC_PACKET(2, ?RC_QUOTA_EXCEEDED), Chann2} = emqx_channel:handle_in(Pub2, Chann1), + {ok, Chann3} = emqx_channel:handle_timeout(ref, reset_quota_flag, Chann2), + {ok, ?PUBREC_PACKET(3, ?RC_SUCCESS), Chann4} = emqx_channel:handle_in(Pub3, Chann3), + %% Quota in overall + {ok, ?PUBREC_PACKET(4, ?RC_QUOTA_EXCEEDED), _} = emqx_channel:handle_in(Pub4, Chann4), + esockd_limiter:stop(). + %%-------------------------------------------------------------------- %% Test cases for handle_deliver %%-------------------------------------------------------------------- @@ -697,3 +748,8 @@ session(InitFields) when is_map(InitFields) -> emqx_session:init(#{zone => channel}, #{receive_maximum => 0}), InitFields). +%% conn: 5/s; overall: 10/s +quota() -> + emqx_limiter:init(zone, [{conn_messages_routing, {5, 1}}, + {overall_messages_routing, {10, 1}}]). + diff --git a/test/emqx_limiter_SUITE.erl b/test/emqx_limiter_SUITE.erl index 0d501135b..a818e0d15 100644 --- a/test/emqx_limiter_SUITE.erl +++ b/test/emqx_limiter_SUITE.erl @@ -62,16 +62,16 @@ t_check_conn(_) -> #{conn_bytes_in := #{tokens := 0}} = emqx_limiter:info(Limiter4). t_check_overall(_) -> - Limiter = emqx_limiter:init(external, [{overall_bytes_in, {100, 1}}]), + Limiter = emqx_limiter:init(external, [{overall_messages_routing, {100, 1}}]), - {ok, Limiter2} = emqx_limiter:check(#{cnt => 0, oct => 1}, Limiter), - #{overall_bytes_in := #{tokens := 99}} = emqx_limiter:info(Limiter2), + {ok, Limiter2} = emqx_limiter:check(#{cnt => 1, oct => 0}, Limiter), + #{overall_messages_routing := #{tokens := 99}} = emqx_limiter:info(Limiter2), %% XXX: P = 1/r = 1/100 * 1000 = 10ms ? - {pause, 1000, Limiter3} = emqx_limiter:check(#{cnt => 0, oct => 100}, Limiter), - #{overall_bytes_in := #{tokens := 0}} = emqx_limiter:info(Limiter2), + {pause, _, Limiter3} = emqx_limiter:check(#{cnt => 100, oct => 0}, Limiter), + #{overall_messages_routing := #{tokens := 0}} = emqx_limiter:info(Limiter2), %% XXX: P = 10000/r = 10000/100 * 1000 = 100s ? - {pause, 1000, Limiter4} = emqx_limiter:check(#{cnt => 0, oct => 10000}, Limiter3), - #{overall_bytes_in := #{tokens := 0}} = emqx_limiter:info(Limiter4). + {pause, _, Limiter4} = emqx_limiter:check(#{cnt => 10000, oct => 0}, Limiter3), + #{overall_messages_routing := #{tokens := 0}} = emqx_limiter:info(Limiter4). diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl index 22382df6c..218e8a073 100644 --- a/test/emqx_shared_sub_SUITE.erl +++ b/test/emqx_shared_sub_SUITE.erl @@ -252,7 +252,7 @@ t_dispatch(_) -> Topic = <<"foo">>, ?assertEqual({error, no_subscribers}, emqx_shared_sub:dispatch(<<"group1">>, Topic, #delivery{message = #message{}})), emqx:subscribe(Topic, #{qos => 2, share => <<"group1">>}), - ?assertEqual(ok, emqx_shared_sub:dispatch(<<"group1">>, Topic, #delivery{message = #message{}})). + ?assertEqual({ok, 1}, emqx_shared_sub:dispatch(<<"group1">>, Topic, #delivery{message = #message{}})). % t_unsubscribe(_) -> % error('TODO').