test(sessmem): make retry delivery testcase more involved

This commit is contained in:
Andrew Mayorov 2023-09-20 12:55:04 +04:00
parent 69889d14a3
commit 21e82b9534
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
1 changed files with 41 additions and 21 deletions

View File

@ -413,22 +413,38 @@ t_enqueue_qos0(_) ->
?assertEqual(2, emqx_session_mem:info(mqueue_len, Session1)). ?assertEqual(2, emqx_session_mem:info(mqueue_len, Session1)).
t_retry(_) -> t_retry(_) ->
%% 0.1s RetryIntervalMs = 1000,
RetryIntervalMs = 100,
Session = session(#{retry_interval => RetryIntervalMs}), Session = session(#{retry_interval => RetryIntervalMs}),
Delivers = enrich([delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)], Session), Delivers = enrich(
{ok, Pubs, Session1} = emqx_session_mem:deliver( [
clientinfo(), Delivers, Session delivery(?QOS_1, <<"t1">>, <<"expiressoon">>, _Expiry = 1),
delivery(?QOS_2, <<"t2">>),
delivery(?QOS_0, <<"t3">>),
delivery(?QOS_1, <<"t4">>)
],
Session
), ),
%% 0.2s {ok, Pubs, Session1} = emqx_session_mem:deliver(clientinfo(), Delivers, Session),
ElapseMs = 200, [_Pub1, Pub2, _Pub3, Pub4] = Pubs,
{ok, _Msg, Session2} = emqx_session_mem:pubrec(get_packet_id(Pub2), Session1),
ElapseMs = 1500,
ok = timer:sleep(ElapseMs), ok = timer:sleep(ElapseMs),
Msgs1 = [{I, with_ts(wait_ack, emqx_message:set_flag(dup, Msg))} || {I, Msg} <- Pubs], {ok, PubsRetry, RetryIntervalMs, Session3} = emqx_session_mem:handle_timeout(
{ok, Msgs1T, RetryIntervalMs, Session2} = emqx_session_mem:handle_timeout( clientinfo(), retry_delivery, Session2
clientinfo(), retry_delivery, Session1
), ),
?assertEqual(inflight_data_to_msg(Msgs1), remove_deliver_flag(Msgs1T)), ?assertEqual(
?assertEqual(2, emqx_session_mem:info(inflight_cnt, Session2)). [
% Pub1 is expired
{pubrel, get_packet_id(Pub2)},
% Pub3 is QoS0
set_duplicate_pub(Pub4)
],
remove_deliver_flag(PubsRetry)
),
?assertEqual(
2,
emqx_session_mem:info(inflight_cnt, Session3)
).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Test cases for takeover/resume %% Test cases for takeover/resume
@ -540,7 +556,12 @@ subopts(Init) ->
maps:merge(?DEFAULT_SUBOPTS, Init). maps:merge(?DEFAULT_SUBOPTS, Init).
delivery(QoS, Topic) -> delivery(QoS, Topic) ->
{deliver, Topic, emqx_message:make(test, QoS, Topic, <<"payload">>)}. Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
{deliver, Topic, emqx_message:make(test, QoS, Topic, Payload)}.
delivery(QoS, Topic, Payload, ExpiryInterval) ->
Headers = #{properties => #{'Message-Expiry-Interval' => ExpiryInterval}},
{deliver, Topic, emqx_message:make(test, QoS, Topic, Payload, #{}, Headers)}.
enrich(Delivers, Session) when is_list(Delivers) -> enrich(Delivers, Session) when is_list(Delivers) ->
emqx_session:enrich_delivers(clientinfo(), Delivers, Session); emqx_session:enrich_delivers(clientinfo(), Delivers, Session);
@ -562,18 +583,17 @@ with_ts(Phase, Msg, Ts) ->
timestamp = Ts timestamp = Ts
}. }.
remove_deliver_flag({pubrel, Id}) ->
{pubrel, Id};
remove_deliver_flag({Id, Data}) -> remove_deliver_flag({Id, Data}) ->
{Id, remove_deliver_flag(Data)}; {Id, remove_deliver_flag(Data)};
remove_deliver_flag(#inflight_data{message = Msg} = Data) ->
Data#inflight_data{message = remove_deliver_flag(Msg)};
remove_deliver_flag(List) when is_list(List) -> remove_deliver_flag(List) when is_list(List) ->
lists:map(fun remove_deliver_flag/1, List); lists:map(fun remove_deliver_flag/1, List);
remove_deliver_flag(Msg) -> remove_deliver_flag(Msg) ->
emqx_message:remove_header(deliver_begin_at, Msg). emqx_message:remove_header(deliver_begin_at, Msg).
inflight_data_to_msg({Id, Data}) -> set_duplicate_pub({Id, Msg}) ->
{Id, inflight_data_to_msg(Data)}; {Id, emqx_message:set_flag(dup, Msg)}.
inflight_data_to_msg(#inflight_data{message = Msg}) ->
Msg; get_packet_id({Id, _}) ->
inflight_data_to_msg(List) when is_list(List) -> Id.
lists:map(fun inflight_data_to_msg/1, List).