fix(session): update testcases

This commit is contained in:
Shawn 2022-02-08 18:14:50 +08:00
parent 2879001694
commit 93092657b9
3 changed files with 117 additions and 64 deletions

View File

@ -454,7 +454,7 @@ dequeue(ClientInfo, Session = #session{inflight = Inflight, mqueue = Q}) ->
true -> {ok, Session};
false ->
{Msgs, Q1} = dequeue(ClientInfo, batch_n(Inflight), [], Q),
deliver(Msgs, [], Session#session{mqueue = Q1})
do_deliver(ClientInfo, Msgs, [], Session#session{mqueue = Q1})
end.
dequeue(_ClientInfo, 0, Msgs, Q) ->

View File

@ -115,6 +115,56 @@ listeners_conf() ->
ws => #{default => listener_mqtt_ws_conf()}
}.
limiter_conf() ->
#{bytes_in =>
#{bucket =>
#{default =>
#{aggregated =>
#{capacity => infinity,initial => 0,rate => infinity},
per_client =>
#{capacity => infinity,divisible => false,
failure_strategy => force,initial => 0,low_water_mark => 0,
max_retry_time => 5000,rate => infinity},
zone => default}},
global => #{burst => 0,rate => infinity},
zone => #{default => #{burst => 0,rate => infinity}}},
connection =>
#{bucket =>
#{default =>
#{aggregated =>
#{capacity => infinity,initial => 0,rate => infinity},
per_client =>
#{capacity => infinity,divisible => false,
failure_strategy => force,initial => 0,low_water_mark => 0,
max_retry_time => 5000,rate => infinity},
zone => default}},
global => #{burst => 0,rate => infinity},
zone => #{default => #{burst => 0,rate => infinity}}},
message_in =>
#{bucket =>
#{default =>
#{aggregated =>
#{capacity => infinity,initial => 0,rate => infinity},
per_client =>
#{capacity => infinity,divisible => false,
failure_strategy => force,initial => 0,low_water_mark => 0,
max_retry_time => 5000,rate => infinity},
zone => default}},
global => #{burst => 0,rate => infinity},
zone => #{default => #{burst => 0,rate => infinity}}},
message_routing =>
#{bucket =>
#{default =>
#{aggregated =>
#{capacity => infinity,initial => 0,rate => infinity},
per_client =>
#{capacity => infinity,divisible => false,
failure_strategy => force,initial => 0,low_water_mark => 0,
max_retry_time => 5000,rate => infinity},
zone => default}},
global => #{burst => 0,rate => infinity},
zone => #{default => #{burst => 0,rate => infinity}}}}.
stats_conf() ->
#{enable => true}.
@ -130,11 +180,11 @@ basic_conf() ->
stats => stats_conf(),
listeners => listeners_conf(),
zones => zone_conf(),
limiter => emqx:get_config([limiter])
limiter => limiter_conf()
}.
set_test_listener_confs() ->
Conf = emqx_config:get([]),
Conf = emqx_config:get([], #{}),
emqx_config:put(basic_conf()),
Conf.
@ -180,10 +230,10 @@ end_per_suite(_Config) ->
]).
init_per_testcase(TestCase, Config) ->
NewConf = set_test_listener_confs(),
OldConf = set_test_listener_confs(),
emqx_common_test_helpers:start_apps([]),
modify_limiter(TestCase, NewConf),
[{config, NewConf}|Config].
modify_limiter(TestCase, OldConf),
[{config, OldConf}|Config].
end_per_testcase(_TestCase, Config) ->
emqx_config:put(?config(config, Config)),
@ -232,15 +282,16 @@ t_chan_info(_) ->
?assertEqual(clientinfo(), ClientInfo).
t_chan_caps(_) ->
#{max_clientid_len := 65535,
?assertMatch(#{
max_clientid_len := 65535,
max_qos_allowed := 2,
max_topic_alias := 65535,
max_topic_levels := 128,
max_topic_levels := Level,
retain_available := true,
shared_subscription := true,
subscription_identifiers := true,
wildcard_subscription := true
} = emqx_channel:caps(channel()).
} when is_integer(Level), emqx_channel:caps(channel())).
%%--------------------------------------------------------------------
%% Test cases for channel handle_in
@ -377,14 +428,14 @@ t_handle_in_qos2_publish_with_error_return(_) ->
t_handle_in_puback_ok(_) ->
Msg = emqx_message:make(<<"t">>, <<"payload">>),
ok = meck:expect(emqx_session, puback,
fun(_PacketId, Session) -> {ok, Msg, Session} end),
fun(_, _PacketId, Session) -> {ok, Msg, Session} end),
Channel = channel(#{conn_state => connected}),
{ok, _NChannel} = emqx_channel:handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), Channel).
% ?assertEqual(#{puback_in => 1}, emqx_channel:info(pub_stats, NChannel)).
t_handle_in_puback_id_in_use(_) ->
ok = meck:expect(emqx_session, puback,
fun(_, _Session) ->
fun(_, _, _Session) ->
{error, ?RC_PACKET_IDENTIFIER_IN_USE}
end),
{ok, _Channel} = emqx_channel:handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), channel()).
@ -392,7 +443,7 @@ t_handle_in_puback_id_in_use(_) ->
t_handle_in_puback_id_not_found(_) ->
ok = meck:expect(emqx_session, puback,
fun(_, _Session) ->
fun(_, _, _Session) ->
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
end),
{ok, _Channel} = emqx_channel:handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), channel()).
@ -430,14 +481,14 @@ t_override_client_receive_maximum(_) ->
t_handle_in_pubrec_ok(_) ->
Msg = emqx_message:make(test,?QOS_2, <<"t">>, <<"payload">>),
ok = meck:expect(emqx_session, pubrec, fun(_, Session) -> {ok, Msg, Session} end),
ok = meck:expect(emqx_session, pubrec, fun(_, _, Session) -> {ok, Msg, Session} end),
Channel = channel(#{conn_state => connected}),
{ok, ?PUBREL_PACKET(1, ?RC_SUCCESS), _Channel1} =
emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), Channel).
t_handle_in_pubrec_id_in_use(_) ->
ok = meck:expect(emqx_session, pubrec,
fun(_, _Session) ->
fun(_, _, _Session) ->
{error, ?RC_PACKET_IDENTIFIER_IN_USE}
end),
{ok, ?PUBREL_PACKET(1, ?RC_PACKET_IDENTIFIER_IN_USE), _Channel} =
@ -445,34 +496,34 @@ t_handle_in_pubrec_id_in_use(_) ->
t_handle_in_pubrec_id_not_found(_) ->
ok = meck:expect(emqx_session, pubrec,
fun(_, _Session) ->
fun(_, _, _Session) ->
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
end),
{ok, ?PUBREL_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), _Channel} =
emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), channel()).
t_handle_in_pubrel_ok(_) ->
ok = meck:expect(emqx_session, pubrel, fun(_, Session) -> {ok, Session} end),
ok = meck:expect(emqx_session, pubrel, fun(_, _, Session) -> {ok, Session} end),
Channel = channel(#{conn_state => connected}),
{ok, ?PUBCOMP_PACKET(1, ?RC_SUCCESS), _Channel1} =
emqx_channel:handle_in(?PUBREL_PACKET(1, ?RC_SUCCESS), Channel).
t_handle_in_pubrel_not_found_error(_) ->
ok = meck:expect(emqx_session, pubrel,
fun(_PacketId, _Session) ->
fun(_, _PacketId, _Session) ->
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
end),
{ok, ?PUBCOMP_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), _Channel} =
emqx_channel:handle_in(?PUBREL_PACKET(1, ?RC_SUCCESS), channel()).
t_handle_in_pubcomp_ok(_) ->
ok = meck:expect(emqx_session, pubcomp, fun(_, Session) -> {ok, Session} end),
ok = meck:expect(emqx_session, pubcomp, fun(_, _, Session) -> {ok, Session} end),
{ok, _Channel} = emqx_channel:handle_in(?PUBCOMP_PACKET(1, ?RC_SUCCESS), channel()).
% ?assertEqual(#{pubcomp_in => 1}, emqx_channel:info(pub_stats, Channel)).
t_handle_in_pubcomp_not_found_error(_) ->
ok = meck:expect(emqx_session, pubcomp,
fun(_PacketId, _Session) ->
fun(_, _PacketId, _Session) ->
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
end),
Channel = channel(#{conn_state => connected}),
@ -795,13 +846,13 @@ t_handle_timeout_keepalive(_) ->
t_handle_timeout_retry_delivery(_) ->
TRef = make_ref(),
ok = meck:expect(emqx_session, retry, fun(Session) -> {ok, Session} end),
ok = meck:expect(emqx_session, retry, fun(_, Session) -> {ok, Session} end),
Channel = emqx_channel:set_field(timers, #{retry_timer => TRef}, channel()),
{ok, _Chan} = emqx_channel:handle_timeout(TRef, retry_delivery, Channel).
t_handle_timeout_expire_awaiting_rel(_) ->
TRef = make_ref(),
ok = meck:expect(emqx_session, expire, fun(_, Session) -> {ok, Session} end),
ok = meck:expect(emqx_session, expire, fun(_, _, Session) -> {ok, Session} end),
Channel = emqx_channel:set_field(timers, #{await_timer => TRef}, channel()),
{ok, _Chan} = emqx_channel:handle_timeout(TRef, expire_awaiting_rel, Channel).

View File

@ -126,23 +126,23 @@ t_unsubscribe(_) ->
t_publish_qos0(_) ->
ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
Msg = emqx_message:make(clientid, ?QOS_0, <<"t">>, <<"payload">>),
{ok, [], Session} = emqx_session:publish(1, Msg, Session = session()),
{ok, [], Session} = emqx_session:publish(undefined, Msg, Session).
{ok, [], Session} = emqx_session:publish(clientinfo(), 1, Msg, Session = session()),
{ok, [], Session} = emqx_session:publish(clientinfo(), undefined, Msg, Session).
t_publish_qos1(_) ->
ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
Msg = emqx_message:make(clientid, ?QOS_1, <<"t">>, <<"payload">>),
{ok, [], Session} = emqx_session:publish(1, Msg, Session = session()),
{ok, [], Session} = emqx_session:publish(2, Msg, Session).
{ok, [], Session} = emqx_session:publish(clientinfo(), 1, Msg, Session = session()),
{ok, [], Session} = emqx_session:publish(clientinfo(), 2, Msg, Session).
t_publish_qos2(_) ->
ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
Msg = emqx_message:make(clientid, ?QOS_2, <<"t">>, <<"payload">>),
{ok, [], Session} = emqx_session:publish(1, Msg, session()),
{ok, [], Session} = emqx_session:publish(clientinfo(), 1, Msg, session()),
?assertEqual(1, emqx_session:info(awaiting_rel_cnt, Session)),
{ok, Session1} = emqx_session:pubrel(1, Session),
{ok, Session1} = emqx_session:pubrel(clientinfo(), 1, Session),
?assertEqual(0, emqx_session:info(awaiting_rel_cnt, Session1)),
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrel(1, Session1).
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrel(clientinfo(), 1, Session1).
t_publish_qos2_with_error_return(_) ->
ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
@ -150,10 +150,10 @@ t_publish_qos2_with_error_return(_) ->
awaiting_rel => #{1 => ts(millisecond)}
}),
Msg = emqx_message:make(clientid, ?QOS_2, <<"t">>, <<"payload">>),
{error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:publish(1, Msg, Session),
{ok, [], Session1} = emqx_session:publish(2, Msg, Session),
{error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:publish(clientinfo(), 1, Msg, Session),
{ok, [], Session1} = emqx_session:publish(clientinfo(), 2, Msg, Session),
?assertEqual(2, emqx_session:info(awaiting_rel_cnt, Session1)),
{error, ?RC_RECEIVE_MAXIMUM_EXCEEDED} = emqx_session:publish(3, Msg, Session1).
{error, ?RC_RECEIVE_MAXIMUM_EXCEEDED} = emqx_session:publish(clientinfo(), 3, Msg, Session1).
t_is_awaiting_full_false(_) ->
Session = session(#{max_awaiting_rel => infinity}),
@ -169,7 +169,7 @@ t_puback(_) ->
Msg = emqx_message:make(test, ?QOS_1, <<"t">>, <<>>),
Inflight = emqx_inflight:insert(1, {Msg, ts(millisecond)}, emqx_inflight:new()),
Session = session(#{inflight => Inflight, mqueue => mqueue()}),
{ok, Msg, Session1} = emqx_session:puback(1, Session),
{ok, Msg, Session1} = emqx_session:puback(clientinfo(), 1, Session),
?assertEqual(0, emqx_session:info(inflight_cnt, Session1)).
t_puback_with_dequeue(_) ->
@ -178,7 +178,7 @@ t_puback_with_dequeue(_) ->
Msg2 = emqx_message:make(clientid, ?QOS_1, <<"t2">>, <<"payload2">>),
{_, Q} = emqx_mqueue:in(Msg2, mqueue(#{max_len => 10})),
Session = session(#{inflight => Inflight, mqueue => Q}),
{ok, Msg1, [{_, Msg3}], Session1} = emqx_session:puback(1, Session),
{ok, Msg1, [{_, Msg3}], Session1} = emqx_session:puback(clientinfo(), 1, Session),
?assertEqual(1, emqx_session:info(inflight_cnt, Session1)),
?assertEqual(0, emqx_session:info(mqueue_len, Session1)),
?assertEqual(<<"t2">>, emqx_message:topic(Msg3)).
@ -186,48 +186,48 @@ t_puback_with_dequeue(_) ->
t_puback_error_packet_id_in_use(_) ->
Inflight = emqx_inflight:insert(1, {#pubrel_await{timestamp = ?NOW}, ts(millisecond)}, emqx_inflight:new()),
{error, ?RC_PACKET_IDENTIFIER_IN_USE} =
emqx_session:puback(1, session(#{inflight => Inflight})).
emqx_session:puback(clientinfo(), 1, session(#{inflight => Inflight})).
t_puback_error_packet_id_not_found(_) ->
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:puback(1, session()).
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:puback(clientinfo(), 1, session()).
t_pubrec(_) ->
Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<>>),
Inflight = emqx_inflight:insert(2, {Msg, ts(millisecond)}, emqx_inflight:new()),
Session = session(#{inflight => Inflight}),
{ok, Msg, Session1} = emqx_session:pubrec(2, Session),
{ok, Msg, Session1} = emqx_session:pubrec(clientinfo(), 2, Session),
?assertMatch([{{pubrel_await, _}, _}], emqx_inflight:values(emqx_session:info(inflight, Session1))).
t_pubrec_packet_id_in_use_error(_) ->
Inflight = emqx_inflight:insert(1, {#pubrel_await{timestamp = ?NOW}, ts(millisecond)}, emqx_inflight:new()),
{error, ?RC_PACKET_IDENTIFIER_IN_USE} =
emqx_session:pubrec(1, session(#{inflight => Inflight})).
emqx_session:pubrec(clientinfo(), 1, session(#{inflight => Inflight})).
t_pubrec_packet_id_not_found_error(_) ->
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrec(1, session()).
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrec(clientinfo(), 1, session()).
t_pubrel(_) ->
Session = session(#{awaiting_rel => #{1 => ts(millisecond)}}),
{ok, Session1} = emqx_session:pubrel(1, Session),
{ok, Session1} = emqx_session:pubrel(clientinfo(), 1, Session),
?assertEqual(#{}, emqx_session:info(awaiting_rel, Session1)).
t_pubrel_error_packetid_not_found(_) ->
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrel(1, session()).
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrel(clientinfo(), 1, session()).
t_pubcomp(_) ->
Inflight = emqx_inflight:insert(1, {#pubrel_await{timestamp = ?NOW}, ts(millisecond)}, emqx_inflight:new()),
Session = session(#{inflight => Inflight}),
{ok, Session1} = emqx_session:pubcomp(1, Session),
{ok, Session1} = emqx_session:pubcomp(clientinfo(), 1, Session),
?assertEqual(0, emqx_session:info(inflight_cnt, Session1)).
t_pubcomp_error_packetid_in_use(_) ->
Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<>>),
Inflight = emqx_inflight:insert(1, {Msg, ts(millisecond)}, emqx_inflight:new()),
Session = session(#{inflight => Inflight}),
{error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:pubcomp(1, Session).
{error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:pubcomp(clientinfo(), 1, Session).
t_pubcomp_error_packetid_not_found(_) ->
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubcomp(1, session()).
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubcomp(clientinfo(), 1, session()).
%%--------------------------------------------------------------------
%% Test cases for deliver/retry
@ -235,14 +235,16 @@ t_pubcomp_error_packetid_not_found(_) ->
t_dequeue(_) ->
Q = mqueue(#{store_qos0 => true}),
{ok, Session} = emqx_session:dequeue(session(#{mqueue => Q})),
{ok, Session} = emqx_session:dequeue(clientinfo(), session(#{mqueue => Q})),
Msgs = [emqx_message:make(clientid, ?QOS_0, <<"t0">>, <<"payload">>),
emqx_message:make(clientid, ?QOS_1, <<"t1">>, <<"payload">>),
emqx_message:make(clientid, ?QOS_2, <<"t2">>, <<"payload">>)
],
Session1 = lists:foldl(fun emqx_session:enqueue/2, Session, Msgs),
Session1 = lists:foldl(fun(Msg, S) ->
emqx_session:enqueue(clientinfo(), Msg, S)
end, Session, Msgs),
{ok, [{undefined, Msg0}, {1, Msg1}, {2, Msg2}], Session2} =
emqx_session:dequeue(Session1),
emqx_session:dequeue(clientinfo(), Session1),
?assertEqual(0, emqx_session:info(mqueue_len, Session2)),
?assertEqual(2, emqx_session:info(inflight_cnt, Session2)),
?assertEqual(<<"t0">>, emqx_message:topic(Msg0)),
@ -257,7 +259,7 @@ t_deliver_qos0(_) ->
clientinfo(), <<"t1">>, subopts(), Session),
Deliveries = [delivery(?QOS_0, T) || T <- [<<"t0">>, <<"t1">>]],
{ok, [{undefined, Msg1}, {undefined, Msg2}], Session1} =
emqx_session:deliver(Deliveries, Session1),
emqx_session:deliver(clientinfo(), Deliveries, Session1),
?assertEqual(<<"t0">>, emqx_message:topic(Msg1)),
?assertEqual(<<"t1">>, emqx_message:topic(Msg2)).
@ -266,38 +268,38 @@ t_deliver_qos1(_) ->
{ok, Session} = emqx_session:subscribe(
clientinfo(), <<"t1">>, subopts(#{qos => ?QOS_1}), session()),
Delivers = [delivery(?QOS_1, T) || T <- [<<"t1">>, <<"t2">>]],
{ok, [{1, Msg1}, {2, Msg2}], Session1} = emqx_session:deliver(Delivers, Session),
{ok, [{1, Msg1}, {2, Msg2}], Session1} = emqx_session:deliver(clientinfo(), Delivers, Session),
?assertEqual(2, emqx_session:info(inflight_cnt, Session1)),
?assertEqual(<<"t1">>, emqx_message:topic(Msg1)),
?assertEqual(<<"t2">>, emqx_message:topic(Msg2)),
{ok, Msg1, Session2} = emqx_session:puback(1, Session1),
{ok, Msg1, Session2} = emqx_session:puback(clientinfo(), 1, Session1),
?assertEqual(1, emqx_session:info(inflight_cnt, Session2)),
{ok, Msg2, Session3} = emqx_session:puback(2, Session2),
{ok, Msg2, Session3} = emqx_session:puback(clientinfo(), 2, Session2),
?assertEqual(0, emqx_session:info(inflight_cnt, Session3)).
t_deliver_qos2(_) ->
ok = meck:expect(emqx_broker, subscribe, fun(_, _, _) -> ok end),
Delivers = [delivery(?QOS_2, <<"t0">>), delivery(?QOS_2, <<"t1">>)],
{ok, [{1, Msg1}, {2, Msg2}], Session} =
emqx_session:deliver(Delivers, session()),
emqx_session:deliver(clientinfo(), Delivers, session()),
?assertEqual(2, emqx_session:info(inflight_cnt, Session)),
?assertEqual(<<"t0">>, emqx_message:topic(Msg1)),
?assertEqual(<<"t1">>, emqx_message:topic(Msg2)).
t_deliver_one_msg(_) ->
{ok, [{1, Msg}], Session} =
emqx_session:deliver([delivery(?QOS_1, <<"t1">>)], session()),
emqx_session:deliver(clientinfo(), [delivery(?QOS_1, <<"t1">>)], session()),
?assertEqual(1, emqx_session:info(inflight_cnt, Session)),
?assertEqual(<<"t1">>, emqx_message:topic(Msg)).
t_deliver_when_inflight_is_full(_) ->
Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)],
Session = session(#{inflight => emqx_inflight:new(1)}),
{ok, Publishes, Session1} = emqx_session:deliver(Delivers, Session),
{ok, Publishes, Session1} = emqx_session:deliver(clientinfo(), Delivers, Session),
?assertEqual(1, length(Publishes)),
?assertEqual(1, emqx_session:info(inflight_cnt, Session1)),
?assertEqual(1, emqx_session:info(mqueue_len, Session1)),
{ok, Msg1, [{2, Msg2}], Session2} = emqx_session:puback(1, Session1),
{ok, Msg1, [{2, Msg2}], Session2} = emqx_session:puback(clientinfo(), 1, Session1),
?assertEqual(1, emqx_session:info(inflight_cnt, Session2)),
?assertEqual(0, emqx_session:info(mqueue_len, Session2)),
?assertEqual(<<"t1">>, emqx_message:topic(Msg1)),
@ -305,8 +307,8 @@ t_deliver_when_inflight_is_full(_) ->
t_enqueue(_) ->
%% store_qos0 = true
Session = emqx_session:enqueue([delivery(?QOS_0, <<"t0">>)], session()),
Session1 = emqx_session:enqueue([delivery(?QOS_1, <<"t1">>),
Session = emqx_session:enqueue(clientinfo(), [delivery(?QOS_0, <<"t0">>)], session()),
Session1 = emqx_session:enqueue(clientinfo(), [delivery(?QOS_1, <<"t1">>),
delivery(?QOS_2, <<"t2">>)], Session),
?assertEqual(3, emqx_session:info(mqueue_len, Session1)).
@ -314,11 +316,11 @@ t_retry(_) ->
Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)],
RetryIntervalMs = 100, %% 0.1s
Session = session(#{retry_interval => RetryIntervalMs}),
{ok, Pubs, Session1} = emqx_session:deliver(Delivers, Session),
{ok, Pubs, Session1} = emqx_session:deliver(clientinfo(), Delivers, Session),
ElapseMs = 200, %% 0.2s
ok = timer:sleep(ElapseMs),
Msgs1 = [{I, emqx_message:set_flag(dup, Msg)} || {I, Msg} <- Pubs],
{ok, Msgs1, 100, Session2} = emqx_session:retry(Session1),
{ok, Msgs1, 100, Session2} = emqx_session:retry(clientinfo(), Session1),
?assertEqual(2, emqx_session:info(inflight_cnt, Session2)).
%%--------------------------------------------------------------------
@ -337,24 +339,24 @@ t_resume(_) ->
t_replay(_) ->
Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)],
{ok, Pubs, Session1} = emqx_session:deliver(Delivers, session()),
{ok, Pubs, Session1} = emqx_session:deliver(clientinfo(), Delivers, session()),
Msg = emqx_message:make(clientid, ?QOS_1, <<"t1">>, <<"payload">>),
Session2 = emqx_session:enqueue(Msg, Session1),
Session2 = emqx_session:enqueue(clientinfo(), Msg, Session1),
Pubs1 = [{I, emqx_message:set_flag(dup, M)} || {I, M} <- Pubs],
{ok, ReplayPubs, Session3} = emqx_session:replay(Session2),
{ok, ReplayPubs, Session3} = emqx_session:replay(clientinfo(), Session2),
?assertEqual(Pubs1 ++ [{3, Msg}], ReplayPubs),
?assertEqual(3, emqx_session:info(inflight_cnt, Session3)).
t_expire_awaiting_rel(_) ->
{ok, Session} = emqx_session:expire(awaiting_rel, session()),
{ok, Session} = emqx_session:expire(clientinfo(), awaiting_rel, session()),
Timeout = emqx_session:info(await_rel_timeout, Session),
Session1 = emqx_session:set_field(awaiting_rel, #{1 => Ts = ts(millisecond)}, Session),
{ok, Timeout, Session2} = emqx_session:expire(awaiting_rel, Session1),
{ok, Timeout, Session2} = emqx_session:expire(clientinfo(), awaiting_rel, Session1),
?assertEqual(#{1 => Ts}, emqx_session:info(awaiting_rel, Session2)).
t_expire_awaiting_rel_all(_) ->
Session = session(#{awaiting_rel => #{1 => 1, 2 => 2}}),
{ok, Session1} = emqx_session:expire(awaiting_rel, Session),
{ok, Session1} = emqx_session:expire(clientinfo(), awaiting_rel, Session),
?assertEqual(#{}, emqx_session:info(awaiting_rel, Session1)).
%%--------------------------------------------------------------------