From beba7c9692294868c2d673d1826df207716a483a Mon Sep 17 00:00:00 2001 From: firest Date: Fri, 18 Mar 2022 14:45:06 +0800 Subject: [PATCH] fix(limiter): improve code style and description --- .../src/emqx_limiter/src/emqx_htb_limiter.erl | 12 ++-- .../emqx_limiter/src/emqx_limiter_schema.erl | 59 +++++++++++++++---- .../emqx_limiter/src/emqx_limiter_server.erl | 2 +- apps/emqx/src/emqx_schema.erl | 2 +- .../src/emqx_dashboard_swagger.erl | 2 + apps/emqx_retainer/etc/emqx_retainer.conf | 11 +++- .../src/emqx_retainer_dispatcher.erl | 8 +-- .../src/emqx_retainer_schema.erl | 2 +- .../test/emqx_retainer_SUITE.erl | 4 +- 9 files changed, 72 insertions(+), 30 deletions(-) diff --git a/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl b/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl index 372944666..2a4e13731 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl @@ -43,7 +43,6 @@ %% retry contenxt %% undefined meaning no retry context or no need to retry - , retry_ctx => undefined | retry_context(token_bucket_limiter()) %% the retry context , atom => any() %% allow to add other keys @@ -108,8 +107,6 @@ -import(emqx_limiter_decimal, [sub/2, mul/2, floor_div/2, add/2]). --elvis([{elvis_style, no_if_expression, disable}]). - %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- @@ -308,18 +305,19 @@ do_reset(Need, capacity := Capacity} = Limiter) -> Now = ?NOW, Tokens2 = apply_elapsed_time(Rate, Now - LastTime, Tokens, Capacity), - Available = erlang:floor(Tokens2), - if Available >= Need -> + + case erlang:floor(Tokens2) of + Available when Available >= Need -> Limiter2 = Limiter#{tokens := Tokens2, lasttime := Now}, do_check_with_parent_limiter(Need, Limiter2); - Divisible andalso Available > 0 -> + Available when Divisible andalso Available > 0 -> %% must be allocated here, because may be Need > Capacity return_pause(Rate, partial, fun do_reset/2, Need - Available, Limiter#{tokens := 0, lasttime := Now}); - true -> + _ -> return_pause(Rate, pause, fun do_reset/2, Need, Limiter) end. diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl index b9b25e806..3f313e7e4 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl @@ -53,9 +53,10 @@ , capacity/0 , initial/0 , failure_strategy/0 + , bucket_name/0 ]). --export_type([limiter_type/0, bucket_name/0, bucket_path/0]). +-export_type([limiter_type/0, bucket_path/0]). -import(emqx_schema, [sc/2, map/2]). -define(UNIT_TIME_IN_MS, 1000). @@ -65,24 +66,58 @@ namespace() -> limiter. roots() -> [limiter]. fields(limiter) -> - [ {bytes_in, sc(ref(limiter_opts), #{description => <<"Limiter of message publish bytes">>})} - , {message_in, sc(ref(limiter_opts), #{description => <<"Message publish limiter">>})} - , {connection, sc(ref(limiter_opts), #{description => <<"Connection limiter">>})} - , {message_routing, sc(ref(limiter_opts), #{description => <<"Deliver limiter">>})} + [ {bytes_in, sc(ref(limiter_opts), + #{description => + <<"The bytes_in limiter.
" + "It is used to limit the inbound bytes rate for this EMQX node." + "If the this limiter limit is reached," + "the restricted client will be slow down even be hung for a while.">> + })} + , {message_in, sc(ref(limiter_opts), + #{description => + <<"The message_in limiter.
" + "This is used to limit the inbound message numbers for this EMQX node" + "If the this limiter limit is reached," + "the restricted client will be slow down even be hung for a while.">> + })} + , {connection, sc(ref(limiter_opts), + #{description => + <<"The connection limiter.
" + "This is used to limit the connection rate for this EMQX node" + "If the this limiter limit is reached," + "New connections will be refused" + >>})} + , {message_routing, sc(ref(limiter_opts), + #{description => + <<"The message_routing limiter.
" + "This is used to limite the deliver rate for this EMQX node" + "If the this limiter limit is reached," + "New publish will be refused" + >> + })} , {batch, sc(ref(limiter_opts), - #{description => <<"Internal batch operation limiter">>})} + #{description => <<"The batch limiter.
" + "This is used for EMQX internal batch operation" + "e.g. limite the retainer's deliver rate" + >> + })} ]; fields(limiter_opts) -> - [ {rate, sc(rate(), #{default => "infinity"})} - , {burst, sc(burst_rate(), #{default => "0/0s"})} - , {bucket, sc(map("bucket name", ref(bucket_opts)), #{})} + [ {rate, sc(rate(), #{default => "infinity", desc => "The rate"})} + , {burst, sc(burst_rate(), + #{default => "0/0s", + desc => "The burst, This value is based on rate." + "this value + rate = the maximum limit that can be achieved when limiter burst" + })} + , {bucket, sc(map("bucket name", ref(bucket_opts)), #{desc => "Buckets config"})} ]; fields(bucket_opts) -> - [ {rate, sc(rate(), #{})} - , {capacity, sc(capacity(), #{})} - , {initial, sc(initial(), #{default => "0"})} + [ {rate, sc(rate(), #{desc => "The rate for this bucket"})} + , {capacity, sc(capacity(), #{desc => "The maximum number of tokens for this bucket"})} + , {initial, sc(initial(), #{default => "0", + desc => "The initial number of tokens for this bucket"})} , {per_client, sc(ref(client_bucket), #{default => #{}, desc => "The rate limit for each user of the bucket," diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl index 896792a32..84688ba38 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl @@ -1,4 +1,4 @@ -%-------------------------------------------------------------------- +%%-------------------------------------------------------------------- %% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index a450cc245..35012acc7 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1197,7 +1197,7 @@ base_listener() -> #{ default => 'default' })} , {"limiter", - sc(map("ratelimit's type", atom()), #{default => #{}})} + sc(map("ratelimit's type", emqx_limiter_schema:bucket_name()), #{default => #{}})} ]. %% utils diff --git a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl index 6121e39b6..093c6fcc4 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl @@ -493,6 +493,8 @@ typename_to_spec("failure_strategy()", _Mod) -> #{type => string, example => <<"force">>}; typename_to_spec("initial()", _Mod) -> #{type => string, example => <<"0M">>}; +typename_to_spec("bucket_name()", _Mod) -> + #{type => string, example => <<"retainer">>}; typename_to_spec(Name, Mod) -> Spec = range(Name), Spec1 = remote_module_type(Spec, Name, Mod), diff --git a/apps/emqx_retainer/etc/emqx_retainer.conf b/apps/emqx_retainer/etc/emqx_retainer.conf index 05cc1dcd0..051f44940 100644 --- a/apps/emqx_retainer/etc/emqx_retainer.conf +++ b/apps/emqx_retainer/etc/emqx_retainer.conf @@ -57,8 +57,15 @@ retainer { ## Default: 0 batch_deliver_number = 0 - ## deliver limiter bucket - limiter.batch = retainer + ## The rate limiter name for retained messages delivery. + ## In order to avoid delivering too many messages to the client at once, which may cause the client + ## to block or crash, or message dropped due to exceeding the size of the message queue. We need + ## to specify a rate limiter for the retained messages delivery to the client. + ## + ## The names of the available rate limiters are taken from the existing rate limiters under `limiter.batch`. + ## You can remove this field if you don't want any limit + ## Default: retainer + batch_deliver_limiter = retainer } ## Maximum retained message size. diff --git a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl index c0e9ad42f..6eb5457b7 100644 --- a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl +++ b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl @@ -85,8 +85,8 @@ start_link(Pool, Id) -> init([Pool, Id]) -> erlang:process_flag(trap_exit, true), true = gproc_pool:connect_worker(Pool, {Pool, Id}), - LimiterCfg = emqx:get_config([retainer, flow_control, limiter]), - Limiter = emqx_limiter_server:connect(batch, LimiterCfg), + BucketName = emqx:get_config([retainer, flow_control, batch_deliver_limiter]), + Limiter = emqx_limiter_server:connect(batch, BucketName), {ok, #{pool => Pool, id => Id, limiter => Limiter}}. %%-------------------------------------------------------------------- @@ -124,8 +124,8 @@ handle_cast({dispatch, Context, Pid, Topic}, #{limiter := Limiter} = State) -> {noreply, State#{limiter := Limiter2}}; handle_cast(refresh_limiter, State) -> - LimiterCfg = emqx:get_config([retainer, flow_control, limiter]), - Limiter = emqx_limiter_server:connect(batch, LimiterCfg), + BucketName = emqx:get_config([retainer, flow_control, batch_deliver_limiter]), + Limiter = emqx_limiter_server:connect(batch, BucketName), {noreply, State#{limiter := Limiter}}; handle_cast(Msg, State) -> diff --git a/apps/emqx_retainer/src/emqx_retainer_schema.erl b/apps/emqx_retainer/src/emqx_retainer_schema.erl index 492a89021..12189c737 100644 --- a/apps/emqx_retainer/src/emqx_retainer_schema.erl +++ b/apps/emqx_retainer/src/emqx_retainer_schema.erl @@ -29,7 +29,7 @@ fields(mnesia_config) -> fields(flow_control) -> [ {batch_read_number, sc(integer(), 0, fun is_pos_integer/1)} , {batch_deliver_number, sc(range(0, 1000), 0)} - , {limiter, sc(emqx_schema:map("limiter's type", atom()), #{})} + , {batch_deliver_limiter, sc(emqx_limiter_schema:bucket_name(), undefined)} ]. %%-------------------------------------------------------------------- diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index f143734fc..9e957f214 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -36,7 +36,7 @@ retainer { flow_control { batch_read_number = 0 batch_deliver_number = 0 - limiter.batch = retainer + batch_deliver_limiter = retainer } backend { type = built_in_database @@ -295,7 +295,7 @@ t_flow_control(_) -> emqx_retainer:update_config(#{<<"flow_control">> => #{<<"batch_read_number">> => 1, <<"batch_deliver_number">> => 1, - <<"limiter">> => #{<<"batch">> => retainer}}}), + <<"batch_deliver_limiter">> => retainer}}), {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, _} = emqtt:connect(C1), emqtt:publish(