From 03c7b84e89e672edf816e8f55f4663f0e71cba32 Mon Sep 17 00:00:00 2001 From: firest Date: Tue, 23 May 2023 15:45:25 +0800 Subject: [PATCH 1/2] feat(retainer): add a field `deliver_rate` to limit the maximum delivery rate --- .../src/emqx_retainer_schema.erl | 35 +++++++++++- .../test/emqx_retainer_SUITE.erl | 55 +++++++++++++++++++ rel/i18n/emqx_retainer_schema.hocon | 2 + 3 files changed, 90 insertions(+), 2 deletions(-) diff --git a/apps/emqx_retainer/src/emqx_retainer_schema.erl b/apps/emqx_retainer/src/emqx_retainer_schema.erl index 74f1e2371..823183cc3 100644 --- a/apps/emqx_retainer/src/emqx_retainer_schema.erl +++ b/apps/emqx_retainer/src/emqx_retainer_schema.erl @@ -32,7 +32,13 @@ namespace() -> "retainer". -roots() -> ["retainer"]. +roots() -> + [ + {"retainer", + hoconsc:mk(hoconsc:ref(?MODULE, "retainer"), #{ + converter => fun retainer_converter/2 + })} + ]. fields("retainer") -> [ @@ -68,6 +74,11 @@ fields("retainer") -> stop_publish_clear_msg, false )}, + {deliver_rate, + ?HOCON( + emqx_limiter_schema:rate(), + #{required => false, desc => ?DESC(deliver_rate), example => <<"1000/s">>} + )}, {backend, backend_config()} ]; fields(mnesia_config) -> @@ -97,7 +108,7 @@ fields(flow_control) -> )}, {batch_deliver_number, sc( - range(0, 1000), + non_neg_integer(), batch_deliver_number, 0 )}, @@ -173,3 +184,23 @@ check_duplicate(List) -> false -> ?INVALID_SPEC(unique_index_spec_limited); true -> ok end. + +retainer_converter(#{<<"deliver_rate">> := <<"infinity">>} = Conf, _Opts) -> + Conf#{ + <<"flow_control">> => #{ + <<"batch_read_number">> => 0, + <<"batch_deliver_number">> => 0 + } + }; +retainer_converter(#{<<"deliver_rate">> := RateStr} = Conf, _Opts) -> + {ok, RateNum} = emqx_limiter_schema:to_rate(RateStr), + RawRate = erlang:floor(RateNum * 1000 / emqx_limiter_schema:default_period()), + Control = #{ + <<"batch_read_number">> => RawRate, + <<"batch_deliver_number">> => RawRate, + %% Set the maximum delivery rate per session + <<"batch_deliver_limiter">> => #{<<"client">> => #{<<"rate">> => RateStr}} + }, + Conf#{<<"flow_control">> => Control}; +retainer_converter(Conf, _Opts) -> + Conf. diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index c90ec6b2b..bce62aa25 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -706,6 +706,61 @@ t_deliver_when_banned(_) -> {ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained/+">>), ok = emqtt:disconnect(C1). +t_compatibility_for_deliver_rate(_) -> + Parser = fun(Conf) -> + {ok, RawConf} = hocon:binary(Conf, #{format => map}), + hocon_tconf:check_plain(emqx_retainer_schema, RawConf, #{ + required => false, atom_key => false + }) + end, + Infinity = <<"retainer.deliver_rate = \"infinity\"">>, + ?assertMatch( + #{ + <<"retainer">> := + #{ + <<"flow_control">> := #{ + <<"batch_deliver_number">> := 0, + <<"batch_read_number">> := 0, + <<"batch_deliver_limiter">> := #{<<"rate">> := infinity} + } + } + }, + Parser(Infinity) + ), + + R1 = <<"retainer.deliver_rate = \"1000/s\"">>, + ?assertMatch( + #{ + <<"retainer">> := + #{ + <<"flow_control">> := #{ + <<"batch_deliver_number">> := 1000, + <<"batch_read_number">> := 1000, + <<"batch_deliver_limiter">> := #{<<"client">> := #{<<"rate">> := 100.0}} + } + } + }, + Parser(R1) + ), + + R2 = << + "retainer{deliver_rate = \"1000/s\"", + "flow_control.batch_deliver_limiter.rate = \"500/s\"}" + >>, + ?assertMatch( + #{ + <<"retainer">> := + #{ + <<"flow_control">> := #{ + <<"batch_deliver_number">> := 1000, + <<"batch_read_number">> := 1000, + <<"batch_deliver_limiter">> := #{<<"client">> := #{<<"rate">> := 100.0}} + } + } + }, + Parser(R2) + ). + %%-------------------------------------------------------------------- %% Helper functions %%-------------------------------------------------------------------- diff --git a/rel/i18n/emqx_retainer_schema.hocon b/rel/i18n/emqx_retainer_schema.hocon index 9b2905da1..45696534d 100644 --- a/rel/i18n/emqx_retainer_schema.hocon +++ b/rel/i18n/emqx_retainer_schema.hocon @@ -46,4 +46,6 @@ whether to continue to publish the message. See: http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718038""" +deliver_rate.desc: +"""The maximum rate of delivering retain messages""" } From a258ef32e5b6bbc252bc4f0d9b82fdadd0d0fe02 Mon Sep 17 00:00:00 2001 From: firest Date: Tue, 23 May 2023 15:53:33 +0800 Subject: [PATCH 2/2] chore: update retainer app version && changes --- apps/emqx_retainer/src/emqx_retainer.app.src | 2 +- changes/ce/feat-10782.en.md | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) create mode 100644 changes/ce/feat-10782.en.md diff --git a/apps/emqx_retainer/src/emqx_retainer.app.src b/apps/emqx_retainer/src/emqx_retainer.app.src index 7bfc8ee4e..9cd7ba09d 100644 --- a/apps/emqx_retainer/src/emqx_retainer.app.src +++ b/apps/emqx_retainer/src/emqx_retainer.app.src @@ -2,7 +2,7 @@ {application, emqx_retainer, [ {description, "EMQX Retainer"}, % strict semver, bump manually! - {vsn, "5.0.12"}, + {vsn, "5.0.13"}, {modules, []}, {registered, [emqx_retainer_sup]}, {applications, [kernel, stdlib, emqx, emqx_ctl]}, diff --git a/changes/ce/feat-10782.en.md b/changes/ce/feat-10782.en.md new file mode 100644 index 000000000..d59971ffa --- /dev/null +++ b/changes/ce/feat-10782.en.md @@ -0,0 +1,2 @@ +Added a new `deliver_rate` option to the retainer configuration, which can limit the maximum delivery rate per session in the retainer. +