From 08ef2c7b8b91cfd1b55627444ac8137fea95b0a8 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 1 Mar 2024 17:12:54 -0300 Subject: [PATCH] test(retainer): extend test suite for usage by other backends Part of https://emqx.atlassian.net/browse/EMQX-11922 --- .../test/emqx_retainer_SUITE.erl | 68 +++++++++++++++---- 1 file changed, 53 insertions(+), 15 deletions(-) diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index f42255832..d4ad43907 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -157,7 +157,7 @@ t_store_and_clean(_) -> ok = emqtt:disconnect(C1). -t_retain_handling(_) -> +t_retain_handling(Config) -> {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, _} = emqtt:connect(C1), @@ -173,11 +173,12 @@ t_retain_handling(_) -> ?assertEqual(0, length(receive_messages(1))), {ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained/#">>), - emqtt:publish( + publish( C1, <<"retained">>, <<"this is a retained message">>, - [{qos, 0}, {retain, true}] + [{qos, 0}, {retain, true}], + Config ), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]), @@ -205,7 +206,7 @@ t_retain_handling(_) -> emqtt:publish(C1, <<"retained">>, <<"">>, [{qos, 0}, {retain, true}]), ok = emqtt:disconnect(C1). -t_wildcard_subscription(_) -> +t_wildcard_subscription(Config) -> {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, _} = emqtt:connect(C1), emqtt:publish( @@ -226,17 +227,19 @@ t_wildcard_subscription(_) -> <<"this is a retained message 2">>, [{qos, 0}, {retain, true}] ), - emqtt:publish( + publish( C1, <<"/x/y/z">>, <<"this is a retained message 3">>, - [{qos, 0}, {retain, true}] + [{qos, 0}, {retain, true}], + Config ), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+/b/#">>, 0), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"/+/y/#">>, 0), - ?assertEqual(4, length(receive_messages(4))), + Msgs = receive_messages(4), + ?assertEqual(4, length(Msgs), #{msgs => Msgs}), emqtt:publish(C1, <<"retained/0">>, <<"">>, [{qos, 0}, {retain, true}]), emqtt:publish(C1, <<"retained/1">>, <<"">>, [{qos, 0}, {retain, true}]), @@ -244,7 +247,7 @@ t_wildcard_subscription(_) -> emqtt:publish(C1, <<"/x/y/z">>, <<"">>, [{qos, 0}, {retain, true}]), ok = emqtt:disconnect(C1). -t_message_expiry(_) -> +t_message_expiry(Config) -> ConfMod = fun(Conf) -> Conf#{<<"delivery_rate">> := <<"infinity">>} end, @@ -279,11 +282,12 @@ t_message_expiry(_) -> <<"don't expire">>, [{qos, 0}, {retain, true}] ), - emqtt:publish( + publish( C1, <<"$SYS/retained/4">>, <<"don't expire">>, - [{qos, 0}, {retain, true}] + [{qos, 0}, {retain, true}], + Config ), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0), @@ -307,14 +311,14 @@ t_message_expiry(_) -> end, with_conf(ConfMod, Case). -t_message_expiry_2(_) -> +t_message_expiry_2(Config) -> ConfMod = fun(Conf) -> Conf#{<<"msg_expiry_interval">> := <<"2s">>} end, Case = fun() -> {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, _} = emqtt:connect(C1), - emqtt:publish(C1, <<"retained">>, <<"expire">>, [{qos, 0}, {retain, true}]), + publish(C1, <<"retained">>, <<"expire">>, [{qos, 0}, {retain, true}], Config), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]), ?assertEqual(1, length(receive_messages(1))), @@ -348,7 +352,7 @@ t_table_full(_) -> end, with_conf(ConfMod, Case). -t_clean(_) -> +t_clean(Config) -> {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, _} = emqtt:connect(C1), emqtt:publish( @@ -363,11 +367,12 @@ t_clean(_) -> <<"this is a retained message 1">>, [{qos, 0}, {retain, true}] ), - emqtt:publish( + publish( C1, <<"retained/test/0">>, <<"this is a retained message 2">>, - [{qos, 0}, {retain, true}] + [{qos, 0}, {retain, true}], + Config ), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]), ?assertEqual(3, length(receive_messages(3))), @@ -871,3 +876,36 @@ make_limiter_json(Rate) -> <<"initial">> => 0, <<"burst">> => <<"0">> }. + +publish(Client, Topic, Payload, Opts, TCConfig) -> + PublishOpts = publish_opts(TCConfig), + do_publish(Client, Topic, Payload, Opts, PublishOpts). + +publish_opts(TCConfig) -> + Timeout = proplists:get_value(publish_wait_timeout, TCConfig, undefined), + Predicate = + case proplists:get_value(publish_wait_predicate, TCConfig, undefined) of + undefined -> undefined; + {NEvents, Pred} -> {predicate, {NEvents, Pred, Timeout}}; + Pred -> {predicate, {1, Pred, Timeout}} + end, + Sleep = + case proplists:get_value(sleep_after_publish, TCConfig, undefined) of + undefined -> undefined; + Time -> {sleep, Time} + end, + emqx_maybe:define(Predicate, Sleep). + +do_publish(Client, Topic, Payload, Opts, undefined) -> + emqtt:publish(Client, Topic, Payload, Opts); +do_publish(Client, Topic, Payload, Opts, {predicate, {NEvents, Predicate, Timeout}}) -> + %% Do not delete this clause: it's used by other retainer implementation tests + {ok, SRef0} = snabbkaffe:subscribe(Predicate, NEvents, Timeout), + Res = emqtt:publish(Client, Topic, Payload, Opts), + {ok, _} = snabbkaffe:receive_events(SRef0), + Res; +do_publish(Client, Topic, Payload, Opts, {sleep, Time}) -> + %% Do not delete this clause: it's used by other retainer implementation tests + Res = emqtt:publish(Client, Topic, Payload, Opts), + ct:sleep(Time), + Res.