test(retainer): extend test suite for usage by other backends
Part of https://emqx.atlassian.net/browse/EMQX-11922
This commit is contained in:
parent
d56fb22208
commit
08ef2c7b8b
|
@ -157,7 +157,7 @@ t_store_and_clean(_) ->
|
|||
|
||||
ok = emqtt:disconnect(C1).
|
||||
|
||||
t_retain_handling(_) ->
|
||||
t_retain_handling(Config) ->
|
||||
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
||||
{ok, _} = emqtt:connect(C1),
|
||||
|
||||
|
@ -173,11 +173,12 @@ t_retain_handling(_) ->
|
|||
?assertEqual(0, length(receive_messages(1))),
|
||||
{ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained/#">>),
|
||||
|
||||
emqtt:publish(
|
||||
publish(
|
||||
C1,
|
||||
<<"retained">>,
|
||||
<<"this is a retained message">>,
|
||||
[{qos, 0}, {retain, true}]
|
||||
[{qos, 0}, {retain, true}],
|
||||
Config
|
||||
),
|
||||
|
||||
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
|
||||
|
@ -205,7 +206,7 @@ t_retain_handling(_) ->
|
|||
emqtt:publish(C1, <<"retained">>, <<"">>, [{qos, 0}, {retain, true}]),
|
||||
ok = emqtt:disconnect(C1).
|
||||
|
||||
t_wildcard_subscription(_) ->
|
||||
t_wildcard_subscription(Config) ->
|
||||
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
||||
{ok, _} = emqtt:connect(C1),
|
||||
emqtt:publish(
|
||||
|
@ -226,17 +227,19 @@ t_wildcard_subscription(_) ->
|
|||
<<"this is a retained message 2">>,
|
||||
[{qos, 0}, {retain, true}]
|
||||
),
|
||||
emqtt:publish(
|
||||
publish(
|
||||
C1,
|
||||
<<"/x/y/z">>,
|
||||
<<"this is a retained message 3">>,
|
||||
[{qos, 0}, {retain, true}]
|
||||
[{qos, 0}, {retain, true}],
|
||||
Config
|
||||
),
|
||||
|
||||
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0),
|
||||
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+/b/#">>, 0),
|
||||
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"/+/y/#">>, 0),
|
||||
?assertEqual(4, length(receive_messages(4))),
|
||||
Msgs = receive_messages(4),
|
||||
?assertEqual(4, length(Msgs), #{msgs => Msgs}),
|
||||
|
||||
emqtt:publish(C1, <<"retained/0">>, <<"">>, [{qos, 0}, {retain, true}]),
|
||||
emqtt:publish(C1, <<"retained/1">>, <<"">>, [{qos, 0}, {retain, true}]),
|
||||
|
@ -244,7 +247,7 @@ t_wildcard_subscription(_) ->
|
|||
emqtt:publish(C1, <<"/x/y/z">>, <<"">>, [{qos, 0}, {retain, true}]),
|
||||
ok = emqtt:disconnect(C1).
|
||||
|
||||
t_message_expiry(_) ->
|
||||
t_message_expiry(Config) ->
|
||||
ConfMod = fun(Conf) ->
|
||||
Conf#{<<"delivery_rate">> := <<"infinity">>}
|
||||
end,
|
||||
|
@ -279,11 +282,12 @@ t_message_expiry(_) ->
|
|||
<<"don't expire">>,
|
||||
[{qos, 0}, {retain, true}]
|
||||
),
|
||||
emqtt:publish(
|
||||
publish(
|
||||
C1,
|
||||
<<"$SYS/retained/4">>,
|
||||
<<"don't expire">>,
|
||||
[{qos, 0}, {retain, true}]
|
||||
[{qos, 0}, {retain, true}],
|
||||
Config
|
||||
),
|
||||
|
||||
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0),
|
||||
|
@ -307,14 +311,14 @@ t_message_expiry(_) ->
|
|||
end,
|
||||
with_conf(ConfMod, Case).
|
||||
|
||||
t_message_expiry_2(_) ->
|
||||
t_message_expiry_2(Config) ->
|
||||
ConfMod = fun(Conf) ->
|
||||
Conf#{<<"msg_expiry_interval">> := <<"2s">>}
|
||||
end,
|
||||
Case = fun() ->
|
||||
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
||||
{ok, _} = emqtt:connect(C1),
|
||||
emqtt:publish(C1, <<"retained">>, <<"expire">>, [{qos, 0}, {retain, true}]),
|
||||
publish(C1, <<"retained">>, <<"expire">>, [{qos, 0}, {retain, true}], Config),
|
||||
|
||||
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
|
||||
?assertEqual(1, length(receive_messages(1))),
|
||||
|
@ -348,7 +352,7 @@ t_table_full(_) ->
|
|||
end,
|
||||
with_conf(ConfMod, Case).
|
||||
|
||||
t_clean(_) ->
|
||||
t_clean(Config) ->
|
||||
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
||||
{ok, _} = emqtt:connect(C1),
|
||||
emqtt:publish(
|
||||
|
@ -363,11 +367,12 @@ t_clean(_) ->
|
|||
<<"this is a retained message 1">>,
|
||||
[{qos, 0}, {retain, true}]
|
||||
),
|
||||
emqtt:publish(
|
||||
publish(
|
||||
C1,
|
||||
<<"retained/test/0">>,
|
||||
<<"this is a retained message 2">>,
|
||||
[{qos, 0}, {retain, true}]
|
||||
[{qos, 0}, {retain, true}],
|
||||
Config
|
||||
),
|
||||
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]),
|
||||
?assertEqual(3, length(receive_messages(3))),
|
||||
|
@ -871,3 +876,36 @@ make_limiter_json(Rate) ->
|
|||
<<"initial">> => 0,
|
||||
<<"burst">> => <<"0">>
|
||||
}.
|
||||
|
||||
publish(Client, Topic, Payload, Opts, TCConfig) ->
|
||||
PublishOpts = publish_opts(TCConfig),
|
||||
do_publish(Client, Topic, Payload, Opts, PublishOpts).
|
||||
|
||||
publish_opts(TCConfig) ->
|
||||
Timeout = proplists:get_value(publish_wait_timeout, TCConfig, undefined),
|
||||
Predicate =
|
||||
case proplists:get_value(publish_wait_predicate, TCConfig, undefined) of
|
||||
undefined -> undefined;
|
||||
{NEvents, Pred} -> {predicate, {NEvents, Pred, Timeout}};
|
||||
Pred -> {predicate, {1, Pred, Timeout}}
|
||||
end,
|
||||
Sleep =
|
||||
case proplists:get_value(sleep_after_publish, TCConfig, undefined) of
|
||||
undefined -> undefined;
|
||||
Time -> {sleep, Time}
|
||||
end,
|
||||
emqx_maybe:define(Predicate, Sleep).
|
||||
|
||||
do_publish(Client, Topic, Payload, Opts, undefined) ->
|
||||
emqtt:publish(Client, Topic, Payload, Opts);
|
||||
do_publish(Client, Topic, Payload, Opts, {predicate, {NEvents, Predicate, Timeout}}) ->
|
||||
%% Do not delete this clause: it's used by other retainer implementation tests
|
||||
{ok, SRef0} = snabbkaffe:subscribe(Predicate, NEvents, Timeout),
|
||||
Res = emqtt:publish(Client, Topic, Payload, Opts),
|
||||
{ok, _} = snabbkaffe:receive_events(SRef0),
|
||||
Res;
|
||||
do_publish(Client, Topic, Payload, Opts, {sleep, Time}) ->
|
||||
%% Do not delete this clause: it's used by other retainer implementation tests
|
||||
Res = emqtt:publish(Client, Topic, Payload, Opts),
|
||||
ct:sleep(Time),
|
||||
Res.
|
||||
|
|
Loading…
Reference in New Issue