From a6408cee4fa14655c7476ad33e59857d0c6d7a5f Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Thu, 13 Jan 2022 15:28:18 +0800 Subject: [PATCH] fix(session): update testcases for emqx_session --- src/emqx_session.erl | 26 +++++++++---------- test/emqx_session_SUITE.erl | 52 +++++++++++++++++++------------------ 2 files changed, 39 insertions(+), 39 deletions(-) diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 154516711..82b4c8168 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -392,7 +392,7 @@ dequeue(ClientInfo, Session = #session{inflight = Inflight, mqueue = Q}) -> true -> {ok, Session}; false -> {Msgs, Q1} = dequeue(ClientInfo, batch_n(Inflight), [], Q), - deliver(Msgs, [], Session#session{mqueue = Q1}) + do_deliver(ClientInfo, Msgs, [], Session#session{mqueue = Q1}) end. dequeue(_ClientInfo, 0, Msgs, Q) -> @@ -422,22 +422,22 @@ acc_cnt(_Msg, Cnt) -> Cnt - 1. -spec(deliver(emqx_types:clientinfo(), list(emqx_types:deliver()), session()) -> {ok, session()} | {ok, replies(), session()}). deliver(ClientInfo, [Deliver], Session) -> %% Optimize - Enrich = enrich_fun(Session), - deliver_msg(ClientInfo, Enrich(Deliver), Session); + Msg = enrich_delivers(Deliver, Session), + deliver_msg(ClientInfo, Msg, Session); deliver(ClientInfo, Delivers, Session) -> - Msgs = lists:map(enrich_fun(Session), Delivers), - deliver(ClientInfo, Msgs, [], Session). + Msgs = [enrich_delivers(D, Session) || D <- Delivers], + do_deliver(ClientInfo, Msgs, [], Session). -deliver(_ClientInfo, [], Publishes, Session) -> +do_deliver(_ClientInfo, [], Publishes, Session) -> {ok, lists:reverse(Publishes), Session}; -deliver(ClientInfo, [Msg | More], Acc, Session) -> +do_deliver(ClientInfo, [Msg | More], Acc, Session) -> case deliver_msg(ClientInfo, Msg, Session) of {ok, Session1} -> - deliver(More, Acc, Session1); + do_deliver(ClientInfo, More, Acc, Session1); {ok, [Publish], Session1} -> - deliver(More, [Publish|Acc], Session1) + do_deliver(ClientInfo, More, [Publish|Acc], Session1) end. deliver_msg(_ClientInfo, Msg = #message{qos = ?QOS_0}, Session) -> @@ -462,7 +462,7 @@ deliver_msg(ClientInfo, Msg = #message{qos = QoS}, Session = -spec(enqueue(emqx_types:clientinfo(), list(emqx_types:deliver())|emqx_types:message(), session()) -> session()). enqueue(ClientInfo, Delivers, Session) when is_list(Delivers) -> - Msgs = lists:map(enrich_fun(Session), Delivers), + Msgs = [enrich_delivers(D, Session) || D <- Delivers], lists:foldl(fun(Msg, Session0) -> enqueue(ClientInfo, Msg, Session0) end, Session, Msgs); @@ -487,10 +487,8 @@ log_dropped(ClientInfo, Msg = #message{qos = QoS}, #session{mqueue = Q}) -> [emqx_message:format(Msg)]) end. -enrich_fun(Session = #session{subscriptions = Subs}) -> - fun({deliver, Topic, Msg}) -> - enrich_subopts(get_subopts(Topic, Subs), Msg, Session) - end. +enrich_delivers({deliver, Topic, Msg}, Session = #session{subscriptions = Subs}) -> + enrich_subopts(get_subopts(Topic, Subs), Msg, Session). maybe_ack(Msg) -> case emqx_shared_sub:is_ack_required(Msg) of diff --git a/test/emqx_session_SUITE.erl b/test/emqx_session_SUITE.erl index cb7c10cae..5dabeca26 100644 --- a/test/emqx_session_SUITE.erl +++ b/test/emqx_session_SUITE.erl @@ -165,7 +165,7 @@ t_puback(_) -> Msg = emqx_message:make(test, ?QOS_1, <<"t">>, <<>>), Inflight = emqx_inflight:insert(1, {Msg, ts(millisecond)}, emqx_inflight:new()), Session = session(#{inflight => Inflight, mqueue => mqueue()}), - {ok, Msg, Session1} = emqx_session:puback(1, Session), + {ok, Msg, Session1} = emqx_session:puback(clientinfo(), 1, Session), ?assertEqual(0, emqx_session:info(inflight_cnt, Session1)). t_puback_with_dequeue(_) -> @@ -174,7 +174,7 @@ t_puback_with_dequeue(_) -> Msg2 = emqx_message:make(clientid, ?QOS_1, <<"t2">>, <<"payload2">>), {_, Q} = emqx_mqueue:in(Msg2, mqueue(#{max_len => 10})), Session = session(#{inflight => Inflight, mqueue => Q}), - {ok, Msg1, [{_, Msg3}], Session1} = emqx_session:puback(1, Session), + {ok, Msg1, [{_, Msg3}], Session1} = emqx_session:puback(clientinfo(), 1, Session), ?assertEqual(1, emqx_session:info(inflight_cnt, Session1)), ?assertEqual(0, emqx_session:info(mqueue_len, Session1)), ?assertEqual(<<"t2">>, emqx_message:topic(Msg3)). @@ -182,10 +182,10 @@ t_puback_with_dequeue(_) -> t_puback_error_packet_id_in_use(_) -> Inflight = emqx_inflight:insert(1, {pubrel, ts(millisecond)}, emqx_inflight:new()), {error, ?RC_PACKET_IDENTIFIER_IN_USE} = - emqx_session:puback(1, session(#{inflight => Inflight})). + emqx_session:puback(clientinfo(), 1, session(#{inflight => Inflight})). t_puback_error_packet_id_not_found(_) -> - {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:puback(1, session()). + {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:puback(clientinfo(), 1, session()). t_pubrec(_) -> Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<>>), @@ -213,17 +213,17 @@ t_pubrel_error_packetid_not_found(_) -> t_pubcomp(_) -> Inflight = emqx_inflight:insert(1, {pubrel, ts(millisecond)}, emqx_inflight:new()), Session = session(#{inflight => Inflight}), - {ok, Session1} = emqx_session:pubcomp(1, Session), + {ok, Session1} = emqx_session:pubcomp(clientinfo(), 1, Session), ?assertEqual(0, emqx_session:info(inflight_cnt, Session1)). t_pubcomp_error_packetid_in_use(_) -> Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<>>), Inflight = emqx_inflight:insert(1, {Msg, ts(millisecond)}, emqx_inflight:new()), Session = session(#{inflight => Inflight}), - {error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:pubcomp(1, Session). + {error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:pubcomp(clientinfo(), 1, Session). t_pubcomp_error_packetid_not_found(_) -> - {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubcomp(1, session()). + {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubcomp(clientinfo(), 1, session()). %%-------------------------------------------------------------------- %% Test cases for deliver/retry @@ -231,14 +231,16 @@ t_pubcomp_error_packetid_not_found(_) -> t_dequeue(_) -> Q = mqueue(#{store_qos0 => true}), - {ok, Session} = emqx_session:dequeue(session(#{mqueue => Q})), + {ok, Session} = emqx_session:dequeue(clientinfo(), session(#{mqueue => Q})), Msgs = [emqx_message:make(clientid, ?QOS_0, <<"t0">>, <<"payload">>), emqx_message:make(clientid, ?QOS_1, <<"t1">>, <<"payload">>), emqx_message:make(clientid, ?QOS_2, <<"t2">>, <<"payload">>) ], - Session1 = lists:foldl(fun emqx_session:enqueue/2, Session, Msgs), + Session1 = lists:foldl(fun(Msg, Session0) -> + emqx_session:enqueue(clientinfo(), Msg, Session0) + end, Session, Msgs), {ok, [{undefined, Msg0}, {1, Msg1}, {2, Msg2}], Session2} = - emqx_session:dequeue(Session1), + emqx_session:dequeue(clientinfo(), Session1), ?assertEqual(0, emqx_session:info(mqueue_len, Session2)), ?assertEqual(2, emqx_session:info(inflight_cnt, Session2)), ?assertEqual(<<"t0">>, emqx_message:topic(Msg0)), @@ -253,7 +255,7 @@ t_deliver_qos0(_) -> clientinfo(), <<"t1">>, subopts(), Session), Deliveries = [delivery(?QOS_0, T) || T <- [<<"t0">>, <<"t1">>]], {ok, [{undefined, Msg1}, {undefined, Msg2}], Session1} = - emqx_session:deliver(Deliveries, Session1), + emqx_session:deliver(clientinfo(), Deliveries, Session1), ?assertEqual(<<"t0">>, emqx_message:topic(Msg1)), ?assertEqual(<<"t1">>, emqx_message:topic(Msg2)). @@ -262,38 +264,38 @@ t_deliver_qos1(_) -> {ok, Session} = emqx_session:subscribe( clientinfo(), <<"t1">>, subopts(#{qos => ?QOS_1}), session()), Delivers = [delivery(?QOS_1, T) || T <- [<<"t1">>, <<"t2">>]], - {ok, [{1, Msg1}, {2, Msg2}], Session1} = emqx_session:deliver(Delivers, Session), + {ok, [{1, Msg1}, {2, Msg2}], Session1} = emqx_session:deliver(clientinfo(), Delivers, Session), ?assertEqual(2, emqx_session:info(inflight_cnt, Session1)), ?assertEqual(<<"t1">>, emqx_message:topic(Msg1)), ?assertEqual(<<"t2">>, emqx_message:topic(Msg2)), - {ok, Msg1, Session2} = emqx_session:puback(1, Session1), + {ok, Msg1, Session2} = emqx_session:puback(clientinfo(), 1, Session1), ?assertEqual(1, emqx_session:info(inflight_cnt, Session2)), - {ok, Msg2, Session3} = emqx_session:puback(2, Session2), + {ok, Msg2, Session3} = emqx_session:puback(clientinfo(), 2, Session2), ?assertEqual(0, emqx_session:info(inflight_cnt, Session3)). t_deliver_qos2(_) -> ok = meck:expect(emqx_broker, subscribe, fun(_, _, _) -> ok end), Delivers = [delivery(?QOS_2, <<"t0">>), delivery(?QOS_2, <<"t1">>)], {ok, [{1, Msg1}, {2, Msg2}], Session} = - emqx_session:deliver(Delivers, session()), + emqx_session:deliver(clientinfo(), Delivers, session()), ?assertEqual(2, emqx_session:info(inflight_cnt, Session)), ?assertEqual(<<"t0">>, emqx_message:topic(Msg1)), ?assertEqual(<<"t1">>, emqx_message:topic(Msg2)). t_deliver_one_msg(_) -> {ok, [{1, Msg}], Session} = - emqx_session:deliver([delivery(?QOS_1, <<"t1">>)], session()), + emqx_session:deliver(clientinfo(), [delivery(?QOS_1, <<"t1">>)], session()), ?assertEqual(1, emqx_session:info(inflight_cnt, Session)), ?assertEqual(<<"t1">>, emqx_message:topic(Msg)). t_deliver_when_inflight_is_full(_) -> Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)], Session = session(#{inflight => emqx_inflight:new(1)}), - {ok, Publishes, Session1} = emqx_session:deliver(Delivers, Session), + {ok, Publishes, Session1} = emqx_session:deliver(clientinfo(), Delivers, Session), ?assertEqual(1, length(Publishes)), ?assertEqual(1, emqx_session:info(inflight_cnt, Session1)), ?assertEqual(1, emqx_session:info(mqueue_len, Session1)), - {ok, Msg1, [{2, Msg2}], Session2} = emqx_session:puback(1, Session1), + {ok, Msg1, [{2, Msg2}], Session2} = emqx_session:puback(clientinfo(), 1, Session1), ?assertEqual(1, emqx_session:info(inflight_cnt, Session2)), ?assertEqual(0, emqx_session:info(mqueue_len, Session2)), ?assertEqual(<<"t1">>, emqx_message:topic(Msg1)), @@ -301,18 +303,18 @@ t_deliver_when_inflight_is_full(_) -> t_enqueue(_) -> %% store_qos0 = true - Session = emqx_session:enqueue([delivery(?QOS_0, <<"t0">>)], session()), - Session1 = emqx_session:enqueue([delivery(?QOS_1, <<"t1">>), + Session = emqx_session:enqueue(clientinfo(), [delivery(?QOS_0, <<"t0">>)], session()), + Session1 = emqx_session:enqueue(clientinfo(), [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)], Session), ?assertEqual(3, emqx_session:info(mqueue_len, Session1)). t_retry(_) -> Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)], Session = session(#{retry_interval => 100}), - {ok, Pubs, Session1} = emqx_session:deliver(Delivers, Session), + {ok, Pubs, Session1} = emqx_session:deliver(clientinfo(), Delivers, Session), ok = timer:sleep(200), Msgs1 = [{I, emqx_message:set_flag(dup, Msg)} || {I, Msg} <- Pubs], - {ok, Msgs1, 100, Session2} = emqx_session:retry(Session1), + {ok, Msgs1, 100, Session2} = emqx_session:retry(clientinfo(), Session1), ?assertEqual(2, emqx_session:info(inflight_cnt, Session2)). %%-------------------------------------------------------------------- @@ -331,11 +333,11 @@ t_resume(_) -> t_replay(_) -> Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)], - {ok, Pubs, Session1} = emqx_session:deliver(Delivers, session()), + {ok, Pubs, Session1} = emqx_session:deliver(clientinfo(), Delivers, session()), Msg = emqx_message:make(clientid, ?QOS_1, <<"t1">>, <<"payload">>), - Session2 = emqx_session:enqueue(Msg, Session1), + Session2 = emqx_session:enqueue(clientinfo(), Msg, Session1), Pubs1 = [{I, emqx_message:set_flag(dup, M)} || {I, M} <- Pubs], - {ok, ReplayPubs, Session3} = emqx_session:replay(Session2), + {ok, ReplayPubs, Session3} = emqx_session:replay(clientinfo(), Session2), ?assertEqual(Pubs1 ++ [{3, Msg}], ReplayPubs), ?assertEqual(3, emqx_session:info(inflight_cnt, Session3)).