From 93092657b92df00c9c36621dfba94a97121e2cde Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 8 Feb 2022 18:14:50 +0800 Subject: [PATCH] fix(session): update testcases --- apps/emqx/src/emqx_session.erl | 2 +- apps/emqx/test/emqx_channel_SUITE.erl | 91 +++++++++++++++++++++------ apps/emqx/test/emqx_session_SUITE.erl | 88 +++++++++++++------------- 3 files changed, 117 insertions(+), 64 deletions(-) diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 565ce3930..65a7697d2 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -454,7 +454,7 @@ dequeue(ClientInfo, Session = #session{inflight = Inflight, mqueue = Q}) -> true -> {ok, Session}; false -> {Msgs, Q1} = dequeue(ClientInfo, batch_n(Inflight), [], Q), - deliver(Msgs, [], Session#session{mqueue = Q1}) + do_deliver(ClientInfo, Msgs, [], Session#session{mqueue = Q1}) end. dequeue(_ClientInfo, 0, Msgs, Q) -> diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index 19bc2b3c3..a0f3a935b 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -115,6 +115,56 @@ listeners_conf() -> ws => #{default => listener_mqtt_ws_conf()} }. +limiter_conf() -> + #{bytes_in => + #{bucket => + #{default => + #{aggregated => + #{capacity => infinity,initial => 0,rate => infinity}, + per_client => + #{capacity => infinity,divisible => false, + failure_strategy => force,initial => 0,low_water_mark => 0, + max_retry_time => 5000,rate => infinity}, + zone => default}}, + global => #{burst => 0,rate => infinity}, + zone => #{default => #{burst => 0,rate => infinity}}}, + connection => + #{bucket => + #{default => + #{aggregated => + #{capacity => infinity,initial => 0,rate => infinity}, + per_client => + #{capacity => infinity,divisible => false, + failure_strategy => force,initial => 0,low_water_mark => 0, + max_retry_time => 5000,rate => infinity}, + zone => default}}, + global => #{burst => 0,rate => infinity}, + zone => #{default => #{burst => 0,rate => infinity}}}, + message_in => + #{bucket => + #{default => + #{aggregated => + #{capacity => infinity,initial => 0,rate => infinity}, + per_client => + #{capacity => infinity,divisible => false, + failure_strategy => force,initial => 0,low_water_mark => 0, + max_retry_time => 5000,rate => infinity}, + zone => default}}, + global => #{burst => 0,rate => infinity}, + zone => #{default => #{burst => 0,rate => infinity}}}, + message_routing => + #{bucket => + #{default => + #{aggregated => + #{capacity => infinity,initial => 0,rate => infinity}, + per_client => + #{capacity => infinity,divisible => false, + failure_strategy => force,initial => 0,low_water_mark => 0, + max_retry_time => 5000,rate => infinity}, + zone => default}}, + global => #{burst => 0,rate => infinity}, + zone => #{default => #{burst => 0,rate => infinity}}}}. + stats_conf() -> #{enable => true}. @@ -130,11 +180,11 @@ basic_conf() -> stats => stats_conf(), listeners => listeners_conf(), zones => zone_conf(), - limiter => emqx:get_config([limiter]) + limiter => limiter_conf() }. set_test_listener_confs() -> - Conf = emqx_config:get([]), + Conf = emqx_config:get([], #{}), emqx_config:put(basic_conf()), Conf. @@ -180,10 +230,10 @@ end_per_suite(_Config) -> ]). init_per_testcase(TestCase, Config) -> - NewConf = set_test_listener_confs(), + OldConf = set_test_listener_confs(), emqx_common_test_helpers:start_apps([]), - modify_limiter(TestCase, NewConf), - [{config, NewConf}|Config]. + modify_limiter(TestCase, OldConf), + [{config, OldConf}|Config]. end_per_testcase(_TestCase, Config) -> emqx_config:put(?config(config, Config)), @@ -232,15 +282,16 @@ t_chan_info(_) -> ?assertEqual(clientinfo(), ClientInfo). t_chan_caps(_) -> - #{max_clientid_len := 65535, + ?assertMatch(#{ + max_clientid_len := 65535, max_qos_allowed := 2, max_topic_alias := 65535, - max_topic_levels := 128, + max_topic_levels := Level, retain_available := true, shared_subscription := true, subscription_identifiers := true, wildcard_subscription := true - } = emqx_channel:caps(channel()). + } when is_integer(Level), emqx_channel:caps(channel())). %%-------------------------------------------------------------------- %% Test cases for channel handle_in @@ -377,14 +428,14 @@ t_handle_in_qos2_publish_with_error_return(_) -> t_handle_in_puback_ok(_) -> Msg = emqx_message:make(<<"t">>, <<"payload">>), ok = meck:expect(emqx_session, puback, - fun(_PacketId, Session) -> {ok, Msg, Session} end), + fun(_, _PacketId, Session) -> {ok, Msg, Session} end), Channel = channel(#{conn_state => connected}), {ok, _NChannel} = emqx_channel:handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), Channel). % ?assertEqual(#{puback_in => 1}, emqx_channel:info(pub_stats, NChannel)). t_handle_in_puback_id_in_use(_) -> ok = meck:expect(emqx_session, puback, - fun(_, _Session) -> + fun(_, _, _Session) -> {error, ?RC_PACKET_IDENTIFIER_IN_USE} end), {ok, _Channel} = emqx_channel:handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), channel()). @@ -392,7 +443,7 @@ t_handle_in_puback_id_in_use(_) -> t_handle_in_puback_id_not_found(_) -> ok = meck:expect(emqx_session, puback, - fun(_, _Session) -> + fun(_, _, _Session) -> {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} end), {ok, _Channel} = emqx_channel:handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), channel()). @@ -430,14 +481,14 @@ t_override_client_receive_maximum(_) -> t_handle_in_pubrec_ok(_) -> Msg = emqx_message:make(test,?QOS_2, <<"t">>, <<"payload">>), - ok = meck:expect(emqx_session, pubrec, fun(_, Session) -> {ok, Msg, Session} end), + ok = meck:expect(emqx_session, pubrec, fun(_, _, Session) -> {ok, Msg, Session} end), Channel = channel(#{conn_state => connected}), {ok, ?PUBREL_PACKET(1, ?RC_SUCCESS), _Channel1} = emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), Channel). t_handle_in_pubrec_id_in_use(_) -> ok = meck:expect(emqx_session, pubrec, - fun(_, _Session) -> + fun(_, _, _Session) -> {error, ?RC_PACKET_IDENTIFIER_IN_USE} end), {ok, ?PUBREL_PACKET(1, ?RC_PACKET_IDENTIFIER_IN_USE), _Channel} = @@ -445,34 +496,34 @@ t_handle_in_pubrec_id_in_use(_) -> t_handle_in_pubrec_id_not_found(_) -> ok = meck:expect(emqx_session, pubrec, - fun(_, _Session) -> + fun(_, _, _Session) -> {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} end), {ok, ?PUBREL_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), _Channel} = emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), channel()). t_handle_in_pubrel_ok(_) -> - ok = meck:expect(emqx_session, pubrel, fun(_, Session) -> {ok, Session} end), + ok = meck:expect(emqx_session, pubrel, fun(_, _, Session) -> {ok, Session} end), Channel = channel(#{conn_state => connected}), {ok, ?PUBCOMP_PACKET(1, ?RC_SUCCESS), _Channel1} = emqx_channel:handle_in(?PUBREL_PACKET(1, ?RC_SUCCESS), Channel). t_handle_in_pubrel_not_found_error(_) -> ok = meck:expect(emqx_session, pubrel, - fun(_PacketId, _Session) -> + fun(_, _PacketId, _Session) -> {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} end), {ok, ?PUBCOMP_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), _Channel} = emqx_channel:handle_in(?PUBREL_PACKET(1, ?RC_SUCCESS), channel()). t_handle_in_pubcomp_ok(_) -> - ok = meck:expect(emqx_session, pubcomp, fun(_, Session) -> {ok, Session} end), + ok = meck:expect(emqx_session, pubcomp, fun(_, _, Session) -> {ok, Session} end), {ok, _Channel} = emqx_channel:handle_in(?PUBCOMP_PACKET(1, ?RC_SUCCESS), channel()). % ?assertEqual(#{pubcomp_in => 1}, emqx_channel:info(pub_stats, Channel)). t_handle_in_pubcomp_not_found_error(_) -> ok = meck:expect(emqx_session, pubcomp, - fun(_PacketId, _Session) -> + fun(_, _PacketId, _Session) -> {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} end), Channel = channel(#{conn_state => connected}), @@ -795,13 +846,13 @@ t_handle_timeout_keepalive(_) -> t_handle_timeout_retry_delivery(_) -> TRef = make_ref(), - ok = meck:expect(emqx_session, retry, fun(Session) -> {ok, Session} end), + ok = meck:expect(emqx_session, retry, fun(_, Session) -> {ok, Session} end), Channel = emqx_channel:set_field(timers, #{retry_timer => TRef}, channel()), {ok, _Chan} = emqx_channel:handle_timeout(TRef, retry_delivery, Channel). t_handle_timeout_expire_awaiting_rel(_) -> TRef = make_ref(), - ok = meck:expect(emqx_session, expire, fun(_, Session) -> {ok, Session} end), + ok = meck:expect(emqx_session, expire, fun(_, _, Session) -> {ok, Session} end), Channel = emqx_channel:set_field(timers, #{await_timer => TRef}, channel()), {ok, _Chan} = emqx_channel:handle_timeout(TRef, expire_awaiting_rel, Channel). diff --git a/apps/emqx/test/emqx_session_SUITE.erl b/apps/emqx/test/emqx_session_SUITE.erl index 7976001a3..bc2cb426f 100644 --- a/apps/emqx/test/emqx_session_SUITE.erl +++ b/apps/emqx/test/emqx_session_SUITE.erl @@ -126,23 +126,23 @@ t_unsubscribe(_) -> t_publish_qos0(_) -> ok = meck:expect(emqx_broker, publish, fun(_) -> [] end), Msg = emqx_message:make(clientid, ?QOS_0, <<"t">>, <<"payload">>), - {ok, [], Session} = emqx_session:publish(1, Msg, Session = session()), - {ok, [], Session} = emqx_session:publish(undefined, Msg, Session). + {ok, [], Session} = emqx_session:publish(clientinfo(), 1, Msg, Session = session()), + {ok, [], Session} = emqx_session:publish(clientinfo(), undefined, Msg, Session). t_publish_qos1(_) -> ok = meck:expect(emqx_broker, publish, fun(_) -> [] end), Msg = emqx_message:make(clientid, ?QOS_1, <<"t">>, <<"payload">>), - {ok, [], Session} = emqx_session:publish(1, Msg, Session = session()), - {ok, [], Session} = emqx_session:publish(2, Msg, Session). + {ok, [], Session} = emqx_session:publish(clientinfo(), 1, Msg, Session = session()), + {ok, [], Session} = emqx_session:publish(clientinfo(), 2, Msg, Session). t_publish_qos2(_) -> ok = meck:expect(emqx_broker, publish, fun(_) -> [] end), Msg = emqx_message:make(clientid, ?QOS_2, <<"t">>, <<"payload">>), - {ok, [], Session} = emqx_session:publish(1, Msg, session()), + {ok, [], Session} = emqx_session:publish(clientinfo(), 1, Msg, session()), ?assertEqual(1, emqx_session:info(awaiting_rel_cnt, Session)), - {ok, Session1} = emqx_session:pubrel(1, Session), + {ok, Session1} = emqx_session:pubrel(clientinfo(), 1, Session), ?assertEqual(0, emqx_session:info(awaiting_rel_cnt, Session1)), - {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrel(1, Session1). + {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrel(clientinfo(), 1, Session1). t_publish_qos2_with_error_return(_) -> ok = meck:expect(emqx_broker, publish, fun(_) -> [] end), @@ -150,10 +150,10 @@ t_publish_qos2_with_error_return(_) -> awaiting_rel => #{1 => ts(millisecond)} }), Msg = emqx_message:make(clientid, ?QOS_2, <<"t">>, <<"payload">>), - {error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:publish(1, Msg, Session), - {ok, [], Session1} = emqx_session:publish(2, Msg, Session), + {error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:publish(clientinfo(), 1, Msg, Session), + {ok, [], Session1} = emqx_session:publish(clientinfo(), 2, Msg, Session), ?assertEqual(2, emqx_session:info(awaiting_rel_cnt, Session1)), - {error, ?RC_RECEIVE_MAXIMUM_EXCEEDED} = emqx_session:publish(3, Msg, Session1). + {error, ?RC_RECEIVE_MAXIMUM_EXCEEDED} = emqx_session:publish(clientinfo(), 3, Msg, Session1). t_is_awaiting_full_false(_) -> Session = session(#{max_awaiting_rel => infinity}), @@ -169,7 +169,7 @@ t_puback(_) -> Msg = emqx_message:make(test, ?QOS_1, <<"t">>, <<>>), Inflight = emqx_inflight:insert(1, {Msg, ts(millisecond)}, emqx_inflight:new()), Session = session(#{inflight => Inflight, mqueue => mqueue()}), - {ok, Msg, Session1} = emqx_session:puback(1, Session), + {ok, Msg, Session1} = emqx_session:puback(clientinfo(), 1, Session), ?assertEqual(0, emqx_session:info(inflight_cnt, Session1)). t_puback_with_dequeue(_) -> @@ -178,7 +178,7 @@ t_puback_with_dequeue(_) -> Msg2 = emqx_message:make(clientid, ?QOS_1, <<"t2">>, <<"payload2">>), {_, Q} = emqx_mqueue:in(Msg2, mqueue(#{max_len => 10})), Session = session(#{inflight => Inflight, mqueue => Q}), - {ok, Msg1, [{_, Msg3}], Session1} = emqx_session:puback(1, Session), + {ok, Msg1, [{_, Msg3}], Session1} = emqx_session:puback(clientinfo(), 1, Session), ?assertEqual(1, emqx_session:info(inflight_cnt, Session1)), ?assertEqual(0, emqx_session:info(mqueue_len, Session1)), ?assertEqual(<<"t2">>, emqx_message:topic(Msg3)). @@ -186,48 +186,48 @@ t_puback_with_dequeue(_) -> t_puback_error_packet_id_in_use(_) -> Inflight = emqx_inflight:insert(1, {#pubrel_await{timestamp = ?NOW}, ts(millisecond)}, emqx_inflight:new()), {error, ?RC_PACKET_IDENTIFIER_IN_USE} = - emqx_session:puback(1, session(#{inflight => Inflight})). + emqx_session:puback(clientinfo(), 1, session(#{inflight => Inflight})). t_puback_error_packet_id_not_found(_) -> - {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:puback(1, session()). + {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:puback(clientinfo(), 1, session()). t_pubrec(_) -> Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<>>), Inflight = emqx_inflight:insert(2, {Msg, ts(millisecond)}, emqx_inflight:new()), Session = session(#{inflight => Inflight}), - {ok, Msg, Session1} = emqx_session:pubrec(2, Session), + {ok, Msg, Session1} = emqx_session:pubrec(clientinfo(), 2, Session), ?assertMatch([{{pubrel_await, _}, _}], emqx_inflight:values(emqx_session:info(inflight, Session1))). t_pubrec_packet_id_in_use_error(_) -> Inflight = emqx_inflight:insert(1, {#pubrel_await{timestamp = ?NOW}, ts(millisecond)}, emqx_inflight:new()), {error, ?RC_PACKET_IDENTIFIER_IN_USE} = - emqx_session:pubrec(1, session(#{inflight => Inflight})). + emqx_session:pubrec(clientinfo(), 1, session(#{inflight => Inflight})). t_pubrec_packet_id_not_found_error(_) -> - {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrec(1, session()). + {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrec(clientinfo(), 1, session()). t_pubrel(_) -> Session = session(#{awaiting_rel => #{1 => ts(millisecond)}}), - {ok, Session1} = emqx_session:pubrel(1, Session), + {ok, Session1} = emqx_session:pubrel(clientinfo(), 1, Session), ?assertEqual(#{}, emqx_session:info(awaiting_rel, Session1)). t_pubrel_error_packetid_not_found(_) -> - {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrel(1, session()). + {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrel(clientinfo(), 1, session()). t_pubcomp(_) -> Inflight = emqx_inflight:insert(1, {#pubrel_await{timestamp = ?NOW}, ts(millisecond)}, emqx_inflight:new()), Session = session(#{inflight => Inflight}), - {ok, Session1} = emqx_session:pubcomp(1, Session), + {ok, Session1} = emqx_session:pubcomp(clientinfo(), 1, Session), ?assertEqual(0, emqx_session:info(inflight_cnt, Session1)). t_pubcomp_error_packetid_in_use(_) -> Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<>>), Inflight = emqx_inflight:insert(1, {Msg, ts(millisecond)}, emqx_inflight:new()), Session = session(#{inflight => Inflight}), - {error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:pubcomp(1, Session). + {error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:pubcomp(clientinfo(), 1, Session). t_pubcomp_error_packetid_not_found(_) -> - {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubcomp(1, session()). + {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubcomp(clientinfo(), 1, session()). %%-------------------------------------------------------------------- %% Test cases for deliver/retry @@ -235,14 +235,16 @@ t_pubcomp_error_packetid_not_found(_) -> t_dequeue(_) -> Q = mqueue(#{store_qos0 => true}), - {ok, Session} = emqx_session:dequeue(session(#{mqueue => Q})), + {ok, Session} = emqx_session:dequeue(clientinfo(), session(#{mqueue => Q})), Msgs = [emqx_message:make(clientid, ?QOS_0, <<"t0">>, <<"payload">>), emqx_message:make(clientid, ?QOS_1, <<"t1">>, <<"payload">>), emqx_message:make(clientid, ?QOS_2, <<"t2">>, <<"payload">>) ], - Session1 = lists:foldl(fun emqx_session:enqueue/2, Session, Msgs), + Session1 = lists:foldl(fun(Msg, S) -> + emqx_session:enqueue(clientinfo(), Msg, S) + end, Session, Msgs), {ok, [{undefined, Msg0}, {1, Msg1}, {2, Msg2}], Session2} = - emqx_session:dequeue(Session1), + emqx_session:dequeue(clientinfo(), Session1), ?assertEqual(0, emqx_session:info(mqueue_len, Session2)), ?assertEqual(2, emqx_session:info(inflight_cnt, Session2)), ?assertEqual(<<"t0">>, emqx_message:topic(Msg0)), @@ -257,7 +259,7 @@ t_deliver_qos0(_) -> clientinfo(), <<"t1">>, subopts(), Session), Deliveries = [delivery(?QOS_0, T) || T <- [<<"t0">>, <<"t1">>]], {ok, [{undefined, Msg1}, {undefined, Msg2}], Session1} = - emqx_session:deliver(Deliveries, Session1), + emqx_session:deliver(clientinfo(), Deliveries, Session1), ?assertEqual(<<"t0">>, emqx_message:topic(Msg1)), ?assertEqual(<<"t1">>, emqx_message:topic(Msg2)). @@ -266,38 +268,38 @@ t_deliver_qos1(_) -> {ok, Session} = emqx_session:subscribe( clientinfo(), <<"t1">>, subopts(#{qos => ?QOS_1}), session()), Delivers = [delivery(?QOS_1, T) || T <- [<<"t1">>, <<"t2">>]], - {ok, [{1, Msg1}, {2, Msg2}], Session1} = emqx_session:deliver(Delivers, Session), + {ok, [{1, Msg1}, {2, Msg2}], Session1} = emqx_session:deliver(clientinfo(), Delivers, Session), ?assertEqual(2, emqx_session:info(inflight_cnt, Session1)), ?assertEqual(<<"t1">>, emqx_message:topic(Msg1)), ?assertEqual(<<"t2">>, emqx_message:topic(Msg2)), - {ok, Msg1, Session2} = emqx_session:puback(1, Session1), + {ok, Msg1, Session2} = emqx_session:puback(clientinfo(), 1, Session1), ?assertEqual(1, emqx_session:info(inflight_cnt, Session2)), - {ok, Msg2, Session3} = emqx_session:puback(2, Session2), + {ok, Msg2, Session3} = emqx_session:puback(clientinfo(), 2, Session2), ?assertEqual(0, emqx_session:info(inflight_cnt, Session3)). t_deliver_qos2(_) -> ok = meck:expect(emqx_broker, subscribe, fun(_, _, _) -> ok end), Delivers = [delivery(?QOS_2, <<"t0">>), delivery(?QOS_2, <<"t1">>)], {ok, [{1, Msg1}, {2, Msg2}], Session} = - emqx_session:deliver(Delivers, session()), + emqx_session:deliver(clientinfo(), Delivers, session()), ?assertEqual(2, emqx_session:info(inflight_cnt, Session)), ?assertEqual(<<"t0">>, emqx_message:topic(Msg1)), ?assertEqual(<<"t1">>, emqx_message:topic(Msg2)). t_deliver_one_msg(_) -> {ok, [{1, Msg}], Session} = - emqx_session:deliver([delivery(?QOS_1, <<"t1">>)], session()), + emqx_session:deliver(clientinfo(), [delivery(?QOS_1, <<"t1">>)], session()), ?assertEqual(1, emqx_session:info(inflight_cnt, Session)), ?assertEqual(<<"t1">>, emqx_message:topic(Msg)). t_deliver_when_inflight_is_full(_) -> Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)], Session = session(#{inflight => emqx_inflight:new(1)}), - {ok, Publishes, Session1} = emqx_session:deliver(Delivers, Session), + {ok, Publishes, Session1} = emqx_session:deliver(clientinfo(), Delivers, Session), ?assertEqual(1, length(Publishes)), ?assertEqual(1, emqx_session:info(inflight_cnt, Session1)), ?assertEqual(1, emqx_session:info(mqueue_len, Session1)), - {ok, Msg1, [{2, Msg2}], Session2} = emqx_session:puback(1, Session1), + {ok, Msg1, [{2, Msg2}], Session2} = emqx_session:puback(clientinfo(), 1, Session1), ?assertEqual(1, emqx_session:info(inflight_cnt, Session2)), ?assertEqual(0, emqx_session:info(mqueue_len, Session2)), ?assertEqual(<<"t1">>, emqx_message:topic(Msg1)), @@ -305,8 +307,8 @@ t_deliver_when_inflight_is_full(_) -> t_enqueue(_) -> %% store_qos0 = true - Session = emqx_session:enqueue([delivery(?QOS_0, <<"t0">>)], session()), - Session1 = emqx_session:enqueue([delivery(?QOS_1, <<"t1">>), + Session = emqx_session:enqueue(clientinfo(), [delivery(?QOS_0, <<"t0">>)], session()), + Session1 = emqx_session:enqueue(clientinfo(), [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)], Session), ?assertEqual(3, emqx_session:info(mqueue_len, Session1)). @@ -314,11 +316,11 @@ t_retry(_) -> Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)], RetryIntervalMs = 100, %% 0.1s Session = session(#{retry_interval => RetryIntervalMs}), - {ok, Pubs, Session1} = emqx_session:deliver(Delivers, Session), + {ok, Pubs, Session1} = emqx_session:deliver(clientinfo(), Delivers, Session), ElapseMs = 200, %% 0.2s ok = timer:sleep(ElapseMs), Msgs1 = [{I, emqx_message:set_flag(dup, Msg)} || {I, Msg} <- Pubs], - {ok, Msgs1, 100, Session2} = emqx_session:retry(Session1), + {ok, Msgs1, 100, Session2} = emqx_session:retry(clientinfo(), Session1), ?assertEqual(2, emqx_session:info(inflight_cnt, Session2)). %%-------------------------------------------------------------------- @@ -337,24 +339,24 @@ t_resume(_) -> t_replay(_) -> Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)], - {ok, Pubs, Session1} = emqx_session:deliver(Delivers, session()), + {ok, Pubs, Session1} = emqx_session:deliver(clientinfo(), Delivers, session()), Msg = emqx_message:make(clientid, ?QOS_1, <<"t1">>, <<"payload">>), - Session2 = emqx_session:enqueue(Msg, Session1), + Session2 = emqx_session:enqueue(clientinfo(), Msg, Session1), Pubs1 = [{I, emqx_message:set_flag(dup, M)} || {I, M} <- Pubs], - {ok, ReplayPubs, Session3} = emqx_session:replay(Session2), + {ok, ReplayPubs, Session3} = emqx_session:replay(clientinfo(), Session2), ?assertEqual(Pubs1 ++ [{3, Msg}], ReplayPubs), ?assertEqual(3, emqx_session:info(inflight_cnt, Session3)). t_expire_awaiting_rel(_) -> - {ok, Session} = emqx_session:expire(awaiting_rel, session()), + {ok, Session} = emqx_session:expire(clientinfo(), awaiting_rel, session()), Timeout = emqx_session:info(await_rel_timeout, Session), Session1 = emqx_session:set_field(awaiting_rel, #{1 => Ts = ts(millisecond)}, Session), - {ok, Timeout, Session2} = emqx_session:expire(awaiting_rel, Session1), + {ok, Timeout, Session2} = emqx_session:expire(clientinfo(), awaiting_rel, Session1), ?assertEqual(#{1 => Ts}, emqx_session:info(awaiting_rel, Session2)). t_expire_awaiting_rel_all(_) -> Session = session(#{awaiting_rel => #{1 => 1, 2 => 2}}), - {ok, Session1} = emqx_session:expire(awaiting_rel, Session), + {ok, Session1} = emqx_session:expire(clientinfo(), awaiting_rel, Session), ?assertEqual(#{}, emqx_session:info(awaiting_rel, Session1)). %%--------------------------------------------------------------------