diff --git a/apps/emqx_retainer/src/emqx_retainer_schema.erl b/apps/emqx_retainer/src/emqx_retainer_schema.erl index 1c5d8e55f..47540b0ec 100644 --- a/apps/emqx_retainer/src/emqx_retainer_schema.erl +++ b/apps/emqx_retainer/src/emqx_retainer_schema.erl @@ -81,6 +81,7 @@ fields("retainer") -> #{ required => false, desc => ?DESC(delivery_rate), + default => <<"1000/s">>, example => <<"1000/s">>, aliases => [deliver_rate] } diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index 2818f6bfa..c76ba90c6 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -256,61 +256,67 @@ t_wildcard_subscription(_) -> ok = emqtt:disconnect(C1). t_message_expiry(_) -> - {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), - {ok, _} = emqtt:connect(C1), + ConfMod = fun(Conf) -> + Conf#{<<"delivery_rate">> := <<"infinity">>} + end, + Case = fun() -> + {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(C1), - emqtt:publish( - C1, - <<"retained/0">>, - #{'Message-Expiry-Interval' => 0}, - <<"don't expire">>, - [{qos, 0}, {retain, true}] - ), - emqtt:publish( - C1, - <<"retained/1">>, - #{'Message-Expiry-Interval' => 2}, - <<"expire">>, - [{qos, 0}, {retain, true}] - ), - emqtt:publish( - C1, - <<"retained/2">>, - #{'Message-Expiry-Interval' => 5}, - <<"don't expire">>, - [{qos, 0}, {retain, true}] - ), - emqtt:publish( - C1, - <<"retained/3">>, - <<"don't expire">>, - [{qos, 0}, {retain, true}] - ), - emqtt:publish( - C1, - <<"$SYS/retained/4">>, - <<"don't expire">>, - [{qos, 0}, {retain, true}] - ), + emqtt:publish( + C1, + <<"retained/0">>, + #{'Message-Expiry-Interval' => 0}, + <<"don't expire">>, + [{qos, 0}, {retain, true}] + ), + emqtt:publish( + C1, + <<"retained/1">>, + #{'Message-Expiry-Interval' => 2}, + <<"expire">>, + [{qos, 0}, {retain, true}] + ), + emqtt:publish( + C1, + <<"retained/2">>, + #{'Message-Expiry-Interval' => 5}, + <<"don't expire">>, + [{qos, 0}, {retain, true}] + ), + emqtt:publish( + C1, + <<"retained/3">>, + <<"don't expire">>, + [{qos, 0}, {retain, true}] + ), + emqtt:publish( + C1, + <<"$SYS/retained/4">>, + <<"don't expire">>, + [{qos, 0}, {retain, true}] + ), - {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0), - {ok, #{}, [0]} = emqtt:subscribe(C1, <<"$SYS/retained/+">>, 0), - ?assertEqual(5, length(receive_messages(5))), - {ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained/+">>), - {ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"$SYS/retained/+">>), + {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0), + {ok, #{}, [0]} = emqtt:subscribe(C1, <<"$SYS/retained/+">>, 0), + ?assertEqual(5, length(receive_messages(5))), + {ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained/+">>), + {ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"$SYS/retained/+">>), - timer:sleep(3000), - {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0), - {ok, #{}, [0]} = emqtt:subscribe(C1, <<"$SYS/retained/+">>, 0), - ?assertEqual(4, length(receive_messages(5))), + timer:sleep(3000), + {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0), + {ok, #{}, [0]} = emqtt:subscribe(C1, <<"$SYS/retained/+">>, 0), + ?assertEqual(4, length(receive_messages(5))), - emqtt:publish(C1, <<"retained/0">>, <<"">>, [{qos, 0}, {retain, true}]), - emqtt:publish(C1, <<"retained/1">>, <<"">>, [{qos, 0}, {retain, true}]), - emqtt:publish(C1, <<"retained/2">>, <<"">>, [{qos, 0}, {retain, true}]), - emqtt:publish(C1, <<"retained/3">>, <<"">>, [{qos, 0}, {retain, true}]), - emqtt:publish(C1, <<"$SYS/retained/4">>, <<"">>, [{qos, 0}, {retain, true}]), + emqtt:publish(C1, <<"retained/0">>, <<"">>, [{qos, 0}, {retain, true}]), + emqtt:publish(C1, <<"retained/1">>, <<"">>, [{qos, 0}, {retain, true}]), + emqtt:publish(C1, <<"retained/2">>, <<"">>, [{qos, 0}, {retain, true}]), + emqtt:publish(C1, <<"retained/3">>, <<"">>, [{qos, 0}, {retain, true}]), + emqtt:publish(C1, <<"$SYS/retained/4">>, <<"">>, [{qos, 0}, {retain, true}]), - ok = emqtt:disconnect(C1). + ok = emqtt:disconnect(C1) + end, + with_conf(ConfMod, Case). t_message_expiry_2(_) -> ConfMod = fun(Conf) -> @@ -410,6 +416,7 @@ t_flow_control(_) -> JsonCfg = make_limiter_json(<<"1/1s">>), emqx_limiter_server:add_bucket(emqx_retainer, internal, LimiterCfg), emqx_retainer:update_config(#{ + <<"delivery_rate">> => <<"1/1s">>, <<"flow_control">> => #{ <<"batch_read_number">> => 1,