test(retainer): fix test cases

This commit is contained in:
Ivan Dyachkov 2024-01-29 08:42:36 +01:00
parent e4b8d79444
commit fa6d65887d
1 changed files with 57 additions and 50 deletions

View File

@ -256,61 +256,67 @@ t_wildcard_subscription(_) ->
ok = emqtt:disconnect(C1). ok = emqtt:disconnect(C1).
t_message_expiry(_) -> t_message_expiry(_) ->
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), ConfMod = fun(Conf) ->
{ok, _} = emqtt:connect(C1), Conf#{<<"delivery_rate">> := <<"infinity">>}
end,
Case = fun() ->
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1),
emqtt:publish( emqtt:publish(
C1, C1,
<<"retained/0">>, <<"retained/0">>,
#{'Message-Expiry-Interval' => 0}, #{'Message-Expiry-Interval' => 0},
<<"don't expire">>, <<"don't expire">>,
[{qos, 0}, {retain, true}] [{qos, 0}, {retain, true}]
), ),
emqtt:publish( emqtt:publish(
C1, C1,
<<"retained/1">>, <<"retained/1">>,
#{'Message-Expiry-Interval' => 2}, #{'Message-Expiry-Interval' => 2},
<<"expire">>, <<"expire">>,
[{qos, 0}, {retain, true}] [{qos, 0}, {retain, true}]
), ),
emqtt:publish( emqtt:publish(
C1, C1,
<<"retained/2">>, <<"retained/2">>,
#{'Message-Expiry-Interval' => 5}, #{'Message-Expiry-Interval' => 5},
<<"don't expire">>, <<"don't expire">>,
[{qos, 0}, {retain, true}] [{qos, 0}, {retain, true}]
), ),
emqtt:publish( emqtt:publish(
C1, C1,
<<"retained/3">>, <<"retained/3">>,
<<"don't expire">>, <<"don't expire">>,
[{qos, 0}, {retain, true}] [{qos, 0}, {retain, true}]
), ),
emqtt:publish( emqtt:publish(
C1, C1,
<<"$SYS/retained/4">>, <<"$SYS/retained/4">>,
<<"don't expire">>, <<"don't expire">>,
[{qos, 0}, {retain, true}] [{qos, 0}, {retain, true}]
), ),
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0),
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"$SYS/retained/+">>, 0), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"$SYS/retained/+">>, 0),
?assertEqual(5, length(receive_messages(5))), ?assertEqual(5, length(receive_messages(5))),
{ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained/+">>), {ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained/+">>),
{ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"$SYS/retained/+">>), {ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"$SYS/retained/+">>),
timer:sleep(3000), timer:sleep(3000),
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0),
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"$SYS/retained/+">>, 0), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"$SYS/retained/+">>, 0),
?assertEqual(4, length(receive_messages(5))), ?assertEqual(4, length(receive_messages(5))),
emqtt:publish(C1, <<"retained/0">>, <<"">>, [{qos, 0}, {retain, true}]), emqtt:publish(C1, <<"retained/0">>, <<"">>, [{qos, 0}, {retain, true}]),
emqtt:publish(C1, <<"retained/1">>, <<"">>, [{qos, 0}, {retain, true}]), emqtt:publish(C1, <<"retained/1">>, <<"">>, [{qos, 0}, {retain, true}]),
emqtt:publish(C1, <<"retained/2">>, <<"">>, [{qos, 0}, {retain, true}]), emqtt:publish(C1, <<"retained/2">>, <<"">>, [{qos, 0}, {retain, true}]),
emqtt:publish(C1, <<"retained/3">>, <<"">>, [{qos, 0}, {retain, true}]), emqtt:publish(C1, <<"retained/3">>, <<"">>, [{qos, 0}, {retain, true}]),
emqtt:publish(C1, <<"$SYS/retained/4">>, <<"">>, [{qos, 0}, {retain, true}]), emqtt:publish(C1, <<"$SYS/retained/4">>, <<"">>, [{qos, 0}, {retain, true}]),
ok = emqtt:disconnect(C1). ok = emqtt:disconnect(C1)
end,
with_conf(ConfMod, Case).
t_message_expiry_2(_) -> t_message_expiry_2(_) ->
ConfMod = fun(Conf) -> ConfMod = fun(Conf) ->
@ -410,6 +416,7 @@ t_flow_control(_) ->
JsonCfg = make_limiter_json(<<"1/1s">>), JsonCfg = make_limiter_json(<<"1/1s">>),
emqx_limiter_server:add_bucket(emqx_retainer, internal, LimiterCfg), emqx_limiter_server:add_bucket(emqx_retainer, internal, LimiterCfg),
emqx_retainer:update_config(#{ emqx_retainer:update_config(#{
<<"delivery_rate">> => <<"1/1s">>,
<<"flow_control">> => <<"flow_control">> =>
#{ #{
<<"batch_read_number">> => 1, <<"batch_read_number">> => 1,