From 21e82b953407072f19ce9b53707bc0a4011de626 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 20 Sep 2023 12:55:04 +0400 Subject: [PATCH] test(sessmem): make retry delivery testcase more involved --- apps/emqx/test/emqx_session_mem_SUITE.erl | 62 +++++++++++++++-------- 1 file changed, 41 insertions(+), 21 deletions(-) diff --git a/apps/emqx/test/emqx_session_mem_SUITE.erl b/apps/emqx/test/emqx_session_mem_SUITE.erl index b535e34a5..7f10635c1 100644 --- a/apps/emqx/test/emqx_session_mem_SUITE.erl +++ b/apps/emqx/test/emqx_session_mem_SUITE.erl @@ -413,22 +413,38 @@ t_enqueue_qos0(_) -> ?assertEqual(2, emqx_session_mem:info(mqueue_len, Session1)). t_retry(_) -> - %% 0.1s - RetryIntervalMs = 100, + RetryIntervalMs = 1000, Session = session(#{retry_interval => RetryIntervalMs}), - Delivers = enrich([delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)], Session), - {ok, Pubs, Session1} = emqx_session_mem:deliver( - clientinfo(), Delivers, Session + Delivers = enrich( + [ + delivery(?QOS_1, <<"t1">>, <<"expiressoon">>, _Expiry = 1), + delivery(?QOS_2, <<"t2">>), + delivery(?QOS_0, <<"t3">>), + delivery(?QOS_1, <<"t4">>) + ], + Session ), - %% 0.2s - ElapseMs = 200, + {ok, Pubs, Session1} = emqx_session_mem:deliver(clientinfo(), Delivers, Session), + [_Pub1, Pub2, _Pub3, Pub4] = Pubs, + {ok, _Msg, Session2} = emqx_session_mem:pubrec(get_packet_id(Pub2), Session1), + ElapseMs = 1500, ok = timer:sleep(ElapseMs), - Msgs1 = [{I, with_ts(wait_ack, emqx_message:set_flag(dup, Msg))} || {I, Msg} <- Pubs], - {ok, Msgs1T, RetryIntervalMs, Session2} = emqx_session_mem:handle_timeout( - clientinfo(), retry_delivery, Session1 + {ok, PubsRetry, RetryIntervalMs, Session3} = emqx_session_mem:handle_timeout( + clientinfo(), retry_delivery, Session2 ), - ?assertEqual(inflight_data_to_msg(Msgs1), remove_deliver_flag(Msgs1T)), - ?assertEqual(2, emqx_session_mem:info(inflight_cnt, Session2)). + ?assertEqual( + [ + % Pub1 is expired + {pubrel, get_packet_id(Pub2)}, + % Pub3 is QoS0 + set_duplicate_pub(Pub4) + ], + remove_deliver_flag(PubsRetry) + ), + ?assertEqual( + 2, + emqx_session_mem:info(inflight_cnt, Session3) + ). %%-------------------------------------------------------------------- %% Test cases for takeover/resume @@ -540,7 +556,12 @@ subopts(Init) -> maps:merge(?DEFAULT_SUBOPTS, Init). delivery(QoS, Topic) -> - {deliver, Topic, emqx_message:make(test, QoS, Topic, <<"payload">>)}. + Payload = emqx_guid:to_hexstr(emqx_guid:gen()), + {deliver, Topic, emqx_message:make(test, QoS, Topic, Payload)}. + +delivery(QoS, Topic, Payload, ExpiryInterval) -> + Headers = #{properties => #{'Message-Expiry-Interval' => ExpiryInterval}}, + {deliver, Topic, emqx_message:make(test, QoS, Topic, Payload, #{}, Headers)}. enrich(Delivers, Session) when is_list(Delivers) -> emqx_session:enrich_delivers(clientinfo(), Delivers, Session); @@ -562,18 +583,17 @@ with_ts(Phase, Msg, Ts) -> timestamp = Ts }. +remove_deliver_flag({pubrel, Id}) -> + {pubrel, Id}; remove_deliver_flag({Id, Data}) -> {Id, remove_deliver_flag(Data)}; -remove_deliver_flag(#inflight_data{message = Msg} = Data) -> - Data#inflight_data{message = remove_deliver_flag(Msg)}; remove_deliver_flag(List) when is_list(List) -> lists:map(fun remove_deliver_flag/1, List); remove_deliver_flag(Msg) -> emqx_message:remove_header(deliver_begin_at, Msg). -inflight_data_to_msg({Id, Data}) -> - {Id, inflight_data_to_msg(Data)}; -inflight_data_to_msg(#inflight_data{message = Msg}) -> - Msg; -inflight_data_to_msg(List) when is_list(List) -> - lists:map(fun inflight_data_to_msg/1, List). +set_duplicate_pub({Id, Msg}) -> + {Id, emqx_message:set_flag(dup, Msg)}. + +get_packet_id({Id, _}) -> + Id.