From 9bdebabdbc1bf0bb063cca03bcd1460bd534522b Mon Sep 17 00:00:00 2001 From: lafirest Date: Wed, 2 Mar 2022 11:34:19 +0800 Subject: [PATCH] fix(retainer): fix/add some comment --- apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf | 5 +++++ apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl | 4 +++- apps/emqx_retainer/etc/emqx_retainer.conf | 6 ++++-- apps/emqx_retainer/src/emqx_retainer_dispatcher.erl | 7 +++++-- apps/emqx_retainer/src/emqx_retainer_schema.erl | 2 +- apps/emqx_retainer/test/emqx_retainer_SUITE.erl | 4 ++-- apps/emqx_retainer/test/emqx_retainer_mqtt_v5_SUITE.erl | 2 +- 7 files changed, 21 insertions(+), 9 deletions(-) diff --git a/apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf b/apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf index 2f57d6bc6..4c1f1b7fb 100644 --- a/apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf +++ b/apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf @@ -3,6 +3,7 @@ ##-------------------------------------------------------------------- limiter { + ## rate limiter for message publish bytes_in { bucket.default { aggregated.rate = infinity @@ -12,6 +13,7 @@ limiter { } } + ## rate limiter for message publish message_in { bucket.default { aggregated.rate = infinity @@ -21,6 +23,7 @@ limiter { } } + ## connection rate limiter connection { bucket.default { aggregated.rate = infinity @@ -30,6 +33,7 @@ limiter { } } + ## rate limiter for message deliver message_routing { bucket.default { aggregated.rate = infinity @@ -39,6 +43,7 @@ limiter { } } + ## Some functions that don't need to use global and zone scope, them can shared use this type shared { bucket.retainer { aggregated.rate = infinity diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl index 288f85dcf..d4541f49d 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl @@ -68,7 +68,9 @@ fields(limiter) -> , {connection, sc(ref(limiter_opts), #{})} , {message_routing, sc(ref(limiter_opts), #{})} , {shared, sc(ref(shared_limiter_opts), - #{description => <<"Some functions that do not need to use global and zone scope, them can shared use this type">>})} + #{description => + <<"Some functions that do not need to use global and zone scope," + "them can shared use this type">>})} ]; fields(limiter_opts) -> diff --git a/apps/emqx_retainer/etc/emqx_retainer.conf b/apps/emqx_retainer/etc/emqx_retainer.conf index 2bec041c2..e561bc52f 100644 --- a/apps/emqx_retainer/etc/emqx_retainer.conf +++ b/apps/emqx_retainer/etc/emqx_retainer.conf @@ -40,17 +40,19 @@ retainer { ## When a client subscribe to a wildcard topic, may many retained messages will be loaded. ## If you don't want these data loaded to the memory all at once, you can use this to control. ## The processing flow: - ## load max_read_number retained message from storage -> + ## load batch_read_number retained message from storage -> ## deliver -> ## repeat this, until all retianed messages are delivered ## flow_control { - ## The messages number per read from storage. 0 means no limit + ## The messages batch number per read from storage. 0 means no limit ## ## Default: 0 batch_read_number = 0 ## The number of retained message can be delivered per batch + ## Range: [0, 1000] + ## Note: If this value is too large, it may cause difficulty in applying for the token of deliver ## ## Default: 0 batch_deliver_number = 0 diff --git a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl index 8bcc80f60..2f4c4b594 100644 --- a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl +++ b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl @@ -22,7 +22,7 @@ -include_lib("emqx/include/logger.hrl"). %% API --export([start_link/2 +-export([ start_link/2 , dispatch/2 , refresh_limiter/0 ]). @@ -241,7 +241,10 @@ do_deliver(Msgs, DeliverNum, Pid, Topic, Limiter) -> do_deliver(ToDelivers, Pid, Topic), do_deliver(Msgs2, DeliverNum, Pid, Topic, Limiter2); {drop, _} = Drop -> - ?SLOG(error, #{msg => "the retainer deliver failed because the required quota could not be obtained"}), + ?SLOG(error, #{msg => "retained_message_dropped", + reason => "reached_ratelimit", + dropped_count => length(ToDelivers) + }), Drop end. diff --git a/apps/emqx_retainer/src/emqx_retainer_schema.erl b/apps/emqx_retainer/src/emqx_retainer_schema.erl index 25a2dda28..395f0e363 100644 --- a/apps/emqx_retainer/src/emqx_retainer_schema.erl +++ b/apps/emqx_retainer/src/emqx_retainer_schema.erl @@ -28,7 +28,7 @@ fields(mnesia_config) -> fields(flow_control) -> [ {batch_read_number, sc(integer(), 0, fun is_pos_integer/1)} - , {batch_deliver_number, sc(range(0, 50), 0)} + , {batch_deliver_number, sc(range(0, 1000), 0)} , {limiter_bucket_name, sc(atom(), retainer)} ]. diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index ce3e50c64..bd144fd84 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -57,7 +57,7 @@ init_per_suite(Config) -> meck:expect(emqx_alarm, activate, 3, ok), meck:expect(emqx_alarm, deactivate, 3, ok), - base_conf(), + load_base_conf(), emqx_ratelimiter_SUITE:base_conf(), emqx_common_test_helpers:start_apps([emqx_retainer]), Config. @@ -84,7 +84,7 @@ end_per_testcase(_, Config) -> end, Config. -base_conf() -> +load_base_conf() -> ok = emqx_common_test_helpers:load_config(emqx_retainer_schema, ?BASE_CONF). %%-------------------------------------------------------------------- diff --git a/apps/emqx_retainer/test/emqx_retainer_mqtt_v5_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_mqtt_v5_SUITE.erl index 852631279..157ba4a23 100644 --- a/apps/emqx_retainer/test/emqx_retainer_mqtt_v5_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_mqtt_v5_SUITE.erl @@ -24,7 +24,7 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - emqx_retainer_SUITE:base_conf(), + emqx_retainer_SUITE:load_base_conf(), %% Meck emqtt ok = meck:new(emqtt, [non_strict, passthrough, no_history, no_link]), %% Start Apps