diff --git a/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl b/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl
index 372944666..2a4e13731 100644
--- a/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl
+++ b/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl
@@ -43,7 +43,6 @@
%% retry contenxt
%% undefined meaning no retry context or no need to retry
-
, retry_ctx => undefined
| retry_context(token_bucket_limiter()) %% the retry context
, atom => any() %% allow to add other keys
@@ -108,8 +107,6 @@
-import(emqx_limiter_decimal, [sub/2, mul/2, floor_div/2, add/2]).
--elvis([{elvis_style, no_if_expression, disable}]).
-
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
@@ -308,18 +305,19 @@ do_reset(Need,
capacity := Capacity} = Limiter) ->
Now = ?NOW,
Tokens2 = apply_elapsed_time(Rate, Now - LastTime, Tokens, Capacity),
- Available = erlang:floor(Tokens2),
- if Available >= Need ->
+
+ case erlang:floor(Tokens2) of
+ Available when Available >= Need ->
Limiter2 = Limiter#{tokens := Tokens2, lasttime := Now},
do_check_with_parent_limiter(Need, Limiter2);
- Divisible andalso Available > 0 ->
+ Available when Divisible andalso Available > 0 ->
%% must be allocated here, because may be Need > Capacity
return_pause(Rate,
partial,
fun do_reset/2,
Need - Available,
Limiter#{tokens := 0, lasttime := Now});
- true ->
+ _ ->
return_pause(Rate, pause, fun do_reset/2, Need, Limiter)
end.
diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl
index b9b25e806..3f313e7e4 100644
--- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl
+++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl
@@ -53,9 +53,10 @@
, capacity/0
, initial/0
, failure_strategy/0
+ , bucket_name/0
]).
--export_type([limiter_type/0, bucket_name/0, bucket_path/0]).
+-export_type([limiter_type/0, bucket_path/0]).
-import(emqx_schema, [sc/2, map/2]).
-define(UNIT_TIME_IN_MS, 1000).
@@ -65,24 +66,58 @@ namespace() -> limiter.
roots() -> [limiter].
fields(limiter) ->
- [ {bytes_in, sc(ref(limiter_opts), #{description => <<"Limiter of message publish bytes">>})}
- , {message_in, sc(ref(limiter_opts), #{description => <<"Message publish limiter">>})}
- , {connection, sc(ref(limiter_opts), #{description => <<"Connection limiter">>})}
- , {message_routing, sc(ref(limiter_opts), #{description => <<"Deliver limiter">>})}
+ [ {bytes_in, sc(ref(limiter_opts),
+ #{description =>
+ <<"The bytes_in limiter.
"
+ "It is used to limit the inbound bytes rate for this EMQX node."
+ "If the this limiter limit is reached,"
+ "the restricted client will be slow down even be hung for a while.">>
+ })}
+ , {message_in, sc(ref(limiter_opts),
+ #{description =>
+ <<"The message_in limiter.
"
+ "This is used to limit the inbound message numbers for this EMQX node"
+ "If the this limiter limit is reached,"
+ "the restricted client will be slow down even be hung for a while.">>
+ })}
+ , {connection, sc(ref(limiter_opts),
+ #{description =>
+ <<"The connection limiter.
"
+ "This is used to limit the connection rate for this EMQX node"
+ "If the this limiter limit is reached,"
+ "New connections will be refused"
+ >>})}
+ , {message_routing, sc(ref(limiter_opts),
+ #{description =>
+ <<"The message_routing limiter.
"
+ "This is used to limite the deliver rate for this EMQX node"
+ "If the this limiter limit is reached,"
+ "New publish will be refused"
+ >>
+ })}
, {batch, sc(ref(limiter_opts),
- #{description => <<"Internal batch operation limiter">>})}
+ #{description => <<"The batch limiter.
"
+ "This is used for EMQX internal batch operation"
+ "e.g. limite the retainer's deliver rate"
+ >>
+ })}
];
fields(limiter_opts) ->
- [ {rate, sc(rate(), #{default => "infinity"})}
- , {burst, sc(burst_rate(), #{default => "0/0s"})}
- , {bucket, sc(map("bucket name", ref(bucket_opts)), #{})}
+ [ {rate, sc(rate(), #{default => "infinity", desc => "The rate"})}
+ , {burst, sc(burst_rate(),
+ #{default => "0/0s",
+ desc => "The burst, This value is based on rate."
+ "this value + rate = the maximum limit that can be achieved when limiter burst"
+ })}
+ , {bucket, sc(map("bucket name", ref(bucket_opts)), #{desc => "Buckets config"})}
];
fields(bucket_opts) ->
- [ {rate, sc(rate(), #{})}
- , {capacity, sc(capacity(), #{})}
- , {initial, sc(initial(), #{default => "0"})}
+ [ {rate, sc(rate(), #{desc => "The rate for this bucket"})}
+ , {capacity, sc(capacity(), #{desc => "The maximum number of tokens for this bucket"})}
+ , {initial, sc(initial(), #{default => "0",
+ desc => "The initial number of tokens for this bucket"})}
, {per_client, sc(ref(client_bucket),
#{default => #{},
desc => "The rate limit for each user of the bucket,"
diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl
index 896792a32..84688ba38 100644
--- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl
+++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl
@@ -1,4 +1,4 @@
-%--------------------------------------------------------------------
+%%--------------------------------------------------------------------
%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl
index a450cc245..35012acc7 100644
--- a/apps/emqx/src/emqx_schema.erl
+++ b/apps/emqx/src/emqx_schema.erl
@@ -1197,7 +1197,7 @@ base_listener() ->
#{ default => 'default'
})}
, {"limiter",
- sc(map("ratelimit's type", atom()), #{default => #{}})}
+ sc(map("ratelimit's type", emqx_limiter_schema:bucket_name()), #{default => #{}})}
].
%% utils
diff --git a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl
index 6121e39b6..093c6fcc4 100644
--- a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl
+++ b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl
@@ -493,6 +493,8 @@ typename_to_spec("failure_strategy()", _Mod) ->
#{type => string, example => <<"force">>};
typename_to_spec("initial()", _Mod) ->
#{type => string, example => <<"0M">>};
+typename_to_spec("bucket_name()", _Mod) ->
+ #{type => string, example => <<"retainer">>};
typename_to_spec(Name, Mod) ->
Spec = range(Name),
Spec1 = remote_module_type(Spec, Name, Mod),
diff --git a/apps/emqx_retainer/etc/emqx_retainer.conf b/apps/emqx_retainer/etc/emqx_retainer.conf
index 05cc1dcd0..051f44940 100644
--- a/apps/emqx_retainer/etc/emqx_retainer.conf
+++ b/apps/emqx_retainer/etc/emqx_retainer.conf
@@ -57,8 +57,15 @@ retainer {
## Default: 0
batch_deliver_number = 0
- ## deliver limiter bucket
- limiter.batch = retainer
+ ## The rate limiter name for retained messages delivery.
+ ## In order to avoid delivering too many messages to the client at once, which may cause the client
+ ## to block or crash, or message dropped due to exceeding the size of the message queue. We need
+ ## to specify a rate limiter for the retained messages delivery to the client.
+ ##
+ ## The names of the available rate limiters are taken from the existing rate limiters under `limiter.batch`.
+ ## You can remove this field if you don't want any limit
+ ## Default: retainer
+ batch_deliver_limiter = retainer
}
## Maximum retained message size.
diff --git a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl
index c0e9ad42f..6eb5457b7 100644
--- a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl
+++ b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl
@@ -85,8 +85,8 @@ start_link(Pool, Id) ->
init([Pool, Id]) ->
erlang:process_flag(trap_exit, true),
true = gproc_pool:connect_worker(Pool, {Pool, Id}),
- LimiterCfg = emqx:get_config([retainer, flow_control, limiter]),
- Limiter = emqx_limiter_server:connect(batch, LimiterCfg),
+ BucketName = emqx:get_config([retainer, flow_control, batch_deliver_limiter]),
+ Limiter = emqx_limiter_server:connect(batch, BucketName),
{ok, #{pool => Pool, id => Id, limiter => Limiter}}.
%%--------------------------------------------------------------------
@@ -124,8 +124,8 @@ handle_cast({dispatch, Context, Pid, Topic}, #{limiter := Limiter} = State) ->
{noreply, State#{limiter := Limiter2}};
handle_cast(refresh_limiter, State) ->
- LimiterCfg = emqx:get_config([retainer, flow_control, limiter]),
- Limiter = emqx_limiter_server:connect(batch, LimiterCfg),
+ BucketName = emqx:get_config([retainer, flow_control, batch_deliver_limiter]),
+ Limiter = emqx_limiter_server:connect(batch, BucketName),
{noreply, State#{limiter := Limiter}};
handle_cast(Msg, State) ->
diff --git a/apps/emqx_retainer/src/emqx_retainer_schema.erl b/apps/emqx_retainer/src/emqx_retainer_schema.erl
index 492a89021..12189c737 100644
--- a/apps/emqx_retainer/src/emqx_retainer_schema.erl
+++ b/apps/emqx_retainer/src/emqx_retainer_schema.erl
@@ -29,7 +29,7 @@ fields(mnesia_config) ->
fields(flow_control) ->
[ {batch_read_number, sc(integer(), 0, fun is_pos_integer/1)}
, {batch_deliver_number, sc(range(0, 1000), 0)}
- , {limiter, sc(emqx_schema:map("limiter's type", atom()), #{})}
+ , {batch_deliver_limiter, sc(emqx_limiter_schema:bucket_name(), undefined)}
].
%%--------------------------------------------------------------------
diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl
index f143734fc..9e957f214 100644
--- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl
+++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl
@@ -36,7 +36,7 @@ retainer {
flow_control {
batch_read_number = 0
batch_deliver_number = 0
- limiter.batch = retainer
+ batch_deliver_limiter = retainer
}
backend {
type = built_in_database
@@ -295,7 +295,7 @@ t_flow_control(_) ->
emqx_retainer:update_config(#{<<"flow_control">> =>
#{<<"batch_read_number">> => 1,
<<"batch_deliver_number">> => 1,
- <<"limiter">> => #{<<"batch">> => retainer}}}),
+ <<"batch_deliver_limiter">> => retainer}}),
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1),
emqtt:publish(