feature(quota): add quota confs to limit the number of message forwards
This commit is contained in:
parent
89377aeaa2
commit
98faa698f3
|
@ -846,23 +846,25 @@ zone.external.enable_flapping_detect = off
|
||||||
##
|
##
|
||||||
## Value: Number,Duration
|
## Value: Number,Duration
|
||||||
## Example: 100 messages per 10 seconds.
|
## Example: 100 messages per 10 seconds.
|
||||||
#zone.external.conn_rate_limit.messages_in = 100, 10s
|
#zone.external.rate_limit.conn_messages_in = 100, 10s
|
||||||
|
|
||||||
## Bytes limit for a external MQTT connections.
|
## Bytes limit for a external MQTT connections.
|
||||||
##
|
##
|
||||||
## Value: Number,Duration
|
## Value: Number,Duration
|
||||||
## Example: 100KB incoming per 10 seconds.
|
## Example: 100KB incoming per 10 seconds.
|
||||||
#zone.external.conn_rate_limit.bytes_in = 100KB, 10s
|
#zone.external.rate_limit.conn_bytes_in = 100KB, 10s
|
||||||
|
|
||||||
## Message limit for the all external MQTT connections.
|
## Messages quota for the external MQTT connections. This value
|
||||||
|
## consumed by the number of PUBLISH message forwarding times.
|
||||||
##
|
##
|
||||||
## Value: Number,Duration
|
## This option allows you to configure up to 2 parameters that are split by '|'.
|
||||||
#zone.external.overall_rate_limit.messages_in = 200000, 1s
|
## The former represents the quota per connection, while the latter represents
|
||||||
|
## the quota for all external zone's connections.
|
||||||
## Bytes limit for the all external MQTT connections.
|
|
||||||
##
|
##
|
||||||
## Value: Number,Duration
|
## Value: Number, Duration
|
||||||
#zone.external.overall_rate_limit.bytes_in = 2048MB, 1s
|
##
|
||||||
|
## Example: 100 messaegs per 1s and 200000 messages per 1s in zone
|
||||||
|
#zone.external.quota.routing.messages = 100,1s | 200000,1s
|
||||||
|
|
||||||
## All the topics will be prefixed with the mountpoint path if this option is enabled.
|
## All the topics will be prefixed with the mountpoint path if this option is enabled.
|
||||||
##
|
##
|
||||||
|
|
|
@ -992,19 +992,15 @@ end}.
|
||||||
{default, off}
|
{default, off}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
{mapping, "zone.$name.conn_rate_limit.messages_in", "emqx.zones", [
|
{mapping, "zone.$name.rate_limit.messages_in", "emqx.zones", [
|
||||||
{datatype, string}
|
{datatype, string}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
{mapping, "zone.$name.conn_rate_limit.bytes_in", "emqx.zones", [
|
{mapping, "zone.$name.rate_limit.bytes_in", "emqx.zones", [
|
||||||
{datatype, string}
|
{datatype, string}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
{mapping, "zone.$name.overall_rate_limit.messages_in", "emqx.zones", [
|
{mapping, "zone.$name.quota.routing.messages", "emqx.zones", [
|
||||||
{datatype, string}
|
|
||||||
]}.
|
|
||||||
|
|
||||||
{mapping, "zone.$name.overall_rate_limit.bytes_in", "emqx.zones", [
|
|
||||||
{datatype, string}
|
{datatype, string}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
|
@ -1113,14 +1109,19 @@ end}.
|
||||||
{mountpoint, iolist_to_binary(Val)};
|
{mountpoint, iolist_to_binary(Val)};
|
||||||
(["response_information"], Val) ->
|
(["response_information"], Val) ->
|
||||||
{response_information, iolist_to_binary(Val)};
|
{response_information, iolist_to_binary(Val)};
|
||||||
(["conn_rate_limit", "messages_in"], Val) ->
|
(["rate_limit", "conn_messages_in"], Val) ->
|
||||||
{ratelimit, {conn_messages_in, Ratelimit(Val)}};
|
{ratelimit, {conn_messages_in, Ratelimit(Val)}};
|
||||||
(["conn_rate_limit", "bytes_in"], Val) ->
|
(["rate_limit", "conn_bytes_in"], Val) ->
|
||||||
{ratelimit, {conn_bytes_in, Ratelimit(Val)}};
|
{ratelimit, {conn_bytes_in, Ratelimit(Val)}};
|
||||||
(["overall_rate_limit", "messages_in"], Val) ->
|
(["quota", "routing", "messages"], Val) ->
|
||||||
{ratelimit, {overall_messages_in, Ratelimit(Val)}};
|
Policy = case string:tokens(Val, "|") of
|
||||||
(["overall_rate_limit", "bytes_in"], Val) ->
|
[T] ->
|
||||||
{ratelimit, {overall_bytes_in, Ratelimit(Val)}};
|
[{conn_messages_routing, Ratelimit(T)}];
|
||||||
|
[T1, T2] ->
|
||||||
|
[{conn_messages_routing, Ratelimit(T1)},
|
||||||
|
{overall_messages_routing, Ratelimit(T2)}]
|
||||||
|
end,
|
||||||
|
{quota, Policy};
|
||||||
([Opt], Val) ->
|
([Opt], Val) ->
|
||||||
{list_to_atom(Opt), Val}
|
{list_to_atom(Opt), Val}
|
||||||
end,
|
end,
|
||||||
|
|
|
@ -282,37 +282,31 @@ forward(Node, To, Delivery, sync) ->
|
||||||
|
|
||||||
-spec(dispatch(emqx_topic:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()).
|
-spec(dispatch(emqx_topic:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()).
|
||||||
dispatch(Topic, #delivery{message = Msg}) ->
|
dispatch(Topic, #delivery{message = Msg}) ->
|
||||||
case subscribers(Topic) of
|
DispN = lists:foldl(
|
||||||
[] -> ok = emqx_hooks:run('message.dropped', [Msg, #{node => node()}, no_subscribers]),
|
fun(Sub, N) ->
|
||||||
|
N + dispatch(Sub, Topic, Msg)
|
||||||
|
end, 0, subscribers(Topic)),
|
||||||
|
case DispN of
|
||||||
|
0 ->
|
||||||
|
ok = emqx_hooks:run('message.dropped', [Msg, #{node => node()}, no_subscribers]),
|
||||||
ok = inc_dropped_cnt(Msg),
|
ok = inc_dropped_cnt(Msg),
|
||||||
{error, no_subscribers};
|
{error, no_subscribers};
|
||||||
[Sub] -> %% optimize?
|
_ ->
|
||||||
dispatch(Sub, Topic, Msg);
|
{ok, DispN}
|
||||||
Subs -> lists:foldl(
|
|
||||||
fun(Sub, Res) ->
|
|
||||||
case dispatch(Sub, Topic, Msg) of
|
|
||||||
ok -> Res;
|
|
||||||
Err -> Err
|
|
||||||
end
|
|
||||||
end, ok, Subs)
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
dispatch(SubPid, Topic, Msg) when is_pid(SubPid) ->
|
dispatch(SubPid, Topic, Msg) when is_pid(SubPid) ->
|
||||||
case erlang:is_process_alive(SubPid) of
|
case erlang:is_process_alive(SubPid) of
|
||||||
true ->
|
true ->
|
||||||
SubPid ! {deliver, Topic, Msg},
|
SubPid ! {deliver, Topic, Msg}, 1;
|
||||||
ok;
|
false -> 0
|
||||||
false -> {error, subscriber_die}
|
|
||||||
end;
|
end;
|
||||||
|
|
||||||
dispatch({shard, I}, Topic, Msg) ->
|
dispatch({shard, I}, Topic, Msg) ->
|
||||||
lists:foldl(
|
lists:foldl(
|
||||||
fun(SubPid, Res) ->
|
fun(SubPid, N) ->
|
||||||
case dispatch(SubPid, Topic, Msg) of
|
N + dispatch(SubPid, Topic, Msg)
|
||||||
ok -> Res;
|
end, 0, subscribers({shard, Topic, I})).
|
||||||
Err -> Err
|
|
||||||
end
|
|
||||||
end, ok, subscribers({shard, Topic, I})).
|
|
||||||
|
|
||||||
-compile({inline, [inc_dropped_cnt/1]}).
|
-compile({inline, [inc_dropped_cnt/1]}).
|
||||||
inc_dropped_cnt(Msg) ->
|
inc_dropped_cnt(Msg) ->
|
||||||
|
|
|
@ -73,6 +73,8 @@
|
||||||
alias_maximum :: maybe(map()),
|
alias_maximum :: maybe(map()),
|
||||||
%% Authentication Data Cache
|
%% Authentication Data Cache
|
||||||
auth_cache :: maybe(map()),
|
auth_cache :: maybe(map()),
|
||||||
|
%% Quota checkers
|
||||||
|
quota :: maybe(emqx_limiter:limiter()),
|
||||||
%% Timers
|
%% Timers
|
||||||
timers :: #{atom() => disabled | maybe(reference())},
|
timers :: #{atom() => disabled | maybe(reference())},
|
||||||
%% Conn State
|
%% Conn State
|
||||||
|
@ -103,7 +105,8 @@
|
||||||
retry_timer => retry_delivery,
|
retry_timer => retry_delivery,
|
||||||
await_timer => expire_awaiting_rel,
|
await_timer => expire_awaiting_rel,
|
||||||
expire_timer => expire_session,
|
expire_timer => expire_session,
|
||||||
will_timer => will_message
|
will_timer => will_message,
|
||||||
|
quota_timer => reset_quota_flag
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]).
|
-define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]).
|
||||||
|
@ -167,6 +170,7 @@ init(ConnInfo = #{peername := {PeerHost, _Port},
|
||||||
Peercert = maps:get(peercert, ConnInfo, undefined),
|
Peercert = maps:get(peercert, ConnInfo, undefined),
|
||||||
Protocol = maps:get(protocol, ConnInfo, mqtt),
|
Protocol = maps:get(protocol, ConnInfo, mqtt),
|
||||||
MountPoint = emqx_zone:mountpoint(Zone),
|
MountPoint = emqx_zone:mountpoint(Zone),
|
||||||
|
QuotaPolicy = emqx_zone:quota_policy(Zone),
|
||||||
ClientInfo = setting_peercert_infos(
|
ClientInfo = setting_peercert_infos(
|
||||||
Peercert,
|
Peercert,
|
||||||
#{zone => Zone,
|
#{zone => Zone,
|
||||||
|
@ -185,6 +189,7 @@ init(ConnInfo = #{peername := {PeerHost, _Port},
|
||||||
outbound => #{}
|
outbound => #{}
|
||||||
},
|
},
|
||||||
auth_cache = #{},
|
auth_cache = #{},
|
||||||
|
quota = emqx_limiter:init(Zone, QuotaPolicy),
|
||||||
timers = #{},
|
timers = #{},
|
||||||
conn_state = idle,
|
conn_state = idle,
|
||||||
takeover = false,
|
takeover = false,
|
||||||
|
@ -441,7 +446,8 @@ process_connect(AckProps, Channel = #channel{conninfo = #{clean_start := CleanSt
|
||||||
|
|
||||||
process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId),
|
process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId),
|
||||||
Channel = #channel{clientinfo = #{zone := Zone}}) ->
|
Channel = #channel{clientinfo = #{zone := Zone}}) ->
|
||||||
case pipeline([fun process_alias/2,
|
case pipeline([fun check_quota_exceeded/2,
|
||||||
|
fun process_alias/2,
|
||||||
fun check_pub_alias/2,
|
fun check_pub_alias/2,
|
||||||
fun check_pub_acl/2,
|
fun check_pub_acl/2,
|
||||||
fun check_pub_caps/2
|
fun check_pub_caps/2
|
||||||
|
@ -449,23 +455,35 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId),
|
||||||
{ok, NPacket, NChannel} ->
|
{ok, NPacket, NChannel} ->
|
||||||
Msg = packet_to_message(NPacket, NChannel),
|
Msg = packet_to_message(NPacket, NChannel),
|
||||||
do_publish(PacketId, Msg, NChannel);
|
do_publish(PacketId, Msg, NChannel);
|
||||||
{error, ReasonCode, NChannel} when ReasonCode =:= ?RC_NOT_AUTHORIZED ->
|
{error, Rc = ?RC_NOT_AUTHORIZED, NChannel} ->
|
||||||
?LOG(warning, "Cannot publish message to ~s due to ~s.",
|
?LOG(warning, "Cannot publish message to ~s due to ~s.",
|
||||||
[Topic, emqx_reason_codes:text(ReasonCode)]),
|
[Topic, emqx_reason_codes:text(Rc)]),
|
||||||
case emqx_zone:get_env(Zone, acl_deny_action, ignore) of
|
case emqx_zone:get_env(Zone, acl_deny_action, ignore) of
|
||||||
ignore ->
|
ignore ->
|
||||||
case QoS of
|
case QoS of
|
||||||
?QOS_0 -> {ok, NChannel};
|
?QOS_0 -> {ok, NChannel};
|
||||||
_ ->
|
_ ->
|
||||||
handle_out(puback, {PacketId, ReasonCode}, NChannel)
|
handle_out(puback, {PacketId, Rc}, NChannel)
|
||||||
end;
|
end;
|
||||||
disconnect ->
|
disconnect ->
|
||||||
handle_out(disconnect, ReasonCode, NChannel)
|
handle_out(disconnect, Rc, NChannel)
|
||||||
end;
|
end;
|
||||||
{error, ReasonCode, NChannel} ->
|
{error, Rc = ?RC_QUOTA_EXCEEDED, NChannel} ->
|
||||||
|
?LOG(warning, "Cannot publish messages to ~s due to ~s.",
|
||||||
|
[Topic, emqx_reason_codes:text(Rc)]),
|
||||||
|
case QoS of
|
||||||
|
?QOS_0 ->
|
||||||
|
ok = emqx_metrics:inc('packets.publish.dropped'),
|
||||||
|
{ok, NChannel};
|
||||||
|
?QOS_1 ->
|
||||||
|
handle_out(puback, {PacketId, Rc}, NChannel);
|
||||||
|
?QOS_2 ->
|
||||||
|
handle_out(pubrec, {PacketId, Rc}, NChannel)
|
||||||
|
end;
|
||||||
|
{error, Rc, NChannel} ->
|
||||||
?LOG(warning, "Cannot publish message to ~s due to ~s.",
|
?LOG(warning, "Cannot publish message to ~s due to ~s.",
|
||||||
[Topic, emqx_reason_codes:text(ReasonCode)]),
|
[Topic, emqx_reason_codes:text(Rc)]),
|
||||||
handle_out(disconnect, ReasonCode, NChannel)
|
handle_out(disconnect, Rc, NChannel)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
packet_to_message(Packet, #channel{
|
packet_to_message(Packet, #channel{
|
||||||
|
@ -487,22 +505,24 @@ packet_to_message(Packet, #channel{
|
||||||
peerhost => PeerHost})).
|
peerhost => PeerHost})).
|
||||||
|
|
||||||
do_publish(_PacketId, Msg = #message{qos = ?QOS_0}, Channel) ->
|
do_publish(_PacketId, Msg = #message{qos = ?QOS_0}, Channel) ->
|
||||||
_ = emqx_broker:publish(Msg),
|
Result = emqx_broker:publish(Msg),
|
||||||
{ok, Channel};
|
NChannel = ensure_quota(Result, Channel),
|
||||||
|
{ok, NChannel};
|
||||||
|
|
||||||
do_publish(PacketId, Msg = #message{qos = ?QOS_1}, Channel) ->
|
do_publish(PacketId, Msg = #message{qos = ?QOS_1}, Channel) ->
|
||||||
Results = emqx_broker:publish(Msg),
|
PubRes = emqx_broker:publish(Msg),
|
||||||
RC = puback_reason_code(Results),
|
RC = puback_reason_code(PubRes),
|
||||||
handle_out(puback, {PacketId, RC}, Channel);
|
NChannel = ensure_quota(PubRes, Channel),
|
||||||
|
handle_out(puback, {PacketId, RC}, NChannel);
|
||||||
|
|
||||||
do_publish(PacketId, Msg = #message{qos = ?QOS_2},
|
do_publish(PacketId, Msg = #message{qos = ?QOS_2},
|
||||||
Channel = #channel{session = Session}) ->
|
Channel = #channel{session = Session}) ->
|
||||||
case emqx_session:publish(PacketId, Msg, Session) of
|
case emqx_session:publish(PacketId, Msg, Session) of
|
||||||
{ok, Results, NSession} ->
|
{ok, PubRes, NSession} ->
|
||||||
RC = puback_reason_code(Results),
|
RC = puback_reason_code(PubRes),
|
||||||
NChannel = Channel#channel{session = NSession},
|
NChannel1 = ensure_timer(await_timer, Channel#channel{session = NSession}),
|
||||||
NChannel1 = ensure_timer(await_timer, NChannel),
|
NChannel2 = ensure_quota(PubRes, NChannel1),
|
||||||
handle_out(pubrec, {PacketId, RC}, NChannel1);
|
handle_out(pubrec, {PacketId, RC}, NChannel2);
|
||||||
{error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
{error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
||||||
ok = emqx_metrics:inc('packets.publish.inuse'),
|
ok = emqx_metrics:inc('packets.publish.inuse'),
|
||||||
handle_out(pubrec, {PacketId, RC}, Channel);
|
handle_out(pubrec, {PacketId, RC}, Channel);
|
||||||
|
@ -513,6 +533,21 @@ do_publish(PacketId, Msg = #message{qos = ?QOS_2},
|
||||||
handle_out(pubrec, {PacketId, RC}, Channel)
|
handle_out(pubrec, {PacketId, RC}, Channel)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
ensure_quota(_, Channel = #channel{quota = undefined}) ->
|
||||||
|
Channel;
|
||||||
|
ensure_quota(PubRes, Channel = #channel{quota = Limiter}) ->
|
||||||
|
Cnt = lists:foldl(
|
||||||
|
fun({_, _, ok}, N) -> N + 1;
|
||||||
|
({_, _, {ok, I}}, N) -> N + I;
|
||||||
|
(_, N) -> N
|
||||||
|
end, 1, PubRes),
|
||||||
|
case emqx_limiter:check(#{cnt => Cnt, oct => 0}, Limiter) of
|
||||||
|
{ok, NLimiter} ->
|
||||||
|
Channel#channel{quota = NLimiter};
|
||||||
|
{pause, Intv, NLimiter} ->
|
||||||
|
ensure_timer(quota_timer, Intv, Channel#channel{quota = NLimiter})
|
||||||
|
end.
|
||||||
|
|
||||||
-compile({inline, [puback_reason_code/1]}).
|
-compile({inline, [puback_reason_code/1]}).
|
||||||
puback_reason_code([]) -> ?RC_NO_MATCHING_SUBSCRIBERS;
|
puback_reason_code([]) -> ?RC_NO_MATCHING_SUBSCRIBERS;
|
||||||
puback_reason_code([_|_]) -> ?RC_SUCCESS.
|
puback_reason_code([_|_]) -> ?RC_SUCCESS.
|
||||||
|
@ -927,6 +962,9 @@ handle_timeout(_TRef, will_message, Channel = #channel{will_msg = WillMsg}) ->
|
||||||
(WillMsg =/= undefined) andalso publish_will_msg(WillMsg),
|
(WillMsg =/= undefined) andalso publish_will_msg(WillMsg),
|
||||||
{ok, clean_timer(will_timer, Channel#channel{will_msg = undefined})};
|
{ok, clean_timer(will_timer, Channel#channel{will_msg = undefined})};
|
||||||
|
|
||||||
|
handle_timeout(_TRef, reset_quota_flag, Channel) ->
|
||||||
|
{ok, clean_timer(quota_timer, Channel)};
|
||||||
|
|
||||||
handle_timeout(_TRef, Msg, Channel) ->
|
handle_timeout(_TRef, Msg, Channel) ->
|
||||||
?LOG(error, "Unexpected timeout: ~p~n", [Msg]),
|
?LOG(error, "Unexpected timeout: ~p~n", [Msg]),
|
||||||
{ok, Channel}.
|
{ok, Channel}.
|
||||||
|
@ -1243,6 +1281,15 @@ packing_alias(Packet = #mqtt_packet{
|
||||||
end;
|
end;
|
||||||
packing_alias(Packet, Channel) -> {Packet, Channel}.
|
packing_alias(Packet, Channel) -> {Packet, Channel}.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Check quota state
|
||||||
|
|
||||||
|
check_quota_exceeded(_, #channel{timers = Timers}) ->
|
||||||
|
case maps:get(quota_timer, Timers, undefined) of
|
||||||
|
undefined -> ok;
|
||||||
|
_ -> {error, ?RC_QUOTA_EXCEEDED}
|
||||||
|
end.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Check Pub Alias
|
%% Check Pub Alias
|
||||||
|
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
%% limitations under the License.
|
%% limitations under the License.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
%% Ratelimit or Quota checker
|
||||||
-module(emqx_limiter).
|
-module(emqx_limiter).
|
||||||
|
|
||||||
-include("types.hrl").
|
-include("types.hrl").
|
||||||
|
@ -27,7 +28,7 @@
|
||||||
-record(limiter, {
|
-record(limiter, {
|
||||||
%% Zone
|
%% Zone
|
||||||
zone :: emqx_zone:zone(),
|
zone :: emqx_zone:zone(),
|
||||||
%% All checkers
|
%% Checkers
|
||||||
checkers :: [checker()]
|
checkers :: [checker()]
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
@ -39,8 +40,8 @@
|
||||||
|
|
||||||
-type(name() :: conn_bytes_in
|
-type(name() :: conn_bytes_in
|
||||||
| conn_messages_in
|
| conn_messages_in
|
||||||
| overall_bytes_in
|
| conn_messages_routing
|
||||||
| overall_messages_in
|
| overall_messages_routing
|
||||||
).
|
).
|
||||||
|
|
||||||
-type(spec() :: {name(), esockd_rate_limit:config()}).
|
-type(spec() :: {name(), esockd_rate_limit:config()}).
|
||||||
|
@ -62,9 +63,9 @@
|
||||||
maybe(esockd_rate_limit:config()),
|
maybe(esockd_rate_limit:config()),
|
||||||
maybe(esockd_rate_limit:config()), specs())
|
maybe(esockd_rate_limit:config()), specs())
|
||||||
-> maybe(limiter())).
|
-> maybe(limiter())).
|
||||||
init(Zone, PubLimit, BytesIn, RateLimit) ->
|
init(Zone, PubLimit, BytesIn, Specs) ->
|
||||||
Merged = maps:merge(#{conn_messages_in => PubLimit,
|
Merged = maps:merge(#{conn_messages_in => PubLimit,
|
||||||
conn_bytes_in => BytesIn}, maps:from_list(RateLimit)),
|
conn_bytes_in => BytesIn}, maps:from_list(Specs)),
|
||||||
Filtered = maps:filter(fun(_, V) -> V /= undefined end, Merged),
|
Filtered = maps:filter(fun(_, V) -> V /= undefined end, Merged),
|
||||||
init(Zone, maps:to_list(Filtered)).
|
init(Zone, maps:to_list(Filtered)).
|
||||||
|
|
||||||
|
@ -144,11 +145,10 @@ get_info(Zone, #{name := Name, capacity := Cap,
|
||||||
interval => Intv,
|
interval => Intv,
|
||||||
tokens => maps:get(tokens, Info)}}.
|
tokens => maps:get(tokens, Info)}}.
|
||||||
|
|
||||||
is_overall_limiter(overall_bytes_in) -> true;
|
is_overall_limiter(overall_messages_routing) -> true;
|
||||||
is_overall_limiter(overall_messages_in) -> true;
|
|
||||||
is_overall_limiter(_) -> false.
|
is_overall_limiter(_) -> false.
|
||||||
|
|
||||||
is_message_limiter(conn_messages_in) -> true;
|
is_message_limiter(conn_messages_in) -> true;
|
||||||
is_message_limiter(overall_messages_in) -> true;
|
is_message_limiter(conn_messages_routing) -> true;
|
||||||
|
is_message_limiter(overall_messages_routing) -> true;
|
||||||
is_message_limiter(_) -> false.
|
is_message_limiter(_) -> false.
|
||||||
|
|
||||||
|
|
|
@ -117,8 +117,7 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->
|
||||||
{error, no_subscribers};
|
{error, no_subscribers};
|
||||||
{Type, SubPid} ->
|
{Type, SubPid} ->
|
||||||
case do_dispatch(SubPid, Topic, Msg, Type) of
|
case do_dispatch(SubPid, Topic, Msg, Type) of
|
||||||
ok ->
|
ok -> {ok, 1};
|
||||||
ok;
|
|
||||||
{error, _Reason} ->
|
{error, _Reason} ->
|
||||||
%% Failed to dispatch to this sub, try next.
|
%% Failed to dispatch to this sub, try next.
|
||||||
dispatch(Group, Topic, Delivery, [SubPid | FailedSubs])
|
dispatch(Group, Topic, Delivery, [SubPid | FailedSubs])
|
||||||
|
|
|
@ -193,7 +193,7 @@
|
||||||
-type(banned() :: #banned{}).
|
-type(banned() :: #banned{}).
|
||||||
-type(deliver() :: {deliver, topic(), message()}).
|
-type(deliver() :: {deliver, topic(), message()}).
|
||||||
-type(delivery() :: #delivery{}).
|
-type(delivery() :: #delivery{}).
|
||||||
-type(deliver_result() :: ok | {error, term()}).
|
-type(deliver_result() :: ok | {ok, non_neg_integer()} | {error, term()}).
|
||||||
-type(publish_result() :: [{node(), topic(), deliver_result()} |
|
-type(publish_result() :: [{node(), topic(), deliver_result()} |
|
||||||
{share, topic(), deliver_result()}]).
|
{share, topic(), deliver_result()}]).
|
||||||
-type(route() :: #route{}).
|
-type(route() :: #route{}).
|
||||||
|
|
|
@ -47,6 +47,7 @@
|
||||||
, force_gc_policy/1
|
, force_gc_policy/1
|
||||||
, force_shutdown_policy/1
|
, force_shutdown_policy/1
|
||||||
, response_information/1
|
, response_information/1
|
||||||
|
, quota_policy/1
|
||||||
, get_env/2
|
, get_env/2
|
||||||
, get_env/3
|
, get_env/3
|
||||||
]}).
|
]}).
|
||||||
|
@ -77,6 +78,7 @@
|
||||||
, force_gc_policy/1
|
, force_gc_policy/1
|
||||||
, force_shutdown_policy/1
|
, force_shutdown_policy/1
|
||||||
, response_information/1
|
, response_information/1
|
||||||
|
, quota_policy/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ init_gc_state/1
|
-export([ init_gc_state/1
|
||||||
|
@ -217,6 +219,10 @@ force_shutdown_policy(Zone) ->
|
||||||
response_information(Zone) ->
|
response_information(Zone) ->
|
||||||
get_env(Zone, response_information).
|
get_env(Zone, response_information).
|
||||||
|
|
||||||
|
-spec(quota_policy(zone()) -> emqx_quota:policy()).
|
||||||
|
quota_policy(Zone) ->
|
||||||
|
get_env(Zone, quota, []).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% APIs
|
%% APIs
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -171,7 +171,7 @@ t_handle_in_qos1_publish(_) ->
|
||||||
emqx_channel:handle_in(Publish, channel(#{conn_state => connected})).
|
emqx_channel:handle_in(Publish, channel(#{conn_state => connected})).
|
||||||
|
|
||||||
t_handle_in_qos2_publish(_) ->
|
t_handle_in_qos2_publish(_) ->
|
||||||
ok = meck:expect(emqx_broker, publish, fun(_) -> [{node(), <<"topic">>, 1}] end),
|
ok = meck:expect(emqx_broker, publish, fun(_) -> [{node(), <<"topic">>, {ok, 1}}] end),
|
||||||
Channel = channel(#{conn_state => connected, session => session()}),
|
Channel = channel(#{conn_state => connected, session => session()}),
|
||||||
Publish1 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>),
|
Publish1 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>),
|
||||||
{ok, ?PUBREC_PACKET(1, ?RC_SUCCESS), Channel1} =
|
{ok, ?PUBREC_PACKET(1, ?RC_SUCCESS), Channel1} =
|
||||||
|
@ -355,6 +355,57 @@ t_process_unsubscribe(_) ->
|
||||||
TopicFilters = [{<<"+">>, ?DEFAULT_SUBOPTS}],
|
TopicFilters = [{<<"+">>, ?DEFAULT_SUBOPTS}],
|
||||||
{[?RC_SUCCESS], _Channel} = emqx_channel:process_unsubscribe(TopicFilters, #{}, channel()).
|
{[?RC_SUCCESS], _Channel} = emqx_channel:process_unsubscribe(TopicFilters, #{}, channel()).
|
||||||
|
|
||||||
|
t_quota_qos0(_) ->
|
||||||
|
esockd_limiter:start_link(), Cnter = counters:new(1, []),
|
||||||
|
ok = meck:expect(emqx_broker, publish, fun(_) -> [{node(), <<"topic">>, {ok, 4}}] end),
|
||||||
|
ok = meck:expect(emqx_metrics, inc, fun('packets.publish.dropped') -> counters:add(Cnter, 1, 1) end),
|
||||||
|
ok = meck:expect(emqx_metrics, val, fun('packets.publish.dropped') -> counters:get(Cnter, 1) end),
|
||||||
|
Chann = channel(#{conn_state => connected, quota => quota()}),
|
||||||
|
Pub = ?PUBLISH_PACKET(?QOS_0, <<"topic">>, undefined, <<"payload">>),
|
||||||
|
|
||||||
|
M1 = emqx_metrics:val('packets.publish.dropped'),
|
||||||
|
{ok, Chann1} = emqx_channel:handle_in(Pub, Chann),
|
||||||
|
{ok, Chann2} = emqx_channel:handle_in(Pub, Chann1),
|
||||||
|
M1 = emqx_metrics:val('packets.publish.dropped') - 1,
|
||||||
|
{ok, Chann3} = emqx_channel:handle_timeout(ref, reset_quota_flag, Chann2),
|
||||||
|
{ok, _} = emqx_channel:handle_in(Pub, Chann3),
|
||||||
|
M1 = emqx_metrics:val('packets.publish.dropped') - 1,
|
||||||
|
|
||||||
|
ok = meck:expect(emqx_metrics, inc, fun(_) -> ok end),
|
||||||
|
ok = meck:expect(emqx_metrics, inc, fun(_, _) -> ok end),
|
||||||
|
esockd_limiter:stop().
|
||||||
|
|
||||||
|
t_quota_qos1(_) ->
|
||||||
|
esockd_limiter:start_link(),
|
||||||
|
ok = meck:expect(emqx_broker, publish, fun(_) -> [{node(), <<"topic">>, {ok, 4}}] end),
|
||||||
|
Chann = channel(#{conn_state => connected, quota => quota()}),
|
||||||
|
Pub = ?PUBLISH_PACKET(?QOS_1, <<"topic">>, 1, <<"payload">>),
|
||||||
|
%% Quota per connections
|
||||||
|
{ok, ?PUBACK_PACKET(1, ?RC_SUCCESS), Chann1} = emqx_channel:handle_in(Pub, Chann),
|
||||||
|
{ok, ?PUBACK_PACKET(1, ?RC_QUOTA_EXCEEDED), Chann2} = emqx_channel:handle_in(Pub, Chann1),
|
||||||
|
{ok, Chann3} = emqx_channel:handle_timeout(ref, reset_quota_flag, Chann2),
|
||||||
|
{ok, ?PUBACK_PACKET(1, ?RC_SUCCESS), Chann4} = emqx_channel:handle_in(Pub, Chann3),
|
||||||
|
%% Quota in overall
|
||||||
|
{ok, ?PUBACK_PACKET(1, ?RC_QUOTA_EXCEEDED), _} = emqx_channel:handle_in(Pub, Chann4),
|
||||||
|
esockd_limiter:stop().
|
||||||
|
|
||||||
|
t_quota_qos2(_) ->
|
||||||
|
esockd_limiter:start_link(),
|
||||||
|
ok = meck:expect(emqx_broker, publish, fun(_) -> [{node(), <<"topic">>, {ok, 4}}] end),
|
||||||
|
Chann = channel(#{conn_state => connected, quota => quota()}),
|
||||||
|
Pub1 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>),
|
||||||
|
Pub2 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 2, <<"payload">>),
|
||||||
|
Pub3 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 3, <<"payload">>),
|
||||||
|
Pub4 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 4, <<"payload">>),
|
||||||
|
%% Quota per connections
|
||||||
|
{ok, ?PUBREC_PACKET(1, ?RC_SUCCESS), Chann1} = emqx_channel:handle_in(Pub1, Chann),
|
||||||
|
{ok, ?PUBREC_PACKET(2, ?RC_QUOTA_EXCEEDED), Chann2} = emqx_channel:handle_in(Pub2, Chann1),
|
||||||
|
{ok, Chann3} = emqx_channel:handle_timeout(ref, reset_quota_flag, Chann2),
|
||||||
|
{ok, ?PUBREC_PACKET(3, ?RC_SUCCESS), Chann4} = emqx_channel:handle_in(Pub3, Chann3),
|
||||||
|
%% Quota in overall
|
||||||
|
{ok, ?PUBREC_PACKET(4, ?RC_QUOTA_EXCEEDED), _} = emqx_channel:handle_in(Pub4, Chann4),
|
||||||
|
esockd_limiter:stop().
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Test cases for handle_deliver
|
%% Test cases for handle_deliver
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -697,3 +748,8 @@ session(InitFields) when is_map(InitFields) ->
|
||||||
emqx_session:init(#{zone => channel}, #{receive_maximum => 0}),
|
emqx_session:init(#{zone => channel}, #{receive_maximum => 0}),
|
||||||
InitFields).
|
InitFields).
|
||||||
|
|
||||||
|
%% conn: 5/s; overall: 10/s
|
||||||
|
quota() ->
|
||||||
|
emqx_limiter:init(zone, [{conn_messages_routing, {5, 1}},
|
||||||
|
{overall_messages_routing, {10, 1}}]).
|
||||||
|
|
||||||
|
|
|
@ -62,16 +62,16 @@ t_check_conn(_) ->
|
||||||
#{conn_bytes_in := #{tokens := 0}} = emqx_limiter:info(Limiter4).
|
#{conn_bytes_in := #{tokens := 0}} = emqx_limiter:info(Limiter4).
|
||||||
|
|
||||||
t_check_overall(_) ->
|
t_check_overall(_) ->
|
||||||
Limiter = emqx_limiter:init(external, [{overall_bytes_in, {100, 1}}]),
|
Limiter = emqx_limiter:init(external, [{overall_messages_routing, {100, 1}}]),
|
||||||
|
|
||||||
{ok, Limiter2} = emqx_limiter:check(#{cnt => 0, oct => 1}, Limiter),
|
{ok, Limiter2} = emqx_limiter:check(#{cnt => 1, oct => 0}, Limiter),
|
||||||
#{overall_bytes_in := #{tokens := 99}} = emqx_limiter:info(Limiter2),
|
#{overall_messages_routing := #{tokens := 99}} = emqx_limiter:info(Limiter2),
|
||||||
|
|
||||||
%% XXX: P = 1/r = 1/100 * 1000 = 10ms ?
|
%% XXX: P = 1/r = 1/100 * 1000 = 10ms ?
|
||||||
{pause, 1000, Limiter3} = emqx_limiter:check(#{cnt => 0, oct => 100}, Limiter),
|
{pause, _, Limiter3} = emqx_limiter:check(#{cnt => 100, oct => 0}, Limiter),
|
||||||
#{overall_bytes_in := #{tokens := 0}} = emqx_limiter:info(Limiter2),
|
#{overall_messages_routing := #{tokens := 0}} = emqx_limiter:info(Limiter2),
|
||||||
|
|
||||||
%% XXX: P = 10000/r = 10000/100 * 1000 = 100s ?
|
%% XXX: P = 10000/r = 10000/100 * 1000 = 100s ?
|
||||||
{pause, 1000, Limiter4} = emqx_limiter:check(#{cnt => 0, oct => 10000}, Limiter3),
|
{pause, _, Limiter4} = emqx_limiter:check(#{cnt => 10000, oct => 0}, Limiter3),
|
||||||
#{overall_bytes_in := #{tokens := 0}} = emqx_limiter:info(Limiter4).
|
#{overall_messages_routing := #{tokens := 0}} = emqx_limiter:info(Limiter4).
|
||||||
|
|
||||||
|
|
|
@ -252,7 +252,7 @@ t_dispatch(_) ->
|
||||||
Topic = <<"foo">>,
|
Topic = <<"foo">>,
|
||||||
?assertEqual({error, no_subscribers}, emqx_shared_sub:dispatch(<<"group1">>, Topic, #delivery{message = #message{}})),
|
?assertEqual({error, no_subscribers}, emqx_shared_sub:dispatch(<<"group1">>, Topic, #delivery{message = #message{}})),
|
||||||
emqx:subscribe(Topic, #{qos => 2, share => <<"group1">>}),
|
emqx:subscribe(Topic, #{qos => 2, share => <<"group1">>}),
|
||||||
?assertEqual(ok, emqx_shared_sub:dispatch(<<"group1">>, Topic, #delivery{message = #message{}})).
|
?assertEqual({ok, 1}, emqx_shared_sub:dispatch(<<"group1">>, Topic, #delivery{message = #message{}})).
|
||||||
|
|
||||||
% t_unsubscribe(_) ->
|
% t_unsubscribe(_) ->
|
||||||
% error('TODO').
|
% error('TODO').
|
||||||
|
|
Loading…
Reference in New Issue