fix(limiter): improve code style and description

This commit is contained in:
firest 2022-03-18 14:45:06 +08:00
parent d28b34f0d1
commit beba7c9692
9 changed files with 72 additions and 30 deletions

View File

@ -43,7 +43,6 @@
%% retry contenxt
%% undefined meaning no retry context or no need to retry
, retry_ctx => undefined
| retry_context(token_bucket_limiter()) %% the retry context
, atom => any() %% allow to add other keys
@ -108,8 +107,6 @@
-import(emqx_limiter_decimal, [sub/2, mul/2, floor_div/2, add/2]).
-elvis([{elvis_style, no_if_expression, disable}]).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
@ -308,18 +305,19 @@ do_reset(Need,
capacity := Capacity} = Limiter) ->
Now = ?NOW,
Tokens2 = apply_elapsed_time(Rate, Now - LastTime, Tokens, Capacity),
Available = erlang:floor(Tokens2),
if Available >= Need ->
case erlang:floor(Tokens2) of
Available when Available >= Need ->
Limiter2 = Limiter#{tokens := Tokens2, lasttime := Now},
do_check_with_parent_limiter(Need, Limiter2);
Divisible andalso Available > 0 ->
Available when Divisible andalso Available > 0 ->
%% must be allocated here, because may be Need > Capacity
return_pause(Rate,
partial,
fun do_reset/2,
Need - Available,
Limiter#{tokens := 0, lasttime := Now});
true ->
_ ->
return_pause(Rate, pause, fun do_reset/2, Need, Limiter)
end.

View File

@ -53,9 +53,10 @@
, capacity/0
, initial/0
, failure_strategy/0
, bucket_name/0
]).
-export_type([limiter_type/0, bucket_name/0, bucket_path/0]).
-export_type([limiter_type/0, bucket_path/0]).
-import(emqx_schema, [sc/2, map/2]).
-define(UNIT_TIME_IN_MS, 1000).
@ -65,24 +66,58 @@ namespace() -> limiter.
roots() -> [limiter].
fields(limiter) ->
[ {bytes_in, sc(ref(limiter_opts), #{description => <<"Limiter of message publish bytes">>})}
, {message_in, sc(ref(limiter_opts), #{description => <<"Message publish limiter">>})}
, {connection, sc(ref(limiter_opts), #{description => <<"Connection limiter">>})}
, {message_routing, sc(ref(limiter_opts), #{description => <<"Deliver limiter">>})}
[ {bytes_in, sc(ref(limiter_opts),
#{description =>
<<"The bytes_in limiter.<br>"
"It is used to limit the inbound bytes rate for this EMQX node."
"If the this limiter limit is reached,"
"the restricted client will be slow down even be hung for a while.">>
})}
, {message_in, sc(ref(limiter_opts),
#{description =>
<<"The message_in limiter.<br>"
"This is used to limit the inbound message numbers for this EMQX node"
"If the this limiter limit is reached,"
"the restricted client will be slow down even be hung for a while.">>
})}
, {connection, sc(ref(limiter_opts),
#{description =>
<<"The connection limiter.<br>"
"This is used to limit the connection rate for this EMQX node"
"If the this limiter limit is reached,"
"New connections will be refused"
>>})}
, {message_routing, sc(ref(limiter_opts),
#{description =>
<<"The message_routing limiter.<br>"
"This is used to limite the deliver rate for this EMQX node"
"If the this limiter limit is reached,"
"New publish will be refused"
>>
})}
, {batch, sc(ref(limiter_opts),
#{description => <<"Internal batch operation limiter">>})}
#{description => <<"The batch limiter.<br>"
"This is used for EMQX internal batch operation"
"e.g. limite the retainer's deliver rate"
>>
})}
];
fields(limiter_opts) ->
[ {rate, sc(rate(), #{default => "infinity"})}
, {burst, sc(burst_rate(), #{default => "0/0s"})}
, {bucket, sc(map("bucket name", ref(bucket_opts)), #{})}
[ {rate, sc(rate(), #{default => "infinity", desc => "The rate"})}
, {burst, sc(burst_rate(),
#{default => "0/0s",
desc => "The burst, This value is based on rate."
"this value + rate = the maximum limit that can be achieved when limiter burst"
})}
, {bucket, sc(map("bucket name", ref(bucket_opts)), #{desc => "Buckets config"})}
];
fields(bucket_opts) ->
[ {rate, sc(rate(), #{})}
, {capacity, sc(capacity(), #{})}
, {initial, sc(initial(), #{default => "0"})}
[ {rate, sc(rate(), #{desc => "The rate for this bucket"})}
, {capacity, sc(capacity(), #{desc => "The maximum number of tokens for this bucket"})}
, {initial, sc(initial(), #{default => "0",
desc => "The initial number of tokens for this bucket"})}
, {per_client, sc(ref(client_bucket),
#{default => #{},
desc => "The rate limit for each user of the bucket,"

View File

@ -1,4 +1,4 @@
%--------------------------------------------------------------------
%%--------------------------------------------------------------------
%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");

View File

@ -1197,7 +1197,7 @@ base_listener() ->
#{ default => 'default'
})}
, {"limiter",
sc(map("ratelimit's type", atom()), #{default => #{}})}
sc(map("ratelimit's type", emqx_limiter_schema:bucket_name()), #{default => #{}})}
].
%% utils

View File

@ -493,6 +493,8 @@ typename_to_spec("failure_strategy()", _Mod) ->
#{type => string, example => <<"force">>};
typename_to_spec("initial()", _Mod) ->
#{type => string, example => <<"0M">>};
typename_to_spec("bucket_name()", _Mod) ->
#{type => string, example => <<"retainer">>};
typename_to_spec(Name, Mod) ->
Spec = range(Name),
Spec1 = remote_module_type(Spec, Name, Mod),

View File

@ -57,8 +57,15 @@ retainer {
## Default: 0
batch_deliver_number = 0
## deliver limiter bucket
limiter.batch = retainer
## The rate limiter name for retained messages delivery.
## In order to avoid delivering too many messages to the client at once, which may cause the client
## to block or crash, or message dropped due to exceeding the size of the message queue. We need
## to specify a rate limiter for the retained messages delivery to the client.
##
## The names of the available rate limiters are taken from the existing rate limiters under `limiter.batch`.
## You can remove this field if you don't want any limit
## Default: retainer
batch_deliver_limiter = retainer
}
## Maximum retained message size.

View File

@ -85,8 +85,8 @@ start_link(Pool, Id) ->
init([Pool, Id]) ->
erlang:process_flag(trap_exit, true),
true = gproc_pool:connect_worker(Pool, {Pool, Id}),
LimiterCfg = emqx:get_config([retainer, flow_control, limiter]),
Limiter = emqx_limiter_server:connect(batch, LimiterCfg),
BucketName = emqx:get_config([retainer, flow_control, batch_deliver_limiter]),
Limiter = emqx_limiter_server:connect(batch, BucketName),
{ok, #{pool => Pool, id => Id, limiter => Limiter}}.
%%--------------------------------------------------------------------
@ -124,8 +124,8 @@ handle_cast({dispatch, Context, Pid, Topic}, #{limiter := Limiter} = State) ->
{noreply, State#{limiter := Limiter2}};
handle_cast(refresh_limiter, State) ->
LimiterCfg = emqx:get_config([retainer, flow_control, limiter]),
Limiter = emqx_limiter_server:connect(batch, LimiterCfg),
BucketName = emqx:get_config([retainer, flow_control, batch_deliver_limiter]),
Limiter = emqx_limiter_server:connect(batch, BucketName),
{noreply, State#{limiter := Limiter}};
handle_cast(Msg, State) ->

View File

@ -29,7 +29,7 @@ fields(mnesia_config) ->
fields(flow_control) ->
[ {batch_read_number, sc(integer(), 0, fun is_pos_integer/1)}
, {batch_deliver_number, sc(range(0, 1000), 0)}
, {limiter, sc(emqx_schema:map("limiter's type", atom()), #{})}
, {batch_deliver_limiter, sc(emqx_limiter_schema:bucket_name(), undefined)}
].
%%--------------------------------------------------------------------

View File

@ -36,7 +36,7 @@ retainer {
flow_control {
batch_read_number = 0
batch_deliver_number = 0
limiter.batch = retainer
batch_deliver_limiter = retainer
}
backend {
type = built_in_database
@ -295,7 +295,7 @@ t_flow_control(_) ->
emqx_retainer:update_config(#{<<"flow_control">> =>
#{<<"batch_read_number">> => 1,
<<"batch_deliver_number">> => 1,
<<"limiter">> => #{<<"batch">> => retainer}}}),
<<"batch_deliver_limiter">> => retainer}}),
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1),
emqtt:publish(