From d1c3dec730ec54397b8bb0268b56fe7373f63d2c Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Tue, 10 Dec 2019 23:58:59 +0800 Subject: [PATCH] Optimize emqx_session and add more test cases --- src/emqx_channel.erl | 2 +- src/emqx_session.erl | 78 ++++++----- test/emqx_channel_SUITE.erl | 202 ++++++++++++++--------------- test/emqx_session_SUITE.erl | 251 ++++++++++++++++++++++++++++-------- 4 files changed, 342 insertions(+), 191 deletions(-) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index dc3940e41..67ce784b1 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -1210,7 +1210,7 @@ maybe_resume_session(#channel{resuming = false}) -> maybe_resume_session(#channel{session = Session, resuming = true, pendings = Pendings}) -> - {ok, Publishes, Session1} = emqx_session:redeliver(Session), + {ok, Publishes, Session1} = emqx_session:replay(Session), case emqx_session:deliver(Pendings, Session1) of {ok, Session2} -> {ok, Publishes, Session2}; diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 633d51b31..234ba2041 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -80,7 +80,7 @@ -export([ takeover/1 , resume/2 - , redeliver/1 + , replay/1 ]). -export([expire/2]). @@ -417,6 +417,10 @@ acc_msg(Msg, Msgs) -> -spec(deliver(list(emqx_types:deliver()), session()) -> {ok, session()} | {ok, replies(), session()}). +deliver([Deliver], Session) -> %% Optimize + Enrich = enrich_fun(Session), + deliver_msg(Enrich(Deliver), Session); + deliver(Delivers, Session) -> Msgs = lists:map(enrich_fun(Session), Delivers), deliver(Msgs, [], Session). @@ -424,12 +428,19 @@ deliver(Delivers, Session) -> deliver([], Publishes, Session) -> {ok, lists:reverse(Publishes), Session}; -deliver([Msg = #message{qos = ?QOS_0}|More], Acc, Session) -> - Publish = {undefined, maybe_ack(Msg)}, - deliver(More, [Publish|Acc], Session); +deliver([Msg|More], Acc, Session) -> + case deliver_msg(Msg, Session) of + {ok, Session1} -> + deliver(More, Acc, Session1); + {ok, [Publish], Session1} -> + deliver(More, [Publish|Acc], Session1) + end. -deliver([Msg = #message{qos = QoS}|More], Acc, Session = - #session{next_pkt_id = PacketId, inflight = Inflight}) +deliver_msg(Msg = #message{qos = ?QOS_0}, Session) -> + {ok, [{undefined, maybe_ack(Msg)}], Session}; + +deliver_msg(Msg = #message{qos = QoS}, Session = + #session{next_pkt_id = PacketId, inflight = Inflight}) when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 -> case emqx_inflight:is_full(Inflight) of true -> @@ -437,15 +448,19 @@ deliver([Msg = #message{qos = QoS}|More], Acc, Session = true -> Session; false -> enqueue(Msg, Session) end, - deliver(More, Acc, Session1); + {ok, Session1}; false -> Publish = {PacketId, maybe_ack(Msg)}, Session1 = await(PacketId, Msg, Session), - deliver(More, [Publish|Acc], next_pkt_id(Session1)) + {ok, [Publish], next_pkt_id(Session1)} end. --spec(enqueue(list(emqx_types:deliver())|emqx_types:message(), session()) - -> session()). +-spec(enqueue(list(emqx_types:deliver())|emqx_types:message(), + session()) -> session()). +enqueue([Deliver], Session) -> %% Optimize + Enrich = enrich_fun(Session), + enqueue(Enrich(Deliver), Session); + enqueue(Delivers, Session) when is_list(Delivers) -> Msgs = lists:map(enrich_fun(Session), Delivers), lists:foldl(fun enqueue/2, Session, Msgs); @@ -564,48 +579,43 @@ expire(awaiting_rel, Session = #session{awaiting_rel = AwaitingRel}) -> expire_awaiting_rel(Now, Session = #session{awaiting_rel = AwaitingRel, await_rel_timeout = Timeout}) -> - NotExpired = fun(_PacketId, Ts) -> age(Now, Ts) < Timeout end, - case maps:filter(NotExpired, AwaitingRel) of - [] -> {ok, Session}; - AwaitingRel1 -> - {ok, Timeout, Session#session{awaiting_rel = AwaitingRel1}} + AwaitingRel1 = maps:filter(NotExpired, AwaitingRel), + NSession = Session#session{awaiting_rel = AwaitingRel1}, + case maps:size(AwaitingRel1) of + 0 -> {ok, NSession}; + _ -> {ok, Timeout, NSession} end. %%-------------------------------------------------------------------- -%% Takeover, Resume and Redeliver +%% Takeover, Resume and Replay %%-------------------------------------------------------------------- -spec(takeover(session()) -> ok). takeover(#session{subscriptions = Subs}) -> - lists:foreach(fun({TopicFilter, _SubOpts}) -> - ok = emqx_broker:unsubscribe(TopicFilter) - end, maps:to_list(Subs)). + lists:foreach(fun emqx_broker:unsubscribe/1, maps:keys(Subs)). -spec(resume(emqx_types:clientid(), session()) -> ok). resume(ClientId, #session{subscriptions = Subs}) -> - %% 1. Subscribe again. lists:foreach(fun({TopicFilter, SubOpts}) -> - ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts) + ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts) end, maps:to_list(Subs)). - %% 2. Run hooks. - %% ok = emqx_hooks:run('session.resumed', [#{clientid => ClientId}, attrs(Session)]), - %% TODO: 3. Redeliver: Replay delivery and Dequeue pending messages - %%Session. --spec(redeliver(session()) -> {ok, replies(), session()}). -redeliver(Session = #session{inflight = Inflight}) -> - Pubs = lists:map(fun to_pub/1, emqx_inflight:to_list(Inflight)), +-spec(replay(session()) -> {ok, replies(), session()}). +replay(Session = #session{inflight = Inflight}) -> + Pubs = replay(Inflight), case dequeue(Session) of {ok, NSession} -> {ok, Pubs, NSession}; {ok, More, NSession} -> {ok, lists:append(Pubs, More), NSession} - end. + end; -to_pub({PacketId, {pubrel, _Ts}}) -> - {pubrel, PacketId}; -to_pub({PacketId, {Msg, _Ts}}) -> - {PacketId, emqx_message:set_flag(dup, true, Msg)}. +replay(Inflight) -> + lists:map(fun({PacketId, {pubrel, _Ts}}) -> + {pubrel, PacketId}; + ({PacketId, {Msg, _Ts}}) -> + {PacketId, emqx_message:set_flag(dup, true, Msg)} + end, emqx_inflight:to_list(Inflight)). %%-------------------------------------------------------------------- %% Next Packet Id @@ -624,7 +634,7 @@ next_pkt_id(Session = #session{next_pkt_id = Id}) -> -compile({inline, [sort_fun/0, batch_n/1, with_ts/1, age/2]}). sort_fun() -> - fun({_, {_, Ts1}}, {_, {_, Ts2}}) -> Ts1 < Ts2 end. + fun({_, {_, Ts1}}, {_, {_, Ts2}}) -> Ts1 =< Ts2 end. batch_n(Inflight) -> case emqx_inflight:max_size(Inflight) of diff --git a/test/emqx_channel_SUITE.erl b/test/emqx_channel_SUITE.erl index 47fbbddf3..c582b3e68 100644 --- a/test/emqx_channel_SUITE.erl +++ b/test/emqx_channel_SUITE.erl @@ -92,16 +92,15 @@ t_chan_info(_) -> ?assertEqual(clientinfo(), ClientInfo). t_chan_caps(_) -> - Caps = emqx_mqtt_caps:default(), - ?assertEqual(Caps, emqx_channel:caps(channel())). - -%%-------------------------------------------------------------------- -%% Test cases for channel init -%%-------------------------------------------------------------------- - -%% TODO: -t_chan_init(_) -> - _Channel = channel(). + #{max_clientid_len := 65535, + max_qos_allowed := 2, + max_topic_alias := 65535, + max_topic_levels := 0, + retain_available := true, + shared_subscription := true, + subscription_identifiers := true, + wildcard_subscription := true + } = emqx_channel:caps(channel()). %%-------------------------------------------------------------------- %% Test cases for channel handle_in @@ -113,8 +112,8 @@ t_handle_in_connect_packet_sucess(_) -> {ok, #{session => session(), present => false}} end), IdleChannel = channel(#{conn_state => idle}), - {ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, _)}], Channel} - = emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), IdleChannel), + {ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, _)}], Channel} = + emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), IdleChannel), ClientInfo = emqx_channel:info(clientinfo, Channel), ?assertMatch(#{clientid := <<"clientid">>, username := <<"username">> @@ -124,32 +123,47 @@ t_handle_in_connect_packet_sucess(_) -> t_handle_in_unexpected_connect_packet(_) -> Channel = emqx_channel:set_field(conn_state, connected, channel()), Packet = ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR), - {ok, [{outgoing, Packet}, {close, protocol_error}], Channel} - = emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), Channel). + {ok, [{outgoing, Packet}, {close, protocol_error}], Channel} = + emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), Channel). t_handle_in_qos0_publish(_) -> ok = meck:expect(emqx_broker, publish, fun(_) -> [] end), Channel = channel(#{conn_state => connected}), Publish = ?PUBLISH_PACKET(?QOS_0, <<"topic">>, undefined, <<"payload">>), {ok, _NChannel} = emqx_channel:handle_in(Publish, Channel). - % ?assertEqual(#{publish_in => 1}, emqx_channel:info(pub_stats, NChannel)). t_handle_in_qos1_publish(_) -> ok = meck:expect(emqx_broker, publish, fun(_) -> [] end), - Channel = channel(#{conn_state => connected}), Publish = ?PUBLISH_PACKET(?QOS_1, <<"topic">>, 1, <<"payload">>), - {ok, ?PUBACK_PACKET(1, RC), _NChannel} = emqx_channel:handle_in(Publish, Channel), - ?assert((RC == ?RC_SUCCESS) orelse (RC == ?RC_NO_MATCHING_SUBSCRIBERS)). - % ?assertEqual(#{publish_in => 1, puback_out => 1}, emqx_channel:info(pub_stats, NChannel)). + {ok, ?PUBACK_PACKET(1, ?RC_NO_MATCHING_SUBSCRIBERS), _Channel} = + emqx_channel:handle_in(Publish, channel(#{conn_state => connected})). t_handle_in_qos2_publish(_) -> - ok = meck:expect(emqx_session, publish, fun(_, _Msg, Session) -> {ok, [], Session} end), - ok = meck:expect(emqx_session, info, fun(await_rel_timeout, _Session) -> 300 end), - Channel = channel(#{conn_state => connected}), - Publish = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>), - {ok, ?PUBREC_PACKET(1, RC), _NChannel} = emqx_channel:handle_in(Publish, Channel), - ?assert((RC == ?RC_SUCCESS) orelse (RC == ?RC_NO_MATCHING_SUBSCRIBERS)). - % ?assertEqual(#{publish_in => 1, pubrec_out => 1}, emqx_channel:info(pub_stats, NChannel)). + ok = meck:expect(emqx_broker, publish, fun(_) -> [{node(), <<"topic">>, 1}] end), + Channel = channel(#{conn_state => connected, session => session()}), + Publish1 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>), + {ok, ?PUBREC_PACKET(1, ?RC_SUCCESS), Channel1} = + emqx_channel:handle_in(Publish1, Channel), + ok = meck:expect(emqx_broker, publish, fun(_) -> [] end), + Publish2 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 2, <<"payload">>), + {ok, ?PUBREC_PACKET(2, ?RC_NO_MATCHING_SUBSCRIBERS), Channel2} = + emqx_channel:handle_in(Publish2, Channel1), + ?assertEqual(2, proplists:get_value(awaiting_rel_cnt, emqx_channel:stats(Channel2))). + +t_handle_in_qos2_publish_with_error_return(_) -> + ok = meck:expect(emqx_metrics, inc, fun(_) -> ok end), + ok = meck:expect(emqx_broker, publish, fun(_) -> [] end), + Session = session(#{max_awaiting_rel => 2, awaiting_rel => #{1 => 1}}), + Channel = channel(#{conn_state => connected, session => Session}), + Publish1 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>), + {ok, ?PUBREC_PACKET(1, ?RC_PACKET_IDENTIFIER_IN_USE), Channel} = + emqx_channel:handle_in(Publish1, Channel), + Publish2 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 2, <<"payload">>), + {ok, ?PUBREC_PACKET(2, ?RC_NO_MATCHING_SUBSCRIBERS), Channel1} = + emqx_channel:handle_in(Publish2, Channel), + Publish3 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 3, <<"payload">>), + {ok, ?PUBREC_PACKET(3, ?RC_RECEIVE_MAXIMUM_EXCEEDED), Channel1} = + emqx_channel:handle_in(Publish3, Channel1). t_handle_in_puback_ok(_) -> Msg = emqx_message:make(<<"t">>, <<"payload">>), @@ -179,46 +193,38 @@ t_handle_in_pubrec_ok(_) -> Msg = emqx_message:make(test,?QOS_2, <<"t">>, <<"payload">>), ok = meck:expect(emqx_session, pubrec, fun(_, Session) -> {ok, Msg, Session} end), Channel = channel(#{conn_state => connected}), - {ok, ?PUBREL_PACKET(1, ?RC_SUCCESS), _Channel1} - = emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), Channel). - % ?assertEqual(#{pubrec_in => 1, pubrel_out => 1}, - % emqx_channel:info(pub_stats, Channel1)). + {ok, ?PUBREL_PACKET(1, ?RC_SUCCESS), _Channel1} = + emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), Channel). t_handle_in_pubrec_id_in_use(_) -> ok = meck:expect(emqx_session, pubrec, fun(_, _Session) -> {error, ?RC_PACKET_IDENTIFIER_IN_USE} end), - {ok, ?PUBREL_PACKET(1, ?RC_PACKET_IDENTIFIER_IN_USE), _Channel} - = emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), channel()). - % ?assertEqual(#{pubrec_in => 1, pubrel_out => 1}, - % emqx_channel:info(pub_stats, Channel)). + {ok, ?PUBREL_PACKET(1, ?RC_PACKET_IDENTIFIER_IN_USE), _Channel} = + emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), channel()). t_handle_in_pubrec_id_not_found(_) -> ok = meck:expect(emqx_session, pubrec, fun(_, _Session) -> {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} end), - {ok, ?PUBREL_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), _Channel} - = emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), channel()). - % ?assertEqual(#{pubrec_in => 1, pubrel_out => 1}, - % emqx_channel:info(pub_stats, Channel)). + {ok, ?PUBREL_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), _Channel} = + emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), channel()). t_handle_in_pubrel_ok(_) -> ok = meck:expect(emqx_session, pubrel, fun(_, Session) -> {ok, Session} end), Channel = channel(#{conn_state => connected}), - {ok, ?PUBCOMP_PACKET(1, ?RC_SUCCESS), _Channel1} - = emqx_channel:handle_in(?PUBREL_PACKET(1, ?RC_SUCCESS), Channel). - % ?assertEqual(#{pubrel_in => 1, pubcomp_out => 1}, - % emqx_channel:info(pub_stats, Channel1)). + {ok, ?PUBCOMP_PACKET(1, ?RC_SUCCESS), _Channel1} = + emqx_channel:handle_in(?PUBREL_PACKET(1, ?RC_SUCCESS), Channel). t_handle_in_pubrel_not_found_error(_) -> ok = meck:expect(emqx_session, pubrel, fun(_PacketId, _Session) -> {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} end), - {ok, ?PUBCOMP_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), _Channel} - = emqx_channel:handle_in(?PUBREL_PACKET(1, ?RC_SUCCESS), channel()). + {ok, ?PUBCOMP_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), _Channel} = + emqx_channel:handle_in(?PUBREL_PACKET(1, ?RC_SUCCESS), channel()). t_handle_in_pubcomp_ok(_) -> ok = meck:expect(emqx_session, pubcomp, fun(_, Session) -> {ok, Session} end), @@ -232,7 +238,6 @@ t_handle_in_pubcomp_not_found_error(_) -> end), Channel = channel(#{conn_state => connected}), {ok, _Channel1} = emqx_channel:handle_in(?PUBCOMP_PACKET(1, ?RC_SUCCESS), Channel). - % ?assertEqual(#{pubcomp_in => 1}, emqx_channel:info(pub_stats, Channel1)). t_handle_in_subscribe(_) -> ok = meck:expect(emqx_session, subscribe, @@ -249,12 +254,12 @@ t_handle_in_unsubscribe(_) -> {ok, Session} end), Channel = channel(#{conn_state => connected}), - {ok, [{outgoing, ?UNSUBACK_PACKET(1)}, {event, updated}], _Chan} - = emqx_channel:handle_in(?UNSUBSCRIBE_PACKET(1, #{}, [<<"+">>]), Channel). + {ok, [{outgoing, ?UNSUBACK_PACKET(1)}, {event, updated}], _Chan} = + emqx_channel:handle_in(?UNSUBSCRIBE_PACKET(1, #{}, [<<"+">>]), Channel). t_handle_in_pingreq(_) -> - {ok, ?PACKET(?PINGRESP), _Channel} - = emqx_channel:handle_in(?PACKET(?PINGREQ), channel()). + {ok, ?PACKET(?PINGRESP), _Channel} = + emqx_channel:handle_in(?PACKET(?PINGREQ), channel()). t_handle_in_disconnect(_) -> Packet = ?DISCONNECT_PACKET(?RC_SUCCESS), @@ -265,38 +270,37 @@ t_handle_in_disconnect(_) -> t_handle_in_auth(_) -> Channel = channel(#{conn_state => connected}), Packet = ?DISCONNECT_PACKET(?RC_IMPLEMENTATION_SPECIFIC_ERROR), - {ok, [{outgoing, Packet}, - {close, implementation_specific_error}], Channel} - = emqx_channel:handle_in(?AUTH_PACKET(), Channel). + {ok, [{outgoing, Packet}, {close, implementation_specific_error}], Channel} = + emqx_channel:handle_in(?AUTH_PACKET(), Channel). t_handle_in_frame_error(_) -> IdleChannel = channel(#{conn_state => idle}), - {shutdown, frame_too_large, _} - = emqx_channel:handle_in({frame_error, frame_too_large}, IdleChannel), + {shutdown, frame_too_large, _Chan} = + emqx_channel:handle_in({frame_error, frame_too_large}, IdleChannel), ConnectingChan = channel(#{conn_state => connecting}), ConnackPacket = ?CONNACK_PACKET(?RC_MALFORMED_PACKET), - {shutdown, frame_too_large, ConnackPacket, _} - = emqx_channel:handle_in({frame_error, frame_too_large}, ConnectingChan), + {shutdown, frame_too_large, ConnackPacket, _} = + emqx_channel:handle_in({frame_error, frame_too_large}, ConnectingChan), DisconnectPacket = ?DISCONNECT_PACKET(?RC_MALFORMED_PACKET), ConnectedChan = channel(#{conn_state => connected}), - {ok, [{outgoing, DisconnectPacket}, {close, frame_too_large}], _} - = emqx_channel:handle_in({frame_error, frame_too_large}, ConnectedChan), + {ok, [{outgoing, DisconnectPacket}, {close, frame_too_large}], _} = + emqx_channel:handle_in({frame_error, frame_too_large}, ConnectedChan), DisconnectedChan = channel(#{conn_state => disconnected}), - {ok, DisconnectedChan} - = emqx_channel:handle_in({frame_error, frame_too_large}, DisconnectedChan). + {ok, DisconnectedChan} = + emqx_channel:handle_in({frame_error, frame_too_large}, DisconnectedChan). t_handle_in_expected_packet(_) -> Packet = ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR), - {ok, [{outgoing, Packet}, {close, protocol_error}], _Chan} - = emqx_channel:handle_in(packet, channel()). + {ok, [{outgoing, Packet}, {close, protocol_error}], _Chan} = + emqx_channel:handle_in(packet, channel()). t_process_connect(_) -> ok = meck:expect(emqx_cm, open_session, fun(true, _ClientInfo, _ConnInfo) -> {ok, #{session => session(), present => false}} end), - {ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS)}], _Chan} - = emqx_channel:process_connect(connpkt(), channel(#{conn_state => idle})). + {ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS)}], _Chan} = + emqx_channel:process_connect(connpkt(), channel(#{conn_state => idle})). t_process_publish_qos0(_) -> ok = meck:expect(emqx_broker, publish, fun(_) -> [] end), @@ -306,8 +310,8 @@ t_process_publish_qos0(_) -> t_process_publish_qos1(_) -> ok = meck:expect(emqx_broker, publish, fun(_) -> [] end), Publish = ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>), - {ok, ?PUBACK_PACKET(1, ?RC_NO_MATCHING_SUBSCRIBERS), _Channel} - = emqx_channel:process_publish(Publish, channel()). + {ok, ?PUBACK_PACKET(1, ?RC_NO_MATCHING_SUBSCRIBERS), _Channel} = + emqx_channel:process_publish(Publish, channel()). t_process_subscribe(_) -> ok = meck:expect(emqx_session, subscribe, fun(_, _, _, Session) -> {ok, Session} end), @@ -347,14 +351,14 @@ t_handle_out_publish(_) -> Channel = channel(#{conn_state => connected}), Pub0 = {undefined, emqx_message:make(<<"t">>, <<"qos0">>)}, Pub1 = {1, emqx_message:make(<<"c">>, ?QOS_1, <<"t">>, <<"qos1">>)}, - {ok, {outgoing, Packets}, _NChannel} - = emqx_channel:handle_out(publish, [Pub0, Pub1], Channel), + {ok, {outgoing, Packets}, _NChannel} = + emqx_channel:handle_out(publish, [Pub0, Pub1], Channel), ?assertEqual(2, length(Packets)). t_handle_out_publish_1(_) -> Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"t">>, <<"payload">>), - {ok, ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>), _Chan} - = emqx_channel:handle_out(publish, [{1, Msg}], channel()). + {ok, ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>), _Chan} = + emqx_channel:handle_out(publish, [{1, Msg}], channel()). t_handle_out_publish_nl(_) -> ClientInfo = clientinfo(#{clientid => <<"clientid">>}), @@ -364,50 +368,47 @@ t_handle_out_publish_nl(_) -> {ok, Channel} = emqx_channel:handle_out(publish, Pubs, Channel). t_handle_out_connack_sucess(_) -> - {ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, _)}], Channel} - = emqx_channel:handle_out(connack, {?RC_SUCCESS, 0, connpkt()}, channel()), + {ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, _)}], Channel} = + emqx_channel:handle_out(connack, {?RC_SUCCESS, 0, connpkt()}, channel()), ?assertEqual(connected, emqx_channel:info(conn_state, Channel)). t_handle_out_connack_failure(_) -> - {shutdown, not_authorized, ?CONNACK_PACKET(?RC_NOT_AUTHORIZED), _Chan} - = emqx_channel:handle_out(connack, {?RC_NOT_AUTHORIZED, connpkt()}, channel()). + {shutdown, not_authorized, ?CONNACK_PACKET(?RC_NOT_AUTHORIZED), _Chan} = + emqx_channel:handle_out(connack, {?RC_NOT_AUTHORIZED, connpkt()}, channel()). t_handle_out_puback(_) -> Channel = channel(#{conn_state => connected}), - {ok, ?PUBACK_PACKET(1, ?RC_SUCCESS), _NChannel} - = emqx_channel:handle_out(puback, {1, ?RC_SUCCESS}, Channel). - % ?assertEqual(#{puback_out => 1}, emqx_channel:info(pub_stats, NChannel)). + {ok, ?PUBACK_PACKET(1, ?RC_SUCCESS), _NChannel} = + emqx_channel:handle_out(puback, {1, ?RC_SUCCESS}, Channel). t_handle_out_pubrec(_) -> Channel = channel(#{conn_state => connected}), - {ok, ?PUBREC_PACKET(1, ?RC_SUCCESS), _NChannel} - = emqx_channel:handle_out(pubrec, {1, ?RC_SUCCESS}, Channel). + {ok, ?PUBREC_PACKET(1, ?RC_SUCCESS), _NChannel} = + emqx_channel:handle_out(pubrec, {1, ?RC_SUCCESS}, Channel). t_handle_out_pubrel(_) -> Channel = channel(#{conn_state => connected}), - {ok, ?PUBREL_PACKET(1), Channel1} - = emqx_channel:handle_out(pubrel, {1, ?RC_SUCCESS}, Channel), - {ok, ?PUBREL_PACKET(2, ?RC_SUCCESS), _Channel2} - = emqx_channel:handle_out(pubrel, {2, ?RC_SUCCESS}, Channel1). + {ok, ?PUBREL_PACKET(1), Channel1} = + emqx_channel:handle_out(pubrel, {1, ?RC_SUCCESS}, Channel), + {ok, ?PUBREL_PACKET(2, ?RC_SUCCESS), _Channel2} = + emqx_channel:handle_out(pubrel, {2, ?RC_SUCCESS}, Channel1). t_handle_out_pubcomp(_) -> - {ok, ?PUBCOMP_PACKET(1, ?RC_SUCCESS), _Channel} - = emqx_channel:handle_out(pubcomp, {1, ?RC_SUCCESS}, channel()). + {ok, ?PUBCOMP_PACKET(1, ?RC_SUCCESS), _Channel} = + emqx_channel:handle_out(pubcomp, {1, ?RC_SUCCESS}, channel()). t_handle_out_suback(_) -> Replies = [{outgoing, ?SUBACK_PACKET(1, [?QOS_2])}, {event, updated}], - {ok, Replies, _Channel} - = emqx_channel:handle_out(suback, {1, [?QOS_2]}, channel()). + {ok, Replies, _Chan} = emqx_channel:handle_out(suback, {1, [?QOS_2]}, channel()). t_handle_out_unsuback(_) -> Replies = [{outgoing, ?UNSUBACK_PACKET(1, [?RC_SUCCESS])}, {event, updated}], - {ok, Replies, _Channel} - = emqx_channel:handle_out(unsuback, {1, [?RC_SUCCESS]}, channel()). + {ok, Replies, _Chan} = emqx_channel:handle_out(unsuback, {1, [?RC_SUCCESS]}, channel()). t_handle_out_disconnect(_) -> Packet = ?DISCONNECT_PACKET(?RC_SUCCESS), - {ok, [{outgoing, Packet}, {close, normal}], _Chan} - = emqx_channel:handle_out(disconnect, ?RC_SUCCESS, channel()). + {ok, [{outgoing, Packet}, {close, normal}], _Chan} = + emqx_channel:handle_out(disconnect, ?RC_SUCCESS, channel()). t_handle_out_unexpected(_) -> {ok, _Channel} = emqx_channel:handle_out(unexpected, <<"data">>, channel()). @@ -421,20 +422,19 @@ t_handle_call_kick(_) -> t_handle_call_discard(_) -> Packet = ?DISCONNECT_PACKET(?RC_SESSION_TAKEN_OVER), - {shutdown, discarded, ok, Packet, _Channel} - = emqx_channel:handle_call(discard, channel()). + {shutdown, discarded, ok, Packet, _Channel} = + emqx_channel:handle_call(discard, channel()). t_handle_call_takeover_begin(_) -> - {reply, undefined, _Channel} - = emqx_channel:handle_call({takeover, 'begin'}, channel()). + {reply, undefined, _Chan} = emqx_channel:handle_call({takeover, 'begin'}, channel()). t_handle_call_takeover_end(_) -> ok = meck:expect(emqx_session, takeover, fun(_) -> ok end), - {shutdown, takeovered, [], _Channel} - = emqx_channel:handle_call({takeover, 'end'}, channel()). + {shutdown, takeovered, [], _Chan} = + emqx_channel:handle_call({takeover, 'end'}, channel()). t_handle_call_unexpected(_) -> - {reply, ignored, _Channel} = emqx_channel:handle_call(unexpected_req, channel()). + {reply, ignored, _Chan} = emqx_channel:handle_call(unexpected_req, channel()). %%-------------------------------------------------------------------- %% Test cases for handle_info @@ -506,8 +506,8 @@ t_auth_connect(_) -> t_process_alias(_) -> Publish = #mqtt_packet_publish{topic_name = <<>>, properties = #{'Topic-Alias' => 1}}, Channel = emqx_channel:set_field(topic_aliases, #{1 => <<"t">>}, channel()), - {ok, #mqtt_packet{variable = #mqtt_packet_publish{topic_name = <<"t">>}}, _Chan} - = emqx_channel:process_alias(#mqtt_packet{variable = Publish}, Channel). + {ok, #mqtt_packet{variable = #mqtt_packet_publish{topic_name = <<"t">>}}, _Chan} = + emqx_channel:process_alias(#mqtt_packet{variable = Publish}, Channel). t_check_pub_acl(_) -> ok = meck:new(emqx_zone, [passthrough, no_history]), @@ -607,6 +607,6 @@ session(InitFields) when is_map(InitFields) -> maps:fold(fun(Field, Value, Session) -> emqx_session:set_field(Field, Value, Session) end, - emqx_session:init(#{zone => zone}, #{receive_maximum => 0}), + emqx_session:init(#{zone => channel}, #{receive_maximum => 0}), InitFields). diff --git a/test/emqx_session_SUITE.erl b/test/emqx_session_SUITE.erl index fa4798391..1112f21ec 100644 --- a/test/emqx_session_SUITE.erl +++ b/test/emqx_session_SUITE.erl @@ -23,8 +23,6 @@ -include_lib("proper/include/proper.hrl"). -include_lib("eunit/include/eunit.hrl"). --import(emqx_session, [set_field/3]). - all() -> emqx_ct:all(?MODULE). %%-------------------------------------------------------------------- @@ -57,6 +55,7 @@ t_session_init(_) -> ?assertEqual(64, emqx_session:info(inflight_max, Session)), ?assertEqual(1, emqx_session:info(next_pkt_id, Session)), ?assertEqual(0, emqx_session:info(retry_interval, Session)), + ?assertEqual(0, emqx_mqueue:len(emqx_session:info(mqueue, Session))), ?assertEqual(0, emqx_session:info(awaiting_rel_cnt, Session)), ?assertEqual(100, emqx_session:info(awaiting_rel_max, Session)), ?assertEqual(300, emqx_session:info(await_rel_timeout, Session)), @@ -86,12 +85,11 @@ t_session_stats(_) -> }, maps:from_list(Stats)). %%-------------------------------------------------------------------- -%% Test cases for pub/sub +%% Test cases for sub/unsub %%-------------------------------------------------------------------- t_subscribe(_) -> ok = meck:expect(emqx_broker, subscribe, fun(_, _, _) -> ok end), - ok = meck:expect(emqx_broker, set_subopts, fun(_, _) -> ok end), {ok, Session} = emqx_session:subscribe( clientinfo(), <<"#">>, subopts(), session()), ?assertEqual(1, emqx_session:info(subscriptions_cnt, Session)). @@ -101,110 +99,217 @@ t_is_subscriptions_full_false(_) -> ?assertNot(emqx_session:is_subscriptions_full(Session)). t_is_subscriptions_full_true(_) -> + ok = meck:expect(emqx_broker, subscribe, fun(_, _, _) -> ok end), Session = session(#{max_subscriptions => 1}), ?assertNot(emqx_session:is_subscriptions_full(Session)), - Subs = #{<<"t1">> => subopts(), <<"t2">> => subopts()}, - NSession = set_field(subscriptions, Subs, Session), - ?assert(emqx_session:is_subscriptions_full(NSession)). + {ok, Session1} = emqx_session:subscribe( + clientinfo(), <<"t1">>, subopts(), Session), + ?assert(emqx_session:is_subscriptions_full(Session1)), + {error, ?RC_QUOTA_EXCEEDED} = + emqx_session:subscribe(clientinfo(), <<"t2">>, subopts(), Session1). t_unsubscribe(_) -> ok = meck:expect(emqx_broker, unsubscribe, fun(_) -> ok end), Session = session(#{subscriptions => #{<<"#">> => subopts()}}), - {ok, NSession} = emqx_session:unsubscribe(clientinfo(), <<"#">>, Session), - Error = emqx_session:unsubscribe(clientinfo(), <<"#">>, NSession), - ?assertEqual({error, ?RC_NO_SUBSCRIPTION_EXISTED}, Error). + {ok, Session1} = emqx_session:unsubscribe(clientinfo(), <<"#">>, Session), + {error, ?RC_NO_SUBSCRIPTION_EXISTED} = + emqx_session:unsubscribe(clientinfo(), <<"#">>, Session1). t_publish_qos0(_) -> ok = meck:expect(emqx_broker, publish, fun(_) -> [] end), - Msg = emqx_message:make(test, ?QOS_0, <<"t">>, <<"payload">>), - {ok, [], Session} = emqx_session:publish(0, Msg, Session = session()). + Msg = emqx_message:make(clientid, ?QOS_0, <<"t">>, <<"payload">>), + {ok, [], Session} = emqx_session:publish(1, Msg, Session = session()), + {ok, [], Session} = emqx_session:publish(undefined, Msg, Session). t_publish_qos1(_) -> ok = meck:expect(emqx_broker, publish, fun(_) -> [] end), - Msg = emqx_message:make(test, ?QOS_1, <<"t">>, <<"payload">>), - {ok, [], _Session} = emqx_session:publish(1, Msg, session()). + Msg = emqx_message:make(clientid, ?QOS_1, <<"t">>, <<"payload">>), + {ok, [], Session} = emqx_session:publish(1, Msg, Session = session()), + {ok, [], Session} = emqx_session:publish(2, Msg, Session). t_publish_qos2(_) -> ok = meck:expect(emqx_broker, publish, fun(_) -> [] end), - Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<"payload">>), + Msg = emqx_message:make(clientid, ?QOS_2, <<"t">>, <<"payload">>), {ok, [], Session} = emqx_session:publish(1, Msg, session()), - ?assertEqual(1, emqx_session:info(awaiting_rel_cnt, Session)). + ?assertEqual(1, emqx_session:info(awaiting_rel_cnt, Session)), + {ok, Session1} = emqx_session:pubrel(1, Session), + ?assertEqual(0, emqx_session:info(awaiting_rel_cnt, Session1)), + {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrel(1, Session1). + +t_publish_qos2_with_error_return(_) -> + ok = meck:expect(emqx_broker, publish, fun(_) -> [] end), + Session = session(#{max_awaiting_rel => 2, + awaiting_rel => #{1 => ts(millisecond)} + }), + Msg = emqx_message:make(clientid, ?QOS_2, <<"t">>, <<"payload">>), + {error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:publish(1, Msg, Session), + {ok, [], Session1} = emqx_session:publish(2, Msg, Session), + ?assertEqual(2, emqx_session:info(awaiting_rel_cnt, Session1)), + {error, ?RC_RECEIVE_MAXIMUM_EXCEEDED} = emqx_session:publish(3, Msg, Session1). t_is_awaiting_full_false(_) -> - ?assertNot(emqx_session:is_awaiting_full(session(#{max_awaiting_rel => 0}))). + Session = session(#{max_awaiting_rel => 0}), + ?assertNot(emqx_session:is_awaiting_full(Session)). t_is_awaiting_full_true(_) -> Session = session(#{max_awaiting_rel => 1, - awaiting_rel => #{1 => 1} + awaiting_rel => #{1 => ts(millisecond)} }), ?assert(emqx_session:is_awaiting_full(Session)). t_puback(_) -> Msg = emqx_message:make(test, ?QOS_1, <<"t">>, <<>>), - Inflight = emqx_inflight:insert(1, {Msg, erlang:system_time(millisecond)}, emqx_inflight:new()), - Session = set_field(inflight, Inflight, session()), - {ok, Msg, NSession} = emqx_session:puback(1, Session), - ?assertEqual(0, emqx_session:info(inflight_cnt, NSession)). + Inflight = emqx_inflight:insert(1, {Msg, ts(millisecond)}, emqx_inflight:new()), + Session = session(#{inflight => Inflight, mqueue => mqueue()}), + {ok, Msg, Session1} = emqx_session:puback(1, Session), + ?assertEqual(0, emqx_session:info(inflight_cnt, Session1)). + +t_puback_with_dequeue(_) -> + Msg1 = emqx_message:make(clientid, ?QOS_1, <<"t1">>, <<"payload1">>), + Inflight = emqx_inflight:insert(1, {Msg1, ts(millisecond)}, emqx_inflight:new()), + Msg2 = emqx_message:make(clientid, ?QOS_1, <<"t2">>, <<"payload2">>), + {_, Q} = emqx_mqueue:in(Msg2, mqueue(#{max_len => 10})), + Session = session(#{inflight => Inflight, mqueue => Q}), + {ok, Msg1, [{_, Msg3}], Session1} = emqx_session:puback(1, Session), + ?assertEqual(1, emqx_session:info(inflight_cnt, Session1)), + ?assertEqual(0, emqx_session:info(mqueue_len, Session1)), + ?assertEqual(<<"t2">>, emqx_message:topic(Msg3)). t_puback_error_packet_id_in_use(_) -> - Inflight = emqx_inflight:insert(1, {pubrel, erlang:system_time(millisecond)}, emqx_inflight:new()), - Session = set_field(inflight, Inflight, session()), - {error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:puback(1, Session). + Inflight = emqx_inflight:insert(1, {pubrel, ts(millisecond)}, emqx_inflight:new()), + {error, ?RC_PACKET_IDENTIFIER_IN_USE} = + emqx_session:puback(1, session(#{inflight => Inflight})). t_puback_error_packet_id_not_found(_) -> {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:puback(1, session()). t_pubrec(_) -> Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<>>), - Inflight = emqx_inflight:insert(2, {Msg, erlang:system_time(millisecond)}, emqx_inflight:new()), - Session = set_field(inflight, Inflight, session()), - {ok, Msg, NSession} = emqx_session:pubrec(2, Session), - ?assertMatch([{pubrel, _}], emqx_inflight:values(emqx_session:info(inflight, NSession))). + Inflight = emqx_inflight:insert(2, {Msg, ts(millisecond)}, emqx_inflight:new()), + Session = session(#{inflight => Inflight}), + {ok, Msg, Session1} = emqx_session:pubrec(2, Session), + ?assertMatch([{pubrel, _}], emqx_inflight:values(emqx_session:info(inflight, Session1))). t_pubrec_packet_id_in_use_error(_) -> - Inflight = emqx_inflight:insert(1, {pubrel, erlang:system_time(millisecond)}, emqx_inflight:new()), - Session = set_field(inflight, Inflight, session()), - {error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:puback(1, Session). + Inflight = emqx_inflight:insert(1, {pubrel, ts(millisecond)}, emqx_inflight:new()), + {error, ?RC_PACKET_IDENTIFIER_IN_USE} = + emqx_session:pubrec(1, session(#{inflight => Inflight})). t_pubrec_packet_id_not_found_error(_) -> {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrec(1, session()). t_pubrel(_) -> - Session = set_field(awaiting_rel, #{1 => erlang:system_time(millisecond)}, session()), - {ok, NSession} = emqx_session:pubrel(1, Session), - ?assertEqual(#{}, emqx_session:info(awaiting_rel, NSession)). + Session = session(#{awaiting_rel => #{1 => ts(millisecond)}}), + {ok, Session1} = emqx_session:pubrel(1, Session), + ?assertEqual(#{}, emqx_session:info(awaiting_rel, Session1)). -t_pubrel_id_not_found(_) -> +t_pubrel_error_packetid_not_found(_) -> {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrel(1, session()). t_pubcomp(_) -> - Inflight = emqx_inflight:insert(2, {pubrel, erlang:system_time(millisecond)}, emqx_inflight:new()), - Session = emqx_session:set_field(inflight, Inflight, session()), - {ok, NSession} = emqx_session:pubcomp(2, Session), - ?assertEqual(0, emqx_session:info(inflight_cnt, NSession)). + Inflight = emqx_inflight:insert(1, {pubrel, ts(millisecond)}, emqx_inflight:new()), + Session = session(#{inflight => Inflight}), + {ok, Session1} = emqx_session:pubcomp(1, Session), + ?assertEqual(0, emqx_session:info(inflight_cnt, Session1)). -t_pubcomp_id_not_found(_) -> - {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubcomp(2, session()). +t_pubcomp_error_packetid_in_use(_) -> + Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<>>), + Inflight = emqx_inflight:insert(1, {Msg, ts(millisecond)}, emqx_inflight:new()), + Session = session(#{inflight => Inflight}), + {error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:pubcomp(1, Session). + +t_pubcomp_error_packetid_not_found(_) -> + {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubcomp(1, session()). %%-------------------------------------------------------------------- %% Test cases for deliver/retry %%-------------------------------------------------------------------- t_dequeue(_) -> - {ok, _Session} = emqx_session:dequeue(session()). + Q = mqueue(#{store_qos0 => true}), + {ok, Session} = emqx_session:dequeue(session(#{mqueue => Q})), + Msgs = [emqx_message:make(clientid, ?QOS_0, <<"t0">>, <<"payload">>), + emqx_message:make(clientid, ?QOS_1, <<"t1">>, <<"payload">>), + emqx_message:make(clientid, ?QOS_2, <<"t2">>, <<"payload">>) + ], + Session1 = lists:foldl(fun emqx_session:enqueue/2, Session, Msgs), + {ok, [{undefined, Msg0}, {1, Msg1}, {2, Msg2}], Session2} = + emqx_session:dequeue(Session1), + ?assertEqual(0, emqx_session:info(mqueue_len, Session2)), + ?assertEqual(2, emqx_session:info(inflight_cnt, Session2)), + ?assertEqual(<<"t0">>, emqx_message:topic(Msg0)), + ?assertEqual(<<"t1">>, emqx_message:topic(Msg1)), + ?assertEqual(<<"t2">>, emqx_message:topic(Msg2)). -t_deliver(_) -> +t_deliver_qos0(_) -> + ok = meck:expect(emqx_broker, subscribe, fun(_, _, _) -> ok end), + {ok, Session} = emqx_session:subscribe( + clientinfo(), <<"t0">>, subopts(), session()), + {ok, Session1} = emqx_session:subscribe( + clientinfo(), <<"t1">>, subopts(), Session), + Deliveries = [delivery(?QOS_0, T) || T <- [<<"t0">>, <<"t1">>]], + {ok, [{undefined, Msg1}, {undefined, Msg2}], Session1} = + emqx_session:deliver(Deliveries, Session1), + ?assertEqual(<<"t0">>, emqx_message:topic(Msg1)), + ?assertEqual(<<"t1">>, emqx_message:topic(Msg2)). + +t_deliver_qos1(_) -> + ok = meck:expect(emqx_broker, subscribe, fun(_, _, _) -> ok end), + {ok, Session} = emqx_session:subscribe( + clientinfo(), <<"t1">>, subopts(#{qos => ?QOS_1}), session()), + Delivers = [delivery(?QOS_1, T) || T <- [<<"t1">>, <<"t2">>]], + {ok, [{1, Msg1}, {2, Msg2}], Session1} = emqx_session:deliver(Delivers, Session), + ?assertEqual(2, emqx_session:info(inflight_cnt, Session1)), + ?assertEqual(<<"t1">>, emqx_message:topic(Msg1)), + ?assertEqual(<<"t2">>, emqx_message:topic(Msg2)), + {ok, Msg1, Session2} = emqx_session:puback(1, Session1), + ?assertEqual(1, emqx_session:info(inflight_cnt, Session2)), + {ok, Msg2, Session3} = emqx_session:puback(2, Session2), + ?assertEqual(0, emqx_session:info(inflight_cnt, Session3)). + +t_deliver_qos2(_) -> + ok = meck:expect(emqx_broker, subscribe, fun(_, _, _) -> ok end), + Delivers = [delivery(?QOS_2, <<"t0">>), delivery(?QOS_2, <<"t1">>)], + {ok, [{1, Msg1}, {2, Msg2}], Session} = + emqx_session:deliver(Delivers, session()), + ?assertEqual(2, emqx_session:info(inflight_cnt, Session)), + ?assertEqual(<<"t0">>, emqx_message:topic(Msg1)), + ?assertEqual(<<"t1">>, emqx_message:topic(Msg2)). + +t_deliver_one_msg(_) -> + {ok, [{1, Msg}], Session} = + emqx_session:deliver([delivery(?QOS_1, <<"t1">>)], session()), + ?assertEqual(1, emqx_session:info(inflight_cnt, Session)), + ?assertEqual(<<"t1">>, emqx_message:topic(Msg)). + +t_deliver_when_inflight_is_full(_) -> Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)], - {ok, Publishes, _Session} = emqx_session:deliver(Delivers, session()), - ?assertEqual(2, length(Publishes)). + Session = session(#{inflight => emqx_inflight:new(1)}), + {ok, Publishes, Session1} = emqx_session:deliver(Delivers, Session), + ?assertEqual(1, length(Publishes)), + ?assertEqual(1, emqx_session:info(inflight_cnt, Session1)), + ?assertEqual(1, emqx_session:info(mqueue_len, Session1)), + {ok, Msg1, [{2, Msg2}], Session2} = emqx_session:puback(1, Session1), + ?assertEqual(1, emqx_session:info(inflight_cnt, Session2)), + ?assertEqual(0, emqx_session:info(mqueue_len, Session2)), + ?assertEqual(<<"t1">>, emqx_message:topic(Msg1)), + ?assertEqual(<<"t2">>, emqx_message:topic(Msg2)). t_enqueue(_) -> - Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)], - Session = emqx_session:enqueue(Delivers, session()), - ?assertEqual(2, emqx_session:info(mqueue_len, Session)). + %% store_qos0 = true + Session = emqx_session:enqueue([delivery(?QOS_0, <<"t0">>)], session()), + Session1 = emqx_session:enqueue([delivery(?QOS_1, <<"t1">>), + delivery(?QOS_2, <<"t2">>)], Session), + ?assertEqual(3, emqx_session:info(mqueue_len, Session1)). t_retry(_) -> - {ok, _Session} = emqx_session:retry(session()). + Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)], + Session = session(#{retry_interval => 100}), + {ok, Pubs, Session1} = emqx_session:deliver(Delivers, Session), + ok = timer:sleep(200), + Msgs1 = [{I, emqx_message:set_flag(dup, Msg)} || {I, Msg} <- Pubs], + {ok, Msgs1, 100, Session2} = emqx_session:retry(Session1), + ?assertEqual(2, emqx_session:info(inflight_cnt, Session2)). %%-------------------------------------------------------------------- %% Test cases for takeover/resume @@ -220,22 +325,53 @@ t_resume(_) -> Session = session(#{subscriptions => #{<<"t">> => ?DEFAULT_SUBOPTS}}), ok = emqx_session:resume(<<"clientid">>, Session). -t_redeliver(_) -> - {ok, [], _Session} = emqx_session:redeliver(session()). +t_replay(_) -> + Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)], + {ok, Pubs, Session1} = emqx_session:deliver(Delivers, session()), + Msg = emqx_message:make(clientid, ?QOS_1, <<"t1">>, <<"payload">>), + Session2 = emqx_session:enqueue(Msg, Session1), + Pubs1 = [{I, emqx_message:set_flag(dup, Msg)} || {I, Msg} <- Pubs], + {ok, ReplayPubs, Session3} = emqx_session:replay(Session2), + ?assertEqual(Pubs1 ++ [{3, Msg}], ReplayPubs), + ?assertEqual(3, emqx_session:info(inflight_cnt, Session3)). -t_expire(_) -> - {ok, _Session} = emqx_session:expire(awaiting_rel, session()). +t_expire_awaiting_rel(_) -> + {ok, Session} = emqx_session:expire(awaiting_rel, session()), + Timeout = emqx_session:info(await_rel_timeout, Session) * 1000, + Session1 = emqx_session:set_field(awaiting_rel, #{1 => Ts = ts(millisecond)}, Session), + {ok, Timeout, Session2} = emqx_session:expire(awaiting_rel, Session1), + ?assertEqual(#{1 => Ts}, emqx_session:info(awaiting_rel, Session2)). + +t_expire_awaiting_rel_all(_) -> + Session = session(#{awaiting_rel => #{1 => 1, 2 => 2}}), + {ok, Session1} = emqx_session:expire(awaiting_rel, Session), + ?assertEqual(#{}, emqx_session:info(awaiting_rel, Session1)). + +%%-------------------------------------------------------------------- +%% CT for utility functions +%%-------------------------------------------------------------------- + +t_next_pakt_id(_) -> + Session = session(#{next_pkt_id => 16#FFFF}), + Session1 = emqx_session:next_pkt_id(Session), + ?assertEqual(1, emqx_session:info(next_pkt_id, Session1)), + Session2 = emqx_session:next_pkt_id(Session1), + ?assertEqual(2, emqx_session:info(next_pkt_id, Session2)). %%-------------------------------------------------------------------- %% Helper functions %%-------------------------------------------------------------------- +mqueue() -> mqueue(#{}). +mqueue(Opts) -> + emqx_mqueue:init(maps:merge(#{max_len => 0, store_qos0 => false}, Opts)). + session() -> session(#{}). session(InitFields) when is_map(InitFields) -> maps:fold(fun(Field, Value, Session) -> emqx_session:set_field(Field, Value, Session) end, - emqx_session:init(#{zone => zone}, #{receive_maximum => 0}), + emqx_session:init(#{zone => channel}, #{receive_maximum => 0}), InitFields). @@ -252,3 +388,8 @@ subopts(Init) -> delivery(QoS, Topic) -> {deliver, Topic, emqx_message:make(test, QoS, Topic, <<"payload">>)}. +ts(second) -> + erlang:system_time(second); +ts(millisecond) -> + erlang:system_time(millisecond). +