Optimize emqx_session and add more test cases

This commit is contained in:
Feng Lee 2019-12-10 23:58:59 +08:00 committed by turtleDeng
parent fff737d32a
commit d1c3dec730
4 changed files with 342 additions and 191 deletions

View File

@ -1210,7 +1210,7 @@ maybe_resume_session(#channel{resuming = false}) ->
maybe_resume_session(#channel{session = Session,
resuming = true,
pendings = Pendings}) ->
{ok, Publishes, Session1} = emqx_session:redeliver(Session),
{ok, Publishes, Session1} = emqx_session:replay(Session),
case emqx_session:deliver(Pendings, Session1) of
{ok, Session2} ->
{ok, Publishes, Session2};

View File

@ -80,7 +80,7 @@
-export([ takeover/1
, resume/2
, redeliver/1
, replay/1
]).
-export([expire/2]).
@ -417,6 +417,10 @@ acc_msg(Msg, Msgs) ->
-spec(deliver(list(emqx_types:deliver()), session())
-> {ok, session()} | {ok, replies(), session()}).
deliver([Deliver], Session) -> %% Optimize
Enrich = enrich_fun(Session),
deliver_msg(Enrich(Deliver), Session);
deliver(Delivers, Session) ->
Msgs = lists:map(enrich_fun(Session), Delivers),
deliver(Msgs, [], Session).
@ -424,12 +428,19 @@ deliver(Delivers, Session) ->
deliver([], Publishes, Session) ->
{ok, lists:reverse(Publishes), Session};
deliver([Msg = #message{qos = ?QOS_0}|More], Acc, Session) ->
Publish = {undefined, maybe_ack(Msg)},
deliver(More, [Publish|Acc], Session);
deliver([Msg|More], Acc, Session) ->
case deliver_msg(Msg, Session) of
{ok, Session1} ->
deliver(More, Acc, Session1);
{ok, [Publish], Session1} ->
deliver(More, [Publish|Acc], Session1)
end.
deliver([Msg = #message{qos = QoS}|More], Acc, Session =
#session{next_pkt_id = PacketId, inflight = Inflight})
deliver_msg(Msg = #message{qos = ?QOS_0}, Session) ->
{ok, [{undefined, maybe_ack(Msg)}], Session};
deliver_msg(Msg = #message{qos = QoS}, Session =
#session{next_pkt_id = PacketId, inflight = Inflight})
when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 ->
case emqx_inflight:is_full(Inflight) of
true ->
@ -437,15 +448,19 @@ deliver([Msg = #message{qos = QoS}|More], Acc, Session =
true -> Session;
false -> enqueue(Msg, Session)
end,
deliver(More, Acc, Session1);
{ok, Session1};
false ->
Publish = {PacketId, maybe_ack(Msg)},
Session1 = await(PacketId, Msg, Session),
deliver(More, [Publish|Acc], next_pkt_id(Session1))
{ok, [Publish], next_pkt_id(Session1)}
end.
-spec(enqueue(list(emqx_types:deliver())|emqx_types:message(), session())
-> session()).
-spec(enqueue(list(emqx_types:deliver())|emqx_types:message(),
session()) -> session()).
enqueue([Deliver], Session) -> %% Optimize
Enrich = enrich_fun(Session),
enqueue(Enrich(Deliver), Session);
enqueue(Delivers, Session) when is_list(Delivers) ->
Msgs = lists:map(enrich_fun(Session), Delivers),
lists:foldl(fun enqueue/2, Session, Msgs);
@ -564,48 +579,43 @@ expire(awaiting_rel, Session = #session{awaiting_rel = AwaitingRel}) ->
expire_awaiting_rel(Now, Session = #session{awaiting_rel = AwaitingRel,
await_rel_timeout = Timeout}) ->
NotExpired = fun(_PacketId, Ts) -> age(Now, Ts) < Timeout end,
case maps:filter(NotExpired, AwaitingRel) of
[] -> {ok, Session};
AwaitingRel1 ->
{ok, Timeout, Session#session{awaiting_rel = AwaitingRel1}}
AwaitingRel1 = maps:filter(NotExpired, AwaitingRel),
NSession = Session#session{awaiting_rel = AwaitingRel1},
case maps:size(AwaitingRel1) of
0 -> {ok, NSession};
_ -> {ok, Timeout, NSession}
end.
%%--------------------------------------------------------------------
%% Takeover, Resume and Redeliver
%% Takeover, Resume and Replay
%%--------------------------------------------------------------------
-spec(takeover(session()) -> ok).
takeover(#session{subscriptions = Subs}) ->
lists:foreach(fun({TopicFilter, _SubOpts}) ->
ok = emqx_broker:unsubscribe(TopicFilter)
end, maps:to_list(Subs)).
lists:foreach(fun emqx_broker:unsubscribe/1, maps:keys(Subs)).
-spec(resume(emqx_types:clientid(), session()) -> ok).
resume(ClientId, #session{subscriptions = Subs}) ->
%% 1. Subscribe again.
lists:foreach(fun({TopicFilter, SubOpts}) ->
ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts)
ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts)
end, maps:to_list(Subs)).
%% 2. Run hooks.
%% ok = emqx_hooks:run('session.resumed', [#{clientid => ClientId}, attrs(Session)]),
%% TODO: 3. Redeliver: Replay delivery and Dequeue pending messages
%%Session.
-spec(redeliver(session()) -> {ok, replies(), session()}).
redeliver(Session = #session{inflight = Inflight}) ->
Pubs = lists:map(fun to_pub/1, emqx_inflight:to_list(Inflight)),
-spec(replay(session()) -> {ok, replies(), session()}).
replay(Session = #session{inflight = Inflight}) ->
Pubs = replay(Inflight),
case dequeue(Session) of
{ok, NSession} -> {ok, Pubs, NSession};
{ok, More, NSession} ->
{ok, lists:append(Pubs, More), NSession}
end.
end;
to_pub({PacketId, {pubrel, _Ts}}) ->
{pubrel, PacketId};
to_pub({PacketId, {Msg, _Ts}}) ->
{PacketId, emqx_message:set_flag(dup, true, Msg)}.
replay(Inflight) ->
lists:map(fun({PacketId, {pubrel, _Ts}}) ->
{pubrel, PacketId};
({PacketId, {Msg, _Ts}}) ->
{PacketId, emqx_message:set_flag(dup, true, Msg)}
end, emqx_inflight:to_list(Inflight)).
%%--------------------------------------------------------------------
%% Next Packet Id
@ -624,7 +634,7 @@ next_pkt_id(Session = #session{next_pkt_id = Id}) ->
-compile({inline, [sort_fun/0, batch_n/1, with_ts/1, age/2]}).
sort_fun() ->
fun({_, {_, Ts1}}, {_, {_, Ts2}}) -> Ts1 < Ts2 end.
fun({_, {_, Ts1}}, {_, {_, Ts2}}) -> Ts1 =< Ts2 end.
batch_n(Inflight) ->
case emqx_inflight:max_size(Inflight) of

View File

@ -92,16 +92,15 @@ t_chan_info(_) ->
?assertEqual(clientinfo(), ClientInfo).
t_chan_caps(_) ->
Caps = emqx_mqtt_caps:default(),
?assertEqual(Caps, emqx_channel:caps(channel())).
%%--------------------------------------------------------------------
%% Test cases for channel init
%%--------------------------------------------------------------------
%% TODO:
t_chan_init(_) ->
_Channel = channel().
#{max_clientid_len := 65535,
max_qos_allowed := 2,
max_topic_alias := 65535,
max_topic_levels := 0,
retain_available := true,
shared_subscription := true,
subscription_identifiers := true,
wildcard_subscription := true
} = emqx_channel:caps(channel()).
%%--------------------------------------------------------------------
%% Test cases for channel handle_in
@ -113,8 +112,8 @@ t_handle_in_connect_packet_sucess(_) ->
{ok, #{session => session(), present => false}}
end),
IdleChannel = channel(#{conn_state => idle}),
{ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, _)}], Channel}
= emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), IdleChannel),
{ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, _)}], Channel} =
emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), IdleChannel),
ClientInfo = emqx_channel:info(clientinfo, Channel),
?assertMatch(#{clientid := <<"clientid">>,
username := <<"username">>
@ -124,32 +123,47 @@ t_handle_in_connect_packet_sucess(_) ->
t_handle_in_unexpected_connect_packet(_) ->
Channel = emqx_channel:set_field(conn_state, connected, channel()),
Packet = ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR),
{ok, [{outgoing, Packet}, {close, protocol_error}], Channel}
= emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), Channel).
{ok, [{outgoing, Packet}, {close, protocol_error}], Channel} =
emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), Channel).
t_handle_in_qos0_publish(_) ->
ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
Channel = channel(#{conn_state => connected}),
Publish = ?PUBLISH_PACKET(?QOS_0, <<"topic">>, undefined, <<"payload">>),
{ok, _NChannel} = emqx_channel:handle_in(Publish, Channel).
% ?assertEqual(#{publish_in => 1}, emqx_channel:info(pub_stats, NChannel)).
t_handle_in_qos1_publish(_) ->
ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
Channel = channel(#{conn_state => connected}),
Publish = ?PUBLISH_PACKET(?QOS_1, <<"topic">>, 1, <<"payload">>),
{ok, ?PUBACK_PACKET(1, RC), _NChannel} = emqx_channel:handle_in(Publish, Channel),
?assert((RC == ?RC_SUCCESS) orelse (RC == ?RC_NO_MATCHING_SUBSCRIBERS)).
% ?assertEqual(#{publish_in => 1, puback_out => 1}, emqx_channel:info(pub_stats, NChannel)).
{ok, ?PUBACK_PACKET(1, ?RC_NO_MATCHING_SUBSCRIBERS), _Channel} =
emqx_channel:handle_in(Publish, channel(#{conn_state => connected})).
t_handle_in_qos2_publish(_) ->
ok = meck:expect(emqx_session, publish, fun(_, _Msg, Session) -> {ok, [], Session} end),
ok = meck:expect(emqx_session, info, fun(await_rel_timeout, _Session) -> 300 end),
Channel = channel(#{conn_state => connected}),
Publish = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>),
{ok, ?PUBREC_PACKET(1, RC), _NChannel} = emqx_channel:handle_in(Publish, Channel),
?assert((RC == ?RC_SUCCESS) orelse (RC == ?RC_NO_MATCHING_SUBSCRIBERS)).
% ?assertEqual(#{publish_in => 1, pubrec_out => 1}, emqx_channel:info(pub_stats, NChannel)).
ok = meck:expect(emqx_broker, publish, fun(_) -> [{node(), <<"topic">>, 1}] end),
Channel = channel(#{conn_state => connected, session => session()}),
Publish1 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>),
{ok, ?PUBREC_PACKET(1, ?RC_SUCCESS), Channel1} =
emqx_channel:handle_in(Publish1, Channel),
ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
Publish2 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 2, <<"payload">>),
{ok, ?PUBREC_PACKET(2, ?RC_NO_MATCHING_SUBSCRIBERS), Channel2} =
emqx_channel:handle_in(Publish2, Channel1),
?assertEqual(2, proplists:get_value(awaiting_rel_cnt, emqx_channel:stats(Channel2))).
t_handle_in_qos2_publish_with_error_return(_) ->
ok = meck:expect(emqx_metrics, inc, fun(_) -> ok end),
ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
Session = session(#{max_awaiting_rel => 2, awaiting_rel => #{1 => 1}}),
Channel = channel(#{conn_state => connected, session => Session}),
Publish1 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>),
{ok, ?PUBREC_PACKET(1, ?RC_PACKET_IDENTIFIER_IN_USE), Channel} =
emqx_channel:handle_in(Publish1, Channel),
Publish2 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 2, <<"payload">>),
{ok, ?PUBREC_PACKET(2, ?RC_NO_MATCHING_SUBSCRIBERS), Channel1} =
emqx_channel:handle_in(Publish2, Channel),
Publish3 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 3, <<"payload">>),
{ok, ?PUBREC_PACKET(3, ?RC_RECEIVE_MAXIMUM_EXCEEDED), Channel1} =
emqx_channel:handle_in(Publish3, Channel1).
t_handle_in_puback_ok(_) ->
Msg = emqx_message:make(<<"t">>, <<"payload">>),
@ -179,46 +193,38 @@ t_handle_in_pubrec_ok(_) ->
Msg = emqx_message:make(test,?QOS_2, <<"t">>, <<"payload">>),
ok = meck:expect(emqx_session, pubrec, fun(_, Session) -> {ok, Msg, Session} end),
Channel = channel(#{conn_state => connected}),
{ok, ?PUBREL_PACKET(1, ?RC_SUCCESS), _Channel1}
= emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), Channel).
% ?assertEqual(#{pubrec_in => 1, pubrel_out => 1},
% emqx_channel:info(pub_stats, Channel1)).
{ok, ?PUBREL_PACKET(1, ?RC_SUCCESS), _Channel1} =
emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), Channel).
t_handle_in_pubrec_id_in_use(_) ->
ok = meck:expect(emqx_session, pubrec,
fun(_, _Session) ->
{error, ?RC_PACKET_IDENTIFIER_IN_USE}
end),
{ok, ?PUBREL_PACKET(1, ?RC_PACKET_IDENTIFIER_IN_USE), _Channel}
= emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), channel()).
% ?assertEqual(#{pubrec_in => 1, pubrel_out => 1},
% emqx_channel:info(pub_stats, Channel)).
{ok, ?PUBREL_PACKET(1, ?RC_PACKET_IDENTIFIER_IN_USE), _Channel} =
emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), channel()).
t_handle_in_pubrec_id_not_found(_) ->
ok = meck:expect(emqx_session, pubrec,
fun(_, _Session) ->
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
end),
{ok, ?PUBREL_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), _Channel}
= emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), channel()).
% ?assertEqual(#{pubrec_in => 1, pubrel_out => 1},
% emqx_channel:info(pub_stats, Channel)).
{ok, ?PUBREL_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), _Channel} =
emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), channel()).
t_handle_in_pubrel_ok(_) ->
ok = meck:expect(emqx_session, pubrel, fun(_, Session) -> {ok, Session} end),
Channel = channel(#{conn_state => connected}),
{ok, ?PUBCOMP_PACKET(1, ?RC_SUCCESS), _Channel1}
= emqx_channel:handle_in(?PUBREL_PACKET(1, ?RC_SUCCESS), Channel).
% ?assertEqual(#{pubrel_in => 1, pubcomp_out => 1},
% emqx_channel:info(pub_stats, Channel1)).
{ok, ?PUBCOMP_PACKET(1, ?RC_SUCCESS), _Channel1} =
emqx_channel:handle_in(?PUBREL_PACKET(1, ?RC_SUCCESS), Channel).
t_handle_in_pubrel_not_found_error(_) ->
ok = meck:expect(emqx_session, pubrel,
fun(_PacketId, _Session) ->
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
end),
{ok, ?PUBCOMP_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), _Channel}
= emqx_channel:handle_in(?PUBREL_PACKET(1, ?RC_SUCCESS), channel()).
{ok, ?PUBCOMP_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), _Channel} =
emqx_channel:handle_in(?PUBREL_PACKET(1, ?RC_SUCCESS), channel()).
t_handle_in_pubcomp_ok(_) ->
ok = meck:expect(emqx_session, pubcomp, fun(_, Session) -> {ok, Session} end),
@ -232,7 +238,6 @@ t_handle_in_pubcomp_not_found_error(_) ->
end),
Channel = channel(#{conn_state => connected}),
{ok, _Channel1} = emqx_channel:handle_in(?PUBCOMP_PACKET(1, ?RC_SUCCESS), Channel).
% ?assertEqual(#{pubcomp_in => 1}, emqx_channel:info(pub_stats, Channel1)).
t_handle_in_subscribe(_) ->
ok = meck:expect(emqx_session, subscribe,
@ -249,12 +254,12 @@ t_handle_in_unsubscribe(_) ->
{ok, Session}
end),
Channel = channel(#{conn_state => connected}),
{ok, [{outgoing, ?UNSUBACK_PACKET(1)}, {event, updated}], _Chan}
= emqx_channel:handle_in(?UNSUBSCRIBE_PACKET(1, #{}, [<<"+">>]), Channel).
{ok, [{outgoing, ?UNSUBACK_PACKET(1)}, {event, updated}], _Chan} =
emqx_channel:handle_in(?UNSUBSCRIBE_PACKET(1, #{}, [<<"+">>]), Channel).
t_handle_in_pingreq(_) ->
{ok, ?PACKET(?PINGRESP), _Channel}
= emqx_channel:handle_in(?PACKET(?PINGREQ), channel()).
{ok, ?PACKET(?PINGRESP), _Channel} =
emqx_channel:handle_in(?PACKET(?PINGREQ), channel()).
t_handle_in_disconnect(_) ->
Packet = ?DISCONNECT_PACKET(?RC_SUCCESS),
@ -265,38 +270,37 @@ t_handle_in_disconnect(_) ->
t_handle_in_auth(_) ->
Channel = channel(#{conn_state => connected}),
Packet = ?DISCONNECT_PACKET(?RC_IMPLEMENTATION_SPECIFIC_ERROR),
{ok, [{outgoing, Packet},
{close, implementation_specific_error}], Channel}
= emqx_channel:handle_in(?AUTH_PACKET(), Channel).
{ok, [{outgoing, Packet}, {close, implementation_specific_error}], Channel} =
emqx_channel:handle_in(?AUTH_PACKET(), Channel).
t_handle_in_frame_error(_) ->
IdleChannel = channel(#{conn_state => idle}),
{shutdown, frame_too_large, _}
= emqx_channel:handle_in({frame_error, frame_too_large}, IdleChannel),
{shutdown, frame_too_large, _Chan} =
emqx_channel:handle_in({frame_error, frame_too_large}, IdleChannel),
ConnectingChan = channel(#{conn_state => connecting}),
ConnackPacket = ?CONNACK_PACKET(?RC_MALFORMED_PACKET),
{shutdown, frame_too_large, ConnackPacket, _}
= emqx_channel:handle_in({frame_error, frame_too_large}, ConnectingChan),
{shutdown, frame_too_large, ConnackPacket, _} =
emqx_channel:handle_in({frame_error, frame_too_large}, ConnectingChan),
DisconnectPacket = ?DISCONNECT_PACKET(?RC_MALFORMED_PACKET),
ConnectedChan = channel(#{conn_state => connected}),
{ok, [{outgoing, DisconnectPacket}, {close, frame_too_large}], _}
= emqx_channel:handle_in({frame_error, frame_too_large}, ConnectedChan),
{ok, [{outgoing, DisconnectPacket}, {close, frame_too_large}], _} =
emqx_channel:handle_in({frame_error, frame_too_large}, ConnectedChan),
DisconnectedChan = channel(#{conn_state => disconnected}),
{ok, DisconnectedChan}
= emqx_channel:handle_in({frame_error, frame_too_large}, DisconnectedChan).
{ok, DisconnectedChan} =
emqx_channel:handle_in({frame_error, frame_too_large}, DisconnectedChan).
t_handle_in_expected_packet(_) ->
Packet = ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR),
{ok, [{outgoing, Packet}, {close, protocol_error}], _Chan}
= emqx_channel:handle_in(packet, channel()).
{ok, [{outgoing, Packet}, {close, protocol_error}], _Chan} =
emqx_channel:handle_in(packet, channel()).
t_process_connect(_) ->
ok = meck:expect(emqx_cm, open_session,
fun(true, _ClientInfo, _ConnInfo) ->
{ok, #{session => session(), present => false}}
end),
{ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS)}], _Chan}
= emqx_channel:process_connect(connpkt(), channel(#{conn_state => idle})).
{ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS)}], _Chan} =
emqx_channel:process_connect(connpkt(), channel(#{conn_state => idle})).
t_process_publish_qos0(_) ->
ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
@ -306,8 +310,8 @@ t_process_publish_qos0(_) ->
t_process_publish_qos1(_) ->
ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
Publish = ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>),
{ok, ?PUBACK_PACKET(1, ?RC_NO_MATCHING_SUBSCRIBERS), _Channel}
= emqx_channel:process_publish(Publish, channel()).
{ok, ?PUBACK_PACKET(1, ?RC_NO_MATCHING_SUBSCRIBERS), _Channel} =
emqx_channel:process_publish(Publish, channel()).
t_process_subscribe(_) ->
ok = meck:expect(emqx_session, subscribe, fun(_, _, _, Session) -> {ok, Session} end),
@ -347,14 +351,14 @@ t_handle_out_publish(_) ->
Channel = channel(#{conn_state => connected}),
Pub0 = {undefined, emqx_message:make(<<"t">>, <<"qos0">>)},
Pub1 = {1, emqx_message:make(<<"c">>, ?QOS_1, <<"t">>, <<"qos1">>)},
{ok, {outgoing, Packets}, _NChannel}
= emqx_channel:handle_out(publish, [Pub0, Pub1], Channel),
{ok, {outgoing, Packets}, _NChannel} =
emqx_channel:handle_out(publish, [Pub0, Pub1], Channel),
?assertEqual(2, length(Packets)).
t_handle_out_publish_1(_) ->
Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"t">>, <<"payload">>),
{ok, ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>), _Chan}
= emqx_channel:handle_out(publish, [{1, Msg}], channel()).
{ok, ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>), _Chan} =
emqx_channel:handle_out(publish, [{1, Msg}], channel()).
t_handle_out_publish_nl(_) ->
ClientInfo = clientinfo(#{clientid => <<"clientid">>}),
@ -364,50 +368,47 @@ t_handle_out_publish_nl(_) ->
{ok, Channel} = emqx_channel:handle_out(publish, Pubs, Channel).
t_handle_out_connack_sucess(_) ->
{ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, _)}], Channel}
= emqx_channel:handle_out(connack, {?RC_SUCCESS, 0, connpkt()}, channel()),
{ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, _)}], Channel} =
emqx_channel:handle_out(connack, {?RC_SUCCESS, 0, connpkt()}, channel()),
?assertEqual(connected, emqx_channel:info(conn_state, Channel)).
t_handle_out_connack_failure(_) ->
{shutdown, not_authorized, ?CONNACK_PACKET(?RC_NOT_AUTHORIZED), _Chan}
= emqx_channel:handle_out(connack, {?RC_NOT_AUTHORIZED, connpkt()}, channel()).
{shutdown, not_authorized, ?CONNACK_PACKET(?RC_NOT_AUTHORIZED), _Chan} =
emqx_channel:handle_out(connack, {?RC_NOT_AUTHORIZED, connpkt()}, channel()).
t_handle_out_puback(_) ->
Channel = channel(#{conn_state => connected}),
{ok, ?PUBACK_PACKET(1, ?RC_SUCCESS), _NChannel}
= emqx_channel:handle_out(puback, {1, ?RC_SUCCESS}, Channel).
% ?assertEqual(#{puback_out => 1}, emqx_channel:info(pub_stats, NChannel)).
{ok, ?PUBACK_PACKET(1, ?RC_SUCCESS), _NChannel} =
emqx_channel:handle_out(puback, {1, ?RC_SUCCESS}, Channel).
t_handle_out_pubrec(_) ->
Channel = channel(#{conn_state => connected}),
{ok, ?PUBREC_PACKET(1, ?RC_SUCCESS), _NChannel}
= emqx_channel:handle_out(pubrec, {1, ?RC_SUCCESS}, Channel).
{ok, ?PUBREC_PACKET(1, ?RC_SUCCESS), _NChannel} =
emqx_channel:handle_out(pubrec, {1, ?RC_SUCCESS}, Channel).
t_handle_out_pubrel(_) ->
Channel = channel(#{conn_state => connected}),
{ok, ?PUBREL_PACKET(1), Channel1}
= emqx_channel:handle_out(pubrel, {1, ?RC_SUCCESS}, Channel),
{ok, ?PUBREL_PACKET(2, ?RC_SUCCESS), _Channel2}
= emqx_channel:handle_out(pubrel, {2, ?RC_SUCCESS}, Channel1).
{ok, ?PUBREL_PACKET(1), Channel1} =
emqx_channel:handle_out(pubrel, {1, ?RC_SUCCESS}, Channel),
{ok, ?PUBREL_PACKET(2, ?RC_SUCCESS), _Channel2} =
emqx_channel:handle_out(pubrel, {2, ?RC_SUCCESS}, Channel1).
t_handle_out_pubcomp(_) ->
{ok, ?PUBCOMP_PACKET(1, ?RC_SUCCESS), _Channel}
= emqx_channel:handle_out(pubcomp, {1, ?RC_SUCCESS}, channel()).
{ok, ?PUBCOMP_PACKET(1, ?RC_SUCCESS), _Channel} =
emqx_channel:handle_out(pubcomp, {1, ?RC_SUCCESS}, channel()).
t_handle_out_suback(_) ->
Replies = [{outgoing, ?SUBACK_PACKET(1, [?QOS_2])}, {event, updated}],
{ok, Replies, _Channel}
= emqx_channel:handle_out(suback, {1, [?QOS_2]}, channel()).
{ok, Replies, _Chan} = emqx_channel:handle_out(suback, {1, [?QOS_2]}, channel()).
t_handle_out_unsuback(_) ->
Replies = [{outgoing, ?UNSUBACK_PACKET(1, [?RC_SUCCESS])}, {event, updated}],
{ok, Replies, _Channel}
= emqx_channel:handle_out(unsuback, {1, [?RC_SUCCESS]}, channel()).
{ok, Replies, _Chan} = emqx_channel:handle_out(unsuback, {1, [?RC_SUCCESS]}, channel()).
t_handle_out_disconnect(_) ->
Packet = ?DISCONNECT_PACKET(?RC_SUCCESS),
{ok, [{outgoing, Packet}, {close, normal}], _Chan}
= emqx_channel:handle_out(disconnect, ?RC_SUCCESS, channel()).
{ok, [{outgoing, Packet}, {close, normal}], _Chan} =
emqx_channel:handle_out(disconnect, ?RC_SUCCESS, channel()).
t_handle_out_unexpected(_) ->
{ok, _Channel} = emqx_channel:handle_out(unexpected, <<"data">>, channel()).
@ -421,20 +422,19 @@ t_handle_call_kick(_) ->
t_handle_call_discard(_) ->
Packet = ?DISCONNECT_PACKET(?RC_SESSION_TAKEN_OVER),
{shutdown, discarded, ok, Packet, _Channel}
= emqx_channel:handle_call(discard, channel()).
{shutdown, discarded, ok, Packet, _Channel} =
emqx_channel:handle_call(discard, channel()).
t_handle_call_takeover_begin(_) ->
{reply, undefined, _Channel}
= emqx_channel:handle_call({takeover, 'begin'}, channel()).
{reply, undefined, _Chan} = emqx_channel:handle_call({takeover, 'begin'}, channel()).
t_handle_call_takeover_end(_) ->
ok = meck:expect(emqx_session, takeover, fun(_) -> ok end),
{shutdown, takeovered, [], _Channel}
= emqx_channel:handle_call({takeover, 'end'}, channel()).
{shutdown, takeovered, [], _Chan} =
emqx_channel:handle_call({takeover, 'end'}, channel()).
t_handle_call_unexpected(_) ->
{reply, ignored, _Channel} = emqx_channel:handle_call(unexpected_req, channel()).
{reply, ignored, _Chan} = emqx_channel:handle_call(unexpected_req, channel()).
%%--------------------------------------------------------------------
%% Test cases for handle_info
@ -506,8 +506,8 @@ t_auth_connect(_) ->
t_process_alias(_) ->
Publish = #mqtt_packet_publish{topic_name = <<>>, properties = #{'Topic-Alias' => 1}},
Channel = emqx_channel:set_field(topic_aliases, #{1 => <<"t">>}, channel()),
{ok, #mqtt_packet{variable = #mqtt_packet_publish{topic_name = <<"t">>}}, _Chan}
= emqx_channel:process_alias(#mqtt_packet{variable = Publish}, Channel).
{ok, #mqtt_packet{variable = #mqtt_packet_publish{topic_name = <<"t">>}}, _Chan} =
emqx_channel:process_alias(#mqtt_packet{variable = Publish}, Channel).
t_check_pub_acl(_) ->
ok = meck:new(emqx_zone, [passthrough, no_history]),
@ -607,6 +607,6 @@ session(InitFields) when is_map(InitFields) ->
maps:fold(fun(Field, Value, Session) ->
emqx_session:set_field(Field, Value, Session)
end,
emqx_session:init(#{zone => zone}, #{receive_maximum => 0}),
emqx_session:init(#{zone => channel}, #{receive_maximum => 0}),
InitFields).

View File

@ -23,8 +23,6 @@
-include_lib("proper/include/proper.hrl").
-include_lib("eunit/include/eunit.hrl").
-import(emqx_session, [set_field/3]).
all() -> emqx_ct:all(?MODULE).
%%--------------------------------------------------------------------
@ -57,6 +55,7 @@ t_session_init(_) ->
?assertEqual(64, emqx_session:info(inflight_max, Session)),
?assertEqual(1, emqx_session:info(next_pkt_id, Session)),
?assertEqual(0, emqx_session:info(retry_interval, Session)),
?assertEqual(0, emqx_mqueue:len(emqx_session:info(mqueue, Session))),
?assertEqual(0, emqx_session:info(awaiting_rel_cnt, Session)),
?assertEqual(100, emqx_session:info(awaiting_rel_max, Session)),
?assertEqual(300, emqx_session:info(await_rel_timeout, Session)),
@ -86,12 +85,11 @@ t_session_stats(_) ->
}, maps:from_list(Stats)).
%%--------------------------------------------------------------------
%% Test cases for pub/sub
%% Test cases for sub/unsub
%%--------------------------------------------------------------------
t_subscribe(_) ->
ok = meck:expect(emqx_broker, subscribe, fun(_, _, _) -> ok end),
ok = meck:expect(emqx_broker, set_subopts, fun(_, _) -> ok end),
{ok, Session} = emqx_session:subscribe(
clientinfo(), <<"#">>, subopts(), session()),
?assertEqual(1, emqx_session:info(subscriptions_cnt, Session)).
@ -101,110 +99,217 @@ t_is_subscriptions_full_false(_) ->
?assertNot(emqx_session:is_subscriptions_full(Session)).
t_is_subscriptions_full_true(_) ->
ok = meck:expect(emqx_broker, subscribe, fun(_, _, _) -> ok end),
Session = session(#{max_subscriptions => 1}),
?assertNot(emqx_session:is_subscriptions_full(Session)),
Subs = #{<<"t1">> => subopts(), <<"t2">> => subopts()},
NSession = set_field(subscriptions, Subs, Session),
?assert(emqx_session:is_subscriptions_full(NSession)).
{ok, Session1} = emqx_session:subscribe(
clientinfo(), <<"t1">>, subopts(), Session),
?assert(emqx_session:is_subscriptions_full(Session1)),
{error, ?RC_QUOTA_EXCEEDED} =
emqx_session:subscribe(clientinfo(), <<"t2">>, subopts(), Session1).
t_unsubscribe(_) ->
ok = meck:expect(emqx_broker, unsubscribe, fun(_) -> ok end),
Session = session(#{subscriptions => #{<<"#">> => subopts()}}),
{ok, NSession} = emqx_session:unsubscribe(clientinfo(), <<"#">>, Session),
Error = emqx_session:unsubscribe(clientinfo(), <<"#">>, NSession),
?assertEqual({error, ?RC_NO_SUBSCRIPTION_EXISTED}, Error).
{ok, Session1} = emqx_session:unsubscribe(clientinfo(), <<"#">>, Session),
{error, ?RC_NO_SUBSCRIPTION_EXISTED} =
emqx_session:unsubscribe(clientinfo(), <<"#">>, Session1).
t_publish_qos0(_) ->
ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
Msg = emqx_message:make(test, ?QOS_0, <<"t">>, <<"payload">>),
{ok, [], Session} = emqx_session:publish(0, Msg, Session = session()).
Msg = emqx_message:make(clientid, ?QOS_0, <<"t">>, <<"payload">>),
{ok, [], Session} = emqx_session:publish(1, Msg, Session = session()),
{ok, [], Session} = emqx_session:publish(undefined, Msg, Session).
t_publish_qos1(_) ->
ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
Msg = emqx_message:make(test, ?QOS_1, <<"t">>, <<"payload">>),
{ok, [], _Session} = emqx_session:publish(1, Msg, session()).
Msg = emqx_message:make(clientid, ?QOS_1, <<"t">>, <<"payload">>),
{ok, [], Session} = emqx_session:publish(1, Msg, Session = session()),
{ok, [], Session} = emqx_session:publish(2, Msg, Session).
t_publish_qos2(_) ->
ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<"payload">>),
Msg = emqx_message:make(clientid, ?QOS_2, <<"t">>, <<"payload">>),
{ok, [], Session} = emqx_session:publish(1, Msg, session()),
?assertEqual(1, emqx_session:info(awaiting_rel_cnt, Session)).
?assertEqual(1, emqx_session:info(awaiting_rel_cnt, Session)),
{ok, Session1} = emqx_session:pubrel(1, Session),
?assertEqual(0, emqx_session:info(awaiting_rel_cnt, Session1)),
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrel(1, Session1).
t_publish_qos2_with_error_return(_) ->
ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
Session = session(#{max_awaiting_rel => 2,
awaiting_rel => #{1 => ts(millisecond)}
}),
Msg = emqx_message:make(clientid, ?QOS_2, <<"t">>, <<"payload">>),
{error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:publish(1, Msg, Session),
{ok, [], Session1} = emqx_session:publish(2, Msg, Session),
?assertEqual(2, emqx_session:info(awaiting_rel_cnt, Session1)),
{error, ?RC_RECEIVE_MAXIMUM_EXCEEDED} = emqx_session:publish(3, Msg, Session1).
t_is_awaiting_full_false(_) ->
?assertNot(emqx_session:is_awaiting_full(session(#{max_awaiting_rel => 0}))).
Session = session(#{max_awaiting_rel => 0}),
?assertNot(emqx_session:is_awaiting_full(Session)).
t_is_awaiting_full_true(_) ->
Session = session(#{max_awaiting_rel => 1,
awaiting_rel => #{1 => 1}
awaiting_rel => #{1 => ts(millisecond)}
}),
?assert(emqx_session:is_awaiting_full(Session)).
t_puback(_) ->
Msg = emqx_message:make(test, ?QOS_1, <<"t">>, <<>>),
Inflight = emqx_inflight:insert(1, {Msg, erlang:system_time(millisecond)}, emqx_inflight:new()),
Session = set_field(inflight, Inflight, session()),
{ok, Msg, NSession} = emqx_session:puback(1, Session),
?assertEqual(0, emqx_session:info(inflight_cnt, NSession)).
Inflight = emqx_inflight:insert(1, {Msg, ts(millisecond)}, emqx_inflight:new()),
Session = session(#{inflight => Inflight, mqueue => mqueue()}),
{ok, Msg, Session1} = emqx_session:puback(1, Session),
?assertEqual(0, emqx_session:info(inflight_cnt, Session1)).
t_puback_with_dequeue(_) ->
Msg1 = emqx_message:make(clientid, ?QOS_1, <<"t1">>, <<"payload1">>),
Inflight = emqx_inflight:insert(1, {Msg1, ts(millisecond)}, emqx_inflight:new()),
Msg2 = emqx_message:make(clientid, ?QOS_1, <<"t2">>, <<"payload2">>),
{_, Q} = emqx_mqueue:in(Msg2, mqueue(#{max_len => 10})),
Session = session(#{inflight => Inflight, mqueue => Q}),
{ok, Msg1, [{_, Msg3}], Session1} = emqx_session:puback(1, Session),
?assertEqual(1, emqx_session:info(inflight_cnt, Session1)),
?assertEqual(0, emqx_session:info(mqueue_len, Session1)),
?assertEqual(<<"t2">>, emqx_message:topic(Msg3)).
t_puback_error_packet_id_in_use(_) ->
Inflight = emqx_inflight:insert(1, {pubrel, erlang:system_time(millisecond)}, emqx_inflight:new()),
Session = set_field(inflight, Inflight, session()),
{error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:puback(1, Session).
Inflight = emqx_inflight:insert(1, {pubrel, ts(millisecond)}, emqx_inflight:new()),
{error, ?RC_PACKET_IDENTIFIER_IN_USE} =
emqx_session:puback(1, session(#{inflight => Inflight})).
t_puback_error_packet_id_not_found(_) ->
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:puback(1, session()).
t_pubrec(_) ->
Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<>>),
Inflight = emqx_inflight:insert(2, {Msg, erlang:system_time(millisecond)}, emqx_inflight:new()),
Session = set_field(inflight, Inflight, session()),
{ok, Msg, NSession} = emqx_session:pubrec(2, Session),
?assertMatch([{pubrel, _}], emqx_inflight:values(emqx_session:info(inflight, NSession))).
Inflight = emqx_inflight:insert(2, {Msg, ts(millisecond)}, emqx_inflight:new()),
Session = session(#{inflight => Inflight}),
{ok, Msg, Session1} = emqx_session:pubrec(2, Session),
?assertMatch([{pubrel, _}], emqx_inflight:values(emqx_session:info(inflight, Session1))).
t_pubrec_packet_id_in_use_error(_) ->
Inflight = emqx_inflight:insert(1, {pubrel, erlang:system_time(millisecond)}, emqx_inflight:new()),
Session = set_field(inflight, Inflight, session()),
{error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:puback(1, Session).
Inflight = emqx_inflight:insert(1, {pubrel, ts(millisecond)}, emqx_inflight:new()),
{error, ?RC_PACKET_IDENTIFIER_IN_USE} =
emqx_session:pubrec(1, session(#{inflight => Inflight})).
t_pubrec_packet_id_not_found_error(_) ->
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrec(1, session()).
t_pubrel(_) ->
Session = set_field(awaiting_rel, #{1 => erlang:system_time(millisecond)}, session()),
{ok, NSession} = emqx_session:pubrel(1, Session),
?assertEqual(#{}, emqx_session:info(awaiting_rel, NSession)).
Session = session(#{awaiting_rel => #{1 => ts(millisecond)}}),
{ok, Session1} = emqx_session:pubrel(1, Session),
?assertEqual(#{}, emqx_session:info(awaiting_rel, Session1)).
t_pubrel_id_not_found(_) ->
t_pubrel_error_packetid_not_found(_) ->
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrel(1, session()).
t_pubcomp(_) ->
Inflight = emqx_inflight:insert(2, {pubrel, erlang:system_time(millisecond)}, emqx_inflight:new()),
Session = emqx_session:set_field(inflight, Inflight, session()),
{ok, NSession} = emqx_session:pubcomp(2, Session),
?assertEqual(0, emqx_session:info(inflight_cnt, NSession)).
Inflight = emqx_inflight:insert(1, {pubrel, ts(millisecond)}, emqx_inflight:new()),
Session = session(#{inflight => Inflight}),
{ok, Session1} = emqx_session:pubcomp(1, Session),
?assertEqual(0, emqx_session:info(inflight_cnt, Session1)).
t_pubcomp_id_not_found(_) ->
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubcomp(2, session()).
t_pubcomp_error_packetid_in_use(_) ->
Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<>>),
Inflight = emqx_inflight:insert(1, {Msg, ts(millisecond)}, emqx_inflight:new()),
Session = session(#{inflight => Inflight}),
{error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:pubcomp(1, Session).
t_pubcomp_error_packetid_not_found(_) ->
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubcomp(1, session()).
%%--------------------------------------------------------------------
%% Test cases for deliver/retry
%%--------------------------------------------------------------------
t_dequeue(_) ->
{ok, _Session} = emqx_session:dequeue(session()).
Q = mqueue(#{store_qos0 => true}),
{ok, Session} = emqx_session:dequeue(session(#{mqueue => Q})),
Msgs = [emqx_message:make(clientid, ?QOS_0, <<"t0">>, <<"payload">>),
emqx_message:make(clientid, ?QOS_1, <<"t1">>, <<"payload">>),
emqx_message:make(clientid, ?QOS_2, <<"t2">>, <<"payload">>)
],
Session1 = lists:foldl(fun emqx_session:enqueue/2, Session, Msgs),
{ok, [{undefined, Msg0}, {1, Msg1}, {2, Msg2}], Session2} =
emqx_session:dequeue(Session1),
?assertEqual(0, emqx_session:info(mqueue_len, Session2)),
?assertEqual(2, emqx_session:info(inflight_cnt, Session2)),
?assertEqual(<<"t0">>, emqx_message:topic(Msg0)),
?assertEqual(<<"t1">>, emqx_message:topic(Msg1)),
?assertEqual(<<"t2">>, emqx_message:topic(Msg2)).
t_deliver(_) ->
t_deliver_qos0(_) ->
ok = meck:expect(emqx_broker, subscribe, fun(_, _, _) -> ok end),
{ok, Session} = emqx_session:subscribe(
clientinfo(), <<"t0">>, subopts(), session()),
{ok, Session1} = emqx_session:subscribe(
clientinfo(), <<"t1">>, subopts(), Session),
Deliveries = [delivery(?QOS_0, T) || T <- [<<"t0">>, <<"t1">>]],
{ok, [{undefined, Msg1}, {undefined, Msg2}], Session1} =
emqx_session:deliver(Deliveries, Session1),
?assertEqual(<<"t0">>, emqx_message:topic(Msg1)),
?assertEqual(<<"t1">>, emqx_message:topic(Msg2)).
t_deliver_qos1(_) ->
ok = meck:expect(emqx_broker, subscribe, fun(_, _, _) -> ok end),
{ok, Session} = emqx_session:subscribe(
clientinfo(), <<"t1">>, subopts(#{qos => ?QOS_1}), session()),
Delivers = [delivery(?QOS_1, T) || T <- [<<"t1">>, <<"t2">>]],
{ok, [{1, Msg1}, {2, Msg2}], Session1} = emqx_session:deliver(Delivers, Session),
?assertEqual(2, emqx_session:info(inflight_cnt, Session1)),
?assertEqual(<<"t1">>, emqx_message:topic(Msg1)),
?assertEqual(<<"t2">>, emqx_message:topic(Msg2)),
{ok, Msg1, Session2} = emqx_session:puback(1, Session1),
?assertEqual(1, emqx_session:info(inflight_cnt, Session2)),
{ok, Msg2, Session3} = emqx_session:puback(2, Session2),
?assertEqual(0, emqx_session:info(inflight_cnt, Session3)).
t_deliver_qos2(_) ->
ok = meck:expect(emqx_broker, subscribe, fun(_, _, _) -> ok end),
Delivers = [delivery(?QOS_2, <<"t0">>), delivery(?QOS_2, <<"t1">>)],
{ok, [{1, Msg1}, {2, Msg2}], Session} =
emqx_session:deliver(Delivers, session()),
?assertEqual(2, emqx_session:info(inflight_cnt, Session)),
?assertEqual(<<"t0">>, emqx_message:topic(Msg1)),
?assertEqual(<<"t1">>, emqx_message:topic(Msg2)).
t_deliver_one_msg(_) ->
{ok, [{1, Msg}], Session} =
emqx_session:deliver([delivery(?QOS_1, <<"t1">>)], session()),
?assertEqual(1, emqx_session:info(inflight_cnt, Session)),
?assertEqual(<<"t1">>, emqx_message:topic(Msg)).
t_deliver_when_inflight_is_full(_) ->
Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)],
{ok, Publishes, _Session} = emqx_session:deliver(Delivers, session()),
?assertEqual(2, length(Publishes)).
Session = session(#{inflight => emqx_inflight:new(1)}),
{ok, Publishes, Session1} = emqx_session:deliver(Delivers, Session),
?assertEqual(1, length(Publishes)),
?assertEqual(1, emqx_session:info(inflight_cnt, Session1)),
?assertEqual(1, emqx_session:info(mqueue_len, Session1)),
{ok, Msg1, [{2, Msg2}], Session2} = emqx_session:puback(1, Session1),
?assertEqual(1, emqx_session:info(inflight_cnt, Session2)),
?assertEqual(0, emqx_session:info(mqueue_len, Session2)),
?assertEqual(<<"t1">>, emqx_message:topic(Msg1)),
?assertEqual(<<"t2">>, emqx_message:topic(Msg2)).
t_enqueue(_) ->
Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)],
Session = emqx_session:enqueue(Delivers, session()),
?assertEqual(2, emqx_session:info(mqueue_len, Session)).
%% store_qos0 = true
Session = emqx_session:enqueue([delivery(?QOS_0, <<"t0">>)], session()),
Session1 = emqx_session:enqueue([delivery(?QOS_1, <<"t1">>),
delivery(?QOS_2, <<"t2">>)], Session),
?assertEqual(3, emqx_session:info(mqueue_len, Session1)).
t_retry(_) ->
{ok, _Session} = emqx_session:retry(session()).
Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)],
Session = session(#{retry_interval => 100}),
{ok, Pubs, Session1} = emqx_session:deliver(Delivers, Session),
ok = timer:sleep(200),
Msgs1 = [{I, emqx_message:set_flag(dup, Msg)} || {I, Msg} <- Pubs],
{ok, Msgs1, 100, Session2} = emqx_session:retry(Session1),
?assertEqual(2, emqx_session:info(inflight_cnt, Session2)).
%%--------------------------------------------------------------------
%% Test cases for takeover/resume
@ -220,22 +325,53 @@ t_resume(_) ->
Session = session(#{subscriptions => #{<<"t">> => ?DEFAULT_SUBOPTS}}),
ok = emqx_session:resume(<<"clientid">>, Session).
t_redeliver(_) ->
{ok, [], _Session} = emqx_session:redeliver(session()).
t_replay(_) ->
Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)],
{ok, Pubs, Session1} = emqx_session:deliver(Delivers, session()),
Msg = emqx_message:make(clientid, ?QOS_1, <<"t1">>, <<"payload">>),
Session2 = emqx_session:enqueue(Msg, Session1),
Pubs1 = [{I, emqx_message:set_flag(dup, Msg)} || {I, Msg} <- Pubs],
{ok, ReplayPubs, Session3} = emqx_session:replay(Session2),
?assertEqual(Pubs1 ++ [{3, Msg}], ReplayPubs),
?assertEqual(3, emqx_session:info(inflight_cnt, Session3)).
t_expire(_) ->
{ok, _Session} = emqx_session:expire(awaiting_rel, session()).
t_expire_awaiting_rel(_) ->
{ok, Session} = emqx_session:expire(awaiting_rel, session()),
Timeout = emqx_session:info(await_rel_timeout, Session) * 1000,
Session1 = emqx_session:set_field(awaiting_rel, #{1 => Ts = ts(millisecond)}, Session),
{ok, Timeout, Session2} = emqx_session:expire(awaiting_rel, Session1),
?assertEqual(#{1 => Ts}, emqx_session:info(awaiting_rel, Session2)).
t_expire_awaiting_rel_all(_) ->
Session = session(#{awaiting_rel => #{1 => 1, 2 => 2}}),
{ok, Session1} = emqx_session:expire(awaiting_rel, Session),
?assertEqual(#{}, emqx_session:info(awaiting_rel, Session1)).
%%--------------------------------------------------------------------
%% CT for utility functions
%%--------------------------------------------------------------------
t_next_pakt_id(_) ->
Session = session(#{next_pkt_id => 16#FFFF}),
Session1 = emqx_session:next_pkt_id(Session),
?assertEqual(1, emqx_session:info(next_pkt_id, Session1)),
Session2 = emqx_session:next_pkt_id(Session1),
?assertEqual(2, emqx_session:info(next_pkt_id, Session2)).
%%--------------------------------------------------------------------
%% Helper functions
%%--------------------------------------------------------------------
mqueue() -> mqueue(#{}).
mqueue(Opts) ->
emqx_mqueue:init(maps:merge(#{max_len => 0, store_qos0 => false}, Opts)).
session() -> session(#{}).
session(InitFields) when is_map(InitFields) ->
maps:fold(fun(Field, Value, Session) ->
emqx_session:set_field(Field, Value, Session)
end,
emqx_session:init(#{zone => zone}, #{receive_maximum => 0}),
emqx_session:init(#{zone => channel}, #{receive_maximum => 0}),
InitFields).
@ -252,3 +388,8 @@ subopts(Init) ->
delivery(QoS, Topic) ->
{deliver, Topic, emqx_message:make(test, QoS, Topic, <<"payload">>)}.
ts(second) ->
erlang:system_time(second);
ts(millisecond) ->
erlang:system_time(millisecond).